Skip to content

Commit

Permalink
Support ECMAScript resource management in Node API
Browse files Browse the repository at this point in the history
[Symbol.dispose]() implementation added to Gateway and
CloseableAsyncIterable, which allows the `using` keyword to be used to
automatically close chaincode and block event iterables when they go out
of scope instead of explcitly calling close with a try/finally block.

Signed-off-by: Mark S. Lewis <[email protected]>
  • Loading branch information
bestbeforetoday committed Sep 30, 2023
1 parent 59ddd68 commit 2f2a564
Show file tree
Hide file tree
Showing 15 changed files with 129 additions and 19 deletions.
17 changes: 15 additions & 2 deletions node/src/blockevents.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import { common, ledger, msp, orderer, peer } from '@hyperledger/fabric-protos';
import { CloseableAsyncIterable } from '.';
import { BlockEventsOptions } from './blockeventsbuilder';
import { BlockAndPrivateDataEventsRequest, BlockEventsRequest, FilteredBlockEventsRequest } from './blockeventsrequest';
import { assertDefined, Gateway, internalConnect, InternalConnectOptions } from './gateway';
import * as checkpointers from './checkpointers';
import { Gateway, InternalConnectOptions, assertDefined, internalConnect } from './gateway';
import { GatewayError } from './gatewayerror';
import { Identity } from './identity/identity';
import { Network } from './network';
import { DuplexStreamResponseStub, MockGatewayGrpcClient, newDuplexStreamResponse, readElements } from './testutils.test';
import * as checkpointers from './checkpointers';

function assertStartPositionToBeSpecified(seekInfo: orderer.SeekInfo, blockNumber: number): void {
const start = seekInfo.getStart();
Expand Down Expand Up @@ -435,5 +435,18 @@ describe('Block Events', () => {

expect(stream.cancel).toHaveBeenCalled();
});

it('resource clean-up cancels gRPC stream', async () => {
const responses = [testCase.newBlockResponse(1), testCase.newBlockResponse(2)];
const stream = newDuplexStreamResponse<common.Envelope, peer.DeliverResponse>(responses);
testCase.mockResponse(stream);

{
// @ts-expect-error Assigned to unused variable for resource cleanup
using events = await testCase.getEvents(); // eslint-disable-line @typescript-eslint/no-unused-vars
}

expect(stream.cancel).toHaveBeenCalled();
});
}));
});
6 changes: 6 additions & 0 deletions node/src/blockeventsrequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ class SignableBlockEventsRequest implements Signable {
}
}

// @ts-expect-error Polyfill for Symbol.dispose if not present
Symbol.dispose ??= Symbol('Symbol.dispose');

export class BlockEventsRequestImpl extends SignableBlockEventsRequest implements BlockEventsRequest {
readonly #client: GatewayClient;

Expand All @@ -150,6 +153,7 @@ export class BlockEventsRequestImpl extends SignableBlockEventsRequest implement
response => getBlock(response, () => response.getBlock()),
),
close: () => responses.close(),
[Symbol.dispose]: () => responses.close(),
};
}
}
Expand All @@ -171,6 +175,7 @@ export class FilteredBlockEventsRequestImpl extends SignableBlockEventsRequest i
response => getBlock(response, () => response.getFilteredBlock()),
),
close: () => responses.close(),
[Symbol.dispose]: () => responses.close(),
};
}
}
Expand All @@ -192,6 +197,7 @@ export class BlockAndPrivateDataEventsRequestImpl extends SignableBlockEventsReq
response => getBlock(response, () => response.getBlockAndPrivateData()),
),
close: () => responses.close(),
[Symbol.dispose]: () => responses.close(),
};
}
}
Expand Down
8 changes: 5 additions & 3 deletions node/src/chaincodeevent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ export interface ChaincodeEvent {
payload: Uint8Array;
}

// @ts-expect-error Polyfill for Symbol.dispose if not present
Symbol.dispose ??= Symbol('Symbol.dispose');

