Source: metadata/index.js

"use strict";

const events = require("events");
const util = require("util");

/**
 * Module containing classes and fields related to metadata.
 * @module metadata
 */

const utils = require("../utils");
const errors = require("../errors");
const types = require("../types");
const schemaParserFactory = require("./schema-parser");
const promiseUtils = require("../promise-utils");
const { TokenRange } = require("../token");

/**
 * @const
 * @private
 */
const _selectTraceSession =
    "SELECT * FROM system_traces.sessions WHERE session_id=%s";
/**
 * @const
 * @private
 */
const _selectTraceEvents =
    "SELECT * FROM system_traces.events WHERE session_id=%s";
/**
 * @const
 * @private
 */
const _selectSchemaVersionPeers = "SELECT schema_version FROM system.peers";
/**
 * @const
 * @private
 */
const _selectSchemaVersionLocal = "SELECT schema_version FROM system.local";
/**
 * @const
 * @private
 */
const _traceMaxAttemps = 5;
/**
 * @const
 * @private
 */
const _traceAttemptDelay = 400;

/**
 * Represents cluster and schema information.
 * The metadata class acts as a internal state of the driver.
 */
class Metadata {
    /**
     * Creates a new instance of {@link Metadata}.
     * @param {ClientOptions} options
     * @param {ControlConnection} controlConnection Control connection used to retrieve information.
     */
    constructor(options, controlConnection) {
        if (!options) {
            throw new errors.ArgumentError("Options are not defined");
        }

        Object.defineProperty(this, "options", {
            value: options,
            enumerable: false,
            writable: false,
        });
        Object.defineProperty(this, "controlConnection", {
            value: controlConnection,
            enumerable: false,
            writable: false,
        });
        this.keyspaces = {};
        this.initialized = false;
        this._isDbaas = false;
        this._schemaParser = schemaParserFactory.getByVersion(
            options,
            controlConnection,
            this.getUdt.bind(this),
        );
        this.log = utils.log;
        this._preparedQueries = new PreparedQueries(
            options.maxPrepared,
            (...args) => this.log(...args),
        );
    }

    /**
     * Sets the cassandra version
     * @internal
     * @ignore
     * @param {Array.<Number>} version
     */
    setCassandraVersion(version) {
        this._schemaParser = schemaParserFactory.getByVersion(
            this.options,
            this.controlConnection,
            this.getUdt.bind(this),
            version,
            this._schemaParser,
        );
    }

    /**
     * Determines whether the cluster is provided as a service.
     * @returns {boolean} true when the cluster is provided as a service (DataStax Astra), <code>false<code> when it's a
     * different deployment (on-prem).
     */
    isDbaas() {
        return this._isDbaas;
    }

    /**
     * Sets the product type as DBaaS.
     * @internal
     * @ignore
     */
    setProductTypeAsDbaas() {
        this._isDbaas = true;
    }

    /**
     * Populates the information regarding primary replica per token, datacenters (+ racks) and sorted token ring.
     * @ignore
     * @param {HostMap} hosts
     */
    buildTokens(hosts) {
        if (!this.tokenizer) {
            return this.log("error", "Tokenizer could not be determined");
        }
        // Get a sorted array of tokens
        const allSorted = [];
        // Get a map of <token, primaryHost>
        const primaryReplicas = {};
        // Depending on the amount of tokens, this could be an expensive operation
        const hostArray = hosts.values();
        const stringify = this.tokenizer.stringify;
        const datacenters = {};
        hostArray.forEach((h) => {
            if (!h.tokens) {
                return;
            }
            h.tokens.forEach((tokenString) => {
                const token = this.tokenizer.parse(tokenString);
                utils.insertSorted(allSorted, token, (t1, t2) =>
                    t1.compare(t2),
                );
                primaryReplicas[stringify(token)] = h;
            });
            let dc = datacenters[h.datacenter];
            if (!dc) {
                dc = datacenters[h.datacenter] = {
                    hostLength: 0,
                    racks: new utils.HashSet(),
                };
            }
            dc.hostLength++;
            dc.racks.add(h.rack);
        });
        // Primary replica for given token
        this.primaryReplicas = primaryReplicas;
        // All the tokens in ring order
        this.ring = allSorted;
        // Build TokenRanges.
        const tokenRanges = new Set();
        if (this.ring.length === 1) {
            // If there is only one token, return the range ]minToken, minToken]
            const min = this.tokenizer.minToken();
            tokenRanges.add(new TokenRange(min, min, this.tokenizer));
        } else {
            for (let i = 0; i < this.ring.length; i++) {
                const start = this.ring[i];
                const end = this.ring[(i + 1) % this.ring.length];
                tokenRanges.add(new TokenRange(start, end, this.tokenizer));
            }
        }
        this.tokenRanges = tokenRanges;
        // Compute string versions as it's potentially expensive and frequently reused later
        this.ringTokensAsStrings = new Array(allSorted.length);
        for (let i = 0; i < allSorted.length; i++) {
            this.ringTokensAsStrings[i] = stringify(allSorted[i]);
        }
        // Datacenter metadata (host length and racks)
        this.datacenters = datacenters;
    }

