Source: types/result-stream.js

"use strict";

const { Readable } = require("stream");
const utils = require("../utils");
const errors = require("../errors");
const clientOptions = require("../client-options");

/** @module types */
/**
 * Readable stream using to yield data from a result or a field
 */
class ResultStream extends Readable {
    constructor(opt) {
        super(opt);
        this.buffer = [];
        this.paused = true;
        this._cancelAllowed = false;
        this._handlersObject = null;
        this._highWaterMarkRows = 0;
    }

    _read() {
        this.paused = false;
        if (this.buffer.length === 0) {
            this._readableState.reading = false;
        }
        while (!this.paused && this.buffer.length > 0) {
            this.paused = !this.push(this.buffer.shift());
        }
        this._checkBelowHighWaterMark();
        if (!this.paused && !this.buffer.length && this._readNext) {
            this._readNext();
            this._readNext = null;
        }
    }

    /**
     * Allows for throttling, helping nodejs keep the internal buffers reasonably sized.
     * @param {function} readNext function that triggers reading the next result chunk
     * @ignore
     */
    _valve(readNext) {
        this._readNext = null;
        if (!readNext) {
            return;
        }
        if (this.paused || this.buffer.length) {
            this._readNext = readNext;
        } else {
            readNext();
        }
    }

    add(chunk) {
        const length = this.buffer.push(chunk);
        this.read(0);
        this._checkAboveHighWaterMark();
        return length;
    }

    _checkAboveHighWaterMark() {
        if (
            !this._handlersObject ||
            !this._handlersObject.resumeReadingHandler
        ) {
            return;
        }
        if (
            this._highWaterMarkRows === 0 ||
            this.buffer.length !== this._highWaterMarkRows
        ) {
            return;
        }
        this._handlersObject.resumeReadingHandler(false);
    }

    _checkBelowHighWaterMark() {
        if (
            !this._handlersObject ||
            !this._handlersObject.resumeReadingHandler
        ) {
            return;
        }
        if (
            this._highWaterMarkRows === 0 ||
            this.buffer.length >= this._highWaterMarkRows
        ) {
            return;
        }
        // The consumer has dequeued below the watermark
        this._handlersObject.resumeReadingHandler(true);
    }

    /**
     * When continuous paging is enabled, allows the client to notify to the server to stop pushing further pages.
     *
     * Note: This is not part of the public API yet.
     * @param {Function} [callback] The cancel method accepts an optional callback.
     * @example <caption>Cancelling a continuous paging execution</caption>
     * const stream = client.stream(query, params, { prepare: true, continuousPaging: true });
     * // ...
     * // Ask the server to stop pushing rows.
     * stream.cancel();
     * @ignore
     */
    cancel(callback) {
        if (!this._cancelAllowed) {
            const err = new Error(
                "You can only cancel streaming executions when continuous paging is enabled",
            );
            if (!callback) {
                throw err;
            }
            return callback(err);
        }
        if (!this._handlersObject) {
            throw new errors.DriverInternalError(
                "ResultStream cancel is allowed but the cancel options were not set",
            );
        }
        callback = callback || utils.noop;
        if (!this._handlersObject.cancelHandler) {
            // The handler is not yet set
            // Set the callback as a flag to identify that the cancel handler must be invoked when set
            this._handlersObject.cancelHandler = callback;
            return;
        }
        this._handlersObject.cancelHandler(callback);
    }

    /**
     * Sets the pointer to the handler to be used to cancel the continuous page execution.
     * @param options
     * @internal
     * @ignore
     */
    setHandlers(options) {
        if (!options.continuousPaging) {
            return;
        }
        this._cancelAllowed = true;
        this._handlersObject = options;
        this._highWaterMarkRows =
            options.continuousPaging.highWaterMarkRows ||
            clientOptions.continuousPageDefaultHighWaterMark;
    }
}

module.exports = ResultStream;