export function newChaincodeEvents(responses: CloseableAsyncIterable<gateway.ChaincodeEventsResponse>): CloseableAsyncIterable<ChaincodeEvent> {
return {
async* [Symbol.asyncIterator]() { // eslint-disable-line @typescript-eslint/require-await
Expand All @@ -48,9 +51,8 @@ export function newChaincodeEvents(responses: CloseableAsyncIterable<gateway.Cha
}
}
},
close: () => {
responses.close();
},
close: () => responses.close(),
[Symbol.dispose]: () => responses.close(),
};
}

Expand Down
16 changes: 14 additions & 2 deletions node/src/chaincodeevents.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { CallOptions, Metadata, ServiceError, status } from '@grpc/grpc-js';
import { gateway as gatewayproto, orderer, peer } from '@hyperledger/fabric-protos';
import { ChaincodeEvent } from './chaincodeevent';
import * as checkpointers from './checkpointers';
import { Gateway, internalConnect, InternalConnectOptions } from './gateway';
import { Gateway, InternalConnectOptions, internalConnect } from './gateway';
import { GatewayError } from './gatewayerror';
import { Identity } from './identity/identity';
import { Network } from './network';
Expand All @@ -30,7 +30,7 @@ function newChaincodeEvent(blockNumber: number, event: peer.ChaincodeEvent): Cha
};
}

interface ExpectedRequest{
interface ExpectedRequest {
channelName: string;
chaincodeName: string;
typeCase: orderer.SeekPosition.TypeCase;
Expand Down Expand Up @@ -370,5 +370,17 @@ describe('Chaincode Events', () => {
cause: serviceError,
});
});

it('resource clean-up cancels gRPC client stream', async () => {
const responseStream = newServerStreamResponse([ response1, response2 ]);
client.mockChaincodeEventsResponse(responseStream);

{
// @ts-expect-error Assigned to unused variable for resource cleanup
using events = await network.getChaincodeEvents('CHAINCODE'); // eslint-disable-line @typescript-eslint/no-unused-vars
}

expect(responseStream.cancel).toHaveBeenCalled();
});
});
});
13 changes: 13 additions & 0 deletions node/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,25 @@ export interface GatewayClient {
blockAndPrivateDataEvents(request: common.Envelope, options?: CallOptions): CloseableAsyncIterable<peer.DeliverResponse>;
}

// @ts-expect-error Polyfill for Symbol.dispose if not present
Symbol.dispose ??= Symbol('Symbol.dispose');

/**
* An async iterable that can be closed when the consumer does not want to read any more elements, freeing up resources
* that may be held by the iterable.
*
* This type implements the Disposable interface and instances can be disposed of with ECMAScript explicit resource
* management and the `using` keyword instead of calling {@link close} explicitly.
*
* @see [ECMAScript explicit resource management](https://github.com/tc39/proposal-explicit-resource-management)
*/
export interface CloseableAsyncIterable<T> extends AsyncIterable<T> {
/**
* Close the iterable to free up resources when no more elements are required.
*/
close(): void;

[Symbol.dispose](): void;
}

/**
Expand Down Expand Up @@ -157,6 +167,7 @@ class GatewayClientImpl implements GatewayClient {
return {
[Symbol.asyncIterator]: () => wrapAsyncIterator(serverStream[Symbol.asyncIterator]()),
close: () => serverStream.cancel(),
[Symbol.dispose]: () => serverStream.cancel(),
};
} catch (err) {
rethrowGrpcError(err);
Expand Down Expand Up @@ -202,6 +213,8 @@ class GatewayClientImpl implements GatewayClient {
return {
[Symbol.asyncIterator]: () => wrapAsyncIterator(duplexStream[Symbol.asyncIterator]()),
close: () => duplexStream.cancel(),
[Symbol.dispose]: () => duplexStream.cancel(),

};
} catch (err) {
rethrowGrpcError(err);
Expand Down
6 changes: 5 additions & 1 deletion node/src/commiterror.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ export class CommitError extends Error {
*/
transactionId: string;