    /**
     * Gets the keyspace metadata information and updates the internal state of the driver.
     *
     * If a `callback` is provided, the callback is invoked when the keyspaces metadata refresh completes.
     * Otherwise, it returns a `Promise`.
     * @param {String} name Name of the keyspace.
     * @param {Function} [callback] Optional callback.
     */
    refreshKeyspace(name, callback) {
        return promiseUtils.optionalCallback(
            this._refreshKeyspace(name),
            callback,
        );
    }

    /**
     * @param {String} name
     * @private
     */
    async _refreshKeyspace(name) {
        if (!this.initialized) {
            throw this._uninitializedError();
        }
        this.log("info", util.format("Retrieving keyspace %s metadata", name));
        try {
            const ksInfo = await this._schemaParser.getKeyspace(name);
            if (!ksInfo) {
                // the keyspace was dropped
                delete this.keyspaces[name];
                return null;
            }
            // Tokens are lazily init on the keyspace, once a replica from that keyspace is retrieved.
            this.keyspaces[ksInfo.name] = ksInfo;
            return ksInfo;
        } catch (err) {
            this.log(
                "error",
                "There was an error while trying to retrieve keyspace information",
                err,
            );
            throw err;
        }
    }

    /**
     * Gets the metadata information of all the keyspaces and updates the internal state of the driver.
     *
     * If a `callback` is provided, the callback is invoked when the keyspace metadata refresh completes.
     * Otherwise, it returns a `Promise`.
     * @param {Boolean|Function} [waitReconnect] Determines if it should wait for reconnection in case the control connection is not
     * connected at the moment. Default: true.
     * @param {Function} [callback] Optional callback.
     */
    refreshKeyspaces(waitReconnect, callback) {
        if (
            typeof waitReconnect === "function" ||
            typeof waitReconnect === "undefined"
        ) {
            callback = waitReconnect;
            waitReconnect = true;
        }
        if (!this.initialized) {
            const err = this._uninitializedError();
            if (callback) {
                return callback(err);
            }
            return Promise.reject(err);
        }
        return promiseUtils.optionalCallback(
            this.refreshKeyspacesInternal(waitReconnect),
            callback,
        );
    }

    /**
     * @param {Boolean} waitReconnect
     * @returns {Promise<Object<string, Object>>}
     * @ignore
     * @internal
     */
    async refreshKeyspacesInternal(waitReconnect) {
        this.log("info", "Retrieving keyspaces metadata");
        try {
            this.keyspaces =
                await this._schemaParser.getKeyspaces(waitReconnect);
            return this.keyspaces;
        } catch (err) {
            this.log(
                "error",
                "There was an error while trying to retrieve keyspaces information",
                err,
            );
            throw err;
        }
    }

    _getKeyspaceReplicas(keyspace) {
        if (!keyspace.replicas) {
            // Calculate replicas the first time for the keyspace
            keyspace.replicas = keyspace.tokenToReplica(
                this.tokenizer,
                this.ringTokensAsStrings,
                this.primaryReplicas,
                this.datacenters,
            );
        }
        return keyspace.replicas;
    }

