"use strict";
const _Client = require("../client");
const utils = require("../utils");
const { Stream } = require("stream");
const { PreparedCache } = require("../cache");
/**
* Utilities for concurrent query execution with the DataStax Node.js Driver.
* @module concurrent
*/
/**
* Executes multiple queries concurrently at the defined concurrency level.
* @static
* @param {Client} client The {@link Client} instance.
* @param {String|Array<{query, params}>} query The query to execute per each parameter item.
* @param {Array<Array>|Stream|Object} parameters An {@link Array} or a readable {@link Stream} composed of {@link Array}
* items representing each individual set of parameters. Per each item in the {@link Array} or {@link Stream}, an
* execution is going to be made.
* @param {Object} [options] The execution options.
* @param {String} [options.executionProfile] The execution profile to be used.
* @param {Number} [options.concurrencyLevel=100] The concurrency level to determine the maximum amount of in-flight
* operations at any given time
* @param {Boolean} [options.raiseOnFirstError=true] Determines whether execution should stop after the first failed
* execution and the corresponding exception will be raised.
* @param {Boolean} [options.collectResults=false] Determines whether each individual
* [ResultSet]{@link module:types~ResultSet} instance should be collected in the grouped result.
* @param {Number} [options.maxErrors=100] The maximum amount of errors to be collected before ignoring the rest of
* the error results.
* @returns {Promise<ResultSetGroup>} A `Promise` of {@link ResultSetGroup} that is resolved when all the
* executions completed and it's rejected when `raiseOnFirstError` is `true` and there is one
* or more failures.
* @example <caption>Using a fixed query and an Array of Arrays as parameters</caption>
* const query = 'INSERT INTO table1 (id, value) VALUES (?, ?)';
* const parameters = [[1, 'a'], [2, 'b'], [3, 'c'], ]; // ...
* const result = await executeConcurrent(client, query, parameters);
* @example <caption>Using a fixed query and a readable stream</caption>
* const stream = csvStream.pipe(transformLineToArrayStream);
* const result = await executeConcurrent(client, query, stream);
* @example <caption>Using a different queries</caption>
* const queryAndParameters = [
* { query: 'INSERT INTO videos (id, name, user_id) VALUES (?, ?, ?)',
* params: [ id, name, userId ] },
* { query: 'INSERT INTO user_videos (user_id, id, name) VALUES (?, ?, ?)',
* params: [ userId, id, name ] },
* { query: 'INSERT INTO latest_videos (id, name, user_id) VALUES (?, ?, ?)',
* params: [ id, name, userId ] },
* ];
*
* const result = await executeConcurrent(client, queryAndParameters);
*/
function executeConcurrent(client, query, parameters, options) {
if (!client) {
throw new TypeError("Client instance is not defined");
}
if (typeof query === "string") {
if (Array.isArray(parameters)) {
return new ArrayBasedExecutor(
client,
query,
parameters,
options,
).execute();
}
if (parameters instanceof Stream) {
return new StreamBasedExecutor(
client,
query,
parameters,
options,
).execute();
}
throw new TypeError(
"parameters should be an Array or a Stream instance",
);
}
if (Array.isArray(query)) {
options = parameters;
return new ArrayBasedExecutor(client, null, query, options).execute();
}
throw new TypeError(
"A string query or query and parameters array should be provided",
);
}
/**
* Wraps the functionality to execute given an Array.
* @ignore
*/
class ArrayBasedExecutor {
/**
* @param {_Client} client
* @param {String} query
* @param {Array<Array>|Array<{query, params}>} parameters
* @param {Object} [options] The execution options.
* @private
*/
constructor(client, query, parameters, options) {
this._client = client;
this._query = query;
this._parameters = parameters;
options = options || utils.emptyObject;
this._raiseOnFirstError = options.raiseOnFirstError !== false;
this._concurrencyLevel = Math.min(
options.concurrencyLevel || 100,
this._parameters.length,
);
// Create ExecutionOptions here, to avoid creation of new
// rust QueryOptionsWrapper for each of the executed queries.
this._queryOptions = client.createOptions({
prepare: true,
executionProfile: options.executionProfile,
});
this._result = new ResultSetGroup(options);
this._stop = false;
this._cache = new PreparedCache();
}
async execute() {
const promises = new Array(this._concurrencyLevel);
for (let i = 0; i < this._concurrencyLevel; i++) {
promises[i] = this._executeOneAtATime(i, 0);
}
await Promise.all(promises);
return this._result;
}
async _executeOneAtATime(initialIndex, iteration) {
const index = initialIndex + this._concurrencyLevel * iteration;
if (index >= this._parameters.length || this._stop) {
return Promise.resolve();
}
const item = this._parameters[index];
let query;
let params;
if (this._query === null) {
query = item.query;
params = item.params;
} else {
query = this._query;
params = item;
}
let prepared = this._cache.getElement(query);
if (!prepared) {
prepared = await (this._client.prepareQuery(query));
this._cache.storeElement(query, prepared);
}
// In NodeJS Promises are sqashed
return this._client
.rustyExecute(prepared, params, this._queryOptions)
.then((rs) => this._result.setResultItem(index, rs))
.catch((err) => this._setError(index, err))
.then(() => this._executeOneAtATime(initialIndex, iteration + 1));
}
_setError(index, err) {
this._result.setError(index, err);
if (this._raiseOnFirstError) {
this._stop = true;
throw err;
}
}
}
/**
* Wraps the functionality to execute given a Stream.
* @ignore
*/
class StreamBasedExecutor {
/**
* @param {_Client} client
* @param {String} query
* @param {Stream} stream
* @param {Object} [options] The execution options.
* @private
*/
constructor(client, query, stream, options) {
this._client = client;
this._query = query;
this._stream = stream;
options = options || utils.emptyObject;
this._raiseOnFirstError = options.raiseOnFirstError !== false;
this._concurrencyLevel = options.concurrencyLevel || 100;
// Create ExecutionOptions here, to avoid creation of new
// rust QueryOptionsWrapper for each of the executed queries.
this._queryOptions = client.createOptions({
prepare: true,
executionProfile: options.executionProfile,
});
this._inFlight = 0;
this._index = 0;
this._result = new ResultSetGroup(options);
this._resolveCallback = null;
this._rejectCallback = null;
this._readEnded = false;
}
execute() {
return new Promise((resolve, reject) => {
this._resolveCallback = resolve;
this._rejectCallback = reject;
this._stream
.on("data", (params) => this._executeOne(params))
.on("error", (err) => this._setReadEnded(err))
.on("end", () => this._setReadEnded());
});
}
async _executeOne(params) {
if (!Array.isArray(params)) {
return this._setReadEnded(
new TypeError(
"Stream should be in objectMode and should emit Array instances",
),
);
}
if (this._readEnded) {
// Read ended abruptly because of incorrect format or error event being emitted.
// We shouldn't consider additional items.
return;
}
this._inFlight++;
const index = this._index++;
this._client
.execute(this._query, params, this._queryOptions)
.then((rs) => {
this._result.setResultItem(index, rs);
this._inFlight--;
})
.catch((err) => {
this._inFlight--;
this._setError(index, err);
})
.then(() => {;
if (this._stream.isPaused()) {
this._stream.resume();
}
if (this._readEnded && this._inFlight === 0) {
// When read ended and there are no more in-flight requests
// We yield the result to the user.
// It could have ended prematurely when there is a read error
// or there was an execution error and raiseOnFirstError is true
// In that case, calling the resolve callback has no effect
this._resolveCallback(this._result);
}
});
if (this._inFlight >= this._concurrencyLevel) {
this._stream.pause();
}
}
/**
* Marks the stream read process as ended.
* @param {Error} [err] The stream read error.
* @private
*/
_setReadEnded(err) {
if (!this._readEnded) {
this._readEnded = true;
if (err) {
// There was an error while reading from the input stream.
// This should be surfaced as a failure
this._rejectCallback(err);
} else if (this._inFlight === 0) {
// Ended signaled and there are no more pending messages.
this._resolveCallback(this._result);
}
}
}
_setError(index, err) {
this._result.setError(index, err);
if (this._raiseOnFirstError) {
this._readEnded = true;
this._rejectCallback(err);
}
}
}
/**
* Represents results from different related executions.
*/
class ResultSetGroup {
/**
* Creates a new instance of {@link ResultSetGroup}.
* @ignore
*/
constructor(options) {
this._collectResults = options.collectResults;
this._maxErrors = options.maxErrors || 100;
this.totalExecuted = 0;
this.errors = [];
if (this._collectResults) {
/**
* Gets an {@link Array} containing the [ResultSet]{@link module:types~ResultSet} instances from each execution.
*
* Note that when `collectResults` is set to `false`, accessing this property will
* throw an error.
* @type {Array}
*/
this.resultItems = [];
} else {
Object.defineProperty(this, "resultItems", {
enumerable: false,
get: () => {
throw new Error(
"Property resultItems can not be accessed when collectResults is set to false",
);
},
});
}
}
/** @ignore */
setResultItem(index, rs) {
this.totalExecuted++;
if (this._collectResults) {
this.resultItems[index] = rs;
}
}
/**
* Internal method to set the error of an execution.
* @ignore
*/
setError(index, err) {
this.totalExecuted++;
if (this.errors.length < this._maxErrors) {
this.errors.push(err);
}
if (this._collectResults) {
this.resultItems[index] = err;
}
}
}
exports.executeConcurrent = executeConcurrent;
exports.ResultSetGroup = ResultSetGroup;