Source: client.js

"use strict";

const events = require("events");
const util = require("util");
const { throwNotSupported } = require("./new-utils.js");

const utils = require("./utils.js");
const errors = require("./errors.js");
const types = require("./types");
const { ProfileManager } = require("./execution-profile");
const clientOptions = require("./client-options");
const ClientState = require("./metadata/client-state");
const description = require("../package.json").description;
const { version } = require("../package.json");
const ExecOptions = require("./execution-options");
const promiseUtils = require("./promise-utils");
const rust = require("../index");
const ResultSet = require("./types/result-set.js");
const { parseParams, convertHints } = require("./types/cql-utils.js");
const queryOptions = require("./query-options.js");
const { PreparedCache } = require("./cache.js");

/**
 * Represents a database client that maintains multiple connections to the cluster nodes, providing methods to
 * execute CQL statements.
 *
 * The `Client` uses [policies]{@link module:policies} to decide which nodes to connect to, which node
 * to use per each query execution, when it should retry failed or timed-out executions and how reconnection to down
 * nodes should be made.
 * @extends EventEmitter
 * @example <caption>Creating a new client instance</caption>
 * const client = new Client({
 *   contactPoints: ['10.0.1.101', '10.0.1.102'],
 * });
 * @example <caption>Executing a query</caption>
 * const result = await client.connect();
 * console.log(`Connected to ${client.hosts.length} nodes in the cluster: ${client.hosts.keys().join(', ')}`);
 * @example <caption>Executing a query</caption>
 * const result = await client.execute('SELECT key FROM system.local');
 * const row = result.first();
 * console.log(row['key']);
 */
class Client extends events.EventEmitter {
    /**
     * Creates a new instance of {@link Client}.
     * @param {clientOptions.ClientOptions} options The options for this instance.
     */
    constructor(options) {
        super();
        this.rustOptions = clientOptions.setRustOptions(options);

        this.options = clientOptions.extend(
            { logEmitter: this.emit.bind(this), id: types.Uuid.random() },
            options,
        );
        Object.defineProperty(this, "profileManager", {
            value: new ProfileManager(this.options),
        });
        // Unlimited amount of listeners for internal event queues by default
        this.setMaxListeners(0);
        this.connected = false;
        this.isShuttingDown = false;
        /**
         * Gets the schema and cluster metadata information.
         * TODO: This field is currently not supported
         * @type {Metadata}
         */
        this.metadata = undefined;
        /**
         * Gets an associative array of cluster hosts.
         * TODO: This field is currently not supported
         * @type {HostMap}
         */
        this.hosts = undefined;

        /**
         * The [ClientMetrics]{@link module:metrics~ClientMetrics} instance used to expose measurements of its internal
         * behavior and of the server as seen from the driver side.
         *
         * By default, a [DefaultMetrics]{@link module:metrics~DefaultMetrics} instance is used.
         * @type {ClientMetrics}
         */
        this.metrics = this.options.metrics;
    }

    /**
     * Emitted when a new host is added to the cluster.
     * - {@link Host} The host being added.
     * @event Client#hostAdd
     */
    /**
     * Emitted when a host is removed from the cluster
     * - {@link Host} The host being removed.
     * @event Client#hostRemove
     */
    /**
     * Emitted when a host in the cluster changed status from down to up.
     * - {@link Host host} The host that changed the status.
     * @event Client#hostUp
     */
    /**
     * Emitted when a host in the cluster changed status from up to down.
     * - {@link Host host} The host that changed the status.
     * @event Client#hostDown
     */

    /**
     * Gets the name of the active keyspace.
     * @type {string | undefined}
     */
    get keyspace() {
        return this.rust_client.getKeyspace();
    }

    set keyspace(_) {
        throw new SyntaxError("Client keyspace is read-only");
    }