    /**
     * Gets the host list representing the replicas that contain the given partition key, token or token range.
     * 
     * It uses the pre-loaded keyspace metadata to retrieve the replicas for a token for a given keyspace.
     * When the keyspace metadata has not been loaded, it returns null.
     * @param {String} keyspaceName
     * @param {Buffer|Token|TokenRange} token Can be Buffer (serialized partition key), Token or TokenRange
     * @returns {Array}
     */
    getReplicas(keyspaceName, token) {
        if (!this.ring) {
            return null;
        }
        if (Buffer.isBuffer(token)) {
            token = this.tokenizer.hash(token);
        }
        if (token instanceof TokenRange) {
            token = token.end;
        }
        let keyspace;
        if (keyspaceName) {
            keyspace = this.keyspaces[keyspaceName];
            if (!keyspace) {
                // the keyspace was not found, the metadata should be loaded beforehand
                return null;
            }
        }
        let i = utils.binarySearch(this.ring, token, (t1, t2) =>
            t1.compare(t2),
        );
        if (i < 0) {
            i = ~i;
        }
        if (i >= this.ring.length) {
            // it circled back
            i = i % this.ring.length;
        }
        const closestToken = this.ringTokensAsStrings[i];
        if (!keyspaceName) {
            return [this.primaryReplicas[closestToken]];
        }
        const replicas = this._getKeyspaceReplicas(keyspace);
        return replicas[closestToken];
    }

    /**
     * Gets the token ranges that define data distribution in the ring.
     * @returns {Set<TokenRange>} The ranges of the ring or empty set if schema metadata is not enabled.
     */
    getTokenRanges() {
        return this.tokenRanges;
    }

    /**
     * Gets the token ranges that are replicated on the given host, for
     * the given keyspace.
     * @param {String} keyspaceName The name of the keyspace to get ranges for.
     * @param {Host} host The host.
     * @returns {Set<TokenRange>|null} Ranges for the keyspace on this host or null if keyspace isn't found or hasn't been loaded.
     */
    getTokenRangesForHost(keyspaceName, host) {
        if (!this.ring) {
            return null;
        }
        let keyspace;
        if (keyspaceName) {
            keyspace = this.keyspaces[keyspaceName];
            if (!keyspace) {
                // the keyspace was not found, the metadata should be loaded beforehand
                return null;
            }
        }
        // If the ring has only 1 token, just return the ranges as we should only have a single node cluster.
        if (this.ring.length === 1) {
            return this.getTokenRanges();
        }
        const replicas = this._getKeyspaceReplicas(keyspace);
        const ranges = new Set();
        // for each range, find replicas for end token, if replicas include host, add range.
        this.tokenRanges.forEach((tokenRange) => {
            const replicasForToken =
                replicas[this.tokenizer.stringify(tokenRange.end)];
            if (replicasForToken.indexOf(host) !== -1) {
                ranges.add(tokenRange);
            }
        });
        return ranges;
    }

    /**
     * Constructs a Token from the input buffer(s) or string input.  If a string is passed in
     * it is assumed this matches the token representation reported by cassandra.
     * @param {Array<Buffer>|Buffer|String} components
     * @returns {Token} constructed token from the input buffer.
     */
    newToken(components) {
        if (!this.tokenizer) {
            throw new Error(
                "Partitioner not established.  This should only happen if metadata was disabled or you have not connected yet.",
            );
        }
        if (Array.isArray(components)) {
            return this.tokenizer.hash(Buffer.concat(components));
        } else if (util.isString(components)) {
            return this.tokenizer.parse(components);
        }
        return this.tokenizer.hash(components);
    }

    /**
     * Constructs a TokenRange from the given start and end tokens.
     * @param {Token} start
     * @param {Token} end
     * @returns TokenRange build range spanning from start (exclusive) to end (inclusive).
     */
    newTokenRange(start, end) {
        if (!this.tokenizer) {
            throw new Error(
                "Partitioner not established.  This should only happen if metadata was disabled or you have not connected yet.",
            );
        }
        return new TokenRange(start, end, this.tokenizer);
    }

    /**
     * Gets the metadata information already stored associated to a prepared statement
     * @param {String} keyspaceName
     * @param {String} query
     * @internal
     * @ignore
     */
    getPreparedInfo(keyspaceName, query) {
        return this._preparedQueries.getOrAdd(keyspaceName, query);
    }

