Skip to content

Commit

Permalink
Cancel link now actually cancels queries in BigQuery/Athena (growthbo…
Browse files Browse the repository at this point in the history
  • Loading branch information
jdorn authored and itsgrimetime committed Nov 17, 2023
1 parent 853f74f commit 1692661
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 31 deletions.
14 changes: 10 additions & 4 deletions packages/back-end/src/integrations/Athena.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { decryptDataSourceParams } from "../services/datasource";
import { runAthenaQuery } from "../services/athena";
import { QueryResponse } from "../types/Integration";
import { cancelAthenaQuery, runAthenaQuery } from "../services/athena";
import { ExternalIdCallback, QueryResponse } from "../types/Integration";
import { AthenaConnectionParams } from "../../types/integrations/athena";
import { FormatDialect } from "../util/sql";
import SqlIntegration from "./SqlIntegration";
Expand All @@ -24,8 +24,14 @@ export default class Athena extends SqlIntegration {
toTimestamp(date: Date) {
return `from_iso8601_timestamp('${date.toISOString()}')`;
}
runQuery(sql: string): Promise<QueryResponse> {
return runAthenaQuery(this.params, sql);
runQuery(
sql: string,
setExternalId: ExternalIdCallback
): Promise<QueryResponse> {
return runAthenaQuery(this.params, sql, setExternalId);
}
async cancelQuery(externalId: string): Promise<void> {
await cancelAthenaQuery(this.params, externalId);
}
addTime(
col: string,
Expand Down
24 changes: 23 additions & 1 deletion packages/back-end/src/integrations/BigQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { decryptDataSourceParams } from "../services/datasource";
import { BigQueryConnectionParams } from "../../types/integrations/bigquery";
import { IS_CLOUD } from "../util/secrets";
import {
ExternalIdCallback,
InformationSchema,
QueryResponse,
RawInformationSchema,
Expand Down Expand Up @@ -49,14 +50,35 @@ export default class BigQuery extends SqlIntegration {
});
}

async runQuery(sql: string): Promise<QueryResponse> {
async cancelQuery(externalId: string): Promise<void> {
const client = this.getClient();
const job = client.job(externalId);

// Attempt to cancel job
const [apiResult] = await job.cancel();
logger.debug(
`Cancelled BigQuery job ${externalId} - ${JSON.stringify(
apiResult.job?.status
)}`
);
}

async runQuery(
sql: string,
setExternalId?: ExternalIdCallback
): Promise<QueryResponse> {
const client = this.getClient();

const [job] = await client.createQueryJob({
labels: { integration: "growthbook" },
query: sql,
useLegacySql: false,
});

if (setExternalId && job.id) {
await setExternalId(job.id);
}

const [rows] = await job.getQueryResults();
const [metadata] = await job.getMetadata();
const statistics = {
Expand Down
32 changes: 23 additions & 9 deletions packages/back-end/src/integrations/SqlIntegration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import {
TrackedEventData,
TrackedEventResponseRow,
ExperimentUnitsQueryResponse,
ExternalIdCallback,
} from "../types/Integration";
import { DimensionInterface } from "../../types/dimension";
import { IMPORT_LIMIT_DAYS } from "../util/secrets";
Expand All @@ -55,6 +56,7 @@ import { formatInformationSchema } from "../util/informationSchemas";
import { ExperimentSnapshotSettings } from "../../types/experiment-snapshot";
import { TemplateVariables } from "../../types/sql";
import { FactTableMap } from "../models/FactTableModel";
import { logger } from "../util/logger";

export default abstract class SqlIntegration
implements SourceIntegrationInterface {
Expand All @@ -66,7 +68,13 @@ export default abstract class SqlIntegration
params: any;
type!: DataSourceType;
abstract setParams(encryptedParams: string): void;
abstract runQuery(sql: string): Promise<QueryResponse>;
abstract runQuery(
sql: string,
setExternalId?: ExternalIdCallback
): Promise<QueryResponse>;
async cancelQuery(externalId: string): Promise<void> {
logger.debug(`Cancel query: ${externalId} - not implemented`);
}
abstract getSensitiveParamKeys(): string[];

constructor(encryptedParams: string, settings: DataSourceSettings) {
Expand Down Expand Up @@ -358,9 +366,10 @@ export default abstract class SqlIntegration
);
}
async runPastExperimentQuery(
query: string
query: string,
setExternalId: ExternalIdCallback
): Promise<PastExperimentQueryResponse> {
const { rows, statistics } = await this.runQuery(query);
const { rows, statistics } = await this.runQuery(query, setExternalId);

return {
rows: rows.map((row) => {
Expand Down Expand Up @@ -511,9 +520,10 @@ export default abstract class SqlIntegration
}

async runExperimentMetricQuery(
query: string
query: string,
setExternalId: ExternalIdCallback
): Promise<ExperimentMetricQueryResponse> {
const { rows, statistics } = await this.runQuery(query);
const { rows, statistics } = await this.runQuery(query, setExternalId);
return {
rows: rows.map((row) => {
return {
Expand Down Expand Up @@ -551,13 +561,17 @@ export default abstract class SqlIntegration
}

async runExperimentUnitsQuery(
query: string
query: string,
setExternalId: ExternalIdCallback
): Promise<ExperimentUnitsQueryResponse> {
return await this.runQuery(query);
return await this.runQuery(query, setExternalId);
}

async runMetricValueQuery(query: string): Promise<MetricValueQueryResponse> {
const { rows, statistics } = await this.runQuery(query);
async runMetricValueQuery(
query: string,
setExternalId: ExternalIdCallback
): Promise<MetricValueQueryResponse> {
const { rows, statistics } = await this.runQuery(query, setExternalId);

return {
rows: rows.map((row) => {
Expand Down
8 changes: 7 additions & 1 deletion packages/back-end/src/models/ImpactEstimateModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,13 @@ export async function getImpactEstimate(
segment: segmentObj || undefined,
});

const queryResponse = await integration.runMetricValueQuery(query);
const queryResponse = await integration.runMetricValueQuery(
query,
// We're not storing a query in Mongo for this, so we don't support cancelling here
async () => {
// Ignore calls to setExternalId
}
);
const value = processMetricValueQueryResponse(queryResponse.rows);

let daysWithData = numDays;
Expand Down
1 change: 1 addition & 0 deletions packages/back-end/src/models/QueryModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const querySchema = new mongoose.Schema({
startedAt: Date,
finishedAt: Date,
heartbeat: Date,
externalId: String,
result: {},
rawResult: [],
error: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ export const startExperimentResultQueries = async (
name: queryParentId,
query: integration.getExperimentUnitsTableQuery(unitQueryParams),
dependencies: [],
run: (query) => integration.runExperimentUnitsQuery(query),
run: (query, setExternalId) =>
integration.runExperimentUnitsQuery(query, setExternalId),
process: (rows) => rows,
});
queries.push(unitQuery);
Expand Down Expand Up @@ -162,7 +163,8 @@ export const startExperimentResultQueries = async (
name: m.id,
query: integration.getExperimentMetricQuery(queryParams),
dependencies: unitQuery ? [unitQuery.query] : [],
run: (query) => integration.runExperimentMetricQuery(query),
run: (query, setExternalId) =>
integration.runExperimentMetricQuery(query, setExternalId),
process: (rows) => rows,
})
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ export class MetricAnalysisQueryRunner extends QueryRunner<
name: "metric",
query: this.integration.getMetricValueQuery(params),
dependencies: [],
run: (query) => this.integration.runMetricValueQuery(query),
run: (query, setExternalId) =>
this.integration.runMetricValueQuery(query, setExternalId),
process: (rows) => processMetricValueQueryResponse(rows),
}),
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ export class PastExperimentsQueryRunner extends QueryRunner<
name: "experiments",
query: this.integration.getPastExperimentQuery(params),
dependencies: [],
run: (query) => this.integration.runPastExperimentQuery(query),
run: (query, setExternalId) =>
this.integration.runPastExperimentQuery(query, setExternalId),
process: (rows) => this.processPastExperimentQueryResponse(rows),
}),
];
Expand Down
54 changes: 50 additions & 4 deletions packages/back-end/src/queryRunners/QueryRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import {
updateQuery,
} from "../models/QueryModel";
import {
ExternalIdCallback,
QueryResponse,
SourceIntegrationInterface,
} from "../types/Integration";
import { logger } from "../util/logger";
import { promiseAllChunks } from "../util/promise";

export type QueryMap = Map<string, QueryInterface>;

Expand Down Expand Up @@ -44,7 +46,10 @@ export type StartQueryParams<Rows, ProcessedRows> = {
name: string;
query: string;
dependencies: string[];
run: (query: string) => Promise<QueryResponse<Rows>>;
run: (
query: string,
setExternalId: ExternalIdCallback
) => Promise<QueryResponse<Rows>>;
process: (rows: Rows) => ProcessedRows;
};

Expand Down Expand Up @@ -84,7 +89,10 @@ export abstract class QueryRunner<
public error = "";
public runCallbacks: {
[key: string]: {
run: (query: string) => Promise<QueryResponse<RowsType>>;
run: (
query: string,
setExternalId: ExternalIdCallback
) => Promise<QueryResponse<RowsType>>;
process: (rows: RowsType) => ProcessedRowsType;
};
} = {};
Expand Down Expand Up @@ -359,6 +367,35 @@ export abstract class QueryRunner<
(q) => q.status === "running" || q.status === "queued"
)
) {
const runningIds = this.model.queries
.filter((q) => q.status === "running")
.map((q) => q.query);

if (runningIds.length) {
const queryDocs = await getQueriesByIds(
this.model.organization,
runningIds
);

const externalIds = queryDocs.map((q) => q.externalId).filter(Boolean);

if (externalIds.length) {
await promiseAllChunks(
externalIds.map((id) => {
return async () => {
if (!id || !this.integration.cancelQuery) return;
try {
await this.integration.cancelQuery(id);
} catch (e) {
logger.debug(`Failed to cancel query - ${e.message}`);
}
};
}),
5
);
}
}

const newModel = await this.updateModel({
queries: [],
status: "failed",
Expand All @@ -375,7 +412,10 @@ export abstract class QueryRunner<
ProcessedRows extends ProcessedRowsType
>(
doc: QueryInterface,
run: (query: string) => Promise<QueryResponse<Rows>>,
run: (
query: string,
setExternalId: ExternalIdCallback
) => Promise<QueryResponse<Rows>>,
process: (rows: Rows) => ProcessedRows
): Promise<void> {
// Update heartbeat for the query once every 30 seconds
Expand All @@ -395,7 +435,13 @@ export abstract class QueryRunner<
});
}

run(doc.query)
const setExternalId = async (id: string) => {
await updateQuery(doc, {
externalId: id,
});
};

run(doc.query, setExternalId)
.then(async ({ rows, statistics }) => {
clearInterval(timer);
logger.debug("Query succeeded");
Expand Down
25 changes: 21 additions & 4 deletions packages/back-end/src/services/athena.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { ResultSet } from "aws-sdk/clients/athena";
import { AthenaConnectionParams } from "../../types/integrations/athena";
import { logger } from "../util/logger";
import { IS_CLOUD } from "../util/secrets";
import { QueryResponse } from "../types/Integration";
import { ExternalIdCallback, QueryResponse } from "../types/Integration";

function getAthenaInstance(params: AthenaConnectionParams) {
if (!IS_CLOUD && params.authType === "auto") {
Expand All @@ -19,9 +19,22 @@ function getAthenaInstance(params: AthenaConnectionParams) {
});
}

export async function cancelAthenaQuery(
conn: AthenaConnectionParams,
id: string
) {
const athena = getAthenaInstance(conn);
await athena
.stopQueryExecution({
QueryExecutionId: id,
})
.promise();
}

export async function runAthenaQuery(
conn: AthenaConnectionParams,
sql: string
sql: string,
setExternalId: ExternalIdCallback
): Promise<QueryResponse> {
const athena = getAthenaInstance(conn);

Expand All @@ -48,6 +61,10 @@ export async function runAthenaQuery(
throw new Error("Failed to start query");
}

if (setExternalId) {
await setExternalId(QueryExecutionId);
}

const waitAndCheck = (delay: number) => {
return new Promise<false | ResultSet>((resolve, reject) => {
setTimeout(() => {
Expand Down Expand Up @@ -99,7 +116,7 @@ export async function runAthenaQuery(
return {
rows: result.Rows.slice(1).map((row) => {
// eslint-disable-next-line
const obj: any = {};
const obj: any = {};
if (row.Data) {
row.Data.forEach((value, i) => {
obj[keys[i]] = value.VarCharValue || null;
Expand All @@ -113,5 +130,5 @@ export async function runAthenaQuery(

// Cancel the query if it reaches this point
await athena.stopQueryExecution({ QueryExecutionId }).promise();
throw new Error("Query timed out after 5 minutes");
throw new Error("Query timed out after 30 minutes");
}
Loading

0 comments on commit 1692661

Please sign in to comment.