diff --git a/packages/pieces/community/snowflake/package.json b/packages/pieces/community/snowflake/package.json index fd3f6c9625..7c5617b394 100644 --- a/packages/pieces/community/snowflake/package.json +++ b/packages/pieces/community/snowflake/package.json @@ -1,4 +1,4 @@ { "name": "@activepieces/piece-snowflake", - "version": "0.0.9" + "version": "0.0.10" } diff --git a/packages/pieces/community/snowflake/src/index.ts b/packages/pieces/community/snowflake/src/index.ts index 09d648b285..64b02368a0 100644 --- a/packages/pieces/community/snowflake/src/index.ts +++ b/packages/pieces/community/snowflake/src/index.ts @@ -3,6 +3,7 @@ import { PieceAuth, Property, } from '@activepieces/pieces-framework'; +import { runMultipleQueries } from './lib/actions/run-multiple-queries'; import { runQuery } from './lib/actions/run-query'; import { PieceCategory } from '@activepieces/shared'; import { insertRowAction } from './lib/actions/insert-row'; @@ -53,7 +54,7 @@ export const snowflake = createPiece({ minimumSupportedRelease: '0.27.1', logoUrl: 'https://cdn.activepieces.com/pieces/snowflake.png', categories: [PieceCategory.DEVELOPER_TOOLS], - authors: ['AdamSelene', 'abuaboud'], - actions: [runQuery, insertRowAction], + authors: ['AdamSelene', 'abuaboud', 'valentin-mourtialon'], + actions: [runQuery, runMultipleQueries, insertRowAction], triggers: [], }); diff --git a/packages/pieces/community/snowflake/src/lib/actions/run-multiple-queries.ts b/packages/pieces/community/snowflake/src/lib/actions/run-multiple-queries.ts new file mode 100644 index 0000000000..bb6f730744 --- /dev/null +++ b/packages/pieces/community/snowflake/src/lib/actions/run-multiple-queries.ts @@ -0,0 +1,166 @@ +import { createAction, Property } from '@activepieces/pieces-framework'; +import snowflake, { Statement, SnowflakeError } from 'snowflake-sdk'; +import { snowflakeAuth } from '../../index'; + +type QueryResult = unknown[] | undefined; +type QueryResults = { query: string; result: QueryResult }[]; + +const DEFAULT_APPLICATION_NAME = 'ActivePieces'; +const DEFAULT_QUERY_TIMEOUT = 30000; + +export const runMultipleQueries = createAction({ + name: 'runMultipleQueries', + displayName: 'Run Multiple Queries', + description: 'Run Multiple Queries', + auth: snowflakeAuth, + props: { + sqlTexts: Property.Array({ + displayName: 'SQL queries', + description: + 'Array of SQL queries to execute in order, in the same transaction. Use :1, :2… placeholders to use binding parameters. ' + + 'Avoid using "?" to avoid unexpected behaviors when having multiple queries.', + required: true, + }), + binds: Property.Array({ + displayName: 'Parameters', + description: + 'Binding parameters shared across all queries to prevent SQL injection attacks. ' + + 'Use :1, :2, etc. to reference parameters in order. ' + + 'Avoid using "?" to avoid unexpected behaviors when having multiple queries. ' + + 'Unused parameters are allowed.', + required: false, + }), + useTransaction: Property.Checkbox({ + displayName: 'Use Transaction', + description: + 'When enabled, all queries will be executed in a single transaction. If any query fails, all changes will be rolled back.', + required: false, + defaultValue: false, + }), + timeout: Property.Number({ + displayName: 'Query timeout (ms)', + description: + 'An integer indicating the maximum number of milliseconds to wait for a query to complete before timing out.', + required: false, + defaultValue: DEFAULT_QUERY_TIMEOUT, + }), + application: Property.ShortText({ + displayName: 'Application name', + description: + 'A string indicating the name of the client application connecting to the server.', + required: false, + defaultValue: DEFAULT_APPLICATION_NAME, + }), + }, + + async run(context) { + const { username, password, role, database, warehouse, account } = + context.auth; + + const connection = snowflake.createConnection({ + application: context.propsValue.application, + timeout: context.propsValue.timeout, + username, + password, + role, + database, + warehouse, + account, + }); + + return new Promise((resolve, reject) => { + connection.connect(async function (err: SnowflakeError | undefined) { + if (err) { + reject(err); + return; + } + + const { sqlTexts, binds, useTransaction } = context.propsValue; + const queryResults: QueryResults = []; + + function handleError(err: SnowflakeError) { + if (useTransaction) { + connection.execute({ + sqlText: 'ROLLBACK', + complete: () => { + connection.destroy(() => { + reject(err); + }); + }, + }); + } else { + connection.destroy(() => { + reject(err); + }); + } + } + + async function executeQueriesSequentially() { + try { + if (useTransaction) { + await new Promise((resolveBegin, rejectBegin) => { + connection.execute({ + sqlText: 'BEGIN', + complete: (err: SnowflakeError | undefined) => { + if (err) rejectBegin(err); + else resolveBegin(); + }, + }); + }); + } + for (const sqlText of sqlTexts) { + const result = await new Promise( + (resolveQuery, rejectQuery) => { + connection.execute({ + sqlText: sqlText as string, + binds: binds as snowflake.Binds, + complete: ( + err: SnowflakeError | undefined, + stmt: Statement, + rows: QueryResult + ) => { + if (err) { + rejectQuery(err); + return; + } + resolveQuery(rows); + }, + }); + } + ); + + queryResults.push({ + query: sqlText as string, + result, + }); + } + + if (useTransaction) { + await new Promise((resolveCommit, rejectCommit) => { + connection.execute({ + sqlText: 'COMMIT', + complete: (err: SnowflakeError | undefined) => { + if (err) rejectCommit(err); + else resolveCommit(); + }, + }); + }); + } + + connection.destroy((err: SnowflakeError | undefined) => { + if (err) { + reject(err); + return; + } + resolve(queryResults); + }); + } catch (err) { + handleError(err as SnowflakeError); // Reject with the original error! + } + } + + executeQueriesSequentially(); + }); + }); + }, +});