    /**
     * Clears the internal state related to the prepared statements.
     * Following calls to the Client using the prepare flag will re-prepare the statements.
     */
    clearPrepared() {
        this._preparedQueries.clear();
    }

    /** @ignore */
    getPreparedById(id) {
        return this._preparedQueries.getById(id);
    }

    /** @ignore */
    setPreparedById(info) {
        return this._preparedQueries.setById(info);
    }

    /** @ignore */
    getAllPrepared() {
        return this._preparedQueries.getAll();
    }

    /** @ignore */
    _uninitializedError() {
        return new Error(
            "Metadata has not been initialized.  This could only happen if you have not connected yet.",
        );
    }

    /**
     * Gets the definition of an user-defined type.
     *
     * If a `callback` is provided, the callback is invoked when the metadata retrieval completes.
     * Otherwise, it returns a `Promise`.
     *
     * When trying to retrieve the same UDT definition concurrently, it will query once and invoke all callbacks
     * with the retrieved information.
     * @param {String} keyspaceName Name of the keyspace.
     * @param {String} name Name of the UDT.
     * @param {Function} [callback] The callback to invoke when retrieval completes.
     */
    getUdt(keyspaceName, name, callback) {
        return promiseUtils.optionalCallback(
            this._getUdt(keyspaceName, name),
            callback,
        );
    }

    /**
     * @param {String} keyspaceName
     * @param {String} name
     * @returns {Promise<Object|null>}
     * @private
     */
    async _getUdt(keyspaceName, name) {
        if (!this.initialized) {
            throw this._uninitializedError();
        }
        let cache;
        if (this.options.isMetadataSyncEnabled) {
            const keyspace = this.keyspaces[keyspaceName];
            if (!keyspace) {
                return null;
            }
            cache = keyspace.udts;
        }
        return await this._schemaParser.getUdt(keyspaceName, name, cache);
    }

    /**
     * Gets the definition of a table.
     *
     * If a `callback` is provided, the callback is invoked when the metadata retrieval completes.
     * Otherwise, it returns a `Promise`.
     *
     * When trying to retrieve the same table definition concurrently, it will query once and invoke all callbacks
     * with the retrieved information.
     * @param {String} keyspaceName Name of the keyspace.
     * @param {String} name Name of the Table.
     * @param {Function} [callback] The callback with the err as a first parameter and the {@link TableMetadata} as
     * second parameter.
     */
    getTable(keyspaceName, name, callback) {
        return promiseUtils.optionalCallback(
            this._getTable(keyspaceName, name),
            callback,
        );
    }

    /**
     * @param {String} keyspaceName
     * @param {String} name
     * @private
     */
    async _getTable(keyspaceName, name) {
        if (!this.initialized) {
            throw this._uninitializedError();
        }
        let cache;
        let virtual;
        if (this.options.isMetadataSyncEnabled) {
            const keyspace = this.keyspaces[keyspaceName];
            if (!keyspace) {
                return null;
            }
            cache = keyspace.tables;
            virtual = keyspace.virtual;
        }
        return await this._schemaParser.getTable(
            keyspaceName,
            name,
            cache,
            virtual,
        );
    }

    /**
     * Gets the definition of CQL functions for a given name.
     *
     *  If a `callback` is provided, the callback is invoked when the metadata retrieval completes.
     *  Otherwise, it returns a `Promise`.
     *
     * When trying to retrieve the same function definition concurrently, it will query once and invoke all callbacks
     * with the retrieved information.
     * @param {String} keyspaceName Name of the keyspace.
     * @param {String} name Name of the Function.
     * @param {Function} [callback] The callback with the err as a first parameter and the array of {@link SchemaFunction}
     * as second parameter.
     */
    getFunctions(keyspaceName, name, callback) {
        return promiseUtils.optionalCallback(
            this._getFunctionsWrapper(keyspaceName, name),
            callback,
        );
    }

    /**
     * @param {String} keyspaceName
     * @param {String} name
     * @private
     */
    async _getFunctionsWrapper(keyspaceName, name) {
        if (!keyspaceName || !name) {
            throw new errors.ArgumentError(
                "You must provide the keyspace name and cql function name to retrieve the metadata",
            );
        }
        const functionsMap = await this._getFunctions(
            keyspaceName,
            name,
            false,
        );
        return Array.from(functionsMap.values());
    }