constructor(properties: Readonly<Omit<CommitError, keyof Error> & Partial<Pick<Error, 'message'>>>) {
constructor(properties: Readonly<{
code: peer.TxValidationCodeMap[keyof peer.TxValidationCodeMap];
transactionId: string;
message?: string;
}>) {
super(properties.message);

this.name = CommitError.name;
Expand Down
11 changes: 9 additions & 2 deletions node/src/commitstatuserror.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { GatewayError } from './gatewayerror';
import { ServiceError } from '@grpc/grpc-js';
import { ErrorDetail, GatewayError } from './gatewayerror';

/**
* CommitStatusError is thrown when a failure occurs obtaining the commit status of a transaction.
Expand All @@ -15,7 +16,13 @@ export class CommitStatusError extends GatewayError {
*/
transactionId: string;

constructor(properties: Readonly<Omit<CommitStatusError, keyof Error> & Partial<Pick<Error, 'message'>>>) {
constructor(properties: Readonly<{
code: number;
details: ErrorDetail[];
cause: ServiceError;
transactionId: string;
message?: string;
}>) {
super(properties);

this.name = CommitStatusError.name;
Expand Down
11 changes: 9 additions & 2 deletions node/src/endorseerror.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { GatewayError } from './gatewayerror';
import { ServiceError } from '@grpc/grpc-js';
import { ErrorDetail, GatewayError } from './gatewayerror';

/**
* EndorseError is thrown when a failure occurs endorsing a transaction proposal.
Expand All @@ -15,7 +16,13 @@ export class EndorseError extends GatewayError {
*/
transactionId: string;

constructor(properties: Readonly<Omit<EndorseError, keyof Error> & Partial<Pick<Error, 'message'>>>) {
constructor(properties: Readonly<{
code: number;
details: ErrorDetail[];
cause: ServiceError;
transactionId: string;
message?: string;
}>) {
super(properties);

this.name = EndorseError.name;
Expand Down
15 changes: 15 additions & 0 deletions node/src/gateway.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,20 @@ describe('Gateway', () => {

expect(closeStub).not.toHaveBeenCalled();
});

it('called by resource clean-up', () => {
const client = new grpc.Client('example.org:1337', grpc.credentials.createInsecure());
const options: ConnectOptions = {
identity,
client,
};
const closeStub = jest.fn();
{
// @ts-expect-error Assigned to unused variable for resource cleanup
using gateway = Object.assign(connect(options), { close: closeStub }); // eslint-disable-line @typescript-eslint/no-unused-vars
}

expect(closeStub).toHaveBeenCalled();
});
});
});
14 changes: 14 additions & 0 deletions node/src/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,17 @@ export function internalConnect(options: Readonly<InternalConnectOptions>): Gate
return new GatewayImpl(gatewayClient, signingIdentity);
}

// @ts-expect-error Polyfill for Symbol.dispose if not present
Symbol.dispose ??= Symbol('Symbol.dispose');

/**
* Gateway represents the connection of a specific client identity to a Fabric Gateway. A Gateway is obtained using the
* {@link connect} function.
*
* This type implements the Disposable interface and instances can be disposed of with ECMAScript explicit resource
* management and the `using` keyword instead of calling {@link close} explicitly.
*
* @see [ECMAScript explicit resource management](https://github.com/tc39/proposal-explicit-resource-management)
*/
export interface Gateway {
/**
Expand Down Expand Up @@ -284,6 +292,8 @@ export interface Gateway {
* contracts obtained using the Gateway, including removing event listeners.
*/
close(): void;

[Symbol.dispose](): void;
}

class GatewayImpl implements Gateway {
Expand Down Expand Up @@ -450,6 +460,10 @@ class GatewayImpl implements Gateway {
close(): void {
// Nothing for now
}

[Symbol.dispose](): void {
this.close();
}
}

export function assertDefined<T>(value: T | null | undefined, message: string): T {
Expand Down
7 changes: 6 additions & 1 deletion node/src/gatewayerror.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ export class GatewayError extends Error {
*/
cause: ServiceError;

constructor(properties: Readonly<Omit<GatewayError, keyof Error> & Partial<Pick<Error, 'message'>>>) {
constructor(properties: Readonly<{
code: number;
details: ErrorDetail[];
cause: ServiceError;
message?: string;
}>) {
super(properties.message);

this.name = GatewayError.name;
Expand Down
6 changes: 3 additions & 3 deletions node/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@
*/

export { BlockEventsOptions } from './blockeventsbuilder';
export { BlockEventsRequest, BlockAndPrivateDataEventsRequest, FilteredBlockEventsRequest } from './blockeventsrequest';
export { BlockAndPrivateDataEventsRequest, BlockEventsRequest, FilteredBlockEventsRequest } from './blockeventsrequest';
export { ChaincodeEvent } from './chaincodeevent';
export { ChaincodeEventsOptions } from './chaincodeeventsbuilder';
export { ChaincodeEventsRequest } from './chaincodeeventsrequest';
export { Checkpoint, Checkpointer } from './checkpointer';
export * as checkpointers from './checkpointers';
export { ChaincodeEventsRequest } from './chaincodeeventsrequest';
export { CloseableAsyncIterable } from './client';
export { Commit } from './commit';
export { CommitError } from './commiterror';
export { CommitStatusError } from './commitstatuserror';
export { Contract } from './contract';
export { EndorseError } from './endorseerror';
export { EventsOptions } from './eventsbuilder';
export { connect, ConnectOptions, Gateway, GrpcClient } from './gateway';
export { ConnectOptions, Gateway, GrpcClient, connect } from './gateway';
export { ErrorDetail, GatewayError } from './gatewayerror';
export { Hash } from './hash/hash';
export * as hash from './hash/hashes';
Expand Down
11 changes: 9 additions & 2 deletions node/src/submiterror.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { GatewayError } from './gatewayerror';
import { ServiceError } from '@grpc/grpc-js';
import { ErrorDetail, GatewayError } from './gatewayerror';

/**
* SubmitError is thrown when a failure occurs submitting an endorsed transaction to the orderer.
Expand All @@ -15,7 +16,13 @@ export class SubmitError extends GatewayError {
*/
transactionId: string;

constructor(properties: Readonly<Omit<SubmitError, keyof Error> & Partial<Pick<Error, 'message'>>>) {
constructor(properties: Readonly<{
code: number;
details: ErrorDetail[];
cause: ServiceError;
transactionId: string;
message?: string;
}>) {
super(properties);

this.name = SubmitError.name;
Expand Down
6 changes: 5 additions & 1 deletion node/src/testutils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { common, gateway, peer } from '@hyperledger/fabric-protos';
import fs from 'fs';
import os from 'os';
import path from 'path';
import { chaincodeEventsMethod, CloseableAsyncIterable, commitStatusMethod, deliverFilteredMethod, deliverMethod, deliverWithPrivateDataMethod, DuplexStreamResponse, endorseMethod, evaluateMethod, GatewayGrpcClient, ServerStreamResponse, submitMethod } from './client';
import { CloseableAsyncIterable, DuplexStreamResponse, GatewayGrpcClient, ServerStreamResponse, chaincodeEventsMethod, commitStatusMethod, deliverFilteredMethod, deliverMethod, deliverWithPrivateDataMethod, endorseMethod, evaluateMethod, submitMethod } from './client';

/* eslint-disable jest/no-export */

Expand Down Expand Up @@ -320,9 +320,13 @@ export interface CloseableAsyncIterableStub<T> extends CloseableAsyncIterable<T>
close: jest.Mock<void, void[]>;
}

// @ts-expect-error Polyfill for Symbol.dispose if not present
Symbol.dispose ??= Symbol('Symbol.dispose');

export function newCloseableAsyncIterable<T>(values: T[]): CloseableAsyncIterableStub<T> {
return Object.assign(newAsyncIterable(values), {
close: jest.fn<void, void[]>(),
[Symbol.dispose]: jest.fn<void, void[]>(),
});
}

Expand Down
1 change: 1 addition & 0 deletions node/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"$schema": "https://json.schemastore.org/tsconfig",
"extends": "@tsconfig/node16/tsconfig.json",
"compilerOptions": {
"lib": ["es2021", "esnext.disposable"],
"declaration": true,
"declarationMap": true,
"sourceMap": true,
Expand Down

0 comments on commit 2f2a564

Please sign in to comment.