    /**
     * Manually create final execution options, applying client and default setting.
     *
     * Creating those options requires a native call, but they can be reused
     * without any additional native calls, which improves performance
     * for queries with the same QueryOptions.
     * @param {queryOptions.QueryOptions | ExecOptions.ExecutionOptions} [options]
     * @returns {ExecOptions.ExecutionOptions}
     * @package
     */
    createOptions(options) {
        if (options instanceof ExecOptions.ExecutionOptions) {
            options.wrapOptionsIfNotWrappedYet();
            return options;
        }
        let fullOptions = ExecOptions.DefaultExecutionOptions.create(
            options,
            this,
        );
        fullOptions.wrapOptionsIfNotWrappedYet();
        return fullOptions;
    }

    /**
     * Manually prepare query into prepared statement
     * @param {string} query
     * @returns {Promise<rust.PreparedStatementWrapper>}
     * @package
     */
    async prepareQuery(query) {
        return await this.rustClient.prepareStatement(query);
    }

    /**
     * Attempts to connect to one of the [contactPoints]{@link ClientOptions} and discovers the rest the nodes of the
     * cluster.
     *
     * When the {@link Client} is already connected, it resolves immediately.
     *
     * It returns a `Promise` when a `callback` is not provided.
     * @param {function} [callback] The optional callback that is invoked when the pool is connected or it failed to
     * connect.
     * @example <caption>Usage example</caption>
     * await client.connect();
     */
    connect(callback) {
        if (this.connected && callback) {
            // Avoid creating Promise to immediately resolve them
            return callback();
        }

        return promiseUtils.optionalCallback(this.#connect(), callback);
    }

    /**
     * Async-only version of {@link Client#connect()}.
     * @private
     */
    async #connect() {
        if (this.connected) {
            return;
        }

        if (this.isShuttingDown) {
            // it is being shutdown, don't allow further calls to connect()
            throw new errors.NoHostAvailableError(
                null,
                "Connecting after shutdown is not supported",
            );
        }

        if (this.connecting) {
            return promiseUtils.fromEvent(this, "connected");
        }

        this.connecting = true;
        this.log(
            "info",
            util.format(
                "Connecting to cluster using '%s' version %s",
                description,
                version,
            ),
        );

        try {
            this.rustClient = await rust.SessionWrapper.createSession(
                this.rustOptions,
            );
        } catch (err) {
            // We should close the pools (if any) and reset the state to allow successive calls to connect()
            this.connected = false;
            this.connecting = false;
            this.emit("connected", err);
            throw err;
        }

        this.connected = true;
        this.connecting = false;
        this.emit("connected");
    }