    /**
     * Gets a definition of CQL function for a given name and signature.
     * 
     * If a `callback` is provided, the callback is invoked when the metadata retrieval completes.
     * Otherwise, it returns a `Promise`.
     *
     * When trying to retrieve the same function definition concurrently, it will query once and invoke all callbacks
     * with the retrieved information.
     * @param {String} keyspaceName Name of the keyspace
     * @param {String} name Name of the Function
     * @param {Array.<String>|Array.<{code, info}>} signature Array of types of the parameters.
     * @param {Function} [callback] The callback with the err as a first parameter and the {@link SchemaFunction} as second
     * parameter.
     */
    getFunction(keyspaceName, name, signature, callback) {
        return promiseUtils.optionalCallback(
            this._getSingleFunction(keyspaceName, name, signature, false),
            callback,
        );
    }

    /**
     * Gets the definition of CQL aggregate for a given name.
     *
     * If a `callback` is provided, the callback is invoked when the metadata retrieval completes.
     * Otherwise, it returns a `Promise`.
     *
     * When trying to retrieve the same aggregates definition concurrently, it will query once and invoke all callbacks
     * with the retrieved information.
     * @param {String} keyspaceName Name of the keyspace
     * @param {String} name Name of the Function
     * @param {Function} [callback] The callback with the err as a first parameter and the array of {@link Aggregate} as
     * second parameter.
     */
    getAggregates(keyspaceName, name, callback) {
        return promiseUtils.optionalCallback(
            this._getAggregates(keyspaceName, name),
            callback,
        );
    }

    /**
     * @param {String} keyspaceName
     * @param {String} name
     * @private
     */
    async _getAggregates(keyspaceName, name) {
        if (!keyspaceName || !name) {
            throw new errors.ArgumentError(
                "You must provide the keyspace name and cql aggregate name to retrieve the metadata",
            );
        }
        const functionsMap = await this._getFunctions(keyspaceName, name, true);
        return Array.from(functionsMap.values());
    }

    /**
     * Gets a definition of CQL aggregate for a given name and signature.
     *
     * If a `callback` is provided, the callback is invoked when the metadata retrieval completes.
     * Otherwise, it returns a `Promise`.
     *
     * When trying to retrieve the same aggregate definition concurrently, it will query once and invoke all callbacks
     * with the retrieved information.
     * @param {String} keyspaceName Name of the keyspace
     * @param {String} name Name of the aggregate
     * @param {Array.<String>|Array.<{code, info}>} signature Array of types of the parameters.
     * @param {Function} [callback] The callback with the err as a first parameter and the {@link Aggregate} as second parameter.
     */
    getAggregate(keyspaceName, name, signature, callback) {
        return promiseUtils.optionalCallback(
            this._getSingleFunction(keyspaceName, name, signature, true),
            callback,
        );
    }

    /**
     * Gets the definition of a CQL materialized view for a given name.
     *
     * If a `callback` is provided, the callback is invoked when the metadata retrieval completes.
     * Otherwise, it returns a `Promise`.
     *
     * Note that, unlike the rest of the {@link Metadata} methods, this method does not cache the result for following
     * calls, as the current version of the Cassandra native protocol does not support schema change events for
     * materialized views. Each call to this method will produce one or more queries to the cluster.
     * @param {String} keyspaceName Name of the keyspace
     * @param {String} name Name of the materialized view
     * @param {Function} [callback] The callback with the err as a first parameter and the {@link MaterializedView} as
     * second parameter.
     */
    getMaterializedView(keyspaceName, name, callback) {
        return promiseUtils.optionalCallback(
            this._getMaterializedView(keyspaceName, name),
            callback,
        );
    }

    /**
     * @param {String} keyspaceName
     * @param {String} name
     * @returns {Promise<MaterializedView|null>}
     * @private
     */
    async _getMaterializedView(keyspaceName, name) {
        if (!this.initialized) {
            throw this._uninitializedError();
        }
        let cache;
        if (this.options.isMetadataSyncEnabled) {
            const keyspace = this.keyspaces[keyspaceName];
            if (!keyspace) {
                return null;
            }
            cache = keyspace.views;
        }
        return await this._schemaParser.getMaterializedView(
            keyspaceName,
            name,
            cache,
        );
    }

