Skip to content

Commit

Permalink
feat: use AsyncLocalStorage to refactor transaction, to make it more …
Browse files Browse the repository at this point in the history
…safe (#108)

BREAKING CHANGE: In `Promise.all` case, Parallel beginTransactionScope will create isolated transactions.
  • Loading branch information
gxkl authored Jun 3, 2023
1 parent b364afb commit ae327fa
Show file tree
Hide file tree
Showing 5 changed files with 467 additions and 208 deletions.
33 changes: 13 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ const db = new RDSClient({
// before returning an error from getConnection.
// If set to 0, there is no limit to the number of queued connection requests. (Default: 0)
// queueLimit: 0,
// Set asyncLocalStorage manually for transaction
// connectionStorage: new AsyncLocalStorage(),
// If create multiple RDSClient instances with the same connectionStorage, use this key to distinguish between the instances
// connectionStorageKey: 'datasource',
});
```

Expand Down Expand Up @@ -309,28 +313,17 @@ const result = await db.beginTransactionScope(async conn => {
// if error throw on scope, will auto rollback
```

#### Transaction on koa

API: `async beginTransactionScope(scope, ctx)`

Use koa's context to make sure only one active transaction on one ctx.
In `Promise.all` case, Parallel beginTransactionScope will create isolated transactions.

```js
async function foo(ctx, data1) {
return await db.beginTransactionScope(async conn => {
await conn.insert(table1, data1);
return { success: true };
}, ctx);
}

async function bar(ctx, data2) {
return await db.beginTransactionScope(async conn => {
// execute foo with the same transaction scope
await foo(ctx, { foo: 'bar' });
await conn.insert(table2, data2);
return { success: true };
}, ctx);
}
const result = await Promise.all([
db.beginTransactionScope(async conn => {
// commit and success
}),
db.beginTransactionScope(async conn => {
// throw err and rollback
}),
])
```

### Raw Queries
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"eslint": "^8.29.0",
"eslint-config-egg": "^12.1.0",
"git-contributor": "^2.0.0",
"mm": "^3.3.0",
"typescript": "^4.9.5"
},
"homepage": "https://github.com/ali-sdk/ali-rds",
Expand Down
163 changes: 118 additions & 45 deletions src/client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { AsyncLocalStorage } from 'node:async_hooks';
import { promisify } from 'node:util';
import mysql from 'mysql';
import type { PoolConfig, Pool } from 'mysql';
import type { PoolConnectionPromisify } from './types';
import type { Pool } from 'mysql';
import type { PoolConnectionPromisify, RDSClientOptions, TransactionContext, TransactionScope } from './types';
import { Operator } from './operator';
import { RDSConnection } from './connection';
import { RDSTransaction } from './transaction';
Expand All @@ -24,17 +25,26 @@ export class RDSClient extends Operator {
static get format() { return mysql.format; }
static get raw() { return mysql.raw; }

static #DEFAULT_STORAGE_KEY = Symbol('RDSClient#storage#default');
static #TRANSACTION_NEST_COUNT = Symbol('RDSClient#transaction#nestCount');

#pool: PoolPromisify;
constructor(options: PoolConfig) {
#connectionStorage: AsyncLocalStorage<TransactionContext>;
#connectionStorageKey: string | symbol;

constructor(options: RDSClientOptions) {
super();
this.#pool = mysql.createPool(options) as unknown as PoolPromisify;
const { connectionStorage, connectionStorageKey, ...mysqlOptions } = options;
this.#pool = mysql.createPool(mysqlOptions) as unknown as PoolPromisify;
[
'query',
'getConnection',
'end',
].forEach(method => {
this.#pool[method] = promisify(this.#pool[method]);
});
this.#connectionStorage = connectionStorage || new AsyncLocalStorage();
this.#connectionStorageKey = connectionStorageKey || RDSClient.#DEFAULT_STORAGE_KEY;
}

// impl Operator._query
Expand Down Expand Up @@ -92,6 +102,7 @@ export class RDSClient extends Operator {
throw err;
}
const tran = new RDSTransaction(conn);
tran[RDSClient.#TRANSACTION_NEST_COUNT] = 1;
if (this.beforeQueryHandlers.length > 0) {
for (const handler of this.beforeQueryHandlers) {
tran.beforeQuery(handler);
Expand All @@ -109,75 +120,137 @@ export class RDSClient extends Operator {
* Auto commit or rollback on a transaction scope
*
* @param {Function} scope - scope with code
* @param {Object} [ctx] - transaction env context, like koa's ctx.
* To make sure only one active transaction on this ctx.
* @param {Object} [ctx] - transaction context
* @return {Object} - scope return result
*/
async beginTransactionScope(scope: (transaction: RDSTransaction) => Promise<any>, ctx?: any): Promise<any> {
ctx = ctx || {};
if (!ctx._transactionConnection) {
// Create only one conn if concurrent call `beginTransactionScope`
ctx._transactionConnection = this.beginTransaction();
}
const tran = await ctx._transactionConnection;

if (!ctx._transactionScopeCount) {
ctx._transactionScopeCount = 1;
async #beginTransactionScope(scope: TransactionScope, ctx: TransactionContext): Promise<any> {
let tran: RDSTransaction;
let shouldRelease = false;
if (!ctx[this.#connectionStorageKey]) {
// there is no transaction in ctx, create a new one
tran = await this.beginTransaction();
ctx[this.#connectionStorageKey] = tran;
shouldRelease = true;
} else {
ctx._transactionScopeCount++;
// use transaction in ctx
tran = ctx[this.#connectionStorageKey]!;
tran[RDSClient.#TRANSACTION_NEST_COUNT]++;
}

let result: any;
let scopeError: any;
let internalError: any;
try {
const result = await scope(tran);
ctx._transactionScopeCount--;
if (ctx._transactionScopeCount === 0) {
ctx._transactionConnection = null;
await tran.commit();
result = await scope(tran);
} catch (err: any) {
scopeError = err;
}
tran[RDSClient.#TRANSACTION_NEST_COUNT]--;

// null connection means the nested scope has been rollback, we can do nothing here
if (tran.conn) {
try {
// execution error, should rollback
if (scopeError) {
await tran.rollback();
} else if (tran[RDSClient.#TRANSACTION_NEST_COUNT] < 1) {
// nestedCount smaller than 1 means all the nested scopes have executed successfully
await tran.commit();
}
} catch (err) {
internalError = err;
}
return result;
} catch (err) {
if (ctx._transactionConnection) {
ctx._transactionConnection = null;
await tran.rollback();
}

// remove transaction in ctx
if (shouldRelease && tran[RDSClient.#TRANSACTION_NEST_COUNT] < 1) {
ctx[this.#connectionStorageKey] = null;
}

if (internalError) {
if (scopeError) {
internalError.cause = scopeError;
}
throw err;
throw internalError;
}
if (scopeError) {
throw scopeError;
}
return result;
}

/**
* Auto commit or rollback on a transaction scope
*
* @param scope - scope with code
* @return {Object} - scope return result
*/
async beginTransactionScope(scope: TransactionScope) {
let ctx = this.#connectionStorage.getStore();
if (ctx) {
return await this.#beginTransactionScope(scope, ctx);
}
ctx = {};
return await this.#connectionStorage.run(ctx, async () => {
return await this.#beginTransactionScope(scope, ctx!);
});
}

/**
* doomed to be rollbacked after transaction scope
* useful on writing tests which are related with database
*
* @param {Function} scope - scope with code
* @param {Object} [ctx] - transaction env context, like koa's ctx.
* To make sure only one active transaction on this ctx.
* @param scope - scope with code
* @param ctx - transaction context
* @return {Object} - scope return result
*/
async beginDoomedTransactionScope(scope: (transaction: RDSTransaction) => Promise<any>, ctx?: any): Promise<any> {
ctx = ctx || {};
if (!ctx._transactionConnection) {
ctx._transactionConnection = await this.beginTransaction();
ctx._transactionScopeCount = 1;
async #beginDoomedTransactionScope(scope: TransactionScope, ctx: TransactionContext): Promise<any> {
let tran: RDSTransaction;
if (!ctx[this.#connectionStorageKey]) {
// there is no transaction in ctx, create a new one
tran = await this.beginTransaction();
ctx[this.#connectionStorageKey] = tran;
} else {
ctx._transactionScopeCount++;
// use transaction in ctx
tran = ctx[this.#connectionStorageKey]!;
tran[RDSClient.#TRANSACTION_NEST_COUNT]++;
}
const tran = ctx._transactionConnection;

try {
const result = await scope(tran);
ctx._transactionScopeCount--;
if (ctx._transactionScopeCount === 0) {
ctx._transactionConnection = null;
tran[RDSClient.#TRANSACTION_NEST_COUNT]--;
if (tran[RDSClient.#TRANSACTION_NEST_COUNT] === 0) {
ctx[this.#connectionStorageKey] = null;
await tran.rollback();
}
return result;
} catch (err) {
if (ctx._transactionConnection) {
ctx._transactionConnection = null;
if (ctx[this.#connectionStorageKey]) {
ctx[this.#connectionStorageKey] = null;
await tran.rollback();
}
throw err;
} finally {
await tran.rollback();
}
}

/**
* doomed to be rollbacked after transaction scope
* useful on writing tests which are related with database
*
* @param scope - scope with code
* @return {Object} - scope return result
*/
async beginDoomedTransactionScope(scope: TransactionScope): Promise<any> {
let ctx = this.#connectionStorage.getStore();
if (ctx) {
return await this.#beginDoomedTransactionScope(scope, ctx);
}
ctx = {};
return await this.#connectionStorage.run(ctx, async () => {
return await this.#beginDoomedTransactionScope(scope, ctx!);
});
}

async end() {
await this.#pool.end();
}
Expand Down
12 changes: 11 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import type { PoolConnection } from 'mysql';
import { AsyncLocalStorage } from 'async_hooks';
import type { PoolConnection, PoolConfig } from 'mysql';
import { RDSTransaction } from './transaction';

export interface RDSClientOptions extends PoolConfig {
connectionStorageKey?: string;
connectionStorage?: AsyncLocalStorage<Record<PropertyKey, RDSTransaction>>;
}

export interface PoolConnectionPromisify extends Omit<PoolConnection, 'query'> {
query(sql: string): Promise<any>;
Expand Down Expand Up @@ -53,3 +60,6 @@ export type LockTableOption = {

export type BeforeQueryHandler = (sql: string) => string | undefined | void;
export type AfterQueryHandler = (sql: string, result: any, execDuration: number, err?: Error) => void;

export type TransactionContext = Record<PropertyKey, RDSTransaction | null>;
export type TransactionScope = (transaction: RDSTransaction) => Promise<any>;
Loading

0 comments on commit ae327fa

Please sign in to comment.