From ae327fa5a350b48c4e1f56c2769524c5786e1152 Mon Sep 17 00:00:00 2001 From: Gxkl Date: Sat, 3 Jun 2023 17:31:44 +0800 Subject: [PATCH] feat: use AsyncLocalStorage to refactor transaction, to make it more safe (#108) BREAKING CHANGE: In `Promise.all` case, Parallel beginTransactionScope will create isolated transactions. --- README.md | 33 ++-- package.json | 1 + src/client.ts | 163 +++++++++++----- src/types.ts | 12 +- test/client.test.ts | 466 ++++++++++++++++++++++++++++++-------------- 5 files changed, 467 insertions(+), 208 deletions(-) diff --git a/README.md b/README.md index 651fcad..033c90a 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,10 @@ const db = new RDSClient({ // before returning an error from getConnection. // If set to 0, there is no limit to the number of queued connection requests. (Default: 0) // queueLimit: 0, + // Set asyncLocalStorage manually for transaction + // connectionStorage: new AsyncLocalStorage(), + // If create multiple RDSClient instances with the same connectionStorage, use this key to distinguish between the instances + // connectionStorageKey: 'datasource', }); ``` @@ -309,28 +313,17 @@ const result = await db.beginTransactionScope(async conn => { // if error throw on scope, will auto rollback ``` -#### Transaction on koa - -API: `async beginTransactionScope(scope, ctx)` - -Use koa's context to make sure only one active transaction on one ctx. +In `Promise.all` case, Parallel beginTransactionScope will create isolated transactions. ```js -async function foo(ctx, data1) { - return await db.beginTransactionScope(async conn => { - await conn.insert(table1, data1); - return { success: true }; - }, ctx); -} - -async function bar(ctx, data2) { - return await db.beginTransactionScope(async conn => { - // execute foo with the same transaction scope - await foo(ctx, { foo: 'bar' }); - await conn.insert(table2, data2); - return { success: true }; - }, ctx); -} +const result = await Promise.all([ + db.beginTransactionScope(async conn => { + // commit and success + }), + db.beginTransactionScope(async conn => { + // throw err and rollback + }), +]) ``` ### Raw Queries diff --git a/package.json b/package.json index 30102ba..c3d8ad7 100644 --- a/package.json +++ b/package.json @@ -27,6 +27,7 @@ "eslint": "^8.29.0", "eslint-config-egg": "^12.1.0", "git-contributor": "^2.0.0", + "mm": "^3.3.0", "typescript": "^4.9.5" }, "homepage": "https://github.com/ali-sdk/ali-rds", diff --git a/src/client.ts b/src/client.ts index 1538c34..d22d4bb 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,7 +1,8 @@ +import { AsyncLocalStorage } from 'node:async_hooks'; import { promisify } from 'node:util'; import mysql from 'mysql'; -import type { PoolConfig, Pool } from 'mysql'; -import type { PoolConnectionPromisify } from './types'; +import type { Pool } from 'mysql'; +import type { PoolConnectionPromisify, RDSClientOptions, TransactionContext, TransactionScope } from './types'; import { Operator } from './operator'; import { RDSConnection } from './connection'; import { RDSTransaction } from './transaction'; @@ -24,10 +25,17 @@ export class RDSClient extends Operator { static get format() { return mysql.format; } static get raw() { return mysql.raw; } + static #DEFAULT_STORAGE_KEY = Symbol('RDSClient#storage#default'); + static #TRANSACTION_NEST_COUNT = Symbol('RDSClient#transaction#nestCount'); + #pool: PoolPromisify; - constructor(options: PoolConfig) { + #connectionStorage: AsyncLocalStorage; + #connectionStorageKey: string | symbol; + + constructor(options: RDSClientOptions) { super(); - this.#pool = mysql.createPool(options) as unknown as PoolPromisify; + const { connectionStorage, connectionStorageKey, ...mysqlOptions } = options; + this.#pool = mysql.createPool(mysqlOptions) as unknown as PoolPromisify; [ 'query', 'getConnection', @@ -35,6 +43,8 @@ export class RDSClient extends Operator { ].forEach(method => { this.#pool[method] = promisify(this.#pool[method]); }); + this.#connectionStorage = connectionStorage || new AsyncLocalStorage(); + this.#connectionStorageKey = connectionStorageKey || RDSClient.#DEFAULT_STORAGE_KEY; } // impl Operator._query @@ -92,6 +102,7 @@ export class RDSClient extends Operator { throw err; } const tran = new RDSTransaction(conn); + tran[RDSClient.#TRANSACTION_NEST_COUNT] = 1; if (this.beforeQueryHandlers.length > 0) { for (const handler of this.beforeQueryHandlers) { tran.beforeQuery(handler); @@ -109,75 +120,137 @@ export class RDSClient extends Operator { * Auto commit or rollback on a transaction scope * * @param {Function} scope - scope with code - * @param {Object} [ctx] - transaction env context, like koa's ctx. - * To make sure only one active transaction on this ctx. + * @param {Object} [ctx] - transaction context * @return {Object} - scope return result */ - async beginTransactionScope(scope: (transaction: RDSTransaction) => Promise, ctx?: any): Promise { - ctx = ctx || {}; - if (!ctx._transactionConnection) { - // Create only one conn if concurrent call `beginTransactionScope` - ctx._transactionConnection = this.beginTransaction(); - } - const tran = await ctx._transactionConnection; - - if (!ctx._transactionScopeCount) { - ctx._transactionScopeCount = 1; + async #beginTransactionScope(scope: TransactionScope, ctx: TransactionContext): Promise { + let tran: RDSTransaction; + let shouldRelease = false; + if (!ctx[this.#connectionStorageKey]) { + // there is no transaction in ctx, create a new one + tran = await this.beginTransaction(); + ctx[this.#connectionStorageKey] = tran; + shouldRelease = true; } else { - ctx._transactionScopeCount++; + // use transaction in ctx + tran = ctx[this.#connectionStorageKey]!; + tran[RDSClient.#TRANSACTION_NEST_COUNT]++; } + + let result: any; + let scopeError: any; + let internalError: any; try { - const result = await scope(tran); - ctx._transactionScopeCount--; - if (ctx._transactionScopeCount === 0) { - ctx._transactionConnection = null; - await tran.commit(); + result = await scope(tran); + } catch (err: any) { + scopeError = err; + } + tran[RDSClient.#TRANSACTION_NEST_COUNT]--; + + // null connection means the nested scope has been rollback, we can do nothing here + if (tran.conn) { + try { + // execution error, should rollback + if (scopeError) { + await tran.rollback(); + } else if (tran[RDSClient.#TRANSACTION_NEST_COUNT] < 1) { + // nestedCount smaller than 1 means all the nested scopes have executed successfully + await tran.commit(); + } + } catch (err) { + internalError = err; } - return result; - } catch (err) { - if (ctx._transactionConnection) { - ctx._transactionConnection = null; - await tran.rollback(); + } + + // remove transaction in ctx + if (shouldRelease && tran[RDSClient.#TRANSACTION_NEST_COUNT] < 1) { + ctx[this.#connectionStorageKey] = null; + } + + if (internalError) { + if (scopeError) { + internalError.cause = scopeError; } - throw err; + throw internalError; + } + if (scopeError) { + throw scopeError; } + return result; + } + + /** + * Auto commit or rollback on a transaction scope + * + * @param scope - scope with code + * @return {Object} - scope return result + */ + async beginTransactionScope(scope: TransactionScope) { + let ctx = this.#connectionStorage.getStore(); + if (ctx) { + return await this.#beginTransactionScope(scope, ctx); + } + ctx = {}; + return await this.#connectionStorage.run(ctx, async () => { + return await this.#beginTransactionScope(scope, ctx!); + }); } /** * doomed to be rollbacked after transaction scope * useful on writing tests which are related with database * - * @param {Function} scope - scope with code - * @param {Object} [ctx] - transaction env context, like koa's ctx. - * To make sure only one active transaction on this ctx. + * @param scope - scope with code + * @param ctx - transaction context * @return {Object} - scope return result */ - async beginDoomedTransactionScope(scope: (transaction: RDSTransaction) => Promise, ctx?: any): Promise { - ctx = ctx || {}; - if (!ctx._transactionConnection) { - ctx._transactionConnection = await this.beginTransaction(); - ctx._transactionScopeCount = 1; + async #beginDoomedTransactionScope(scope: TransactionScope, ctx: TransactionContext): Promise { + let tran: RDSTransaction; + if (!ctx[this.#connectionStorageKey]) { + // there is no transaction in ctx, create a new one + tran = await this.beginTransaction(); + ctx[this.#connectionStorageKey] = tran; } else { - ctx._transactionScopeCount++; + // use transaction in ctx + tran = ctx[this.#connectionStorageKey]!; + tran[RDSClient.#TRANSACTION_NEST_COUNT]++; } - const tran = ctx._transactionConnection; + try { const result = await scope(tran); - ctx._transactionScopeCount--; - if (ctx._transactionScopeCount === 0) { - ctx._transactionConnection = null; + tran[RDSClient.#TRANSACTION_NEST_COUNT]--; + if (tran[RDSClient.#TRANSACTION_NEST_COUNT] === 0) { + ctx[this.#connectionStorageKey] = null; + await tran.rollback(); } return result; } catch (err) { - if (ctx._transactionConnection) { - ctx._transactionConnection = null; + if (ctx[this.#connectionStorageKey]) { + ctx[this.#connectionStorageKey] = null; + await tran.rollback(); } throw err; - } finally { - await tran.rollback(); } } + /** + * doomed to be rollbacked after transaction scope + * useful on writing tests which are related with database + * + * @param scope - scope with code + * @return {Object} - scope return result + */ + async beginDoomedTransactionScope(scope: TransactionScope): Promise { + let ctx = this.#connectionStorage.getStore(); + if (ctx) { + return await this.#beginDoomedTransactionScope(scope, ctx); + } + ctx = {}; + return await this.#connectionStorage.run(ctx, async () => { + return await this.#beginDoomedTransactionScope(scope, ctx!); + }); + } + async end() { await this.#pool.end(); } diff --git a/src/types.ts b/src/types.ts index b65dd6b..2f074aa 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,4 +1,11 @@ -import type { PoolConnection } from 'mysql'; +import { AsyncLocalStorage } from 'async_hooks'; +import type { PoolConnection, PoolConfig } from 'mysql'; +import { RDSTransaction } from './transaction'; + +export interface RDSClientOptions extends PoolConfig { + connectionStorageKey?: string; + connectionStorage?: AsyncLocalStorage>; +} export interface PoolConnectionPromisify extends Omit { query(sql: string): Promise; @@ -53,3 +60,6 @@ export type LockTableOption = { export type BeforeQueryHandler = (sql: string) => string | undefined | void; export type AfterQueryHandler = (sql: string, result: any, execDuration: number, err?: Error) => void; + +export type TransactionContext = Record; +export type TransactionScope = (transaction: RDSTransaction) => Promise; diff --git a/test/client.test.ts b/test/client.test.ts index c468852..972343b 100644 --- a/test/client.test.ts +++ b/test/client.test.ts @@ -1,10 +1,20 @@ +import { AsyncLocalStorage } from 'async_hooks'; import { strict as assert } from 'node:assert'; -import { setTimeout } from 'node:timers/promises'; import fs from 'node:fs/promises'; import path from 'node:path'; +import mm from 'mm'; +import { RDSTransaction } from '../src/transaction'; import config from './config'; import { RDSClient } from '../src/client'; +interface MockMateSpyObject any> { + called?: number; + calledArguments?: Array>; + lastCalledArguments?: Parameters; +} + +const mmSpy = any>(target: T) => target as MockMateSpyObject; + describe('test/client.test.ts', () => { const prefix = 'prefix-' + process.version + '-'; const table = 'ali-sdk-test-user'; @@ -24,6 +34,10 @@ describe('test/client.test.ts', () => { return await db.end(); }); + afterEach(() => { + mm.restore(); + }); + describe('new RDSClient(options)', () => { it('should access pool', async () => { assert(db.pool.config.connectionConfig.database); @@ -336,6 +350,163 @@ describe('test/client.test.ts', () => { }); }); + it('should throw rollback error with cause error when rollback failed', async () => { + mm(RDSTransaction.prototype, 'rollback', async () => { + throw new Error('fake rollback error'); + }); + await assert.rejects( + db.beginTransactionScope(async conn => { + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + valuefail(?, ?, now(), now())`, + [ table, prefix + 'beginTransactionScope-fail12', 'm@beginTransactionScope-fail.com' ]); + }), + (err: any) => + err.message === 'fake rollback error' && + err.cause.code === 'ER_PARSE_ERROR', + ); + }); + + it('should rollback when query fail', async () => { + await assert.rejects( + db.beginTransactionScope(async conn => { + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + values(?, ?, now(), now())`, + [ table, prefix + 'beginTransactionScope-fail1', 'm@beginTransactionScope-fail.com' ]); + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + valuefail(?, ?, now(), now())`, + [ table, prefix + 'beginTransactionScope-fail12', 'm@beginTransactionScope-fail.com' ]); + return true; + }), + (err: any) => err.code === 'ER_PARSE_ERROR', + ); + + const rows = await db.query('select * from ?? where email=? order by id', + [ table, prefix + 'm@beginTransactionScope-fail.com' ]); + assert.equal(rows.length, 0); + }); + + it('should rollback all when query failed in nested scope', async () => { + mm.spy(RDSTransaction.prototype, 'rollback'); + + const inner = async () => { + return db.beginTransactionScope(async conn => { + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + values(?, ?, now(), now())`, + [ table, prefix + 'beginTransactionScope-fail1', 'm@beginTransactionScope-nested-fail.com' ]); + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + valuefail(?, ?, now(), now())`, + [ table, prefix + 'beginTransactionScope-fail12', 'm@beginTransactionScope-nested-fail.com' ]); + return true; + }); + }; + + const outter = async () => { + return await db.beginTransactionScope(async conn => { + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + values(?, ?, now(), now())`, + [ table, prefix + 'beginTransactionScopeCtx3', 'm@beginTransactionScope-nested-fail.com' ]); + await inner(); + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + values(?, ?, now(), now())`, + [ table, prefix + 'beginTransactionScopeCtx4', 'm@beginTransactionScope-nested-fail.com' ]); + return true; + }); + }; + + await assert.rejects(outter(), (err: any) => err.code === 'ER_PARSE_ERROR'); + assert.strictEqual(mmSpy(RDSTransaction.prototype.rollback).called, 1); + + const rows = await db.query('select * from ?? where email=? order by id', + [ table, 'm@beginTransactionScope-nested-fail.com' ]); + assert.equal(rows.length, 0); + }); + + it('should not commit when catch query error in nested scope', async () => { + mm.spy(RDSTransaction.prototype, 'commit'); + mm.spy(RDSTransaction.prototype, 'rollback'); + + const inner = async () => { + return await db.beginTransactionScope(async conn => { + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + valuefail(?, ?, now(), now())`, + [ table, prefix + 'beginTransactionScope-fail12', 'm@beginTransactionScope-catch-nested-error.com' ]); + return true; + }); + }; + + const err = await db.beginTransactionScope(async conn => { + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + values(?, ?, now(), now())`, + [ table, prefix + 'beginTransactionScope-fail1', 'm@beginTransactionScope-catch-nested-error.com' ]); + return await inner().catch(err => err); + }); + + assert.strictEqual(err.code, 'ER_PARSE_ERROR'); + assert.strictEqual(mmSpy(RDSTransaction.prototype.rollback).called, 1); + assert.strictEqual(mmSpy(RDSTransaction.prototype.commit).called, undefined); + const rows = await db.query('select * from ?? where email=? order by id', + [ table, 'm@beginTransactionScope-catch-nested-error.com' ]); + assert.equal(rows.length, 0); + }); + + it('should fail when query after catch nested error', async () => { + mm.spy(RDSTransaction.prototype, 'rollback'); + + const inner = async () => { + return await db.beginTransactionScope(async conn => { + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + valuefail(?, ?, now(), now())`, + [ table, prefix + 'beginTransactionScope-fail12', 'm@beginTransactionScope-query-catch-nested-error.com' ]); + return true; + }); + }; + + await assert.rejects( + db.beginTransactionScope(async conn => { + await inner().catch(() => { /* ignore */ }); + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + values(?, ?, now(), now())`, + [ table, prefix + 'beginTransactionScope-fail1', 'm@beginTransactionScope-query-catch-nested-error.com' ]); + }), + (err: Error) => err.message === 'transaction was commit or rollback', + ); + + assert.strictEqual(mmSpy(RDSTransaction.prototype.rollback).called, 1); + }); + + it('should partially success when query failed in parallel transaction', async () => { + const p1 = async () => { + return await db.beginTransactionScope(async conn => { + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + values(?, ?, now(), now())`, + [ table, prefix + 'beginTransactionScopeCtx-partial1', 'm@beginTransactionScope-parallel-fail.com' ]); + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + values(?, ?, now(), now())`, + [ table, prefix + 'beginTransactionScopeCtx-partial2', 'm@beginTransactionScope-parallel-fail.com' ]); + return true; + }); + }; + + const p2 = async () => { + return db.beginTransactionScope(async conn => { + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + values(?, ?, now(), now())`, + [ table, prefix + 'beginTransactionScope-fail1', 'm@beginTransactionScope-parallel-fail.com' ]); + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + valuefail(?, ?, now(), now())`, + [ table, prefix + 'beginTransactionScope-fail12', 'm@beginTransactionScope-parallel-fail.com' ]); + return true; + }); + }; + + const [ p1Res, p2Res ] = await Promise.all([ p1(), p2().catch(err => err) ]); + assert.strictEqual(p1Res, true); + assert.strictEqual(p2Res.code, 'ER_PARSE_ERROR'); + const rows = await db.query('select * from ?? where email=? order by id', + [ table, 'm@beginTransactionScope-parallel-fail.com' ]); + assert.equal(rows.length, 2); + }); + it('should insert 2 rows in a transaction', async () => { const result = await db.beginTransactionScope(async conn => { await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) @@ -354,170 +525,181 @@ describe('test/client.test.ts', () => { assert.equal(rows[1].name, prefix + 'beginTransactionScope2'); }); - it('should rollback when query fail', async () => { - try { - await db.beginTransactionScope(async conn => { + it('should insert 4 rows in nested and parallel scopes', async () => { + mm.spy(RDSTransaction.prototype, 'commit'); + const nestedNestedScope = async () => { + return await db.beginTransactionScope(async conn => { await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) - values(?, ?, now(), now())`, - [ table, prefix + 'beginTransactionScope-fail1', 'm@beginTransactionScope-fail.com' ]); + values(?, ?, now(), now())`, + [ table, prefix + 'beginTransactionScopeCtx1', prefix + 'm@beginTransactionScope-success.com' ]); + return conn; + }); + }; + + const nestedScope = async () => { + return await db.beginTransactionScope(async conn => { + const nested = await nestedNestedScope(); await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) - valuefail(?, ?, now(), now())`, - [ table, prefix + 'beginTransactionScope-fail12', 'm@beginTransactionScope-fail.com' ]); - return true; + values(?, ?, now(), now())`, + [ table, prefix + 'beginTransactionScopeCtx2', prefix + 'm@beginTransactionScope-success.com' ]); + return [ conn, nested ]; }); - throw new Error('should not run this'); - } catch (err) { - assert.equal(err.code, 'ER_PARSE_ERROR'); - } + }; + + const scope = async () => { + return await db.beginTransactionScope(async conn => { + const nested = await nestedScope(); + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + values(?, ?, now(), now())`, + [ table, prefix + 'beginTransactionScopeCtx3', prefix + 'm@beginTransactionScope-success.com' ]); + return [ conn, ...nested ]; + }); + }; + + const parallelScope = async () => { + return await db.beginTransactionScope(async conn => { + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + values(?, ?, now(), now())`, + [ table, prefix + 'beginTransactionScopeCtx4', prefix + 'm@beginTransactionScope-success.com' ]); + return conn; + }); + }; + + const [[ conn1, conn2, conn3 ], conn4 ] = await Promise.all([ scope(), parallelScope() ]); + assert.strictEqual(conn1, conn2); + assert.strictEqual(conn2, conn3); + assert.notStrictEqual(conn1, conn4); const rows = await db.query('select * from ?? where email=? order by id', - [ table, prefix + 'm@beginTransactionScope-fail.com' ]); - assert.equal(rows.length, 0); + [ table, prefix + 'm@beginTransactionScope-success.com' ]); + assert.equal(rows.length, 4); + assert.strictEqual(mmSpy(RDSTransaction.prototype.commit).called, 2); }); - describe('beginTransactionScope(fn, ctx)', () => { - it('should insert 7 rows in a transaction with ctx', async () => { - const ctx = {} as any; - async function hiInsert() { - return await db.beginTransactionScope(async conn => { - await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) - values(?, ?, now(), now())`, - [ table, prefix + 'beginTransactionScopeCtx3', prefix + 'm@beginTransactionScopeCtx1.com' ]); - await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) - values(?, ?, now(), now())`, - [ table, prefix + 'beginTransactionScopeCtx4', prefix + 'm@beginTransactionScopeCtx1.com' ]); - return true; - }, ctx); - } + it('should multiple instances works', async () => { + mm.spy(RDSTransaction.prototype, 'commit'); + const db2 = new RDSClient(config); - async function fooInsert() { - return await db.beginTransactionScope(async conn => { - await hiInsert(); + const [ conn1, conn2 ] = await Promise.all([ + db.beginTransactionScope(async conn => { + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + values(?, ?, now(), now())`, + [ table, prefix + 'multiple-instance1', prefix + 'm@multiple-instance.com' ]); + return conn; + }), + db2.beginTransactionScope(async conn => { + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + values(?, ?, now(), now())`, + [ table, prefix + 'multiple-instance2', prefix + 'm@multiple-instance.com' ]); + return conn; + }), + ]); - await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) - values(?, ?, now(), now())`, - [ table, prefix + 'beginTransactionScopeCtx5', prefix + 'm@beginTransactionScopeCtx1.com' ]); - await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) - values(?, ?, now(), now())`, - [ table, prefix + 'beginTransactionScopeCtx6', prefix + 'm@beginTransactionScopeCtx1.com' ]); - return true; - }, ctx); - } + assert.notStrictEqual(conn1, conn2); + assert.strictEqual(mmSpy(RDSTransaction.prototype.commit).called, 2); - async function barInsert() { - return await db.beginTransactionScope(async conn => { - await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) - values(?, ?, now(), now())`, - [ table, prefix + 'beginTransactionScopeCtx7', prefix + 'm@beginTransactionScopeCtx1.com' ]); - return true; - }, ctx); - } + const rows = await db.query('select * from ?? where email=? order by id', + [ table, prefix + 'm@multiple-instance.com' ]); + assert.equal(rows.length, 2); + + await db2.end(); + }); + + it('should multiple instances with the same asyncLocalStorage works', async () => { + mm.spy(RDSTransaction.prototype, 'commit'); - const result = await db.beginTransactionScope(async conn => { + const storage = new AsyncLocalStorage(); + const db1 = new RDSClient({ + ...config, + connectionStorage: storage, + connectionStorageKey: 'datasource1', + }); + const db2 = new RDSClient({ + ...config, + connectionStorage: storage, + connectionStorageKey: 'datasource2', + }); + + const [ conn1, conn2 ] = await Promise.all([ + db1.beginTransactionScope(async conn => { await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) values(?, ?, now(), now())`, - [ table, prefix + 'beginTransactionScopeCtx1', prefix + 'm@beginTransactionScopeCtx1.com' ]); + [ table, prefix + 'multiple-instance-als1', prefix + 'm@multiple-instance-als.com' ]); + return conn; + }), + db2.beginTransactionScope(async conn => { await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) values(?, ?, now(), now())`, - [ table, prefix + 'beginTransactionScopeCtx2', prefix + 'm@beginTransactionScopeCtx1.com' ]); + [ table, prefix + 'multiple-instance-als2', prefix + 'm@multiple-instance-als.com' ]); + return conn; + }), + ]); - // test query one - const row = await conn.queryOne('select * from ?? where name=?', - [ table, prefix + 'beginTransactionScopeCtx1' ]); - assert(row); - assert.equal(row.name, prefix + 'beginTransactionScopeCtx1'); + assert.notStrictEqual(conn1, conn2); + assert.strictEqual(mmSpy(RDSTransaction.prototype.commit).called, 2); - const fooResult = await fooInsert(); - assert.equal(fooResult, true); - const barResult = await barInsert(); - assert.equal(barResult, true); + const rows = await db.query('select * from ?? where email=? order by id', + [ table, prefix + 'm@multiple-instance-als.com' ]); + assert.equal(rows.length, 2); - return true; - }, ctx); - - assert.equal(result, true); - - const rows = await db.query('select * from ?? where email=? order by id', - [ table, prefix + 'm@beginTransactionScopeCtx1.com' ]); - assert.equal(rows.length, 7); - assert.equal(rows[0].name, prefix + 'beginTransactionScopeCtx1'); - assert.equal(rows[1].name, prefix + 'beginTransactionScopeCtx2'); - assert.equal(rows[2].name, prefix + 'beginTransactionScopeCtx3'); - assert.equal(rows[3].name, prefix + 'beginTransactionScopeCtx4'); - assert.equal(rows[4].name, prefix + 'beginTransactionScopeCtx5'); - assert.equal(rows[5].name, prefix + 'beginTransactionScopeCtx6'); - assert.equal(rows[6].name, prefix + 'beginTransactionScopeCtx7'); - assert.equal(ctx._transactionConnection, null); - assert.equal(ctx._transactionScopeCount, 0); - }); - - it('should auto rollback on fail', async () => { - const ctx = {} as any; - async function fooInsert() { - return await db.beginTransactionScope(async conn => { - await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) - values(?, ?, now(), now())`, - [ table, prefix + 'beginTransactionScope-ctx-fail1', prefix + 'm@beginTransactionScope-ctx-fail1.com' ]); - await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) - values(?, ?, now(), now())`, - [ table, prefix + 'beginTransactionScope-ctx-fail2', prefix + 'm@beginTransactionScope-ctx-fail1.com' ]); - return true; - }, ctx); - } + await db1.end(); + await db2.end(); + }); + }); - async function barInsert() { - return await db.beginTransactionScope(async conn => { - await fooInsert(); - await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + describe('beginDoomedTransactionScope(scope)', () => { + it('should rollback when query success', async () => { + mm.spy(RDSTransaction.prototype, 'rollback'); + const inner = async () => { + return await db.beginDoomedTransactionScope(async conn => { + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) values(?, ?, now(), now())`, - [ table, prefix + 'beginTransactionScope-ctx-fail3', prefix + 'm@beginTransactionScope-ctx-fail1.com' ]); - return true; - }, ctx); - } + [ table, prefix + 'beginDoomedTransactionScopeCtx1', prefix + 'm@beginDoomedTransactionScope-success.com' ]); + return conn; + }); + }; - try { - await db.beginTransactionScope(async conn => { - await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) - values(?, ?, now(), now())`, - [ table, prefix + 'beginTransactionScope-ctx-fail1', prefix + 'm@beginTransactionScope-ctx-fail1.com' ]); - await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) - values(?, ?, now(), now())`, - [ table, prefix + 'beginTransactionScope-ctx-fail2', prefix + 'm@beginTransactionScope-ctx-fail1.com' ]); + const [ conn, nestedConn ] = await db.beginDoomedTransactionScope(async conn => { + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + values(?, ?, now(), now())`, + [ table, prefix + 'beginDoomedTransactionScopeCtx2', prefix + 'm@beginDoomedTransactionScope-success.com' ]); + const nestedConn = await inner(); + return [ conn, nestedConn ]; + }); - await barInsert(); - throw new Error('should not run this'); - }, ctx); - } catch (err) { - assert.equal(err.code, 'ER_DUP_ENTRY'); - } + assert.strictEqual(conn, nestedConn); + assert.strictEqual(mmSpy(RDSTransaction.prototype.rollback).called, 1); - const rows = await db.query('select * from ?? where email=? order by id', - [ table, prefix + 'm@beginTransactionScope-ctx-fail1.com' ]); - assert.equal(rows.length, 0); - assert.equal(ctx._transactionConnection, null); - assert.equal(ctx._transactionScopeCount, 3); - }); - }); - - it('should safe with await Array', async () => { - const ctx = {}; - await Promise.all([ - await db.beginTransactionScope(async conn => { - await conn.query( - 'INSERT INTO `ali-sdk-test-user` (name, email, mobile) values(?, ?, "12345678901")', - [ prefix + 'should-safe-with-yield-array-1', prefix + 'm@should-safe-with-yield-array-1.com' ]); - await setTimeout(100); - }, ctx), - await db.beginTransactionScope(async conn => { - await conn.query( - 'INSERT INTO `ali-sdk-test-user` (name, email, mobile) values(?, ?, "12345678901")', - [ prefix + 'should-safe-with-yield-array-2', prefix + 'm@should-safe-with-yield-array-1.com' ]); - await setTimeout(200); - }, ctx), - ]); - const rows = await db.query( - 'SELECT * FROM `ali-sdk-test-user` where name like "%should-safe-with-yield-array%"'); - assert(rows.length === 2); + const rows = await db.query('select * from ?? where email=? order by id', + [ table, prefix + 'm@beginDoomedTransactionScope-success.com' ]); + assert.equal(rows.length, 0); + }); + + it('should rollback when query fail', async () => { + mm.spy(RDSTransaction.prototype, 'rollback'); + const inner = async () => { + return await db.beginDoomedTransactionScope(async conn => { + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + valuefail(?, ?, now(), now())`, + [ table, prefix + 'beginDoomedTransactionScopeCtx1', prefix + 'm@beginDoomedTransactionScope-fail.com' ]); + }); + }; + + await assert.rejects( + db.beginDoomedTransactionScope(async conn => { + await conn.query(`insert into ??(name, email, gmt_create, gmt_modified) + values(?, ?, now(), now())`, + [ table, prefix + 'beginDoomedTransactionScopeCtx2', prefix + 'm@beginDoomedTransactionScope-fail.com' ]); + await inner(); + }), + (err: any) => err.code === 'ER_PARSE_ERROR', + ); + + assert.strictEqual(mmSpy(RDSTransaction.prototype.rollback).called, 1); + + const rows = await db.query('select * from ?? where email=? order by id', + [ table, prefix + 'm@beginDoomedTransactionScope-fail.com' ]); + assert.equal(rows.length, 0); }); });