    /**
     * Gets a map of cql function definitions or aggregates based on signature.
     * @param {String} keyspaceName
     * @param {String} name Name of the function or aggregate
     * @param {Boolean} aggregate
     * @returns {Promise<Map>}
     * @private
     */
    async _getFunctions(keyspaceName, name, aggregate) {
        if (!this.initialized) {
            throw this._uninitializedError();
        }
        let cache;
        if (this.options.isMetadataSyncEnabled) {
            const keyspace = this.keyspaces[keyspaceName];
            if (!keyspace) {
                return new Map();
            }
            cache = aggregate ? keyspace.aggregates : keyspace.functions;
        }
        return await this._schemaParser.getFunctions(
            keyspaceName,
            name,
            aggregate,
            cache,
        );
    }

    /**
     * Gets a single cql function or aggregate definition
     * @param {String} keyspaceName
     * @param {String} name
     * @param {Array} signature
     * @param {Boolean} aggregate
     * @returns {Promise<SchemaFunction|Aggregate|null>}
     * @private
     */
    async _getSingleFunction(keyspaceName, name, signature, aggregate) {
        if (!keyspaceName || !name) {
            throw new errors.ArgumentError(
                "You must provide the keyspace name and cql function name to retrieve the metadata",
            );
        }
        if (!Array.isArray(signature)) {
            throw new errors.ArgumentError(
                "Signature must be an array of types",
            );
        }
        signature = signature.map((item) => {
            if (typeof item === "string") {
                return item;
            }
            return types.getDataTypeNameByCode(item);
        });
        const functionsMap = await this._getFunctions(
            keyspaceName,
            name,
            aggregate,
        );
        return functionsMap.get(signature.join(",")) || null;
    }

    /**
     * Gets the trace session generated by Cassandra when query tracing is enabled for the
     * query. The trace itself is stored in Cassandra in the `sessions` and
     * `events` table in the `system_traces` keyspace and can be
     * retrieve manually using the trace identifier.
     *
     * If a `callback` is provided, the callback is invoked when the metadata retrieval completes.
     * Otherwise, it returns a `Promise`.
     * @param {Uuid} traceId Identifier of the trace session.
     * @param {Number} [consistency] The consistency level to obtain the trace.
     * @param {Function} [callback] The callback with the err as first parameter and the query trace as second parameter.
     */
    getTrace(traceId, consistency, callback) {
        if (!callback && typeof consistency === "function") {
            // Both callback and consistency are optional parameters
            // In this case, the second parameter is the callback
            callback = consistency;
            consistency = null;
        }

        return promiseUtils.optionalCallback(
            this._getTrace(traceId, consistency),
            callback,
        );
    }

    /**
     * @param {Uuid} traceId
     * @param {Number} consistency
     * @returns {Promise<Object>}
     * @private
     */
    async _getTrace() {
        throw new Error(`TODO: Not implemented`);
    }

    /**
     * Checks whether hosts that are currently up agree on the schema definition.
     *
     * This method performs a one-time check only, without any form of retry; therefore
     * `protocolOptions.maxSchemaAgreementWaitSeconds` setting does not apply in this case.
     * @param {Function} [callback] A function that is invoked with a value
     * `true` when all hosts agree on the schema and `false` when there is no agreement or when
     * the check could not be performed (for example, if the control connection is down).
     * @returns {Promise} Returns a `Promise` when a callback is not provided. The promise resolves to
     * `true` when all hosts agree on the schema and `false` when there is no agreement or when
     * the check could not be performed (for example, if the control connection is down).
     */
    checkSchemaAgreement(callback) {
        return promiseUtils.optionalCallback(
            this._checkSchemaAgreement(),
            callback,
        );
    }

    /**
     * Async-only version of check schema agreement.
     * @private
     */
    async _checkSchemaAgreement() {
        const connection = this.controlConnection.connection;
        if (!connection) {
            return false;
        }
        try {
            return await this.compareSchemaVersions(connection);
        } catch (err) {
            return false;
        }
    }

