diff --git a/packages/back-end/src/integrations/Athena.ts b/packages/back-end/src/integrations/Athena.ts index 497b3d831877..f5ffdf663c1b 100644 --- a/packages/back-end/src/integrations/Athena.ts +++ b/packages/back-end/src/integrations/Athena.ts @@ -1,6 +1,6 @@ import { decryptDataSourceParams } from "../services/datasource"; -import { runAthenaQuery } from "../services/athena"; -import { QueryResponse } from "../types/Integration"; +import { cancelAthenaQuery, runAthenaQuery } from "../services/athena"; +import { ExternalIdCallback, QueryResponse } from "../types/Integration"; import { AthenaConnectionParams } from "../../types/integrations/athena"; import { FormatDialect } from "../util/sql"; import SqlIntegration from "./SqlIntegration"; @@ -24,8 +24,14 @@ export default class Athena extends SqlIntegration { toTimestamp(date: Date) { return `from_iso8601_timestamp('${date.toISOString()}')`; } - runQuery(sql: string): Promise { - return runAthenaQuery(this.params, sql); + runQuery( + sql: string, + setExternalId: ExternalIdCallback + ): Promise { + return runAthenaQuery(this.params, sql, setExternalId); + } + async cancelQuery(externalId: string): Promise { + await cancelAthenaQuery(this.params, externalId); } addTime( col: string, diff --git a/packages/back-end/src/integrations/BigQuery.ts b/packages/back-end/src/integrations/BigQuery.ts index 4b779e38e3cc..534686ccf12d 100644 --- a/packages/back-end/src/integrations/BigQuery.ts +++ b/packages/back-end/src/integrations/BigQuery.ts @@ -6,6 +6,7 @@ import { decryptDataSourceParams } from "../services/datasource"; import { BigQueryConnectionParams } from "../../types/integrations/bigquery"; import { IS_CLOUD } from "../util/secrets"; import { + ExternalIdCallback, InformationSchema, QueryResponse, RawInformationSchema, @@ -49,7 +50,23 @@ export default class BigQuery extends SqlIntegration { }); } - async runQuery(sql: string): Promise { + async cancelQuery(externalId: string): Promise { + const client = this.getClient(); + const job = client.job(externalId); + + // Attempt to cancel job + const [apiResult] = await job.cancel(); + logger.debug( + `Cancelled BigQuery job ${externalId} - ${JSON.stringify( + apiResult.job?.status + )}` + ); + } + + async runQuery( + sql: string, + setExternalId?: ExternalIdCallback + ): Promise { const client = this.getClient(); const [job] = await client.createQueryJob({ @@ -57,6 +74,11 @@ export default class BigQuery extends SqlIntegration { query: sql, useLegacySql: false, }); + + if (setExternalId && job.id) { + await setExternalId(job.id); + } + const [rows] = await job.getQueryResults(); const [metadata] = await job.getMetadata(); const statistics = { diff --git a/packages/back-end/src/integrations/SqlIntegration.ts b/packages/back-end/src/integrations/SqlIntegration.ts index c32c6fa59598..c54ec5d22196 100644 --- a/packages/back-end/src/integrations/SqlIntegration.ts +++ b/packages/back-end/src/integrations/SqlIntegration.ts @@ -40,6 +40,7 @@ import { TrackedEventData, TrackedEventResponseRow, ExperimentUnitsQueryResponse, + ExternalIdCallback, } from "../types/Integration"; import { DimensionInterface } from "../../types/dimension"; import { IMPORT_LIMIT_DAYS } from "../util/secrets"; @@ -55,6 +56,7 @@ import { formatInformationSchema } from "../util/informationSchemas"; import { ExperimentSnapshotSettings } from "../../types/experiment-snapshot"; import { TemplateVariables } from "../../types/sql"; import { FactTableMap } from "../models/FactTableModel"; +import { logger } from "../util/logger"; export default abstract class SqlIntegration implements SourceIntegrationInterface { @@ -66,7 +68,13 @@ export default abstract class SqlIntegration params: any; type!: DataSourceType; abstract setParams(encryptedParams: string): void; - abstract runQuery(sql: string): Promise; + abstract runQuery( + sql: string, + setExternalId?: ExternalIdCallback + ): Promise; + async cancelQuery(externalId: string): Promise { + logger.debug(`Cancel query: ${externalId} - not implemented`); + } abstract getSensitiveParamKeys(): string[]; constructor(encryptedParams: string, settings: DataSourceSettings) { @@ -358,9 +366,10 @@ export default abstract class SqlIntegration ); } async runPastExperimentQuery( - query: string + query: string, + setExternalId: ExternalIdCallback ): Promise { - const { rows, statistics } = await this.runQuery(query); + const { rows, statistics } = await this.runQuery(query, setExternalId); return { rows: rows.map((row) => { @@ -511,9 +520,10 @@ export default abstract class SqlIntegration } async runExperimentMetricQuery( - query: string + query: string, + setExternalId: ExternalIdCallback ): Promise { - const { rows, statistics } = await this.runQuery(query); + const { rows, statistics } = await this.runQuery(query, setExternalId); return { rows: rows.map((row) => { return { @@ -551,13 +561,17 @@ export default abstract class SqlIntegration } async runExperimentUnitsQuery( - query: string + query: string, + setExternalId: ExternalIdCallback ): Promise { - return await this.runQuery(query); + return await this.runQuery(query, setExternalId); } - async runMetricValueQuery(query: string): Promise { - const { rows, statistics } = await this.runQuery(query); + async runMetricValueQuery( + query: string, + setExternalId: ExternalIdCallback + ): Promise { + const { rows, statistics } = await this.runQuery(query, setExternalId); return { rows: rows.map((row) => { diff --git a/packages/back-end/src/models/ImpactEstimateModel.ts b/packages/back-end/src/models/ImpactEstimateModel.ts index 73706adb309e..168963cfb952 100644 --- a/packages/back-end/src/models/ImpactEstimateModel.ts +++ b/packages/back-end/src/models/ImpactEstimateModel.ts @@ -94,7 +94,13 @@ export async function getImpactEstimate( segment: segmentObj || undefined, }); - const queryResponse = await integration.runMetricValueQuery(query); + const queryResponse = await integration.runMetricValueQuery( + query, + // We're not storing a query in Mongo for this, so we don't support cancelling here + async () => { + // Ignore calls to setExternalId + } + ); const value = processMetricValueQueryResponse(queryResponse.rows); let daysWithData = numDays; diff --git a/packages/back-end/src/models/QueryModel.ts b/packages/back-end/src/models/QueryModel.ts index 79589d4ea2b4..6be18a132f1d 100644 --- a/packages/back-end/src/models/QueryModel.ts +++ b/packages/back-end/src/models/QueryModel.ts @@ -34,6 +34,7 @@ const querySchema = new mongoose.Schema({ startedAt: Date, finishedAt: Date, heartbeat: Date, + externalId: String, result: {}, rawResult: [], error: String, diff --git a/packages/back-end/src/queryRunners/ExperimentResultsQueryRunner.ts b/packages/back-end/src/queryRunners/ExperimentResultsQueryRunner.ts index 2fdd9b98acef..ea74e87e8cde 100644 --- a/packages/back-end/src/queryRunners/ExperimentResultsQueryRunner.ts +++ b/packages/back-end/src/queryRunners/ExperimentResultsQueryRunner.ts @@ -128,7 +128,8 @@ export const startExperimentResultQueries = async ( name: queryParentId, query: integration.getExperimentUnitsTableQuery(unitQueryParams), dependencies: [], - run: (query) => integration.runExperimentUnitsQuery(query), + run: (query, setExternalId) => + integration.runExperimentUnitsQuery(query, setExternalId), process: (rows) => rows, }); queries.push(unitQuery); @@ -162,7 +163,8 @@ export const startExperimentResultQueries = async ( name: m.id, query: integration.getExperimentMetricQuery(queryParams), dependencies: unitQuery ? [unitQuery.query] : [], - run: (query) => integration.runExperimentMetricQuery(query), + run: (query, setExternalId) => + integration.runExperimentMetricQuery(query, setExternalId), process: (rows) => rows, }) ); diff --git a/packages/back-end/src/queryRunners/MetricAnalysisQueryRunner.ts b/packages/back-end/src/queryRunners/MetricAnalysisQueryRunner.ts index c82690cfcbc2..7a76a6500c67 100644 --- a/packages/back-end/src/queryRunners/MetricAnalysisQueryRunner.ts +++ b/packages/back-end/src/queryRunners/MetricAnalysisQueryRunner.ts @@ -21,7 +21,8 @@ export class MetricAnalysisQueryRunner extends QueryRunner< name: "metric", query: this.integration.getMetricValueQuery(params), dependencies: [], - run: (query) => this.integration.runMetricValueQuery(query), + run: (query, setExternalId) => + this.integration.runMetricValueQuery(query, setExternalId), process: (rows) => processMetricValueQueryResponse(rows), }), ]; diff --git a/packages/back-end/src/queryRunners/PastExperimentsQueryRunner.ts b/packages/back-end/src/queryRunners/PastExperimentsQueryRunner.ts index 4de467c992f1..16c08a36ca4d 100644 --- a/packages/back-end/src/queryRunners/PastExperimentsQueryRunner.ts +++ b/packages/back-end/src/queryRunners/PastExperimentsQueryRunner.ts @@ -26,7 +26,8 @@ export class PastExperimentsQueryRunner extends QueryRunner< name: "experiments", query: this.integration.getPastExperimentQuery(params), dependencies: [], - run: (query) => this.integration.runPastExperimentQuery(query), + run: (query, setExternalId) => + this.integration.runPastExperimentQuery(query, setExternalId), process: (rows) => this.processPastExperimentQueryResponse(rows), }), ]; diff --git a/packages/back-end/src/queryRunners/QueryRunner.ts b/packages/back-end/src/queryRunners/QueryRunner.ts index 631c957d05b2..09176c4f16b9 100644 --- a/packages/back-end/src/queryRunners/QueryRunner.ts +++ b/packages/back-end/src/queryRunners/QueryRunner.ts @@ -13,10 +13,12 @@ import { updateQuery, } from "../models/QueryModel"; import { + ExternalIdCallback, QueryResponse, SourceIntegrationInterface, } from "../types/Integration"; import { logger } from "../util/logger"; +import { promiseAllChunks } from "../util/promise"; export type QueryMap = Map; @@ -44,7 +46,10 @@ export type StartQueryParams = { name: string; query: string; dependencies: string[]; - run: (query: string) => Promise>; + run: ( + query: string, + setExternalId: ExternalIdCallback + ) => Promise>; process: (rows: Rows) => ProcessedRows; }; @@ -84,7 +89,10 @@ export abstract class QueryRunner< public error = ""; public runCallbacks: { [key: string]: { - run: (query: string) => Promise>; + run: ( + query: string, + setExternalId: ExternalIdCallback + ) => Promise>; process: (rows: RowsType) => ProcessedRowsType; }; } = {}; @@ -359,6 +367,35 @@ export abstract class QueryRunner< (q) => q.status === "running" || q.status === "queued" ) ) { + const runningIds = this.model.queries + .filter((q) => q.status === "running") + .map((q) => q.query); + + if (runningIds.length) { + const queryDocs = await getQueriesByIds( + this.model.organization, + runningIds + ); + + const externalIds = queryDocs.map((q) => q.externalId).filter(Boolean); + + if (externalIds.length) { + await promiseAllChunks( + externalIds.map((id) => { + return async () => { + if (!id || !this.integration.cancelQuery) return; + try { + await this.integration.cancelQuery(id); + } catch (e) { + logger.debug(`Failed to cancel query - ${e.message}`); + } + }; + }), + 5 + ); + } + } + const newModel = await this.updateModel({ queries: [], status: "failed", @@ -375,7 +412,10 @@ export abstract class QueryRunner< ProcessedRows extends ProcessedRowsType >( doc: QueryInterface, - run: (query: string) => Promise>, + run: ( + query: string, + setExternalId: ExternalIdCallback + ) => Promise>, process: (rows: Rows) => ProcessedRows ): Promise { // Update heartbeat for the query once every 30 seconds @@ -395,7 +435,13 @@ export abstract class QueryRunner< }); } - run(doc.query) + const setExternalId = async (id: string) => { + await updateQuery(doc, { + externalId: id, + }); + }; + + run(doc.query, setExternalId) .then(async ({ rows, statistics }) => { clearInterval(timer); logger.debug("Query succeeded"); diff --git a/packages/back-end/src/services/athena.ts b/packages/back-end/src/services/athena.ts index b307b619ba4b..93e7545c30d8 100644 --- a/packages/back-end/src/services/athena.ts +++ b/packages/back-end/src/services/athena.ts @@ -3,7 +3,7 @@ import { ResultSet } from "aws-sdk/clients/athena"; import { AthenaConnectionParams } from "../../types/integrations/athena"; import { logger } from "../util/logger"; import { IS_CLOUD } from "../util/secrets"; -import { QueryResponse } from "../types/Integration"; +import { ExternalIdCallback, QueryResponse } from "../types/Integration"; function getAthenaInstance(params: AthenaConnectionParams) { if (!IS_CLOUD && params.authType === "auto") { @@ -19,9 +19,22 @@ function getAthenaInstance(params: AthenaConnectionParams) { }); } +export async function cancelAthenaQuery( + conn: AthenaConnectionParams, + id: string +) { + const athena = getAthenaInstance(conn); + await athena + .stopQueryExecution({ + QueryExecutionId: id, + }) + .promise(); +} + export async function runAthenaQuery( conn: AthenaConnectionParams, - sql: string + sql: string, + setExternalId: ExternalIdCallback ): Promise { const athena = getAthenaInstance(conn); @@ -48,6 +61,10 @@ export async function runAthenaQuery( throw new Error("Failed to start query"); } + if (setExternalId) { + await setExternalId(QueryExecutionId); + } + const waitAndCheck = (delay: number) => { return new Promise((resolve, reject) => { setTimeout(() => { @@ -99,7 +116,7 @@ export async function runAthenaQuery( return { rows: result.Rows.slice(1).map((row) => { // eslint-disable-next-line - const obj: any = {}; + const obj: any = {}; if (row.Data) { row.Data.forEach((value, i) => { obj[keys[i]] = value.VarCharValue || null; @@ -113,5 +130,5 @@ export async function runAthenaQuery( // Cancel the query if it reaches this point await athena.stopQueryExecution({ QueryExecutionId }).promise(); - throw new Error("Query timed out after 5 minutes"); + throw new Error("Query timed out after 30 minutes"); } diff --git a/packages/back-end/src/types/Integration.ts b/packages/back-end/src/types/Integration.ts index 366ccb1a6393..967c9410bc72 100644 --- a/packages/back-end/src/types/Integration.ts +++ b/packages/back-end/src/types/Integration.ts @@ -14,6 +14,8 @@ import { FormatDialect } from "../util/sql"; import { TemplateVariables } from "../../types/sql"; import { FactTableMap } from "../models/FactTableModel"; +export type ExternalIdCallback = (id: string) => Promise; + export class MissingDatasourceParamsError extends Error { constructor(message: string) { super(message); @@ -356,12 +358,22 @@ export interface SourceIntegrationInterface { getExperimentMetricQuery(params: ExperimentMetricQueryParams): string; getExperimentUnitsTableQuery(params: ExperimentUnitsQueryParams): string; getPastExperimentQuery(params: PastExperimentParams): string; - runMetricValueQuery(query: string): Promise; + runMetricValueQuery( + query: string, + setExternalId: ExternalIdCallback + ): Promise; runExperimentMetricQuery( - query: string + query: string, + setExternalId: ExternalIdCallback ): Promise; - runExperimentUnitsQuery(query: string): Promise; - runPastExperimentQuery(query: string): Promise; + runExperimentUnitsQuery( + query: string, + setExternalId: ExternalIdCallback + ): Promise; + runPastExperimentQuery( + query: string, + setExternalId: ExternalIdCallback + ): Promise; getEventsTrackedByDatasource?: ( schemaFormat: SchemaFormat, existingMetrics: MetricInterface[], @@ -379,4 +391,5 @@ export interface SourceIntegrationInterface { database?: string, requireSchema?: boolean ): string; + cancelQuery?(externalId: string): Promise; } diff --git a/packages/back-end/types/query.d.ts b/packages/back-end/types/query.d.ts index 25529d585d3b..5194717ef01f 100644 --- a/packages/back-end/types/query.d.ts +++ b/packages/back-end/types/query.d.ts @@ -41,4 +41,5 @@ export interface QueryInterface { dependencies?: string[]; cachedQueryUsed?: string; statistics?: QueryStatistics; + externalId?: string; }