    /**
     * Executes a query on an available connection.
     *
     * The query can be prepared (recommended) or not depending on the [prepare]{@linkcode QueryOptions} flag.
     *
     * Some execution failures can be handled transparently by the driver, according to the
     * [RetryPolicy]{@linkcode module:policies/retry~RetryPolicy} or the
     * [SpeculativeExecutionPolicy]{@linkcode module:policies/speculativeExecution} used.
     *
     * It returns a `Promise` when a `callback` is not provided.
     *
     * @param {string} query The query to execute.
     * @param {Array|Object} [params] Array of parameter values or an associative array (object) containing parameter names
     * as keys and its value.
     * @param {queryOptions.QueryOptions} [options] The query options for the execution.
     * @param {ResultCallback} [callback] Executes callback(err, result) when execution completed. When not defined, the
     * method will return a promise.
     * @example <caption>Promise-based API, using async/await</caption>
     * const query = 'SELECT name, email FROM users WHERE id = ?';
     * const result = await client.execute(query, [ id ], { prepare: true });
     * const row = result.first();
     * console.log('%s: %s', row['name'], row['email']);
     * @example <caption>Callback-based API</caption>
     * const query = 'SELECT name, email FROM users WHERE id = ?';
     * client.execute(query, [ id ], { prepare: true }, function (err, result) {
     *   assert.ifError(err);
     *   const row = result.first();
     *   console.log('%s: %s', row['name'], row['email']);
     * });
     * @see {@link ExecutionProfile} to reuse a set of options across different query executions.
     */
    execute(query, params, options, callback) {
        // This method acts as a wrapper for the async method #execute(), replaced by #rustyExecute()

        if (!callback) {
            // Set default argument values for optional parameters
            if (typeof options === "function") {
                callback = options;
                options = null;
            } else if (typeof params === "function") {
                callback = params;
                params = null;
            }
        }

        try {
            const execOptions = this.createOptions(options);
            if (execOptions.isPaged()) {
                return promiseUtils.optionalCallback(
                    this.#rustyPaged(
                        query,
                        params,
                        execOptions,
                        execOptions.getPageState(),
                    ).then((e) => e[1]),
                    callback,
                );
            }

            return promiseUtils.optionalCallback(
                this.rustyExecute(query, params, execOptions),
                callback,
            );
        } catch (err) {
            // There was an error when parsing the user options
            if (callback) {
                return callback(err);
            }

            return Promise.reject(err);
        }
    }

    /**
     * Wrapper for executing queries by rust driver
     * @param {string | rust.PreparedStatementWrapper} query
     * @param {Array} params
     * @param {ExecOptions.ExecutionOptions} execOptions
     * @returns {Promise<ResultSet>}
     * @package
     */
    async rustyExecute(query, params, execOptions) {
        if (
            // !execOptions.isPrepared() &&
            params &&
            !Array.isArray(params)
            // && !types.protocolVersion.supportsNamedParameters(version)
        ) {
            throw new Error(`TODO: Implement any support for named parameters`);
            // // Only Cassandra 2.1 and above supports named parameters
            // throw new errors.ArgumentError(
            //   "Named parameters for simple statements are not supported, use prepare flag",
            // );
        }

        if (!this.connected) {
            // TODO: Check this logic and decide if it's needed. Probably do it while implementing (better) connection
            // // Micro optimization to avoid an async execution for a simple check
            await this.#connect();
        }

        let rustOptions = execOptions.getRustOptions();
        let result;

        if (execOptions.isPrepared()) {
            // If the statement is already prepared, skip the preparation process
            // Otherwise call Rust part to prepare a statement
            let statement =
                query instanceof rust.PreparedStatementWrapper
                    ? query
                    : await this.rustClient.prepareStatement(query);

            // Parse parameters according to expected types
            let parsedParams = parseParams(
                statement.getExpectedTypes(),
                params,
                false,
            );

            // Execute query
            result = await this.rustClient.executePreparedUnpaged(
                statement,
                parsedParams,
                rustOptions,
            );
        } else {
            // We do not accept already prepared statements for unprepared queries
            if (query instanceof rust.PreparedStatementWrapper) {
                throw new Error(
                    "Unexpected prepared statement wrapper for unprepared queries",
                );
            }
            // Get expected types from user provided hints
            let expectedTypes = convertHints(execOptions.getHints() || []);
            // Parse parameters according to provided hints, with type guessing
            let parsedParams = parseParams(expectedTypes, params, true);

            // Execute query
            result = await this.rustClient.queryUnpaged(
                query,
                parsedParams,
                rustOptions,
            );
        }
        return new ResultSet(result);
    }

    /**
     * @deprecated Not supported by the driver. Usage will throw an error.
     */
    executeGraph() {
        throwNotSupported("Client.executeGraph");
    }

    /**
     * Executes the query and calls `rowCallback` for each row as soon as they are received. Calls the final
     * `callback` after all rows have been sent, or when there is an error.
     *
     * The query can be prepared (recommended) or not depending on the [prepare]{@linkcode QueryOptions} flag.
     *
     * @param {string} query The query to execute
     * @param {Array|Object} [params] Array of parameter values or an associative array (object) containing parameter names
     * as keys and its value.
     * @param {queryOptions.QueryOptions} [options] The query options.
     * @param {function} rowCallback Executes `rowCallback(n, row)` per each row received, where n is the row
     * index and row is the current Row.
     * @param {function} [callback] Executes `callback(err, result)` after all rows have been received.
     *
     * When dealing with paged results, [ResultSet#nextPage()]{@link module:types~ResultSet#nextPage} method can be used
     * to retrieve the following page. In that case, `rowCallback()` will be again called for each row and
     * the final callback will be invoked when all rows in the following page has been retrieved.
     *
     * @example <caption>Using per-row callback and arrow functions</caption>
     * client.eachRow(query, params, { prepare: true }, (n, row) => console.log(n, row), err => console.error(err));
     * @example <caption>Overloads</caption>
     * client.eachRow(query, rowCallback);
     * client.eachRow(query, params, rowCallback);
     * client.eachRow(query, params, options, rowCallback);
     * client.eachRow(query, params, rowCallback, callback);
     * client.eachRow(query, params, options, rowCallback, callback);
     */
    eachRow(query, params, options, rowCallback, callback) {
        if (!callback && rowCallback && typeof options === "function") {
            callback = utils.validateFn(rowCallback, "rowCallback");
            rowCallback = options;
        } else {
            callback = callback || utils.noop;
            rowCallback = utils.validateFn(
                rowCallback || options || params,
                "rowCallback",
            );
        }

        params = typeof params !== "function" ? params : null;

        /**
         * @type {ExecOptions.ExecutionOptions}
         */
        let execOptions;
        try {
            execOptions = ExecOptions.DefaultExecutionOptions.create(
                options,
                this,
            );
        } catch (e) {
            return callback(e);
        }

        let rowLength = 0;
        let pagingState = null;

        const nextPage = () => {
            promiseUtils.toCallback(
                this.#rustyPaged(query, params, execOptions, pagingState),
                pageCallback,
            );
        };

        /**
         * @param {Error} err
         * @param {Array<rust.PagingStateResponseWrapper, ResultSet>} result
         * Should be [rust.PagingStateResponseWrapper, ResultSet]
         */
        function pageCallback(err, result) {
            if (err) {
                return callback(err);
            }
            /**
             * Next requests in case paging (auto or explicit) is used
             */
            let lastPagingState = result[0];
            let queryResult = result[1];

            rowLength += queryResult.rowLength;

            if (queryResult.rows) {
                queryResult.rows.forEach((value, index) => {
                    rowCallback(index, value);
                });
            }

            if (lastPagingState.hasNextPage()) {
                // Use new page state as next request page state
                pagingState = lastPagingState.nextPage();

                if (execOptions.isAutoPage()) {
                    // Issue next request for the next page
                    return nextPage();
                }
                // Allows for explicit (manual) paging, in case the caller needs it
                queryResult.nextPage = nextPage;
            }

            // Finished auto-paging
            queryResult.rowLength = rowLength;
            callback(null, queryResult);
        }

        promiseUtils.toCallback(
            this.#rustyPaged(query, params, execOptions, pagingState),
            pageCallback,
        );
    }

    /**
     * Execute a single page of query
     * @param {string} query
     * @param {Array} params
     * @param {ExecOptions.ExecutionOptions} execOptions
     * @param {rust.PagingStateWrapper|Buffer} [pageState]
     * @returns {Promise<Array<rust.PagingStateResponseWrapper, ResultSet>>} should be Promise<[rust.PagingStateResponseWrapper, ResultSet]>
     * @private
     */
    async #rustyPaged(query, params, execOptions, pageState) {
        if (
            !execOptions.isPrepared() &&
            params &&
            !Array.isArray(params)
            // && !types.protocolVersion.supportsNamedParameters(version)
        ) {
            throw new Error(`TODO: Implement any support for named parameters`);
            // // Only Cassandra 2.1 and above supports named parameters
            // throw new errors.ArgumentError(
            //   "Named parameters for simple statements are not supported, use prepare flag",
            // );
        }

        if (!this.connected) {
            // TODO: Check this logic and decide if it's needed. Probably do it while implementing (better) connection
            // // Micro optimization to avoid an async execution for a simple check
            await this.#connect();
        }

        if (pageState instanceof Buffer) {
            pageState = rust.PagingStateWrapper.fromBuffer(pageState);
        }
        const rustOptions = queryOptions.queryOptionsIntoWrapper(execOptions);
        let result;
        if (execOptions.isPrepared()) {
            // If the statement is already prepared, skip the preparation process
            // Otherwise call Rust part to prepare a statement
            let statement =
                query instanceof rust.PreparedStatementWrapper
                    ? query
                    : await this.rustClient.prepareStatement(query);

            // Parse parameters according to expected types
            let parsedParams = parseParams(
                statement.getExpectedTypes(),
                params,
            );

            // Execute query
            result = await this.rustClient.executeSinglePage(
                statement,
                parsedParams,
                rustOptions,
                pageState,
            );
        } else {
            // We do not accept already prepared statements for unprepared queries
            if (query instanceof rust.PreparedStatementWrapper) {
                throw new Error(
                    "Unexpected prepared statement wrapper for unprepared queries",
                );
            }
            // Get expected types from user provided hints
            const expectedTypes = convertHints(execOptions.getHints() || []);
            // Parse parameters according to provided hints, with type guessing
            const parsedParams = parseParams(expectedTypes, params, true);
            // Execute query
            result = await this.rustClient.querySinglePage(
                query,
                parsedParams,
                rustOptions,
                pageState,
            );
        }
        // result[0] - information about page state
        // result[1] - object representing result itself
        result[1] = new ResultSet(result[1], result[0]);
        return result;
    }

    /**
     * Executes the query and pushes the rows to the result stream as soon as they received.
     *
     * The stream is a [ReadableStream]{@linkcode https://nodejs.org/api/stream.html#stream_class_stream_readable} object
     * that emits rows.
     * It can be piped downstream and provides automatic pause/resume logic (it buffers when not read).
     *
     * The query can be prepared (recommended) or not depending on {@link QueryOptions}.prepare flag. Retries on multiple
     * hosts if needed.
     *
     * @param {string} query The query to prepare and execute.
     * @param {Array|Object} [params] Array of parameter values or an associative array (object) containing parameter names
     * as keys and its value
     * @param {queryOptions.QueryOptions} [options] The query options.
     * @param {function} [callback] executes callback(err) after all rows have been received or if there is an error
     * @returns {types.types.ResultStream}
     */
    stream(query, params, options, callback) {
        callback = callback || utils.noop;
        // NOTE: the nodejs stream maintains yet another internal buffer
        // we rely on the default stream implementation to keep memory
        // usage reasonable.
        const resultStream = new types.ResultStream({ objectMode: 1 });
        function onFinish(err, result) {
            if (err) {
                resultStream.emit("error", err);
            }
            if (result && result.nextPage) {
                // allows for throttling as per the
                // default nodejs stream implementation
                resultStream._valve(function pageValve() {
                    try {
                        result.nextPage();
                    } catch (ex) {
                        resultStream.emit("error", ex);
                    }
                });
                return;
            }
            // Explicitly dropping the valve (closure)
            resultStream._valve(null);
            resultStream.add(null);
            callback(err);
        }
        let sync = true;
        this.eachRow(
            query,
            params,
            options,
            function rowCallback(n, row) {
                resultStream.add(row);
            },
            function eachRowFinished(err, result) {
                if (sync) {
                    // Prevent sync callback
                    return setImmediate(function eachRowFinishedImmediate() {
                        onFinish(err, result);
                    });
                }
                onFinish(err, result);
            },
        );
        sync = false;
        return resultStream;
    }

    /**
     * Executes batch of queries on an available connection to a host.
     *
     * It returns a `Promise` when a `callback` is not provided.
     *
     * @param {Array.<string>|Array.<{query, params}>} queries The queries to execute as an Array of strings or as an array
     * of object containing the query and params
     * @param {queryOptions.QueryOptions} [options] The query options.
     * @param {ResultCallback} [callback] Executes callback(err, result) when the batch was executed
     */
    batch(queries, options, callback) {
        if (!callback && typeof options === "function") {
            callback = options;
            options = null;
        }

        return promiseUtils.optionalCallback(
            this.#rustyBatch(queries, options),
            callback,
        );
    }

    /**
     * Async-only version of {@link Client#batch()} .
     * @param {Array.<string>|Array.<{query, params}>}queries
     * @param {queryOptions.QueryOptions} options
     * @returns {Promise<ResultSet>}
     * @private
     */
    async #rustyBatch(queries, options) {
        if (!Array.isArray(queries)) {
            throw new errors.ArgumentError("Queries should be an Array");
        }

        if (queries.length === 0) {
            throw new errors.ArgumentError("Queries array should not be empty");
        }

        await this.#connect();

        const execOptions = this.createOptions(options);

        let shouldBePrepared = execOptions.isPrepared();
        let allQueries = [];
        let parametersRows = [];
        let hints = execOptions.getHints() || [];
        let preparedCache = new PreparedCache();

        for (let i = 0; i < queries.length; i++) {
            let element = queries[i];
            if (!element) {
                throw new errors.ArgumentError(`Invalid query at index ${i}`);
            }
            /**
             * @type {rust.PreparedStatementWrapper | string}
             */
            let statement =
                typeof element === "string" ? element : element.query;
            let params = element.params || [];
            let types;

            if (!statement) {
                throw new errors.ArgumentError(`Invalid query at index ${i}`);
            }

            if (shouldBePrepared) {
                let prepared = preparedCache.getElement(statement);
                if (!prepared) {
                    prepared =
                        await this.rustClient.prepareStatement(statement);
                    preparedCache.storeElement(statement, prepared);
                }
                types = prepared.getExpectedTypes();
                statement = prepared;
            } else {
                types = convertHints(hints[i] || []);
            }

            if (params) {
                params = parseParams(types, params, shouldBePrepared === false);
            }
            allQueries.push(statement);
            parametersRows.push(params);
        }

        let rustOptions = queryOptions.queryOptionsIntoWrapper(execOptions);
        let batch = shouldBePrepared
            ? rust.createPreparedBatch(allQueries, rustOptions)
            : rust.createUnpreparedBatch(allQueries, rustOptions);
        let wrappedResult = await this.rustClient.batch(batch, parametersRows);
        return new ResultSet(wrappedResult);
    }

    /**
     * Gets the host that are replicas of a given token.
     * @param {string} keyspace
     * @param {Buffer} token
     * @returns {Array<Host>}
     */
    getReplicas(keyspace, token) {
        throw new Error(`TODO: Not implemented`);
        // return this.metadata.getReplicas(keyspace, token);
    }

    /**
     * @returns {ClientState} A dummy [ClientState]{@linkcode module:metadata~ClientState} instance.
     *
     * @deprecated This is not planned feature for the driver. Currently this remains in place, but returns Client state with
     * no information. This endpoint may be removed at any point.
     */
    getState() {
        return ClientState.from(this);
    }

    log = utils.log;

    /**
     * The only effect of calling shutdown is rejecting any following queries to the database.
     *
     * It returns a `Promise` when a `callback` is not provided.
     *
     * @param {Function} [callback] Optional callback to be invoked when finished closing all connections.
     *
     * @deprecated Explicit connection shutdown is deprecated and may be removed in the future.
     * Drop this client to close the connection to the database.
     */
    shutdown(callback) {
        return promiseUtils.optionalCallback(this.#shutdown(), callback);
    }

    /** @private */
    async #shutdown() {
        this.log(
            "warning",
            "Explicit shutdown is deprecated and may be removed in the future.\n" +
                "Drop this client to close the connection to the database.",
        );

        if (!this.connected) {
            // not initialized
            return;
        }

        if (this.connecting) {
            this.log("warning", "Shutting down while connecting");
            // wait until finish connecting for easier troubleshooting
            await promiseUtils.fromEvent(this, "connected");
        }

        this.connected = false;
        this.isShuttingDown = true;
    }

    /**
     * Reject callback
     *
     * @callback RejectCallback
     * @param {any} reason
     * @returns {void}
     */

    /**
     * Resolve callback
     *
     * @callback ResolveCallback
     * @param {ResultSet | PromiseLike<ResultSet>} value
     * @returns {void}
     */
}
/**
 * Callback used by execution methods.
 * @callback ResultCallback
 * @param {Error} err Error occurred in the execution of the query.
 * @param {ResultSet} [result] Result of the execution of the query.
 */

module.exports = Client;