    /**
     * Uses the metadata to fill the user provided parameter hints
     * @param {String} keyspace
     * @param {Array} hints
     * @internal
     * @ignore
     */
    async adaptUserHints(keyspace, hints) {
        if (!Array.isArray(hints)) {
            return;
        }
        const udts = [];
        // Check for udts and get the metadata
        for (let i = 0; i < hints.length; i++) {
            const hint = hints[i];
            if (typeof hint !== "string") {
                continue;
            }

            const type = types.dataTypes.getByName(hint);
            this._checkUdtTypes(udts, type, keyspace);
            hints[i] = type;
        }

        for (const type of udts) {
            const udtInfo = await this.getUdt(
                type.info.keyspace,
                type.info.name,
            );
            if (!udtInfo) {
                throw new TypeError(
                    "User defined type not found: " +
                    type.info.keyspace +
                    "." +
                    type.info.name,
                );
            }
            type.info = udtInfo;
        }
    }

    /**
     * @param {Array} udts
     * @param {{code, info}} type
     * @param {string} keyspace
     * @private
     */
    _checkUdtTypes(udts, type, keyspace) {
        if (type.code === types.dataTypes.udt) {
            const udtName = type.info.split(".");
            type.info = {
                keyspace: udtName[0],
                name: udtName[1],
            };
            if (!type.info.name) {
                if (!keyspace) {
                    throw new TypeError(
                        "No keyspace specified for udt: " + udtName.join("."),
                    );
                }
                // use the provided keyspace
                type.info.name = type.info.keyspace;
                type.info.keyspace = keyspace;
            }
            udts.push(type);
            return;
        }

        if (!type.info) {
            return;
        }
        if (
            type.code === types.dataTypes.list ||
            type.code === types.dataTypes.set
        ) {
            return this._checkUdtTypes(udts, type.info, keyspace);
        }
        if (type.code === types.dataTypes.map) {
            this._checkUdtTypes(udts, type.info[0], keyspace);
            this._checkUdtTypes(udts, type.info[1], keyspace);
        }
    }

    /**
     * Uses the provided connection to query the schema versions and compare them.
     * @param {Connection} connection
     * @internal
     * @ignore
     */
    async compareSchemaVersions() {
        throw new Error(`TODO: Not implemented`);
    }
}

/**
 * Allows to store prepared queries and retrieval by query or query id.
 * @ignore
 */
class PreparedQueries {
    /**
     * @param {Number} maxPrepared
     * @param {Function} logger
     */
    constructor(maxPrepared, logger) {
        this.length = 0;
        this._maxPrepared = maxPrepared;
        this._mapByKey = new Map();
        this._mapById = new Map();
        this._logger = logger;
    }

    _getKey(keyspace, query) {
        return (keyspace || "") + query;
    }

    getOrAdd(keyspace, query) {
        const key = this._getKey(keyspace, query);
        let info = this._mapByKey.get(key);
        if (info) {
            return info;
        }

        this._validateOverflow();

        info = new events.EventEmitter();
        info.setMaxListeners(0);
        info.query = query;
        // The keyspace in which it was prepared
        info.keyspace = keyspace;
        this._mapByKey.set(key, info);
        this.length++;
        return info;
    }

    _validateOverflow() {
        if (this.length < this._maxPrepared) {
            return;
        }

        const toRemove = [];
        this._logger(
            "warning",
            "Prepared statements exceeded maximum. This could be caused by preparing queries that contain parameters",
        );

        const toRemoveLength = this.length - this._maxPrepared + 1;

        for (const [key, info] of this._mapByKey) {
            if (!info.queryId) {
                // Only remove queries that contain queryId
                continue;
            }

            const length = toRemove.push([key, info]);
            if (length >= toRemoveLength) {
                break;
            }
        }

        for (const [key, info] of toRemove) {
            this._mapByKey.delete(key);
            this._mapById.delete(info.queryId.toString("hex"));
            this.length--;
        }
    }

    setById(info) {
        this._mapById.set(info.queryId.toString("hex"), info);
    }

    getById(id) {
        return this._mapById.get(id.toString("hex"));
    }

    clear() {
        this._mapByKey = new Map();
        this._mapById = new Map();
        this.length = 0;
    }

    getAll() {
        return Array.from(this._mapByKey.values()).filter(
            (info) => !!info.queryId,
        );
    }
}

module.exports = Metadata;