From ddf1d7153614e982cdecf31824b6f191f946bcab Mon Sep 17 00:00:00 2001 From: "Mark S. Lewis" Date: Thu, 27 Feb 2020 09:14:24 +0000 Subject: [PATCH] FABN-1491: Updated realtime block event listening (#144) Signed-off-by: Mark S. Lewis --- fabric-common/types/index.d.ts | 35 ++- fabric-network/index.js | 199 +++++++++++++- fabric-network/src/contract.js | 2 +- fabric-network/src/gateway.js | 43 +--- .../src/impl/event/asyncnotifier.ts | 29 +++ .../src/impl/event/baseeventlistener.js | 4 +- .../src/impl/event/blockeventlistener.js | 84 ------ .../src/impl/event/blockeventsource.ts | 142 ++++++++++ .../src/impl/event/blocklistener.ts | 10 + .../src/impl/event/blocklistenersession.ts | 45 ++++ .../src/impl/event/commitlistener.ts | 91 ------- .../src/impl/event/commitlistenersession.ts | 117 +++++++++ .../event/defaulteventhandlerstrategies.ts | 42 ++- .../src/impl/event/eventservicemanager.js | 41 ++- .../src/impl/event/orderedblockqueue.ts | 61 +++++ .../src/impl/event/transactioneventhandler.ts | 5 +- .../query/defaultqueryhandlerstrategies.ts | 34 +++ .../src/impl/query/{query.js => query.ts} | 66 ++--- fabric-network/src/impl/query/queryhandler.ts | 14 + .../src/impl/query/queryhandlerstrategies.js | 39 --- ...ryhandler.js => roundrobinqueryhandler.ts} | 37 +-- ...equeryhandler.js => singlequeryhandler.ts} | 35 +-- fabric-network/src/{logger.js => logger.ts} | 6 +- fabric-network/src/{network.js => network.ts} | 225 +++++++--------- fabric-network/src/transaction.js | 15 +- fabric-network/test/contract.js | 8 +- fabric-network/test/gateway.js | 2 +- .../test/impl/event/asyncnotifier.ts | 102 ++++++++ .../test/impl/event/baseeventlistener.js | 20 +- .../test/impl/event/blockeventlistener.js | 242 ------------------ .../test/impl/event/blocklistener.ts | 214 ++++++++++++++++ .../test/impl/event/commiteventlistener.js | 2 +- .../test/impl/event/commitlistener.ts | 20 +- .../test/impl/event/contracteventlistener.js | 2 +- .../event/defaulteventhandlerstrategies.ts | 4 +- .../test/impl/event/eventservicemanager.js | 32 +-- .../test/impl/event/orderedblockqueue.ts | 153 +++++++++++ .../test/impl/event/stubeventservice.ts | 28 +- .../impl/event/transactioneventhandler.ts | 6 +- ...es.js => defaultqueryhandlerstrategies.js} | 10 +- fabric-network/test/impl/query/query.js | 2 +- .../test/impl/query/queryhandlers.js | 54 ++-- fabric-network/test/network.js | 39 +-- fabric-network/test/testutils.ts | 45 ++++ fabric-network/test/transaction.js | 25 +- fabric-network/tsconfig-declaration.json | 16 +- fabric-network/tsconfig.json | 11 +- fabric-network/types/index.d.ts | 56 ++-- package-lock.json | 209 ++++++++++----- package.json | 11 +- test/ts-scenario/features/events.feature | 36 +-- test/ts-scenario/steps/lib/gateway.ts | 10 +- test/ts-scenario/steps/lib/listeners.ts | 56 ++-- .../steps/lib/utility/clientUtils.ts | 6 +- 54 files changed, 1733 insertions(+), 1109 deletions(-) create mode 100644 fabric-network/src/impl/event/asyncnotifier.ts delete mode 100644 fabric-network/src/impl/event/blockeventlistener.js create mode 100644 fabric-network/src/impl/event/blockeventsource.ts create mode 100644 fabric-network/src/impl/event/blocklistener.ts create mode 100644 fabric-network/src/impl/event/blocklistenersession.ts create mode 100644 fabric-network/src/impl/event/commitlistenersession.ts create mode 100644 fabric-network/src/impl/event/orderedblockqueue.ts create mode 100644 fabric-network/src/impl/query/defaultqueryhandlerstrategies.ts rename fabric-network/src/impl/query/{query.js => query.ts} (61%) create mode 100644 fabric-network/src/impl/query/queryhandler.ts delete mode 100644 fabric-network/src/impl/query/queryhandlerstrategies.js rename fabric-network/src/impl/query/{roundrobinqueryhandler.js => roundrobinqueryhandler.ts} (55%) rename fabric-network/src/impl/query/{singlequeryhandler.js => singlequeryhandler.ts} (57%) rename fabric-network/src/{logger.js => logger.ts} (70%) rename fabric-network/src/{network.js => network.ts} (50%) create mode 100644 fabric-network/test/impl/event/asyncnotifier.ts delete mode 100644 fabric-network/test/impl/event/blockeventlistener.js create mode 100644 fabric-network/test/impl/event/blocklistener.ts create mode 100644 fabric-network/test/impl/event/orderedblockqueue.ts rename fabric-network/test/impl/query/{queryhandlerstrategies.js => defaultqueryhandlerstrategies.js} (84%) create mode 100644 fabric-network/test/testutils.ts diff --git a/fabric-common/types/index.d.ts b/fabric-common/types/index.d.ts index 49ab1ac86d..d7877deaec 100644 --- a/fabric-common/types/index.d.ts +++ b/fabric-common/types/index.d.ts @@ -52,20 +52,31 @@ export interface UserConfig { roles?: string[]; } +export interface ConnectionInfo { + type: string; + name: string; + url: string; + options: object; +} +export interface ServiceError extends Error { + connection: ConnectionInfo; +} + export interface ProposalResponse { - errors: Error[]; + errors: ServiceError[]; responses: EndorsementResponse[]; queryResults: Buffer[]; } export interface EndorsementResponse { + connection: ConnectionInfo; response: { status: number; message: string; payload: Buffer; }; payload: Buffer; - endorsment: { + endorsement: { endorser: Buffer; signature: Buffer; }; @@ -113,8 +124,10 @@ export class DiscoveryHandler extends ServiceHandler { export class ServiceEndpoint { public readonly name: string; + public readonly mspid: string; + public readonly endpoint: Endpoint; constructor(name: string, client: Client, mspid?: string); - public connect(endpoint: Endpoint, options: ConnectOptions): Promise; + public connect(endpoint: Endpoint, options?: ConnectOptions): Promise; public disconnect(): void; public checkConnection(): Promise; public isTLS(): boolean; @@ -152,8 +165,8 @@ export class ServiceAction { export class Commit extends Proposal { constructor(chaincodeName: string, channel: Channel, endorsement: Endorsement); - public build(idContext: IdentityContext, request: any): Buffer; - public send(request: any, options: any): Promise; + public build(idContext: IdentityContext, request?: any): Buffer; + public send(request?: any): Promise; } export class Endorsement extends Proposal { @@ -171,8 +184,8 @@ export class Proposal extends ServiceAction { public buildProposalInterest(): any; public addCollectionInterest(collectionName: string): Proposal; public addChaincodeCollectionsInterest(collectionName: string, collectionNames: string[]): Proposal; - public build(idContext: IdentityContext, request: any): Buffer; - public send(request: any, options: any): Promise; + public build(idContext: IdentityContext, request?: any): Buffer; + public send(request?: any): Promise; public verifyProposalResponse(proposalResponse?: any): boolean; public compareProposalResponseResults(proposalResponses: any[]): boolean; } @@ -181,8 +194,8 @@ export class DiscoveryService extends ServiceAction { constructor(chaincodeName: string, channel: Channel); public setDiscoverer(discoverer: Discoverer): DiscoveryService; public newHandler(): DiscoveryHandler; - public build(idContext: IdentityContext, request: any): Buffer; - public send(request: any): Promise; + public build(idContext: IdentityContext, request?: any): Buffer; + public send(request?: any): Promise; public getDiscoveryResults(refresh?: boolean): Promise; public close(): void; } @@ -247,6 +260,7 @@ export class Client { public newEndpoint(options: ConnectOptions): Endpoint; public newEndorser(name: string, mspid?: string): Endorser; public getEndorser(name: string, mspid?: string): Endorser; + public getEndorsers(mspid?: string): Endorser[]; public newCommitter(name: string, mspid?: string): Committer; public getCommitter(name: string, mspid?: string): Committer; public newEventer(name: string, mspid?: string): Eventer; @@ -270,6 +284,9 @@ export interface ConnectOptions { } export class Channel { + readonly name: string; + readonly client: Client; + constructor(name: string, client: Client); public close(): void; diff --git a/fabric-network/index.js b/fabric-network/index.js index 5be4f2df0e..8b4ab563a2 100644 --- a/fabric-network/index.js +++ b/fabric-network/index.js @@ -143,11 +143,208 @@ * @returns {Promise} */ +/** + * Factory function to obtain transaction event handler instances. Called on every transaction submit. + * @typedef {Function} TxEventHandlerFactory + * @memberof module:fabric-network + * @param {string} transactionId The ID of the transaction being submitted. + * @param {module:fabric-network.Network} network The network on which this transaction is being submitted. + * @returns {module:fabric-network.TxEventHandler} A transaction event handler. + */ + +/** + * Handler used to wait for commit events when a transaction is submitted. + * @interface TxEventHandler + * @memberof module:fabric-network + */ +/** + * Resolves when the handler has started listening for transaction commit events. Called after the transaction proposal + * has been accepted and prior to submission of the transaction to the orderer. + * @function module:fabric-network.TxEventHandler#startListening + * @async + * @returns {Promise} + */ +/** + * Resolves (or rejects) when suitable transaction commit events have been received. Called after submission of the + * transaction to the orderer. + * @function module:fabric-network.TxEventHandler#waitForEvents + * @async + * @returns {Promise} + */ +/** + * Called if submission of the transaction to the orderer fails. + * @function module:fabric-network.TxEventHandler#cancelListening + * @returns {void} + */ + +/** + * Factory function to obtain query handler instances. Called on every network creation. + * @typedef {Function} QueryHandlerFactory + * @memberof module:fabric-network + * @param {module:fabric-network.Network} network The network on which queries are being evaluated. + * @returns {module:fabric-network.QueryHandler} A query handler. + */ + +/** + * Handler used to obtain query results from peers when a transaction is evaluated. + * @interface QueryHandler + * @memberof module:fabric-network + */ +/** + * Called when a transaction is evaluated to obtain query results from suitable network peers. + * @function module:fabric-network.QueryHandler#evaluate + * @async + * @param {module:fabric-network.Query} query Query object that can be used by the handler to send the query to + * specific peers. + * @returns {Promise} + */ + +/** + * Used by query handler implementations to evaluate transactions on peers of their choosing. + * @interface Query + * @memberof module:fabric-network + */ +/** + * Get query results from specified peers. + * @function module:fabric-network.Query#evaluate + * @async + * @param {Endorser[]} peers + * @returns {Promise>} + */ + +/** + * @typedef {Object} Query~QueryResponse + * @memberof module:fabric-network + * @property {boolean} isEndorsed True if the proposal was endorsed by the peer. + * @property {number} status The status value from the endorsement. This attriibute will be set by the chaincode. + * @property {Buffer} payload The payload value from the endorsement. This attribute may be considered the query value + * if the proposal was endorsed by the peer. + * @property {string} message The message value from the endorsement. This property contains the error message from + * the peer if it did not endorse the proposal. + */ + +/** + * A callback function that will be invoked when either a peer communication error occurs or a transaction commit event + * is received. Only one of the two arguments will have a value for any given invocation. + * @callback Network~CommitListener + * @memberof module:fabric-network + * @param {module:fabric-network.Network~CommitError} [error] Peer communication error. + * @param {module:fabric-network.Network~CommitEvent} [event] Transaction commit event from a specific peer. + */ + +/** + * @typedef {Error} Network~CommitError + * @memberof module:fabric-network + * @property {Endorser} peer The peer that raised this error. + */ + +/** + * @typedef {EventInfo} Network~CommitEvent + * @memberof module:fabric-network + * @property {Endorser} peer The peer that raised this error. + */ + +/** + * A callback function that will be invoked when a block event is received. Block events will be received in order and + * without duplication. + * @callback Network~CommitListener + * @memberof module:fabric-network + * @async + * @param {module:fabric-network.Network~BlockEvent} event Block event. + * @returns {Promise} + */ + +/** + * @typedef {EventInfo} Network~BlockEvent + * @memberof module:fabric-network + */ + +/** + * A Network represents the set of peers in a Fabric network. + * Applications should get a Network instance using the + * gateway's [getNetwork]{@link module:fabric-network.Gateway#getNetwork} method. + * @interface Network + * @memberof module:fabric-network + */ + +/** + * Get the owning Gateway connection. + * @method Network#getGateway + * @memberof module:fabric-network + * @returns {module:fabric-network.Gateway} A Gateway. + */ + +/** + * Get an instance of a contract (chaincode) on the current network. + * @method Network#getContract + * @memberof module:fabric-network + * @param {string} chaincodeId - the chaincode identifier. + * @param {string} [name] - the name of the contract. + * @param {string[]} [collections] - the names of collections defined for this chaincode. + * @returns {module:fabric-network.Contract} the contract. + */ + +/** + * Get the underlying channel object representation of this network. + * @method Network#getChannel + * @memberof module:fabric-network + * @returns {Channel} A channel. + */ + +/** + * Add a listener to receive transaction commit and peer disconnect events for a set of peers. + * @method Network#addCommitListener + * @memberof module:fabric-network + * @param {module:fabric-network.Network~CommitListener} listener A transaction commit listener callback function. + * @param {Endorser[]} peers The peers from which to receive events. + * @param {string} transactionId A transaction ID. + * @returns {module:fabric-network.Network~CommitListener} The added listener. + * @example + * const listener: CommitListener = (error, event) => { + * if (error) { + * // Handle peer communication error + * } else { + * // Handle transaction commit event + * } + * } + * const peers = network.channel.getEndorsers(); + * await network.addCommitListener(listener, peers, transactionId); + */ + +/** + * Remove a previously added transaction commit listener. + * @method Network#removeCommitListener + * @memberof module:fabric-network + * @param {module:fabric-network.Network~CommitListener} listener A transaction commit listener callback function. + */ + +/** + * Add a listener to receive block events for this network. Blocks will be received in order and without duplication. + * @method Network#addBlockListener + * @memberof module:fabric-network + * @param {module:fabric-network.Network~BlockListener} listener A block listener callback function. + * @returns {module:fabric-network.Network~BlockListener} The added listener. + * @example + * const listener: BlockListener = async (event) => { + * // Handle block event, then (optionally) remove myself as a listener + * network.removeBlockListener(listener); + * } + * await network.addBlockListener(listener); + */ + +/** + * Remove a previously added block listener. + * @method Network#removeBlockListener + * @memberof module:fabric-network + * @param listener {module:fabric-network.Network~BlockListener} A block listener callback function. + */ + + module.exports.Gateway = require('./lib/gateway'); module.exports.Wallet = require('./lib/impl/wallet/wallet').Wallet; module.exports.Wallets = require('./lib/impl/wallet/wallets').Wallets; module.exports.IdentityProviderRegistry = require('./lib/impl/wallet/identityproviderregistry').IdentityProviderRegistry; module.exports.HsmX509Provider = require('./lib/impl/wallet/hsmx509identity').HsmX509Provider; module.exports.DefaultEventHandlerStrategies = require('./lib/impl/event/defaulteventhandlerstrategies'); -module.exports.QueryHandlerStrategies = require('./lib/impl/query/queryhandlerstrategies'); +module.exports.DefaultQueryHandlerStrategies = require('./lib/impl/query/defaultqueryhandlerstrategies'); module.exports.TimeoutError = require('./lib/errors/timeouterror').TimeoutError; diff --git a/fabric-network/src/contract.js b/fabric-network/src/contract.js index 49727fbf4c..9b5cabd5d8 100644 --- a/fabric-network/src/contract.js +++ b/fabric-network/src/contract.js @@ -133,7 +133,7 @@ class Contract { if (eventService) { listener.eventService = eventService; } - this.network.saveListener(listener, listener); + this.network.saveListener(listener); await listener.register(); return listener; diff --git a/fabric-network/src/gateway.js b/fabric-network/src/gateway.js index 35e48523df..6da4dc6f04 100644 --- a/fabric-network/src/gateway.js +++ b/fabric-network/src/gateway.js @@ -6,11 +6,11 @@ 'use strict'; -const Network = require('./network'); +const {NetworkImpl: Network} = require('./network'); const NetworkConfig = require('./impl/ccp/networkconfig'); const {Client} = require('fabric-common'); const EventStrategies = require('./impl/event/defaulteventhandlerstrategies'); -const QueryStrategies = require('./impl/query/queryhandlerstrategies'); +const QueryStrategies = require('./impl/query/defaultqueryhandlerstrategies'); const logger = require('./logger').getLogger('Gateway'); @@ -35,57 +35,22 @@ const logger = require('./logger').getLogger('Gateway'); * for commit notification to complete. * @property {number} [endorseTimeout = 30] The timeout period in seconds to wait * for the endorsement to complete. - * @property {?module:fabric-network.Gateway~TxEventHandlerFactory} [strategy=MSPID_SCOPE_ALLFORTX] + * @property {?module:fabric-network.TxEventHandlerFactory} [strategy=MSPID_SCOPE_ALLFORTX] * Event handling strategy to identify successful transaction commits. A null value indicates * that no event handling is desired. The default is * [MSPID_SCOPE_ALLFORTX]{@link module:fabric-network.EventHandlerStrategies}. */ -/** - * @typedef {Function} Gateway~TxEventHandlerFactory - * @memberof module:fabric-network - * @param {Transaction} transaction The transaction for which the handler should listen for commit events. - * @param {module:fabric-network.Network} network The network on which this transaction is being submitted. - * @returns {module:fabric-network.Gateway~TxEventHandler} A transaction event handler. - */ - -/** - * @typedef {Object} Gateway~TxEventHandler - * @memberof module:fabric-network - * @property {Function} startListening Async function that resolves when the handler has started listening for - * transaction commit events. Called after the transaction proposal has been accepted and prior to submission of - * the transaction to the orderer. - * @property {Function} waitForEvents Async function that resolves (or rejects) when suitable transaction - * commit events have been received. Called after submission of the transaction to the orderer. - * @property {Function} cancelListening Cancel listening. Called if submission of the transaction to the orderer - * fails. - */ - /** * @typedef {Object} Gateway~QueryOptions * @memberof module:fabric-network * @property {number} [timeout = 30] The timeout period in seconds to wait for the query to * complete. - * @property {module:fabric-network.Gateway~QueryHandlerFactory} [strategy=MSPID_SCOPE_SINGLE] + * @property {module:fabric-network.QueryHandlerFactory} [strategy=MSPID_SCOPE_SINGLE] * Query handling strategy used to evaluate queries. The default is * [MSPID_SCOPE_SINGLE]{@link module:fabric-network.QueryHandlerStrategies}. */ -/** - * @typedef {Function} Gateway~QueryHandlerFactory - * @memberof module:fabric-network - * @param {module:fabric-network.Network} network The network on which queries are being evaluated. - * @param {Object} options The request options to use when queries are being evaluated. - * @returns {module:fabric-network.Gateway~QueryHandler} A query handler. - */ - -/** - * @typedef {Object} Gateway~QueryHandler - * @memberof module:fabric-network - * @property {Function} evaluate Async function that takes a [Query]{@link module:fabric-common.Query} - * and resolves with the result of the query evaluation. - */ - /** * @typedef {Object} Gateway~DiscoveryOptions * @memberof module:fabric-network diff --git a/fabric-network/src/impl/event/asyncnotifier.ts b/fabric-network/src/impl/event/asyncnotifier.ts new file mode 100644 index 0000000000..9fa75fd26f --- /dev/null +++ b/fabric-network/src/impl/event/asyncnotifier.ts @@ -0,0 +1,29 @@ +/** + * Copyright 2020 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +export class AsyncNotifier { + private readonly readCallback: () => T | undefined; + private readonly notifyCallback: (event: T) => Promise; + private running: boolean = false; + + constructor(readCallback: () => T | undefined, notifyCallback: (event: T) => Promise) { + this.readCallback = readCallback; + this.notifyCallback = notifyCallback; + } + + public notify() { + if (!this.running) { + this.running = true; + this.run(); // tslint:disable-line: no-floating-promises + } + } + private async run() { + for (let event; event = this.readCallback(); ) { // tslint:disable-line: no-conditional-assignment + await this.notifyCallback(event); + } + this.running = false; + } +} diff --git a/fabric-network/src/impl/event/baseeventlistener.js b/fabric-network/src/impl/event/baseeventlistener.js index bf48295038..3a737ab052 100644 --- a/fabric-network/src/impl/event/baseeventlistener.js +++ b/fabric-network/src/impl/event/baseeventlistener.js @@ -139,8 +139,8 @@ class BaseEventListener { this.registration = null; } - if (this.network.listeners.has(this)) { - this.network.listeners.delete(this); + if (this.network.oldListeners.has(this)) { + this.network.oldListeners.delete(this); } } diff --git a/fabric-network/src/impl/event/blockeventlistener.js b/fabric-network/src/impl/event/blockeventlistener.js deleted file mode 100644 index bcad9aa8af..0000000000 --- a/fabric-network/src/impl/event/blockeventlistener.js +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Copyright 2019 IBM All Rights Reserved. - * - * SPDX-License-Identifier: Apache-2.0 - */ - -'use strict'; - -const BaseEventListener = require('./baseeventlistener'); - -const logger = require('../../logger').getLogger('BlockEventListener'); - -/** - * The Block Event listener class handles block events from the channel. - * - * - * @private - * @memberof module:fabric-network - * @class - */ -class BlockEventListener extends BaseEventListener { - /** - * - * @param {module:fabric-network.Network} network The fabric network - * @param {Function} eventCallback The event callback called when a transaction is committed. - * It has signature (err, block) - * @param {module:fabric-network.Network~EventListenerOptions} options - */ - constructor(network, eventCallback, options) { - - super(network, eventCallback, options); - } - - _registerListener() { - const method = '_registerListener'; - logger.debug('%s - start', method); - this.registration = this.eventService.registerBlockListener( - this.onEvent.bind(this), - this.eventServiceOptions - ); - logger.debug('%s - end', method); - } - - /* - * This is the called by the base.onEvent() class event processing. - * This will be the sending of the unique data for this event Listener type - * to the user's callback. - */ - async _onEvent(event) { - const method = `_onEvent[${this.listenerCount}]`; - logger.debug('%s - start', method); - - const {block, filteredBlock, privateData, blockNumber} = event; - - let _block; - - if (filteredBlock) { - logger.debug('%s - have filtered block data', method); - _block = filteredBlock; - } else if (block) { - logger.debug('%s - have full block data', method); - _block = block; - if (privateData) { - logger.debug('%s - have private data', method); - _block.privateData = privateData; - } - } else { - logger.error('%s - missing block data in event %s', method, blockNumber.toString()); - this.eventCallback(new Error('Event is missing block data')); - return; - } - - try { - logger.debug('%s - calling user callback', method); - await this.eventCallback(null, blockNumber.toString(), _block); - logger.debug('%s - completed calling user callback', method); - } catch (err) { - logger.error('%s - Error executing callback: %s', method, err); - } - - } -} - -module.exports = BlockEventListener; diff --git a/fabric-network/src/impl/event/blockeventsource.ts b/fabric-network/src/impl/event/blockeventsource.ts new file mode 100644 index 0000000000..38c4106646 --- /dev/null +++ b/fabric-network/src/impl/event/blockeventsource.ts @@ -0,0 +1,142 @@ +/** + * Copyright 2020 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +import { BlockEvent, BlockListener } from './blocklistener'; +import { OrderedBlockQueue } from './orderedblockqueue'; +import { AsyncNotifier } from './asyncnotifier'; +import { + EventCallback, + EventInfo, + EventListener, + EventRegistrationOptions, + EventService +} from 'fabric-common'; +// @ts-ignore no implicit any +import EventServiceManager = require('./eventservicemanager'); +import Long = require('long'); + +import * as Logger from '../../logger'; +const logger = Logger.getLogger('BlockEventSource'); + +function settle(promise: Promise): Promise<{ status: 'fulfilled', value: T } | { status: 'rejected', reason: Error }> { + return promise.then( + (value: T) => { + return { status: 'fulfilled', value }; + }, + (reason: Error) => { + return { status: 'rejected', reason }; + } + ); +} + +function allSettled(promises: Promise[]) { + return Promise.all(promises.map((promise) => settle(promise))); +} + +export class BlockEventSource { + private readonly eventServiceManager: EventServiceManager; + private eventService?: EventService; + private listeners = new Set(); + private eventListener?: EventListener; + private readonly blockQueue: OrderedBlockQueue; + private readonly asyncNotifier: AsyncNotifier; + private started = false; + + constructor(eventServiceManager: EventServiceManager, startBlock?: Long) { + this.eventServiceManager = eventServiceManager; + this.blockQueue = new OrderedBlockQueue(startBlock); + this.asyncNotifier = new AsyncNotifier( + this.blockQueue.getNextBlock.bind(this.blockQueue), + this.notifyListeners.bind(this) + ); + } + + async addBlockListener(listener: BlockListener): Promise { + this.listeners.add(listener); + await this.start(); + return listener; + } + + removeBlockListener(listener: BlockListener): void { + this.listeners.delete(listener); + } + + close() { + this.unregisterListener(); + this.eventService?.close(); + this.started = false; + } + + private async start() { + if (this.started) { + return; + } + + this.started = true; + + try { + this.eventService = this.eventServiceManager.getEventService(); + this.registerListener(); // Register before start so no events are missed + await this.startEventService(); + } catch (error) { + logger.error('Failed to start event service', error); + this.close(); + } + } + + private registerListener() { + const callback: EventCallback = this.blockEventCallback.bind(this); + const options: EventRegistrationOptions = { + startBlock: this.getNextBlockNumber(), + unregister: false + }; + this.eventListener = this.eventService!.registerBlockListener(callback, options); + } + + private unregisterListener() { + try { + this.eventListener?.unregisterEventListener(); + } catch (error) { + logger.warn('Failed to unregister listener', error); + } + } + + private async startEventService() { + const options = { startBlock: this.getNextBlockNumber() }; + await this.eventServiceManager.startEventService(this.eventService, options); + } + + private blockEventCallback(error?: Error, event?: EventInfo) { + if (error) { + this.close(); + setImmediate(() => this.start()); // Must schedule after current event loop to avoid recursion in event service notification + } else { + this.onBlockEvent(event!); + } + } + + private onBlockEvent(event: EventInfo) { + this.blockQueue.addBlock(event!); + if (this.blockQueue.size() > 0) { + this.asyncNotifier.notify(); + } + } + + private async notifyListeners(event: BlockEvent) { + const promises = Array.from(this.listeners).map((listener) => listener(event)); + const results = await allSettled(promises); + + for (const result of results) { + if (result.status === 'rejected') { + logger.error('Error notifying listener', result.reason); + } + } + } + + private getNextBlockNumber() { + return this.blockQueue.getNextBlockNumber(); + } +} diff --git a/fabric-network/src/impl/event/blocklistener.ts b/fabric-network/src/impl/event/blocklistener.ts new file mode 100644 index 0000000000..5a549a7a2f --- /dev/null +++ b/fabric-network/src/impl/event/blocklistener.ts @@ -0,0 +1,10 @@ +/** + * Copyright 2020 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +import { EventInfo } from 'fabric-common'; + +export type BlockEvent = EventInfo; +export type BlockListener = (event: BlockEvent) => Promise; diff --git a/fabric-network/src/impl/event/blocklistenersession.ts b/fabric-network/src/impl/event/blocklistenersession.ts new file mode 100644 index 0000000000..f0487408f4 --- /dev/null +++ b/fabric-network/src/impl/event/blocklistenersession.ts @@ -0,0 +1,45 @@ +/** + * Copyright 2020 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +import { ListenerSession } from './listenersession'; +import { BlockEvent, BlockListener } from './blocklistener'; +import { BlockEventSource } from './blockeventsource'; +import { + EventCallback, + EventListener, + EventService +} from 'fabric-common'; + +import * as Logger from '../../logger'; +const logger = Logger.getLogger('BlockListenerSession'); + +export class BlockListenerSession implements ListenerSession { + private readonly listener: BlockListener; + private readonly eventSource: BlockEventSource; + private readonly eventListener: BlockListener = this.notifyListener.bind(this); + + constructor(listener: BlockListener, eventSource: BlockEventSource) { + this.listener = listener; + this.eventSource = eventSource; + } + + public async start() { + await this.eventSource.addBlockListener(this.eventListener); + } + + public close() { + this.eventSource.removeBlockListener(this.eventListener); + } + + private async notifyListener(blockEvent: BlockEvent) { + try { + await this.listener(blockEvent); + } catch (error) { + logger.error('Error notifying listener:', error); + } + } + +} diff --git a/fabric-network/src/impl/event/commitlistener.ts b/fabric-network/src/impl/event/commitlistener.ts index 6b37d33aa6..56a7edcd7d 100644 --- a/fabric-network/src/impl/event/commitlistener.ts +++ b/fabric-network/src/impl/event/commitlistener.ts @@ -4,14 +4,9 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { ListenerSession } from './listenersession'; -// @ts-ignore: no typings for EventServiceManager -import EventServiceManager = require('./eventservicemanager'); import { Endorser, EventInfo, - EventListener, - EventService } from 'fabric-common'; export interface CommitError extends Error { @@ -23,89 +18,3 @@ export interface CommitEvent extends EventInfo { } export type CommitListener = (error?: CommitError, event?: CommitEvent) => void; - -export class CommitListenerSession implements ListenerSession { - private readonly listener: CommitListener; - private readonly eventServiceManager: EventServiceManager; - private readonly eventServices: EventService[]; - private readonly transactionId: string; - private readonly endorsers: {[name: string]: Endorser} = {}; - private eventListeners: EventListener[] = []; - - constructor(listener: CommitListener, eventServiceManager: EventServiceManager, endorsers: Endorser[], transactionId: string) { - this.listener = listener; - this.eventServiceManager = eventServiceManager; - this.eventServices = eventServiceManager.getEventServices(endorsers); - this.transactionId = transactionId; - - for (const endorser of endorsers) { - this.endorsers[endorser.name] = endorser; - } - } - - public async start() { - const startErrors = await this.registerTransactionListeners(); - // Notify listeners of errors after all registrations are complete so listeners can remove themselves in response - for (const error of startErrors) { - this.listener(error, undefined); - } - } - - public close() { - for (const eventListener of this.eventListeners) { - eventListener.unregisterEventListener(); - } - } - - private async registerTransactionListeners(): Promise { - const startErrors = []; - - for (const eventService of this.eventServices) { - const error = await this.startEventService(eventService); - if (error) { - startErrors.push(error); - } else { - // Only register listener for event services that start successfully - const eventListener = this.registerTransactionListener(eventService); - this.eventListeners.push(eventListener); - } - } - - return startErrors; - } - - private async startEventService(eventService: EventService): Promise { - try { - await this.eventServiceManager.startEventService(eventService); - } catch (error) { - const commitError = error as CommitError; - commitError.peer = this.getEndorserForEventService(eventService); - return commitError; - } - } - - private getEndorserForEventService(eventService: EventService): Endorser { - return this.endorsers[eventService.name]; - } - - private registerTransactionListener(eventService: EventService): EventListener { - const endorser = this.getEndorserForEventService(eventService); - return eventService.registerTransactionListener( - this.transactionId, - (error, event) => { - const commitError = error as CommitError; - if (commitError) { - commitError.peer = endorser; - } - const commitEvent = event as CommitEvent; - if (commitEvent) { - commitEvent.peer = endorser; - } - this.listener(commitError, commitEvent); - }, - { - unregister: false - } - ); - } -} diff --git a/fabric-network/src/impl/event/commitlistenersession.ts b/fabric-network/src/impl/event/commitlistenersession.ts new file mode 100644 index 0000000000..a132f9680b --- /dev/null +++ b/fabric-network/src/impl/event/commitlistenersession.ts @@ -0,0 +1,117 @@ +/** + * Copyright 2020 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +import { ListenerSession } from './listenersession'; +import { + CommitError, + CommitEvent, + CommitListener +} from './commitlistener'; +// @ts-ignore: no typings for EventServiceManager +import EventServiceManager = require('./eventservicemanager'); +import { + Endorser, + EventListener, + EventService, + EventCallback +} from 'fabric-common'; + +import * as Logger from '../../logger'; +const logger = Logger.getLogger('CommitListenerSession'); + +export class CommitListenerSession implements ListenerSession { + private readonly listener: CommitListener; + private readonly eventServiceManager: EventServiceManager; + private readonly eventServices: EventService[]; + private readonly transactionId: string; + private readonly endorsers: {[name: string]: Endorser} = {}; + private eventListeners: EventListener[] = []; + + constructor(listener: CommitListener, eventServiceManager: EventServiceManager, endorsers: Endorser[], transactionId: string) { + this.listener = listener; + this.eventServiceManager = eventServiceManager; + this.eventServices = eventServiceManager.getEventServices(endorsers); + this.transactionId = transactionId; + + for (const endorser of endorsers) { + this.endorsers[endorser.name] = endorser; + } + } + + public async start() { + const startErrors = await this.registerTransactionListeners(); + // Notify listeners of errors after all registrations are complete so listeners can remove themselves in response + for (const error of startErrors) { + this.listener(error, undefined); + } + } + + public close() { + for (const eventListener of this.eventListeners) { + eventListener.unregisterEventListener(); + } + } + + private async registerTransactionListeners(): Promise { + const startErrors = []; + + for (const eventService of this.eventServices) { + const error = await this.startEventService(eventService); + if (error) { + startErrors.push(error); + } else { + // Only register listener for event services that start successfully + const eventListener = this.registerTransactionListener(eventService); + this.eventListeners.push(eventListener); + } + } + + return startErrors; + } + + private async startEventService(eventService: EventService): Promise { + try { + await this.eventServiceManager.startEventService(eventService); + } catch (error) { + const commitError = error as CommitError; + commitError.peer = this.getEndorserForEventService(eventService); + return commitError; + } + } + + private getEndorserForEventService(eventService: EventService): Endorser { + return this.endorsers[eventService.name]; + } + + private registerTransactionListener(eventService: EventService): EventListener { + const endorser = this.getEndorserForEventService(eventService); + const callback: EventCallback = (error, event) => { + const commitError = error as CommitError; + if (commitError) { + commitError.peer = endorser; + } + const commitEvent = event as CommitEvent; + if (commitEvent) { + commitEvent.peer = endorser; + } + this.notifyListener(commitError, commitEvent); + }; + + const registrationOptions = { + unregister: false + }; + + return eventService.registerTransactionListener(this.transactionId, callback, registrationOptions); + } + + private notifyListener(commitError: CommitError, commitEvent: CommitEvent) { + try { + this.listener(commitError, commitEvent); + } catch (error) { + logger.error('Error notifying listener:', error); + } + } +} diff --git a/fabric-network/src/impl/event/defaulteventhandlerstrategies.ts b/fabric-network/src/impl/event/defaulteventhandlerstrategies.ts index 07128a57c2..c1098ce0d3 100644 --- a/fabric-network/src/impl/event/defaulteventhandlerstrategies.ts +++ b/fabric-network/src/impl/event/defaulteventhandlerstrategies.ts @@ -8,7 +8,7 @@ import { AllForTxStrategy } from './allfortxstrategy'; import { AnyForTxStrategy } from './anyfortxstrategy'; import { TxEventHandlerFactory, TransactionEventHandler } from './transactioneventhandler'; // @ts-ignore no implicit any -import Network = require('../../network'); +import { Network } from '../../network'; import { Endorser } from 'fabric-common'; function getOrganizationPeers(network: Network): Endorser[] { @@ -23,51 +23,39 @@ function getNetworkPeers(network: Network): Endorser[] { /** * @typedef DefaultEventHandlerStrategies * @memberof module:fabric-network - */ - -/** - * @property {module:fabric-network.Gateway~TxEventHandlerFactory} MSPID_SCOPE_ALLFORTX Listen for transaction commit + * @property {module:fabric-network.TxEventHandlerFactory} MSPID_SCOPE_ALLFORTX Listen for transaction commit + * events from all peers in the client identity's organization. + * The [submitTransaction]{@link module:fabric-network.Contract#submitTransaction} function will wait until successful + * events are received from all currently connected peers (minimum 1). + * @property {module:fabric-network.TxEventHandlerFactory} MSPID_SCOPE_ALLFORTX Listen for transaction commit + * events from all peers in the client identity's organization. + * The [submitTransaction]{@link module:fabric-network.Contract#submitTransaction} function will wait until successful + * events are received from all currently connected peers (minimum 1). + * @property {module:fabric-network.TxEventHandlerFactory} MSPID_SCOPE_ALLFORTX Listen for transaction commit * events from all peers in the client identity's organization. * The [submitTransaction]{@link module:fabric-network.Contract#submitTransaction} function will wait until successful * events are received from all currently connected peers (minimum 1). - * @memberof module:fabric-network.DefaultEventHandlerStrategies + * @property {module:fabric-network.TxEventHandlerFactory} NETWORK_SCOPE_ANYFORTX Listen for transaction commit + * events from all peers in the network. + * The [submitTransaction]{@link module:fabric-network.Contract#submitTransaction} function will wait until a + * successful event is received from any peer. */ + export const MSPID_SCOPE_ALLFORTX: TxEventHandlerFactory = (transactionId, network) => { const eventStrategy = new AllForTxStrategy(getOrganizationPeers(network)); return new TransactionEventHandler(transactionId, network, eventStrategy); }; -/** - * @property {module:fabric-network.Gateway~TxEventHandlerFactory} MSPID_SCOPE_ANYFORTX Listen for transaction commit - * events from all peers in the client identity's organization. - * The [submitTransaction]{@link module:fabric-network.Contract#submitTransaction} function will wait until a - * successful event is received from any peer. - * @memberof module:fabric-network.DefaultEventHandlerStrategies - */ export const MSPID_SCOPE_ANYFORTX: TxEventHandlerFactory = (transactionId, network) => { const eventStrategy = new AnyForTxStrategy(getOrganizationPeers(network)); return new TransactionEventHandler(transactionId, network, eventStrategy); }; -/** - * @property {module:fabric-network.Gateway~TxEventHandlerFactory} MSPID_SCOPE_ALLFORTX Listen for transaction commit - * events from all peers in the client identity's organization. - * The [submitTransaction]{@link module:fabric-network.Contract#submitTransaction} function will wait until successful - * events are received from all currently connected peers (minimum 1). - * @memberof module:fabric-network.DefaultEventHandlerStrategies - */ export const NETWORK_SCOPE_ALLFORTX: TxEventHandlerFactory = (transactionId, network) => { const eventStrategy = new AllForTxStrategy(getNetworkPeers(network)); return new TransactionEventHandler(transactionId, network, eventStrategy); }; -/** - * @property {module:fabric-network.Gateway~TxEventHandlerFactory} NETWORK_SCOPE_ANYFORTX Listen for transaction commit - * events from all peers in the network. - * The [submitTransaction]{@link module:fabric-network.Contract#submitTransaction} function will wait until a - * successful event is received from any peer. - * @memberof module:fabric-network.DefaultEventHandlerStrategies - */ export const NETWORK_SCOPE_ANYFORTX: TxEventHandlerFactory = (transactionId, network) => { const eventStrategy = new AnyForTxStrategy(getNetworkPeers(network)); return new TransactionEventHandler(transactionId, network, eventStrategy); diff --git a/fabric-network/src/impl/event/eventservicemanager.js b/fabric-network/src/impl/event/eventservicemanager.js index 13d4820c76..3c307e36ea 100644 --- a/fabric-network/src/impl/event/eventservicemanager.js +++ b/fabric-network/src/impl/event/eventservicemanager.js @@ -8,6 +8,15 @@ const logger = require('../../logger').getLogger('EventServiceManager'); +function getOrganizationPeers(network) { + const mspId = network.getGateway().getIdentity().mspId; + return network.getChannel().getEndorsers(mspId); +} + +function getNetworkPeers(network) { + return network.getChannel().getEndorsers(); +} + /** * The Event Service Manager is responsible for creating and distributing {@link EventService} instances. * It uses the event Service factory to reuse event Services that exists, and maintains @@ -251,18 +260,16 @@ class EventServiceManager { * spreads out the load onto different event services from the same organization */ class RoundRobinPeerPool { - /* - * Constructor. - * @param {Endorser[]} peers The list of peers that the strategy can choose from - */ constructor(network) { - const mspId = network.getGateway().getIdentity().mspId; - const peers = network.getChannel().getEndorsers(mspId); - if (!peers || peers.length === 0) { + let peers = getOrganizationPeers(network); + if (peers.length === 0) { + peers = getNetworkPeers(network); + } + if (peers.length === 0) { throw Error('No peers available'); } this.peers = peers; - this.lastPeerIndex = -1; + this.currentPeerIndex = 0; } /* @@ -270,12 +277,9 @@ class RoundRobinPeerPool { * @returns {Endorser} */ getNextPeer() { - this.lastPeerIndex++; - if (this.lastPeerIndex >= this.peers.length) { - this.lastPeerIndex = 0; - } - - return this.peers[this.lastPeerIndex]; + const peer = this.peers[this.currentPeerIndex]; + this.currentPeerIndex = (this.currentPeerIndex + 1) % this.peers.length; + return peer; } /* @@ -287,14 +291,9 @@ class RoundRobinPeerPool { * @returns {Endorser[]} */ getNextPeers() { - const peers = []; - for (let count = 0; count < this.peers.length; count++) { - peers.push(this.getNextPeer()); - } - // call one more time to move the index so the next call will - // start on the next peer after the start index of this call. + const peers = this.peers.map(() => this.getNextPeer()); + // Move the peer index along so a subsequent call to this function will start at a different peer this.getNextPeer(); - return peers; } diff --git a/fabric-network/src/impl/event/orderedblockqueue.ts b/fabric-network/src/impl/event/orderedblockqueue.ts new file mode 100644 index 0000000000..58c8f02e4c --- /dev/null +++ b/fabric-network/src/impl/event/orderedblockqueue.ts @@ -0,0 +1,61 @@ +/** + * Copyright 2020 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +import { BlockEvent } from './blocklistener'; +import Long = require('long'); + +export class OrderedBlockQueue { + private readonly queue = new Map(); + private nextBlockNumber?: Long; + + constructor(startBlock?: Long) { + this.nextBlockNumber = startBlock; + } + + addBlock(event: BlockEvent) { + const blockNumber = event.blockNumber; + if (!this.isNewBlockNumber(blockNumber)) { + return; + } + + const key = this.blockNumberToKey(blockNumber); + this.queue.set(key, event); + if (!this.nextBlockNumber) { + this.nextBlockNumber = blockNumber; + } + } + + getNextBlock(): BlockEvent | undefined { + if (!this.nextBlockNumber) { + return; + } + + const key = this.blockNumberToKey(this.nextBlockNumber); + const event = this.queue.get(key); + if (event) { + this.queue.delete(key); + this.nextBlockNumber = this.nextBlockNumber.add(Long.ONE); + } + + return event; + } + + getNextBlockNumber() { + return this.nextBlockNumber; + } + + size() { + return this.queue.size; + } + + private isNewBlockNumber(blockNumber: Long) { + return !this.nextBlockNumber || this.nextBlockNumber.lessThanOrEqual(blockNumber); + } + + private blockNumberToKey(blockNumber: Long) { + return blockNumber.toString(); + } +} diff --git a/fabric-network/src/impl/event/transactioneventhandler.ts b/fabric-network/src/impl/event/transactioneventhandler.ts index bf4828de8d..8326231f32 100644 --- a/fabric-network/src/impl/event/transactioneventhandler.ts +++ b/fabric-network/src/impl/event/transactioneventhandler.ts @@ -7,12 +7,11 @@ import { TimeoutError } from '../../errors/timeouterror'; import { TransactionEventStrategy } from './transactioneventstrategy'; // @ts-ignore: no implicit any -import Network = require('../../network'); +import { Network } from '../../network'; import { Endorser } from 'fabric-common'; import { CommitError, CommitEvent, CommitListener } from './commitlistener'; -// @ts-ignore no implicit any -import Logger = require('../../logger'); +import * as Logger from '../../logger'; const logger = Logger.getLogger('TransactionEventHandler'); export interface TxEventHandler { diff --git a/fabric-network/src/impl/query/defaultqueryhandlerstrategies.ts b/fabric-network/src/impl/query/defaultqueryhandlerstrategies.ts new file mode 100644 index 0000000000..116252cc25 --- /dev/null +++ b/fabric-network/src/impl/query/defaultqueryhandlerstrategies.ts @@ -0,0 +1,34 @@ +/** + * Copyright 2018, 2019 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +import { SingleQueryHandler } from './singlequeryhandler'; +import { RoundRobinQueryHandler } from './roundrobinqueryhandler'; +import { Network } from '../../network'; +import { QueryHandlerFactory } from './queryhandler'; + +function getOrganizationPeers(network: Network) { + const mspId = network.getGateway().getIdentity().mspId; + return network.getChannel().getEndorsers(mspId); +} + +/** + * @typedef DefaultQueryHandlerStrategies + * @memberof module:fabric-network + * @property {module:fabric-network.QueryHandlerFactory} MSPID_SCOPE_SINGLE Query any one of the event services for the connected organisation. Continue + * to use the same event service for all queries unless it fails. + * @property {module:fabric-network.QueryHandlerFactory} MSPID_SCOPE_ROUND_ROBIN Query any one of the event services for the connected organisation. + * Use the next available peer for each successive query. + */ + +export const MSPID_SCOPE_SINGLE: QueryHandlerFactory = (network) => { + const peers = getOrganizationPeers(network); + return new SingleQueryHandler(peers); +}; + +export const MSPID_SCOPE_ROUND_ROBIN: QueryHandlerFactory = (network) => { + const peers = getOrganizationPeers(network); + return new RoundRobinQueryHandler(peers); +}; diff --git a/fabric-network/src/impl/query/query.js b/fabric-network/src/impl/query/query.ts similarity index 61% rename from fabric-network/src/impl/query/query.js rename to fabric-network/src/impl/query/query.ts index 0326e94683..0336f45f89 100644 --- a/fabric-network/src/impl/query/query.js +++ b/fabric-network/src/impl/query/query.ts @@ -4,26 +4,33 @@ * SPDX-License-Identifier: Apache-2.0 */ -'use strict'; +import { Query as CommonQuery, Endorser } from 'fabric-common'; -const logger = require('../../logger').getLogger('Query'); +import * as Logger from '../../logger'; +const logger = Logger.getLogger('Query'); + +export interface QueryResults { + [peerName: string]: Error | QueryResponse; +} + +export interface QueryResponse { + isEndorsed: boolean; + payload: Buffer; + status: number; + message: string; +} + +export interface Query { + evaluate(peers: Endorser[]): Promise; +} /** - * @typedef {Object} Query~QueryResponse - * @memberof module:fabric-network - * @property {number} status - The status value from the endorsement. This attriibute - * will be set by the chaincode. - * @property {Buffer} payload - The payload value from the endorsement. This attribute - * may be considered the query value if the status code is exceptable. - * @property {Buffer} payload - The message value from the endorsement. This attribute - * will have a value when there is not a payload and status value indicates an issue - * with determining the payload (the query value). - */ -/** - * Used by query handler implementations to evaluate transactions on peers of their choosing. - * @memberof module:fabric-network + * @private */ -class Query { +export class QueryImpl implements Query { + private readonly query: CommonQuery; + private readonly requestTimeout: number; + /** * Builds a Query instance to send and then work with the results returned * by the fabric-common/Query. @@ -31,7 +38,7 @@ class Query { * @returns {Object} options - options to be used when sending the request to * fabric-common service endpoint {Endorser} peer. */ - constructor(query, options = {}) { + constructor(query: CommonQuery, options: any = {}) { this.query = query; this.requestTimeout = 3000; // default 3 seconds if (Number.isInteger(options.timeout)) { @@ -46,11 +53,11 @@ class Query { * @returns {Object.} Object with peer name keys and associated values that are either * QueryResponse objects or Error objects. */ - async evaluate(peers) { + async evaluate(peers: Endorser[]): Promise { const method = 'evaluate'; logger.debug('%s - start', method); - const results = {}; + const results: QueryResults = {}; try { const responses = await this.query.send({targets: peers, requestTimeout: this.requestTimeout}); if (responses) { @@ -61,17 +68,18 @@ class Query { } } if (responses.responses) { - for (const peer_response of responses.responses) { - if (peer_response.response) { - const response = {}; - response.status = peer_response.response.status; - response.payload = peer_response.response.payload; - response.message = peer_response.response.message; - response.isEndorsed = peer_response.endorsement ? true : false; - results[peer_response.connection.name] = response; + for (const peerResponse of responses.responses) { + if (peerResponse.response) { + const response: QueryResponse = { + status: peerResponse.response.status, + payload: peerResponse.response.payload, + message: peerResponse.response.message, + isEndorsed: peerResponse.endorsement ? true : false + }; + results[peerResponse.connection.name] = response; logger.debug('%s - have results - peer: %s with status:%s', method, - peer_response.connection.name, + peerResponse.connection.name, response.status); } } @@ -99,5 +107,3 @@ class Query { return results; } } - -module.exports = Query; diff --git a/fabric-network/src/impl/query/queryhandler.ts b/fabric-network/src/impl/query/queryhandler.ts new file mode 100644 index 0000000000..358e76392c --- /dev/null +++ b/fabric-network/src/impl/query/queryhandler.ts @@ -0,0 +1,14 @@ +/** + * Copyright 2020 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +import { Network } from '../../network'; +import { Query } from './query'; + +export type QueryHandlerFactory = (network: Network) => QueryHandler; + +export interface QueryHandler { + evaluate(query: Query): Promise; +} diff --git a/fabric-network/src/impl/query/queryhandlerstrategies.js b/fabric-network/src/impl/query/queryhandlerstrategies.js deleted file mode 100644 index c43f577f0e..0000000000 --- a/fabric-network/src/impl/query/queryhandlerstrategies.js +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Copyright 2018, 2019 IBM All Rights Reserved. - * - * SPDX-License-Identifier: Apache-2.0 - */ - -'use strict'; - -const SingleQueryHandler = require('./singlequeryhandler'); -const RoundRobinQueryHandler = require('./roundrobinqueryhandler'); - -function getOrganizationPeers(network) { - const mspId = network.getGateway().getIdentity().mspId; - return network.getChannel().getEndorsers(mspId); -} - -function MSPID_SCOPE_SINGLE(network) { - const peers = getOrganizationPeers(network); - return new SingleQueryHandler(peers); -} - -function MSPID_SCOPE_ROUND_ROBIN(network) { - const peers = getOrganizationPeers(network); - return new RoundRobinQueryHandler(peers); -} - -/** - * @typedef QueryHandlerStrategies - * @memberof module:fabric-network - * @property {function} MSPID_SCOPE_SINGLE Query any one of the event services for the connected organisation. Continue - * to use the same event service for all queries unless it fails. - * @property {function} MSPID_SCOPE_ROUND_ROBIN Query any one of the event services for the connected organisation. - * Use the next available peer for each successive query. - */ - -module.exports = { - MSPID_SCOPE_SINGLE, - MSPID_SCOPE_ROUND_ROBIN -}; diff --git a/fabric-network/src/impl/query/roundrobinqueryhandler.js b/fabric-network/src/impl/query/roundrobinqueryhandler.ts similarity index 55% rename from fabric-network/src/impl/query/roundrobinqueryhandler.js rename to fabric-network/src/impl/query/roundrobinqueryhandler.ts index 74d8d23037..15330dea65 100644 --- a/fabric-network/src/impl/query/roundrobinqueryhandler.js +++ b/fabric-network/src/impl/query/roundrobinqueryhandler.ts @@ -4,35 +4,38 @@ * SPDX-License-Identifier: Apache-2.0 */ -'use strict'; +import { FabricError } from '../../errors/fabricerror'; +import { QueryHandler } from './queryhandler'; +import { Query } from './query'; -const {FabricError} = require('../../errors/fabricerror'); +import { Endorser } from 'fabric-common'; -const util = require('util'); +import util = require('util'); -const logger = require('../../logger').getLogger('RoundRobinQueryHandler'); +import * as Logger from '../../logger'; +const logger = Logger.getLogger('RoundRobinQueryHandler'); -class RoundRobinQueryHandler { - constructor(peers) { +export class RoundRobinQueryHandler implements QueryHandler { + private readonly peers: Endorser[]; + private currentPeerIndex = 0; + + constructor(peers: Endorser[]) { logger.debug('constructor: peers=%j', peers.map((peer) => peer.name)); - this._peers = peers; - this._currentPeerIndex = 0; + this.peers = peers; } - async evaluate(query) { + async evaluate(query: Query) { const method = 'evaluate'; logger.debug('%s - start', method); - const startPeerIndex = this._currentPeerIndex; - - this._currentPeerIndex = (this._currentPeerIndex + 1) % this._peers.length; - + const startPeerIndex = this.currentPeerIndex; + this.currentPeerIndex = (this.currentPeerIndex + 1) % this.peers.length; const errorMessages = []; - for (let i = 0; i < this._peers.length; i++) { - const peerIndex = (startPeerIndex + i) % this._peers.length; + for (let i = 0; i < this.peers.length; i++) { + const peerIndex = (startPeerIndex + i) % this.peers.length; - const peer = this._peers[peerIndex]; + const peer = this.peers[peerIndex]; logger.debug('%s - sending to peer %s', method, peer.name); const results = await query.evaluate([peer]); @@ -56,5 +59,3 @@ class RoundRobinQueryHandler { throw error; } } - -module.exports = RoundRobinQueryHandler; diff --git a/fabric-network/src/impl/query/singlequeryhandler.js b/fabric-network/src/impl/query/singlequeryhandler.ts similarity index 57% rename from fabric-network/src/impl/query/singlequeryhandler.js rename to fabric-network/src/impl/query/singlequeryhandler.ts index d85971a626..2c9d8008d8 100644 --- a/fabric-network/src/impl/query/singlequeryhandler.js +++ b/fabric-network/src/impl/query/singlequeryhandler.ts @@ -4,33 +4,38 @@ * SPDX-License-Identifier: Apache-2.0 */ -'use strict'; +import { FabricError } from '../../errors/fabricerror'; +import { QueryHandler } from './queryhandler'; +import { Query } from './query'; -const {FabricError} = require('../../errors/fabricerror'); +import { Endorser } from 'fabric-common'; -const util = require('util'); +import util = require('util'); -const logger = require('../../logger').getLogger('SingleQueryHandler'); +import * as Logger from '../../logger'; +const logger = Logger.getLogger('SingleQueryHandler'); -class SingleQueryHandler { - constructor(peers) { +export class SingleQueryHandler implements QueryHandler { + private readonly peers: Endorser[]; + private currentPeerIndex = 0; + + constructor(peers: Endorser[]) { logger.debug('constructor: peers=%j', peers.map((peer) => peer.name)); - this._peers = peers; - this._currentPeerIndex = 0; + this.peers = peers; } - async evaluate(query) { + async evaluate(query: Query) { const method = 'evaluate'; logger.debug('%s - start', method); - const startPeerIndex = this._currentPeerIndex; + const startPeerIndex = this.currentPeerIndex; const errorMessages = []; - for (let i = 0; i < this._peers.length; i++) { - const peerIndex = (startPeerIndex + i) % this._peers.length; - this._currentPeerIndex = peerIndex; + for (let i = 0; i < this.peers.length; i++) { + const peerIndex = (startPeerIndex + i) % this.peers.length; + this.currentPeerIndex = peerIndex; - const peer = this._peers[peerIndex]; + const peer = this.peers[peerIndex]; logger.debug('%s - sending to peer %s', method, peer.name); const results = await query.evaluate([peer]); @@ -54,5 +59,3 @@ class SingleQueryHandler { throw error; } } - -module.exports = SingleQueryHandler; diff --git a/fabric-network/src/logger.js b/fabric-network/src/logger.ts similarity index 70% rename from fabric-network/src/logger.js rename to fabric-network/src/logger.ts index f2637d6347..f2398a3cfa 100644 --- a/fabric-network/src/logger.js +++ b/fabric-network/src/logger.ts @@ -4,9 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -'use strict'; - // reuse the client implementation of the logger as we are part of the client // abstracted out in case we want to change this in the future. -const {Utils} = require('fabric-common'); -module.exports.getLogger = Utils.getLogger; +import { Utils } from 'fabric-common'; +export const getLogger = Utils.getLogger; diff --git a/fabric-network/src/network.js b/fabric-network/src/network.ts similarity index 50% rename from fabric-network/src/network.js rename to fabric-network/src/network.ts index 87234d6ad0..155b74c00c 100644 --- a/fabric-network/src/network.js +++ b/fabric-network/src/network.ts @@ -4,13 +4,26 @@ * SPDX-License-Identifier: Apache-2.0 */ -'use strict'; -const Contract = require('./contract'); -const EventServiceManager = require('./impl/event/eventservicemanager'); -const BlockEventListener = require('./impl/event/blockeventlistener'); -const {CommitListenerSession} = require('./impl/event/commitlistener'); - -const logger = require('./logger').getLogger('Network'); +// @ts-ignore no implicit any +import Contract = require('./contract'); +// @ts-ignore no implicit any +import EventServiceManager = require('./impl/event/eventservicemanager'); +import { ListenerSession } from './impl/event/listenersession'; +import { CommitListener } from './impl/event/commitlistener'; +import { CommitListenerSession } from './impl/event/commitlistenersession'; +import { QueryHandlerFactory } from './impl/query/queryhandler'; +import { BlockListener } from './impl/event/blocklistener'; +import { BlockListenerSession } from './impl/event/blocklistenersession'; +// @ts-ignore no implicit any +import Gateway = require('./gateway'); +// @ts-ignore no implicit any +import BaseEventListener = require('./impl/event/baseeventlistener'); + +import { Channel, DiscoveryService, Endorser } from 'fabric-common'; + +import * as Logger from './logger'; +import { BlockEventSource } from './impl/event/blockeventsource'; +const logger = Logger.getLogger('Network'); /** * @typedef {Object} Network~EventListenerOptions @@ -31,66 +44,74 @@ const logger = require('./logger').getLogger('Network'); * every transaction committed to the ledger. */ -/** - * A callback function that will be invoked when either a peer communication error occurs or a transaction commit event - * is received. Only one of the two arguments will have a value for any given invocation. - * @callback Network~CommitListener - * @memberof module:fabric-network - * @param {module:fabric-network.Network~CommitError} [error] Peer communication error. - * @param {module:fabric-network.Network~CommitEvent} [event] Transaction commit event from a specific peer. - */ +export interface Network { + getGateway(): Gateway; + getContract(chaincodeId: string, name?: string, collections?: string[]): Contract; + getChannel(): Channel; + addCommitListener(listener: CommitListener, peers: Endorser[], transactionId: string): Promise; + removeCommitListener(listener: CommitListener): void; + addBlockListener(listener: BlockListener): Promise; + removeBlockListener(listener: BlockListener): void; +} -/** - * @typedef {Error} Network~CommitError - * @memberof module:fabric-network - * @property {Endorser} peer The peer that raised this error. - */ +async function addListener(listener: T, listenerSessions: Map, sessionSupplier: () => ListenerSession) { + if (!listenerSessions.has(listener)) { + const session = sessionSupplier(); + // Store listener before starting in case start fires error events that trigger remove of the listener + listenerSessions.set(listener, session); + await session.start(); + } + return listener; -/** - * @typedef {EventInfo} Network~CommitEvent - * @memberof module:fabric-network - * @property {Endorser} peer The peer that raised this error. - */ +} + +function removeListener(listener: T, listenerSessions: Map) { + const session = listenerSessions.get(listener); + if (session) { + session.close(); + listenerSessions.delete(listener); + } +} + +export class NetworkImpl implements Network { + public queryHandler?: QueryHandlerFactory; + private readonly gateway: Gateway; + private readonly channel: Channel; + private readonly contracts = new Map(); + private initialized = false; + private discoveryService?: DiscoveryService; + private eventServiceManager?: EventServiceManager; + private readonly commitListeners = new Map(); + private readonly blockListeners = new Map(); + private readonly oldListeners = new Set(); + private readonly realtimeBlockEventSource: BlockEventSource; -/** - * A Network represents the set of peers in a Fabric network. - * Applications should get a Network instance using the - * gateway's [getNetwork]{@link module:fabric-network.Gateway#getNetwork} method. - * @memberof module:fabric-network - * @hideconstructor - */ -class Network { /* * Network constructor for internal use only. * @param {Gateway} gateway The owning gateway instance * @param {Channel} channel The fabric-base channel instance */ - constructor(gateway, channel) { + constructor(gateway: Gateway, channel: Channel) { const method = 'constructor'; logger.debug('%s - start', method); this.gateway = gateway; this.channel = channel; - this.contracts = new Map(); - this.initialized = false; - this.listeners = new Map(); - this.discoveryService = null; - this.queryHandler = null; - this.eventServiceManager = null; - this.commitListeners = new Map(); + this.eventServiceManager = new EventServiceManager(this); + this.realtimeBlockEventSource = new BlockEventSource(this.eventServiceManager); } /** * initialize the channel if it hasn't been done * @private */ - async _initializeInternalChannel(options) { + async _initializeInternalChannel(options: any) { // TODO: fix types const method = '_initializeInternalChannel'; logger.debug('%s - start', method); if (options.enabled) { logger.debug('%s - initialize with discovery', method); - let targets; + let targets: Endorser[]; if (options.targets) { if (Array.isArray(options.targets) && options.targets.length > 0) { for (const target of options.targets) { @@ -148,9 +169,6 @@ class Network { // or get a handler later from the discoverService // to be used on endorsement, queries, and commits logger.debug('%s - discovery complete - channel is populated', method); - - } else { - this.discoveryService = null; } logger.debug('%s - end', method); @@ -160,7 +178,7 @@ class Network { * Initialize this network instance * @private */ - async _initialize(discover) { + async _initialize(discover: any) { // TODO: fix types const method = '_initialize'; logger.debug('%s - start', method); @@ -175,26 +193,14 @@ class Network { // Must be created after channel initialization to ensure discovery has located the peers const queryOptions = this.gateway.getOptions().query; this.queryHandler = queryOptions.strategy(this, queryOptions); - this.eventServiceManager = new EventServiceManager(this); logger.debug('%s - end', method); } - /** - * Get the owning Gateway connection. - * @returns {module:fabric-network.Gateway} A Gateway. - */ getGateway() { return this.gateway; } - /** - * Get an instance of a contract (chaincode) on the current network. - * @param {string} chaincodeId - the chaincode identifier. - * @param {string} [name] - the name of the contract. - * @param {string[]} [collections] - the names of collections defined for this chaincode. - * @returns {module:fabric-network.Contract} the contract. - */ - getContract(chaincodeId, name = '', collections) { + getContract(chaincodeId: string, name = '', collections?: string[]) { const method = 'getContract'; logger.debug('%s - start - name %s', method, name); @@ -215,10 +221,6 @@ class Network { return contract; } - /** - * Get the underlying channel object representation of this network. - * @returns {Channel} A channel. - */ getChannel() { return this.channel; } @@ -227,90 +229,43 @@ class Network { const method = '_dispose'; logger.debug('%s - start', method); - this.listeners.forEach(listener => listener.unregister()); this.contracts.clear(); - this.eventServiceManager.dispose(); + this.oldListeners.forEach((listener) => listener.unregister()); + this.oldListeners.clear(); + + this.commitListeners.forEach((listener) => listener.close()); + this.commitListeners.clear(); + + this.blockListeners.forEach((listener) => listener.close()); + this.blockListeners.clear(); + + this.realtimeBlockEventSource?.close(); + this.eventServiceManager?.dispose(); this.channel.close(); this.initialized = false; } - /** - * Create a block event listener. - * @param {Function} callback - the function to be called when an event is - * triggered with signature (error, block) - * @param {module:fabric-network.Network~EventListenerOptions} [options] Optional. - * @param {EventService} [eventService] - Optional. Used to override the - * event service selection - * @returns {module:fabric-network~BlockEventListener} - * @async - * @private - */ - async addBlockListener(callback, options = {}, eventService) { - const method = 'addBlockListener'; - logger.debug('%s - start', method); - - const listener = new BlockEventListener(this, callback, options); - this.saveListener(listener, listener); - if (eventService) { - listener.eventService = eventService; - } - await listener.register(); - return listener; + async addCommitListener(listener: CommitListener, peers: Endorser[], transactionId: string) { + const sessionSupplier = () => new CommitListenerSession(listener, this.eventServiceManager!, peers, transactionId); + return await addListener(listener, this.commitListeners, sessionSupplier); } - /** - * Add a listener to receive transaction commit and peer disconnect events for a set of peers. - * @param {module:fabric-network.Network~CommitListener} listener A transaction commit listener callback function. - * @param {Endorser[]} peers The peers from which to receive events. - * @param {string} transactionId A transaction ID. - * @returns {module:fabric-network.Network~CommitListener} The added listener. - * @example - * const listener: CommitListener = (error, event) => { - * if (error) { - * // Handle peer communication error - * } else { - * // Handle transaction commit event - * } - * } - * const peers = network.channel.getEndorsers(); - * await network.addCommitListener(listener, peers, transactionId); - */ - async addCommitListener(listener, peers, transactionId) { - if (!this.commitListeners.has(listener)) { - const session = new CommitListenerSession(listener, this.eventServiceManager, peers, transactionId); - // Store listener before starting in case start fires error events that trigger remove of the listener - this.commitListeners.set(listener, session); - await session.start(); - } - return listener; + removeCommitListener(listener: CommitListener) { + removeListener(listener, this.commitListeners); } - /** - * Removes a previously added transaction commit listener. - * @param {module:fabric-network.Network~CommitListener} listener A transaction commit listener callback function. - */ - removeCommitListener(listener) { - const session = this.commitListeners.get(listener); - if (session) { - session.close(); - this.commitListeners.delete(listener); - } + async addBlockListener(listener: BlockListener) { + const sessionSupplier = () => new BlockListenerSession(listener, this.realtimeBlockEventSource); + return await addListener(listener, this.blockListeners, sessionSupplier); } - /* - * Save the listener to a map in Network - * @param {String} listenerName the name of the listener being saved - * @param {AbstractEventListener} listener the listener to be saved - * @private - */ - saveListener(listenerName, listener) { - const method = 'saveListener'; - logger.debug('%s - start', method); + removeBlockListener(listener: BlockListener) { + removeListener(listener, this.blockListeners); + } - this.listeners.set(listenerName, listener); + saveListener(listener: BaseEventListener) { + this.oldListeners.add(listener); } } - -module.exports = Network; diff --git a/fabric-network/src/transaction.js b/fabric-network/src/transaction.js index c5c1de2a2b..cd44c11b37 100644 --- a/fabric-network/src/transaction.js +++ b/fabric-network/src/transaction.js @@ -6,7 +6,7 @@ 'use strict'; -const Query = require('./impl/query/query'); +const {QueryImpl: Query} = require('./impl/query/query'); const logger = require('./logger').getLogger('Transaction'); const util = require('util'); @@ -109,19 +109,6 @@ class Transaction { return this._name; } - /** - * Override the Gateway option for event handler strategy - * @private - * @param {*} eventHandlerStrategyFactory - */ - setEventHandlerStrategy(eventHandlerStrategyFactory) { - const method = `setEventHandlerStrategy[${this._name}]`; - logger.debug('%s - start', method); - - this._eventHandlerStrategyFactory = eventHandlerStrategyFactory; - - return this; - } /** * Set transient data that will be passed to the transaction function * but will not be stored on the ledger. This can be used to pass diff --git a/fabric-network/test/contract.js b/fabric-network/test/contract.js index dffdd6fdfd..aa9cb7f852 100644 --- a/fabric-network/test/contract.js +++ b/fabric-network/test/contract.js @@ -19,7 +19,7 @@ chai.should(); const Contract = rewire('../lib/contract'); const Gateway = require('../lib/gateway'); -const Network = require('fabric-network/lib/network'); +const {NetworkImpl: Network} = require('../lib/network'); const Transaction = require('../lib/transaction'); class FakeListener { constructor(network, listenerName) { @@ -129,18 +129,18 @@ describe('Contract', () => { it('should create if no options', async () => { const listener = await contract.addContractListener('testEventName', () => {}); expect(listener).to.be.instanceof(FakeListener); - expect(network.listeners.get(listener)).to.equal(listener); + expect(network.oldListeners.has(listener)).to.be.true; }); it('should create if options', async () => { const listener = await contract.addContractListener('testEventName', () => {}, {something: 'something'}); expect(listener).to.be.instanceof(FakeListener); - expect(network.listeners.get(listener)).to.equal(listener); + expect(network.oldListeners.has(listener)).to.be.true; }); it('should create if event service', async () => { const listener = await contract.addContractListener('testEventName', () => {}, null, 'eventService'); expect(listener).to.be.instanceof(FakeListener); - expect(network.listeners.get(listener)).to.equal(listener); + expect(network.oldListeners.has(listener)).to.be.true; }); }); diff --git a/fabric-network/test/gateway.js b/fabric-network/test/gateway.js index ef31cc9e2e..7be6d7f58b 100644 --- a/fabric-network/test/gateway.js +++ b/fabric-network/test/gateway.js @@ -18,7 +18,7 @@ chai.use(require('chai-as-promised')); const Gateway = rewire('../lib/gateway'); const Client = rewire('fabric-common/lib/Client'); -const QueryStrategies = require('../lib/impl/query/queryhandlerstrategies'); +const QueryStrategies = require('../lib/impl/query/defaultqueryhandlerstrategies'); describe('Gateway', () => { let client; diff --git a/fabric-network/test/impl/event/asyncnotifier.ts b/fabric-network/test/impl/event/asyncnotifier.ts new file mode 100644 index 0000000000..dcdee9c95c --- /dev/null +++ b/fabric-network/test/impl/event/asyncnotifier.ts @@ -0,0 +1,102 @@ +/** + * Copyright 2020 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +import { AsyncNotifier } from '../../../src/impl/event/asyncnotifier'; + +import * as testUtils from '../../testutils'; + +import sinon = require('sinon'); +import chai = require('chai'); +const expect = chai.expect; + +describe('AsyncNotifier', () => { + function newSupplier(events: number[]) { + let callCount = 0; + return () => { + const event = events[callCount]; + if (event) { + callCount++; + return event; + } else { + return undefined; + } + }; + } + + afterEach(() => { + sinon.restore(); + }); + + it('does not call listener if no events', () => { + const supplier = newSupplier([]); + const listener = sinon.fake(); + const notifier = new AsyncNotifier(supplier, listener); + + notifier.notify(); + + sinon.assert.notCalled(listener); + }); + + it('passes event to listener', async () => { + const expected = [1]; + const supplier = newSupplier(expected); + const listener = testUtils.newAsyncListener(); + const notifier = new AsyncNotifier(supplier, listener); + + notifier.notify(); + const actual = await listener.completePromise; + + expect(actual).to.deep.equal(expected); + }); + + it('passes events to listener in order', async () => { + const expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + const supplier = newSupplier(expected); + const listener = testUtils.newAsyncListener(expected.length, 10); + const notifier = new AsyncNotifier(supplier, listener); + + notifier.notify(); + const actual = await listener.completePromise; + + expect(actual).to.deep.equal(expected); + }); + + it('unnecessary notifies have no effect', async () => { + const expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + const supplier = newSupplier(expected); + const listener = testUtils.newAsyncListener(expected.length); + const notifier = new AsyncNotifier(supplier, listener); + + notifier.notify(); + notifier.notify(); + const actual = await listener.completePromise; + + expect(actual).to.deep.equal(expected); + + }); + + it('interleaved send and receive delivers all events in order', async () => { + const numEvents = 10; + const maxSleep = 10; + const events = []; + const supplier = newSupplier(events); + const listener = testUtils.newAsyncListener(numEvents, maxSleep); + const notifier = new AsyncNotifier(supplier, listener); + + async function run() { + for (let i = 0; i < numEvents; ) { + events.push(++i); + notifier.notify(); + const random = testUtils.getRandomInt(maxSleep); + await testUtils.sleep(random); + } + } + await run(); + const actual = await listener.completePromise; + + expect(actual).to.deep.equal(events); + }); +}); diff --git a/fabric-network/test/impl/event/baseeventlistener.js b/fabric-network/test/impl/event/baseeventlistener.js index bb495fadf3..ab25477e59 100644 --- a/fabric-network/test/impl/event/baseeventlistener.js +++ b/fabric-network/test/impl/event/baseeventlistener.js @@ -11,7 +11,7 @@ const expect = chai.expect; const sinon = require('sinon'); const EventService = require('fabric-common/lib/EventService'); -const Network = require('./../../../lib/network'); +const {NetworkImpl: Network} = require('../../../lib/network'); const EventServiceManager = require('./../../../lib/impl/event/eventservicemanager'); const BaseEventListener = require('./../../../lib/impl/event/baseeventlistener'); const FileSystemCheckpointer = require('./../../../lib/impl/event/filesystemcheckpointer'); @@ -184,24 +184,24 @@ describe('BaseEventListener', () => { testListener.registration = sinon.stub(); testListener.registration.eventService = sinon.stub(); testListener.registration.eventService.unregisterEventListener = sinon.stub(); - network.listeners = new Map(); - network.listeners.set(testListener, testListener); + network.oldListeners = new Map(); + network.oldListeners.set(testListener, testListener); testListener.unregister(); - expect(network.listeners.has(testListener)).to.be.false; + expect(network.oldListeners.has(testListener)).to.be.false; }); it('should unregister with no registration', () => { testListener.registration = null; - network.listeners = new Map(); - network.listeners.set(testListener, testListener); + network.oldListeners = new Map(); + network.oldListeners.set(testListener, testListener); testListener.unregister(); - expect(network.listeners.has(testListener)).to.be.false; + expect(network.oldListeners.has(testListener)).to.be.false; }); it('should unregister with listener on the network', () => { testListener.registration = sinon.stub(); testListener.registration.eventService = sinon.stub(); testListener.registration.eventService.unregisterEventListener = sinon.stub(); - network.listeners = new Map(); - network.listeners.set(testListener, testListener); + network.oldListeners = new Map(); + network.oldListeners.set(testListener, testListener); testListener.unregister(); expect(testListener.registration).to.be.null; }); @@ -209,7 +209,7 @@ describe('BaseEventListener', () => { testListener.registration = sinon.stub(); testListener.registration.eventService = sinon.stub(); testListener.registration.eventService.unregisterEventListener = sinon.stub(); - network.listeners = new Map(); + network.oldListeners = new Map(); testListener.unregister(); expect(testListener.registration).to.be.null; }); diff --git a/fabric-network/test/impl/event/blockeventlistener.js b/fabric-network/test/impl/event/blockeventlistener.js deleted file mode 100644 index 8b32f14bf2..0000000000 --- a/fabric-network/test/impl/event/blockeventlistener.js +++ /dev/null @@ -1,242 +0,0 @@ -/** - * Copyright 2019 IBM All Rights Reserved. - * - * SPDX-License-Identifier: Apache-2.0 - */ - -'use strict'; - -const Long = require('long'); -const rewire = require('rewire'); -const chai = require('chai'); -const expect = chai.expect; -const sinon = require('sinon'); -chai.use(require('chai-as-promised')); - -const EventService = require('fabric-common/lib/EventService'); -const Network = require('./../../../lib/network'); -const EventServiceManager = require('./../../../lib/impl/event/eventservicemanager'); -const BlockEventListener = rewire('fabric-network/lib/impl/event/blockeventlistener'); -const FileSystemCheckpointer = require('./../../../lib/impl/event/filesystemcheckpointer'); - -describe('BlockEventListener', () => { - let sandbox; - let FakeLogger; - let eventService; - let checkpointer; - let network; - let eventServiceManager; - let baseListener; - let revert; - - let listener; - - beforeEach(() => { - revert = []; - sandbox = sinon.createSandbox(); - FakeLogger = { - debug: () => { - }, - error: () => { - }, - warn: () => { - } - }; - sandbox.stub(FakeLogger); - revert.push(BlockEventListener.__set__('logger', FakeLogger)); - baseListener = sinon.stub(); - checkpointer = sandbox.createStubInstance(FileSystemCheckpointer); - checkpointer.check.returns(false); - checkpointer.save.resolves(); - eventServiceManager = sandbox.createStubInstance(EventServiceManager); - eventServiceManager.startEventService.resolves(); - eventService = sandbox.createStubInstance(EventService); - eventService.registerBlockListener = sinon.stub().returns(baseListener); - eventServiceManager.getEventService = sinon.stub().returns(eventService); - eventServiceManager.getReplayEventService = sinon.stub().returns(eventService); - network = sandbox.createStubInstance(Network); - network.eventServiceManager = eventServiceManager; - - listener = new BlockEventListener(network, () => {}, {replay: true}); - listener.eventService = eventService; - listener.checkpointer = checkpointer; - }); - - afterEach(() => { - if (revert.length) { - revert.forEach(Function.prototype.call, Function.prototype.call); - } - sandbox.restore(); - }); - - describe('#_registerListener', () => { - it('should register a block event', () => { - listener._registerListener(); - expect(listener.registration).to.be.equal(baseListener); - }); - }); - - - describe('#onEvent', () => { - beforeEach(() => { - listener.registration = baseListener; - sandbox.stub(listener, 'eventCallback'); - }); - - it('should handle no event data', async () => { - try { - await listener.onEvent(); - } catch (error) { - expect(error.message).to.contain('Missing event information'); - } - }); - - it('should handle the endblockReceived', async () => { - listener.eventServiceOptions = { - endBlock: Long.fromValue(10) - }; - const blockNumber = Long.fromValue(10); - const event = { - endBlockReceived: true, - blockNumber - }; - - await listener.onEvent(null, event); - sinon.assert.notCalled(listener.eventCallback); - sinon.assert.notCalled(checkpointer.save); - }); - - it('should handle the endblockReceived received too soon', async () => { - listener.eventServiceOptions = { - endBlock: Long.fromValue(11) - }; - const blockNumber = Long.fromValue(10); - const event = { - endBlockReceived: true, - blockNumber - }; - - await listener.onEvent(null, event); - sinon.assert.called(listener.eventCallback); - }); - - it('should handle the endblockReceived when not defined', async () => { - listener.eventServiceOptions = {}; - const blockNumber = Long.fromValue(10); - const event = { - endBlockReceived: true, - blockNumber - }; - - await listener.onEvent(null, event); - sinon.assert.called(listener.eventCallback); - }); - - it('should call the event callback with full block', async () => { - const blockNumber = Long.fromValue(10); - const block = {header: blockNumber}; - const event = { - block, - blockNumber - }; - - await listener.onEvent(null, event); - sinon.assert.calledWith(listener.eventCallback, null, blockNumber.toString(), block); - sinon.assert.calledWith(checkpointer.check, '10'); - sinon.assert.calledWith(checkpointer.save, '10'); - }); - - it('should call the event callback with full block with private', async () => { - const blockNumber = Long.fromValue(10); - const block = {header: blockNumber}; - const event = { - block, - blockNumber, - privateData: 'privateData' - }; - - await listener.onEvent(null, event); - sinon.assert.calledWith(listener.eventCallback, null, blockNumber.toString(), block); - sinon.assert.calledWith(checkpointer.check, '10'); - sinon.assert.calledWith(checkpointer.save, '10'); - }); - - it('should call the event callback with full block and no checkpointer', async () => { - const blockNumber = Long.fromValue(10); - const block = {header: blockNumber}; - const event = { - block, - blockNumber, - privateData: 'privateData' - }; - listener.checkpointer = null; - - await listener.onEvent(null, event); - sinon.assert.calledWith(listener.eventCallback, null, blockNumber.toString(), block); - }); - - it('should call the event callback with filtered', async () => { - const blockNumber = Long.fromValue(10); - const block = {header: blockNumber}; - const event = { - filteredBlock: block, - blockNumber - }; - - await listener.onEvent(null, event); - sinon.assert.calledWith(listener.eventCallback, null, blockNumber.toString(), block); - sinon.assert.calledWith(checkpointer.check, '10'); - sinon.assert.calledWith(checkpointer.save, '10'); - }); - - it('should call the event callback with error', async () => { - const error = new Error('SOMETHING BAD'); - - await listener.onEvent(error); - sinon.assert.calledWith(listener.eventCallback, error); - }); - - it('should call the event callback with filtered', async () => { - const blockNumber = Long.fromValue(10); - const event = { - blockNumber - }; - - await listener.onEvent(null, event); - sinon.assert.called(listener.eventCallback); - sinon.assert.calledWith(checkpointer.check, '10'); - sinon.assert.calledWith(checkpointer.save, '10'); - sinon.assert.calledWith(FakeLogger.error, '%s - missing block data in event %s'); - }); - - it('should not call the event callback when checkpoint has seen', async () => { - checkpointer.check.returns(true); - - const blockNumber = Long.fromValue(10); - const block = {header: blockNumber}; - const event = { - filteredBlock: block, - blockNumber - }; - - await listener.onEvent(null, event); - sinon.assert.notCalled(listener.eventCallback); - sinon.assert.calledWith(checkpointer.check, '10'); - sinon.assert.calledWith(checkpointer.save, '10'); - }); - - it('should call the event callback and have an error', async () => { - const blockNumber = Long.fromValue(10); - const block = {header: blockNumber}; - const event = { - filteredBlock: block, - blockNumber - }; - listener.eventCallback = sinon.stub().rejects(new Error('CALLBACK ERROR')); - await listener.onEvent(null, event); - sinon.assert.calledWith(checkpointer.check, '10'); - sinon.assert.calledWith(checkpointer.save, '10'); - sinon.assert.calledWith(FakeLogger.error, '%s - Error executing callback: %s'); - }); - }); -}); diff --git a/fabric-network/test/impl/event/blocklistener.ts b/fabric-network/test/impl/event/blocklistener.ts new file mode 100644 index 0000000000..702e79ff88 --- /dev/null +++ b/fabric-network/test/impl/event/blocklistener.ts @@ -0,0 +1,214 @@ +/** + * Copyright 2020 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +import sinon = require('sinon'); +import chai = require('chai'); +const expect = chai.expect; + +import * as testUtils from '../../testutils'; + +import { + EventInfo, + IdentityContext +} from 'fabric-common'; +import Long = require('long'); + +import { Network, NetworkImpl } from '../../../src/network'; +import EventServiceManager = require('../../../src/impl/event/eventservicemanager'); +import Gateway = require('../../../src/gateway'); +import { StubEventService } from './stubeventservice'; +import { BlockEvent, BlockListener } from '../../../src/impl/event/blocklistener'; +import { BlockEventSource } from '../../../src/impl/event/blockeventsource'; + +interface StubBlockListener extends BlockListener { + completePromise: Promise; +} + +describe('block listener', () => { + let eventServiceManager: sinon.SinonStubbedInstance; + let eventService: StubEventService; + let gateway: Gateway; + let network: Network; + let listener: StubBlockListener; + + beforeEach(async () => { + eventService = new StubEventService('stub'); + + eventServiceManager = sinon.createStubInstance(EventServiceManager); + eventServiceManager.getEventService.returns(eventService); + + gateway = sinon.createStubInstance(Gateway); + gateway.identityContext = sinon.createStubInstance(IdentityContext); + + network = new NetworkImpl(gateway, null); + (network as any).realtimeBlockEventSource = new BlockEventSource(eventServiceManager); + + listener = testUtils.newAsyncListener(); + }); + + afterEach(() => { + sinon.restore(); + }); + + function newEvent(blockNumber: number) { + return { + eventHub: null, + blockNumber: new Long(blockNumber) + }; + } + + it('add listener returns the listener', async () => { + const result = await network.addBlockListener(listener); + expect(result).to.equal(listener); + }); + + it('listener receives events', async () => { + const event = newEvent(1); + await network.addBlockListener(listener); + eventService.sendEvent(event); + + const actual = await listener.completePromise; + expect(actual).to.deep.equal([event]); + }); + + it('removed listener does not receive events', async () => { + const removedListener = sinon.spy(testUtils.newAsyncListener()); + + await network.addBlockListener(listener); + await network.addBlockListener(removedListener); + network.removeBlockListener(removedListener); + eventService.sendEvent(newEvent(1)); + + await listener.completePromise; + sinon.assert.notCalled(removedListener); + }); + + it('add listener multiple times has no effect', async () => { + const spy = sinon.spy(listener); + + await network.addBlockListener(spy); + await network.addBlockListener(spy); + eventService.sendEvent(newEvent(1)); + + await listener.completePromise; + sinon.assert.calledOnce(spy); + }); + + it('remove listener multiple times has no effect', async () => { + const removedListener = sinon.spy(testUtils.newAsyncListener()); + + await network.addBlockListener(listener); + await network.addBlockListener(removedListener); + network.removeBlockListener(removedListener); + network.removeBlockListener(removedListener); + eventService.sendEvent(newEvent(1)); + + await listener.completePromise; + sinon.assert.notCalled(removedListener); + }); + + it('listener can remove itself when receiving event', async () => { + listener = testUtils.newAsyncListener(2); + const fake = sinon.fake(async (event: BlockEvent) => { + network.removeBlockListener(fake); + }); + + await network.addBlockListener(listener); + await network.addBlockListener(fake); + eventService.sendEvent(newEvent(1)); + // fake listener should have removed itself + eventService.sendEvent(newEvent(2)); + + await listener.completePromise; + sinon.assert.calledOnce(fake); + }); + + it('listener does not auto-unregister when receiving events', async () => { + listener = testUtils.newAsyncListener(2); + const event1 = newEvent(1); + const event2 = newEvent(2); + + await network.addBlockListener(listener); + eventService.sendEvent(event1); + eventService.sendEvent(event2); + + const actual = await listener.completePromise; + network.removeBlockListener(listener); + + expect(actual).to.deep.equal([event1, event2]); + }); + + it('error thrown by listener is handled', async () => { + listener = testUtils.newAsyncListener(2); + const errorListener = sinon.fake.rejects(new Error('LISTENER_ERROR')); + const event1 = newEvent(1); + const event2 = newEvent(2); + + await network.addBlockListener(listener); + await network.addBlockListener(errorListener); + eventService.sendEvent(event1); + eventService.sendEvent(event2); + + const actual = await listener.completePromise; + expect(actual).to.deep.equal([event1, event2]); + }); + + it('listener receives blocks in order', async () => { + listener = testUtils.newAsyncListener(3); + const event1 = newEvent(1); + const event2 = newEvent(2); + const event3 = newEvent(3); + + await network.addBlockListener(listener); + eventService.sendEvent(event1); + eventService.sendEvent(event3); + eventService.sendEvent(event2); + + const actual = await listener.completePromise; + expect(actual).to.deep.equal([event1, event2, event3]); + }); + + it('listener does not receive old blocks', async () => { + listener = testUtils.newAsyncListener(2); + const event1 = newEvent(1); + const event2 = newEvent(2); + const event3 = newEvent(3); + + await network.addBlockListener(listener); + eventService.sendEvent(event2); + eventService.sendEvent(event1); // Ignored as older than first block received + eventService.sendEvent(event3); + + const actual = await listener.completePromise; + expect(actual).to.deep.equal([event2, event3]); + }); + + it('errors trigger reconnect of event service with no start block if no events received', async () => { + await network.addBlockListener(listener); + eventServiceManager.startEventService.resetHistory(); + const startListener = testUtils.newAsyncListener(1); + eventServiceManager.startEventService.callsFake(() => startListener()); + + eventService.sendError(new Error('DISCONNECT')); + + await startListener.completePromise; + sinon.assert.calledWith(eventServiceManager.startEventService, eventService); + sinon.assert.neverCalledWith(eventServiceManager.startEventService, sinon.match.any, sinon.match.has('startBlock', sinon.match.number)); + }); + + it('errors trigger reconnect of event service with next block as start block if events received', async () => { + await network.addBlockListener(listener); + eventServiceManager.startEventService.resetHistory(); + const startListener = testUtils.newAsyncListener(1); + eventServiceManager.startEventService.callsFake(() => startListener()); + + eventService.sendEvent(newEvent(1)); + eventService.sendError(new Error('DISCONNECT')); + + await startListener.completePromise; + sinon.assert.calledWith(eventServiceManager.startEventService, eventService, sinon.match.has('startBlock', Long.fromNumber(2))); + }); +}); diff --git a/fabric-network/test/impl/event/commiteventlistener.js b/fabric-network/test/impl/event/commiteventlistener.js index c9b2582735..47e12c667d 100644 --- a/fabric-network/test/impl/event/commiteventlistener.js +++ b/fabric-network/test/impl/event/commiteventlistener.js @@ -14,7 +14,7 @@ const sinon = require('sinon'); chai.use(require('chai-as-promised')); const EventService = require('fabric-common/lib/EventService'); -const Network = require('./../../../lib/network'); +const {NetworkImpl: Network} = require('../../../lib/network'); const EventServiceManager = require('./../../../lib/impl/event/eventservicemanager'); const CommitEventListener = rewire('fabric-network/lib/impl/event/commiteventlistener'); const FileSystemCheckpointer = require('./../../../lib/impl/event/filesystemcheckpointer'); diff --git a/fabric-network/test/impl/event/commitlistener.ts b/fabric-network/test/impl/event/commitlistener.ts index ab42ad7161..3ea8f32b0f 100644 --- a/fabric-network/test/impl/event/commitlistener.ts +++ b/fabric-network/test/impl/event/commitlistener.ts @@ -6,8 +6,6 @@ import sinon = require('sinon'); import chai = require('chai'); -import chaiAsPromised = require('chai-as-promised'); -chai.use(chaiAsPromised); const expect = chai.expect; import { @@ -17,10 +15,11 @@ import { } from 'fabric-common'; import Long = require('long'); -import Network = require('../../../src/network'); +import { Network, NetworkImpl } from '../../../src/network'; import EventServiceManager = require('../../../src/impl/event/eventservicemanager'); -import { Gateway } from 'fabric-network'; +import Gateway = require('../../../src/gateway'); import { StubEventService } from './stubeventservice'; +import { CommitListener } from '../../../src/impl/event/commitlistener'; describe('commit listener', () => { let eventServiceManager: sinon.SinonStubbedInstance; @@ -51,8 +50,8 @@ describe('commit listener', () => { gateway = sinon.createStubInstance(Gateway); gateway.identityContext = sinon.createStubInstance(IdentityContext); - network = new Network(gateway, null); - network.eventServiceManager = eventServiceManager; + network = new NetworkImpl(gateway, null); + (network as any).eventServiceManager = eventServiceManager; }); afterEach(() => { @@ -195,4 +194,13 @@ describe('commit listener', () => { sinon.assert.calledTwice(listener); }); + + it('error thrown by listener is handled', async () => { + const listener = sinon.fake.throws(new Error('LISTENER_ERROR')); + + await network.addCommitListener(listener, peers, transactionId); + const f = () => eventService.sendEvent(eventInfo); + + expect(f).to.not.throw(); + }); }); diff --git a/fabric-network/test/impl/event/contracteventlistener.js b/fabric-network/test/impl/event/contracteventlistener.js index 8248f2ae43..f0fef02c41 100644 --- a/fabric-network/test/impl/event/contracteventlistener.js +++ b/fabric-network/test/impl/event/contracteventlistener.js @@ -14,7 +14,7 @@ const sinon = require('sinon'); chai.use(require('chai-as-promised')); const EventService = require('fabric-common/lib/EventService'); -const Network = require('./../../../lib/network'); +const {NetworkImpl: Network} = require('../../../lib/network'); const Contract = require('./../../../lib/contract'); const EventServiceManager = require('./../../../lib/impl/event/eventservicemanager'); const ContractEventListener = rewire('fabric-network/lib/impl/event/contracteventlistener'); diff --git a/fabric-network/test/impl/event/defaulteventhandlerstrategies.ts b/fabric-network/test/impl/event/defaulteventhandlerstrategies.ts index 185b7ac8a9..ff17acc2d0 100644 --- a/fabric-network/test/impl/event/defaulteventhandlerstrategies.ts +++ b/fabric-network/test/impl/event/defaulteventhandlerstrategies.ts @@ -10,7 +10,7 @@ const expect = chai.expect; import { Channel, Endorser } from 'fabric-common'; -import Network = require('fabric-network/lib/network'); +import { Network, NetworkImpl } from '../../../src/network'; import Gateway = require('fabric-network/lib/gateway'); import { AllForTxStrategy } from '../../../src/impl/event/allfortxstrategy'; @@ -53,7 +53,7 @@ describe('DefaultEventHandlerStrategies', () => { mspId }); - network = sinon.createStubInstance(Network); + network = sinon.createStubInstance(NetworkImpl); network.getChannel.returns(channel); network.getGateway.returns(gateway); }); diff --git a/fabric-network/test/impl/event/eventservicemanager.js b/fabric-network/test/impl/event/eventservicemanager.js index d989836784..f4fb37d59e 100644 --- a/fabric-network/test/impl/event/eventservicemanager.js +++ b/fabric-network/test/impl/event/eventservicemanager.js @@ -13,12 +13,12 @@ chai.use(require('chai-as-promised')); const rewire = require('rewire'); const {Channel, Client, EventService, Endorser, Eventer} = require('fabric-common'); -const Network = require('fabric-network/lib/network'); +const {NetworkImpl: Network} = require('../../../lib/network'); const Gateway = require('fabric-network/lib/gateway'); const EventServiceManager = require('fabric-network/lib/impl/event/eventservicemanager'); const EventServiceManagerRewire = rewire('fabric-network/lib/impl/event/eventservicemanager'); const EventServiceStrategies = require('fabric-network/lib/impl/event/defaulteventhandlerstrategies'); -const QueryHandlerStrategies = require('fabric-network/lib/impl/query/queryhandlerstrategies'); +const QueryHandlerStrategies = require('fabric-network/lib/impl/query/defaultqueryhandlerstrategies'); describe('EventServiceManager', () => { let sandbox; @@ -310,37 +310,17 @@ describe('EventServiceManager', () => { }); describe('#constructor', () => { - it('should create an of peers', () => { + it('should create an array of peers', () => { expect(pool.peers).to.be.instanceOf(Array); expect(pool.peers).to.deep.equal([peer1, peer2]); - expect(pool.lastPeerIndex).to.be.equal(-1); }); it('should throw error when no peers found', () => { - try { - // const network2 = sinon.stub(); - // network2.getChannel = sinon.stub(); - // network2.getChannel.getEndorsers = sinon.stub().returns([]); - // network2.getGateway = sinon.stub().returns(gateway); + const f = () => { channel.getEndorsers.returns([]); new RoundRobinPeerPool(network); - expect(1, 'should have throw error').to.be.false; - } catch (error) { - expect(error.message).to.contain('No peers available'); - } - }); - it('should throw error when no peers found', () => { - try { - // const network2 = sinon.stub(); - // network2.getChannel = sinon.stub(); - // network2.getChannel.getEndorsers = sinon.stub().returns(); - // network2.getGateway = sinon.stub().returns(gateway); - channel.getEndorsers.returns(); - new RoundRobinPeerPool(network); - expect(1, 'should have throw error').to.be.false; - } catch (error) { - expect(error.message).to.contain('No peers available'); - } + }; + expect(f).to.throw('No peers available'); }); }); diff --git a/fabric-network/test/impl/event/orderedblockqueue.ts b/fabric-network/test/impl/event/orderedblockqueue.ts new file mode 100644 index 0000000000..adf65b0e79 --- /dev/null +++ b/fabric-network/test/impl/event/orderedblockqueue.ts @@ -0,0 +1,153 @@ +/** + * Copyright 2020 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +import { OrderedBlockQueue } from '../../../src/impl/event/orderedblockqueue'; +import { BlockEvent } from '../../../src/impl/event/blocklistener'; +import Long = require('long'); + +import chai = require('chai'); +const expect = chai.expect; + +// tslint:disable: no-unused-expression + +describe('OrderedBlockQueue', () => { + let queue: OrderedBlockQueue; + + beforeEach(() => { + queue = new OrderedBlockQueue(); + }); + + function newBlock(blockNumber: number): BlockEvent { + return { + blockNumber: Long.fromNumber(blockNumber), + eventHub: null + }; + } + + it('next block for empty queue is undefined', () => { + const result = queue.getNextBlock(); + expect(result).to.be.undefined; + }); + + it('first block added can be retrieved', () => { + const block = newBlock(1); + + queue.addBlock(block); + const result = queue.getNextBlock(); + + expect(result).to.equal(block); + }); + + it('lower block numbers are ignored', () => { + const block1 = newBlock(1); + const block2 = newBlock(2); + + queue.addBlock(block2); + queue.addBlock(block1); + const result1 = queue.getNextBlock(); + const result2 = queue.getNextBlock(); + + expect(result1).to.equal(block2); + expect(result2).to.be.undefined; + }); + + it('orders out-of-order blocks', () => { + const block1 = newBlock(1); + const block2 = newBlock(2); + const block3 = newBlock(3); + + queue.addBlock(block1); + queue.addBlock(block3); + queue.addBlock(block2); + const result1 = queue.getNextBlock(); + const result2 = queue.getNextBlock(); + const result3 = queue.getNextBlock(); + + expect(result1).to.equal(block1); + expect(result2).to.equal(block2); + expect(result3).to.equal(block3); + }); + + it('empty queue has size zero', () => { + const result = queue.size(); + expect(result).to.equal(0); + }); + + it('add block increases queue size', () => { + queue.addBlock(newBlock(1)); + const result = queue.size(); + + expect(result).to.equal(1); + }); + + it('retrieve block decreases queue size', () => { + queue.addBlock(newBlock(1)); + queue.getNextBlock(); + const result = queue.size(); + + expect(result).to.equal(0); + }); + + it('queue size does not go below zero', () => { + queue.getNextBlock(); + const result = queue.size(); + + expect(result).to.equal(0); + }); + + it('ignores block numbers lower than start block', () => { + queue = new OrderedBlockQueue(Long.fromNumber(2)); + + queue.addBlock(newBlock(1)); + const result = queue.size(); + + expect(result).to.equal(0); + }); + + it('accepts block numbers same or higher than start block', () => { + queue = new OrderedBlockQueue(Long.ONE); + + queue.addBlock(newBlock(2)); + queue.addBlock(newBlock(1)); + const result = queue.size(); + + expect(result).to.equal(2); + }); + + it('next block number for empty queue without start block is undefined', () => { + const result = queue.getNextBlockNumber(); + expect(result).to.be.undefined; + }); + + it('next block number for empty queue with start block is start block', () => { + const startBlock = Long.ONE; + queue = new OrderedBlockQueue(startBlock); + + const result = queue.getNextBlockNumber(); + + expect(result?.toString()).to.equal(startBlock.toString()); + }); + + it('next block number is first added block number', () => { + const block = newBlock(1); + + queue.addBlock(block); + const result = queue.getNextBlockNumber(); + + expect(result?.toString()).to.equal(block.blockNumber.toString()); + }); + + it('next block number is last retrieved block number plus one', () => { + const block = newBlock(1); + + queue.addBlock(block); + queue.getNextBlock(); + const result = queue.getNextBlockNumber(); + + const expected = block.blockNumber.add(1).toString(); + expect(result?.toString()).to.equal(expected); + }); +}); diff --git a/fabric-network/test/impl/event/stubeventservice.ts b/fabric-network/test/impl/event/stubeventservice.ts index 065c49ca2c..132143862a 100644 --- a/fabric-network/test/impl/event/stubeventservice.ts +++ b/fabric-network/test/impl/event/stubeventservice.ts @@ -51,6 +51,28 @@ class StubTransactionEventListener implements EventListener { } } +class StubBlockEventListener implements EventListener { + readonly callback: EventCallback; + readonly options: EventRegistrationOptions; + private readonly eventService: EventService; + + constructor(eventService: EventService, callback: EventCallback, options: EventRegistrationOptions) { + this.eventService = eventService; + this.callback = callback; + this.options = options; + } + + onEvent(error: Error, event: EventInfo) { + if (error || event.blockNumber) { + this.callback(error, event); + } + } + + unregisterEventListener() { + this.eventService.unregisterEventListener(this); + } +} + export class StubEventService implements EventService { readonly name: string; startBlock: string | Long; @@ -71,7 +93,7 @@ export class StubEventService implements EventService { } close() { - throw new Error('Method not implemented.'); + this.eventListeners.clear(); } build(idContext: IdentityContext, request: any): Buffer { @@ -105,7 +127,9 @@ export class StubEventService implements EventService { } registerBlockListener(callback: EventCallback, options: EventRegistrationOptions): EventListener { - throw new Error('Method not implemented.'); + const listener = new StubBlockEventListener(this, callback, options); + this.eventListeners.add(listener); + return listener; } sign(parm: IdentityContext | Buffer): ServiceAction { diff --git a/fabric-network/test/impl/event/transactioneventhandler.ts b/fabric-network/test/impl/event/transactioneventhandler.ts index f1edbf8fce..1b0d47db62 100644 --- a/fabric-network/test/impl/event/transactioneventhandler.ts +++ b/fabric-network/test/impl/event/transactioneventhandler.ts @@ -18,7 +18,7 @@ import { import Long = require('long'); import Gateway = require('../../../src/gateway'); -import Network = require('../../../src/network'); +import { Network, NetworkImpl } from '../../../src/network'; import EventServiceManager = require('../../../src/impl/event/eventservicemanager'); import { TransactionEventStrategy } from '../../../src/impl/event/transactioneventstrategy'; import { StubEventService } from './stubeventservice'; @@ -69,8 +69,8 @@ describe('TransactionEventHandler', () => { gateway.identityContext = sinon.createStubInstance(IdentityContext); gateway.getOptions.returns(options); - network = new Network(gateway, null); - network.eventServiceManager = eventServiceManager; + network = new NetworkImpl(gateway, null); + (network as any).eventServiceManager = eventServiceManager; stubStrategy = sinon.createStubInstance(TransactionEventStrategy); stubStrategy.getPeers.returns([peer]); diff --git a/fabric-network/test/impl/query/queryhandlerstrategies.js b/fabric-network/test/impl/query/defaultqueryhandlerstrategies.js similarity index 84% rename from fabric-network/test/impl/query/queryhandlerstrategies.js rename to fabric-network/test/impl/query/defaultqueryhandlerstrategies.js index f8189b9aa3..16972b02c9 100644 --- a/fabric-network/test/impl/query/queryhandlerstrategies.js +++ b/fabric-network/test/impl/query/defaultqueryhandlerstrategies.js @@ -11,13 +11,13 @@ const chai = require('chai'); const expect = chai.expect; const Gateway = require('fabric-network/lib/gateway'); -const Network = require('fabric-network/lib/network'); +const {NetworkImpl: Network} = require('../../../lib/network'); const Channel = require('fabric-common').Channel; -const SingleQueryHandler = require('fabric-network/lib/impl/query/singlequeryhandler'); -const RoundRobinQueryHandler = require('fabric-network/lib/impl/query/roundrobinqueryhandler'); -const QueryStrategies = require('fabric-network/lib/impl/query/queryhandlerstrategies'); +const {SingleQueryHandler} = require('fabric-network/lib/impl/query/singlequeryhandler'); +const {RoundRobinQueryHandler} = require('fabric-network/lib/impl/query/roundrobinqueryhandler'); +const QueryStrategies = require('fabric-network/lib/impl/query/defaultqueryhandlerstrategies'); -describe('QueryHandlerStrategies', () => { +describe('DefaultQueryHandlerStrategies', () => { const expectedStrategyTypes = { 'MSPID_SCOPE_SINGLE': SingleQueryHandler, 'MSPID_SCOPE_ROUND_ROBIN': RoundRobinQueryHandler diff --git a/fabric-network/test/impl/query/query.js b/fabric-network/test/impl/query/query.js index 7cd2a37408..4aec968885 100644 --- a/fabric-network/test/impl/query/query.js +++ b/fabric-network/test/impl/query/query.js @@ -10,7 +10,7 @@ const sinon = require('sinon'); const expect = require('chai').expect; const QueryProposal = require('fabric-common/lib/Query'); -const Query = require('fabric-network/lib/impl/query/query'); +const {QueryImpl: Query} = require('fabric-network/lib/impl/query/query'); describe('Query', () => { let queryProposal; diff --git a/fabric-network/test/impl/query/queryhandlers.js b/fabric-network/test/impl/query/queryhandlers.js index ad65befc46..6ed6ea703f 100644 --- a/fabric-network/test/impl/query/queryhandlers.js +++ b/fabric-network/test/impl/query/queryhandlers.js @@ -19,10 +19,10 @@ }); */ -const Endorser = require('fabric-common/lib/Endorser'); -const Query = require('fabric-network/lib/impl/query/query'); -const SingleQueryHandler = require('fabric-network/lib/impl/query/singlequeryhandler'); -const RoundRobinQueryHandler = require('fabric-network/lib/impl/query/roundrobinqueryhandler'); +const {Endorser} = require('fabric-common'); +const {QueryImpl: Query} = require('fabric-network/lib/impl/query/query'); +const {SingleQueryHandler} = require('fabric-network/lib/impl/query/singlequeryhandler'); +const {RoundRobinQueryHandler} = require('fabric-network/lib/impl/query/roundrobinqueryhandler'); const sinon = require('sinon'); const chai = require('chai'); @@ -168,15 +168,15 @@ describe('QueryHandlers', () => { query.evaluate.withArgs([endorser2]).resolves(s400ProposalResponse2); let result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(0); + expect(handler.currentPeerIndex).to.be.equal(0); expect(result).to.be.equal('peer1-200'); result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(0); + expect(handler.currentPeerIndex).to.be.equal(0); expect(result).to.be.equal('peer1-200'); result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(0); + expect(handler.currentPeerIndex).to.be.equal(0); expect(result).to.be.equal('peer1-200'); }); it('continues to use second peer', async () => { @@ -184,15 +184,15 @@ describe('QueryHandlers', () => { query.evaluate.withArgs([endorser1]).resolves(errorProposalResponse1); let result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(1); + expect(handler.currentPeerIndex).to.be.equal(1); expect(result).to.be.equal('peer2-200'); result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(1); + expect(handler.currentPeerIndex).to.be.equal(1); expect(result).to.be.equal('peer2-200'); result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(1); + expect(handler.currentPeerIndex).to.be.equal(1); expect(result).to.be.equal('peer2-200'); }); it('switches to second peer after error', async () => { @@ -200,21 +200,21 @@ describe('QueryHandlers', () => { query.evaluate.withArgs([endorser2]).resolves(s200ProposalResponse2); let result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(0); + expect(handler.currentPeerIndex).to.be.equal(0); expect(result).to.be.equal('peer1-200'); result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(0); + expect(handler.currentPeerIndex).to.be.equal(0); expect(result).to.be.equal('peer1-200'); query.evaluate.withArgs([endorser1]).resolves(errorProposalResponse1); result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(1); + expect(handler.currentPeerIndex).to.be.equal(1); expect(result).to.be.equal('peer2-200'); result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(1); + expect(handler.currentPeerIndex).to.be.equal(1); expect(result).to.be.equal('peer2-200'); }); it('returns an error with the both peers 500 results', async () => { @@ -346,7 +346,7 @@ describe('QueryHandlers', () => { query.evaluate.withArgs([endorser2]).resolves(s400ProposalResponse2); let result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(1); + expect(handler.currentPeerIndex).to.be.equal(1); expect(result).to.be.equal('peer1-200'); try { @@ -356,7 +356,7 @@ describe('QueryHandlers', () => { } result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(1); + expect(handler.currentPeerIndex).to.be.equal(1); expect(result).to.be.equal('peer1-200'); }); it('switches between peers', async () => { @@ -364,19 +364,19 @@ describe('QueryHandlers', () => { query.evaluate.withArgs([endorser1]).resolves(s200ProposalResponse1); let result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(1); + expect(handler.currentPeerIndex).to.be.equal(1); expect(result).to.be.equal('peer1-200'); result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(0); + expect(handler.currentPeerIndex).to.be.equal(0); expect(result).to.be.equal('peer2-200'); result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(1); + expect(handler.currentPeerIndex).to.be.equal(1); expect(result).to.be.equal('peer1-200'); result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(0); + expect(handler.currentPeerIndex).to.be.equal(0); expect(result).to.be.equal('peer2-200'); }); it('continues to use second peer', async () => { @@ -384,15 +384,15 @@ describe('QueryHandlers', () => { query.evaluate.withArgs([endorser1]).resolves(errorProposalResponse1); let result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(1); + expect(handler.currentPeerIndex).to.be.equal(1); expect(result).to.be.equal('peer2-200'); result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(0); + expect(handler.currentPeerIndex).to.be.equal(0); expect(result).to.be.equal('peer2-200'); result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(1); + expect(handler.currentPeerIndex).to.be.equal(1); expect(result).to.be.equal('peer2-200'); }); it('stays with second peer after error', async () => { @@ -400,21 +400,21 @@ describe('QueryHandlers', () => { query.evaluate.withArgs([endorser2]).resolves(s200ProposalResponse2); let result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(1); + expect(handler.currentPeerIndex).to.be.equal(1); expect(result).to.be.equal('peer1-200'); result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(0); + expect(handler.currentPeerIndex).to.be.equal(0); expect(result).to.be.equal('peer2-200'); query.evaluate.withArgs([endorser1]).resolves(errorProposalResponse1); result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(1); + expect(handler.currentPeerIndex).to.be.equal(1); expect(result).to.be.equal('peer2-200'); result = await handler.evaluate(query); - expect(handler._currentPeerIndex).to.be.equal(0); + expect(handler.currentPeerIndex).to.be.equal(0); expect(result).to.be.equal('peer2-200'); }); it('returns an error with the both peers 500 results', async () => { diff --git a/fabric-network/test/network.js b/fabric-network/test/network.js index 60e99e050a..096ecde9b8 100644 --- a/fabric-network/test/network.js +++ b/fabric-network/test/network.js @@ -22,12 +22,11 @@ const should = chai.should; should(); chai.use(require('chai-as-promised')); -const Network = require('../lib/network'); +const {NetworkImpl: Network} = require('../lib/network'); const Gateway = require('../lib/gateway'); const Contract = require('../lib/contract'); const EventStrategies = require('fabric-network/lib/impl/event/defaulteventhandlerstrategies'); const EventServiceManager = require('fabric-network/lib/impl/event/eventservicemanager'); -const BlockEventListener = require('fabric-network/lib/impl/event/blockeventlistener'); describe('Network', () => { let channel; @@ -110,7 +109,7 @@ describe('Network', () => { describe('#_initializeInternalChannel', () => { it('should initialize with no discovery', async () => { await network._initializeInternalChannel({enabled:false}); - expect(network.discoveryService).to.equal(null); + expect(network.discoveryService).to.not.exist; }); it('should initialize the network using the discovery with user specified targets', async () => { @@ -228,39 +227,5 @@ describe('Network', () => { network._dispose(); sinon.assert.called(spy); }); - - it('calls unregister on its listeners', () => { - const listener = sinon.createStubInstance(BlockEventListener); - network.listeners.set('listener', listener); - network._dispose(); - sinon.assert.calledOnce(listener.unregister); - }); }); - - describe('#addBlockListener', () => { - let callback; - beforeEach(() => { - callback = () => {}; - }); - - it('should create options if the options param is undefined', async () => { - const listener = await network.addBlockListener(callback); - listener.should.to.be.instanceof(BlockEventListener); - network.listeners.get(listener).should.to.equal(listener); - }); - - it('should create an instance of BlockEventListener and add it to the list of listeners', async () => { - const listener = await network.addBlockListener(callback, {}); - listener.should.to.be.instanceof(BlockEventListener); - network.listeners.get(listener).should.to.equal(listener); - }); - - it('should create an instance of BlockEventListener and add it to the list of listeners', async () => { - const listener = await network.addBlockListener(callback, {}, eventService); - listener.should.to.be.instanceof(BlockEventListener); - network.listeners.get(listener).should.to.equal(listener); - listener.eventService.should.to.equal(eventService); - }); - }); - }); diff --git a/fabric-network/test/testutils.ts b/fabric-network/test/testutils.ts new file mode 100644 index 0000000000..cffd34ac25 --- /dev/null +++ b/fabric-network/test/testutils.ts @@ -0,0 +1,45 @@ +/** + * Copyright 2020 IBM All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * Returns a new async function that taken one argument of the generic type. The returned function also has a + * 'completePromise' property of type Promise, which resolves when the expected number of calls have been made + * to the listener function, and returns an array of the arguments passed to the listener function. + * @param expectedCallCount Number of calls after which to resolve the completePromise. + * @param maxSleep Maximum number of milliseconds to randomly sleep when invoked. + */ +export function newAsyncListener(expectedCallCount = 1, maxSleep = 0) { + let resolve; + const completePromise = new Promise((_resolve, _) => resolve = _resolve); + + const events: T[] = []; + const listener = async (event: T) => { + if (maxSleep > 0) { + // Some random delay to similate async work in the listener and catch timing bugs + await sleep(getRandomInt(maxSleep)); + } + events.push(event); + expectedCallCount--; + if (expectedCallCount === 0) { + resolve(events); + } + }; + listener.completePromise = completePromise; + + return listener; +} + +export function sleep(ms: number) { + if (ms > 0) { + return new Promise((resolve, _) => setTimeout(resolve, ms)); + } else { + return Promise.resolve(); + } +} + +export function getRandomInt(max: number) { + return Math.floor(Math.random() * Math.floor(max)); +} diff --git a/fabric-network/test/transaction.js b/fabric-network/test/transaction.js index 88869a5f9f..df1bdb169c 100644 --- a/fabric-network/test/transaction.js +++ b/fabric-network/test/transaction.js @@ -22,12 +22,12 @@ const DiscoveryHandler = require('fabric-common/lib/DiscoveryHandler'); const Committer = require('fabric-common/lib/Committer'); const Contract = require('fabric-network/lib/contract'); -const Network = require('fabric-network/lib/network'); +const {NetworkImpl: Network} = require('../lib/network'); const Gateway = require('fabric-network/lib/gateway'); const Transaction = require('fabric-network/lib/transaction'); const {TransactionEventHandler} = require('fabric-network/lib/impl/event/transactioneventhandler'); -const Query = require('fabric-network/lib/impl/query/query'); -const QueryStrategies = require('fabric-network/lib/impl/query/queryhandlerstrategies'); +const {QueryImpl: Query} = require('fabric-network/lib/impl/query/query'); +const QueryStrategies = require('fabric-network/lib/impl/query/defaultqueryhandlerstrategies'); describe('Transaction', () => { const transactionName = 'TRANSACTION_NAME'; @@ -174,15 +174,6 @@ describe('Transaction', () => { }); }); - describe('#setEventHandlerStrategy', () => { - it('returns this', () => { - const stubEventHandler = sinon.createStubInstance(TransactionEventHandler); - const stubEventHandlerFactoryFn = () => stubEventHandler; - const result = transaction.setEventHandlerStrategy(stubEventHandlerFactoryFn); - expect(result).to.equal(transaction); - }); - }); - describe('#setTransient', () => { it('returns this', () => { const result = transaction.setTransient(new Map()); @@ -278,16 +269,6 @@ describe('Transaction', () => { return expect(promise).to.be.rejectedWith(status); }); - it('uses a supplied event handler strategy', async () => { - const stubEventHandler = sinon.createStubInstance(TransactionEventHandler); - const stubEventHandlerFactoryFn = sinon.stub().withArgs(transactionId, network).returns(stubEventHandler); - - await transaction.setEventHandlerStrategy(stubEventHandlerFactoryFn).submit(); - - sinon.assert.called(stubEventHandler.startListening); - sinon.assert.called(stubEventHandler.waitForEvents); - }); - it('uses event handler strategy from gateway options', async () => { const stubEventHandler = sinon.createStubInstance(TransactionEventHandler); const stubEventHandlerFactoryFn = sinon.stub().withArgs(transactionId, network).returns(stubEventHandler); diff --git a/fabric-network/tsconfig-declaration.json b/fabric-network/tsconfig-declaration.json index e66fe7fbe2..442278983d 100644 --- a/fabric-network/tsconfig-declaration.json +++ b/fabric-network/tsconfig-declaration.json @@ -1,17 +1,7 @@ { + "extends": "./tsconfig.json", "compilerOptions": { - "alwaysStrict": true, + "allowJs": false, "declaration": true, - "module": "commonjs", - "outDir": "lib", - "rootDir": "src", - "strict": true, - "target": "es2017" - }, - "include": [ - "src/**/*" - ], - "exclude": [ - "node_modules/**/*" - ] + } } diff --git a/fabric-network/tsconfig.json b/fabric-network/tsconfig.json index 5274d2a7f6..5ca66389cb 100644 --- a/fabric-network/tsconfig.json +++ b/fabric-network/tsconfig.json @@ -3,16 +3,17 @@ "allowJs": true, "alwaysStrict": true, "module": "commonjs", - "outDir": "lib", - "rootDir": "src", + "outDir": "./lib", + "rootDir": "./src", "sourceMap": true, "strict": true, - "target": "es2017" + "target": "es2017", + "forceConsistentCasingInFileNames": true }, "include": [ - "src/**/*" + "src/" ], "exclude": [ - "node_modules/**/*" + "node_modules/" ] } diff --git a/fabric-network/types/index.d.ts b/fabric-network/types/index.d.ts index b5df6d2ea1..2d63af5065 100644 --- a/fabric-network/types/index.d.ts +++ b/fabric-network/types/index.d.ts @@ -9,6 +9,8 @@ import { Wallet } from '../lib/impl/wallet/wallet'; import { CommitListener } from '../lib/impl/event/commitlistener'; import { Identity } from '../lib/impl/wallet/identity'; +import { QueryHandlerFactory } from '../lib/impl/query/queryhandler'; +import { Network } from '../lib/network'; import { ChaincodeEvent, Channel, Client, Endorser, EventService, IdentityContext, ProposalResponse, User } from 'fabric-common'; export { Wallet }; @@ -21,8 +23,22 @@ export { IdentityProviderRegistry } from '../lib/impl/wallet/identityproviderreg export { HsmOptions, HsmX509Provider, HsmX509Identity } from '../lib/impl/wallet/hsmx509identity'; export { X509Identity } from '../lib/impl/wallet/x509identity'; export { CommitEvent, CommitError, CommitListener } from '../lib/impl/event/commitlistener'; +export { BlockEvent, BlockListener } from '../lib/impl/event/blocklistener'; export { FabricError } from '../lib/errors/fabricerror'; export { TimeoutError } from '../lib/errors/timeouterror'; +export { QueryHandlerFactory }; +export { QueryHandler } from '../lib/impl/query/queryhandler'; +export { Query, QueryResults, QueryResponse } from '../lib/impl/query/query'; +export { Network }; + +import * as DefaultEventHandlerStrategies from '../lib/impl/event/defaulteventhandlerstrategies'; +export { DefaultEventHandlerStrategies }; + +import { TxEventHandler, TxEventHandlerFactory } from '../lib/impl/event/transactioneventhandler'; +export { TxEventHandler, TxEventHandlerFactory }; + +import * as DefaultQueryHandlerStrategies from '../lib/impl/query/defaultqueryhandlerstrategies'; +export { DefaultQueryHandlerStrategies }; // Main fabric network classes // ------------------------------------------- @@ -47,43 +63,11 @@ export interface TransactionOptions { strategy?: TxEventHandlerFactory | null; } -import * as DefaultEventHandlerStrategies from '../lib/impl/event/defaulteventhandlerstrategies'; -export { DefaultEventHandlerStrategies }; - -import { TxEventHandler, TxEventHandlerFactory } from '../lib/impl/event/transactioneventhandler'; -export { TxEventHandler, TxEventHandlerFactory }; - export interface QueryOptions { strategy?: QueryHandlerFactory; timeout?: number; } -export class QueryHandlerStrategies { - public static MSPID_SCOPE_ROUND_ROBIN: QueryHandlerFactory; - public static MSPID_SCOPE_SINGLE: QueryHandlerFactory; -} - -export type QueryHandlerFactory = (network: Network) => QueryHandler; - -export interface QueryHandler { - evaluate(query: Query): Promise; -} - -export interface Query { - evaluate(targets: Endorser[]): Promise; -} - -export interface QueryResults { - [peerName: string]: Error | QueryResponse; -} - -export interface QueryResponse { - isEndorsed: boolean; - payload: Buffer; - status: number; - message: string; -} - export class Gateway { public client: Client; public identityContext: IdentityContext; @@ -95,14 +79,6 @@ export class Gateway { public getOptions(): GatewayOptions; } -export class Network { - getGateway(): Gateway; - getChannel(): Channel; - getContract(chaincodeId: string, name?: string): Contract; - addCommitListener(listener: CommitListener, peers: Endorser[], transactionId: string): Promise; - removeCommitListener(listener: CommitListener): void; -} - export class Contract { createTransaction(name: string): Transaction; evaluateTransaction(name: string, ...args: string[]): Promise; diff --git a/package-lock.json b/package-lock.json index 85587898db..a456e57264 100644 --- a/package-lock.json +++ b/package-lock.json @@ -122,6 +122,16 @@ "integrity": "sha512-0fKu/QqildpXmPVaRBoXOlyBb3MC+J0A66x97qEfLOMkn3u6nfY5esWogQwi/K0BjASYy4DbnsEWnpNL6qT5Mw==", "dev": true }, + "@babel/runtime-corejs3": { + "version": "7.8.4", + "resolved": "https://registry.npmjs.org/@babel/runtime-corejs3/-/runtime-corejs3-7.8.4.tgz", + "integrity": "sha512-+wpLqy5+fbQhvbllvlJEVRIpYj+COUWnnsm+I4jZlA8Lo7/MJmBhGTCHyk1/RWfOqBRJ2MbadddG6QltTKTlrg==", + "dev": true, + "requires": { + "core-js-pure": "^3.0.0", + "regenerator-runtime": "^0.13.2" + } + }, "@babel/template": { "version": "7.8.3", "resolved": "https://registry.npmjs.org/@babel/template/-/template-7.8.3.tgz", @@ -582,14 +592,22 @@ "dev": true }, "assertion-error-formatter": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/assertion-error-formatter/-/assertion-error-formatter-2.0.1.tgz", - "integrity": "sha512-cjC3jUCh9spkroKue5PDSKH5RFQ/KNuZJhk3GwHYmB/8qqETxLOmMdLH+ohi/VukNzxDlMvIe7zScvLoOdhb6Q==", + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/assertion-error-formatter/-/assertion-error-formatter-3.0.0.tgz", + "integrity": "sha512-6YyAVLrEze0kQ7CmJfUgrLHb+Y7XghmL2Ie7ijVa2Y9ynP3LV+VDiwFk62Dn0qtqbmY0BT0ss6p1xxpiF2PYbQ==", "dev": true, "requires": { - "diff": "^3.0.0", + "diff": "^4.0.1", "pad-right": "^0.2.2", "repeat-string": "^1.6.1" + }, + "dependencies": { + "diff": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz", + "integrity": "sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==", + "dev": true + } } }, "astral-regex": { @@ -681,16 +699,6 @@ } } }, - "babel-runtime": { - "version": "6.26.0", - "resolved": "https://registry.npmjs.org/babel-runtime/-/babel-runtime-6.26.0.tgz", - "integrity": "sha1-llxwWGaOgrVde/4E/yM3vItWR/4=", - "dev": true, - "requires": { - "core-js": "^2.4.0", - "regenerator-runtime": "^0.11.0" - } - }, "balanced-match": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.0.tgz", @@ -1161,10 +1169,10 @@ } } }, - "core-js": { - "version": "2.6.11", - "resolved": "https://registry.npmjs.org/core-js/-/core-js-2.6.11.tgz", - "integrity": "sha512-5wjnpaT/3dV+XB4borEsnAYQchn00XSgTAWKDkEqv+K8KevjbzmofK6hfJ9TZIlpj2N0xQpazy7PiRQiWHqzWg==", + "core-js-pure": { + "version": "3.6.4", + "resolved": "https://registry.npmjs.org/core-js-pure/-/core-js-pure-3.6.4.tgz", + "integrity": "sha512-epIhRLkXdgv32xIUFaaAry2wdxZYBi6bgM7cB136dzzXXa+dFyRLTZeLUJxnd8ShrmyVXBub63n2NHo2JAt8Cw==", "dev": true }, "core-util-is": { @@ -1195,54 +1203,107 @@ } }, "cucumber": { - "version": "5.0.1", - "resolved": "https://registry.npmjs.org/cucumber/-/cucumber-5.0.1.tgz", - "integrity": "sha512-sY9zW1E6G60b7bhUpPEcuJDfAowKgAPabMtr6+8tn2iMeBtF37hiKJvZIS/FO/KBvrxRJ5HDriLP3905NXcbDQ==", + "version": "6.0.5", + "resolved": "https://registry.npmjs.org/cucumber/-/cucumber-6.0.5.tgz", + "integrity": "sha512-x+W9Fwk6TvcapQsYMxwFU5AsQJDOIJVGrPKmH15OC7jzb9/Dk7Hb0ZAyw4WcpaDcUDRc8bi2k2yJejDp5eTRlg==", "dev": true, "requires": { - "assertion-error-formatter": "^2.0.1", - "babel-runtime": "^6.11.6", + "assertion-error-formatter": "^3.0.0", "bluebird": "^3.4.1", "cli-table3": "^0.5.1", "colors": "^1.1.2", - "commander": "^2.9.0", - "cucumber-expressions": "^6.0.0", - "cucumber-tag-expressions": "^1.1.1", + "commander": "^3.0.1", + "cucumber-expressions": "^8.1.0", + "cucumber-tag-expressions": "^2.0.2", "duration": "^0.2.1", - "escape-string-regexp": "^1.0.5", - "figures": "2.0.0", - "gherkin": "^5.0.0", - "glob": "^7.0.0", - "indent-string": "^3.1.0", + "escape-string-regexp": "^2.0.0", + "figures": "^3.0.0", + "gherkin": "5.0.0", + "glob": "^7.1.3", + "indent-string": "^4.0.0", "is-generator": "^1.0.2", - "is-stream": "^1.1.0", + "is-stream": "^2.0.0", "knuth-shuffle-seeded": "^1.0.6", - "lodash": "^4.17.10", + "lodash": "^4.17.14", "mz": "^2.4.0", "progress": "^2.0.0", "resolve": "^1.3.3", - "serialize-error": "^2.1.0", + "serialize-error": "^4.1.0", "stack-chain": "^2.0.0", "stacktrace-js": "^2.0.0", - "string-argv": "0.1.1", + "string-argv": "^0.3.0", "title-case": "^2.1.1", "util-arity": "^1.0.2", "verror": "^1.9.0" + }, + "dependencies": { + "commander": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/commander/-/commander-3.0.2.tgz", + "integrity": "sha512-Gar0ASD4BDyKC4hl4DwHqDrmvjoxWKZigVnAbn5H1owvm4CxCPdb0HQDehwNYMJpla5+M2tPmPARzhtYuwpHow==", + "dev": true + }, + "escape-string-regexp": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/escape-string-regexp/-/escape-string-regexp-2.0.0.tgz", + "integrity": "sha512-UpzcLCXolUWcNu5HtVMHYdXJjArjsF9C0aNnquZYY4uW/Vu0miy5YoWvbV345HauVvcAUnpRuhMMcqTcGOY2+w==", + "dev": true + }, + "figures": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/figures/-/figures-3.2.0.tgz", + "integrity": "sha512-yaduQFRKLXYOGgEn6AZau90j3ggSOyiqXU0F9JZfeXYhNa+Jk4X+s45A2zg5jns87GAFa34BBm2kXw4XpNcbdg==", + "dev": true, + "requires": { + "escape-string-regexp": "^1.0.5" + }, + "dependencies": { + "escape-string-regexp": { + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/escape-string-regexp/-/escape-string-regexp-1.0.5.tgz", + "integrity": "sha1-G2HAViGQqN/2rjuyzwIAyhMLhtQ=", + "dev": true + } + } + } } }, "cucumber-expressions": { - "version": "6.6.2", - "resolved": "https://registry.npmjs.org/cucumber-expressions/-/cucumber-expressions-6.6.2.tgz", - "integrity": "sha512-WcFSVBiWNLJbIcAAC3t/ACU46vaOKfe1UIF5H3qveoq+Y4XQm9j3YwHurQNufRKBBg8nCnpU7Ttsx7egjS3hwA==", + "version": "8.3.0", + "resolved": "https://registry.npmjs.org/cucumber-expressions/-/cucumber-expressions-8.3.0.tgz", + "integrity": "sha512-cP2ya0EiorwXBC7Ll7Cj7NELYbasNv9Ty42L4u7sso9KruWemWG1ZiTq4PMqir3SNDSrbykoqI5wZgMbLEDjLQ==", "dev": true, "requires": { - "becke-ch--regex--s0-0-v1--base--pl--lib": "^1.2.0" + "becke-ch--regex--s0-0-v1--base--pl--lib": "^1.4.0", + "xregexp": "^4.2.4" + } + }, + "cucumber-pretty": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/cucumber-pretty/-/cucumber-pretty-6.0.0.tgz", + "integrity": "sha512-ddx/VInPVKFB7N86QujgLivihJhuzexKwExMuFaUjSlEs5zVVqBgaf55f88h97VafXTWX+ZAcxTUwMBS4mYj/g==", + "dev": true, + "requires": { + "cli-table3": "^0.5.1", + "colors": "^1.4.0", + "figures": "^3.0.0" + }, + "dependencies": { + "figures": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/figures/-/figures-3.2.0.tgz", + "integrity": "sha512-yaduQFRKLXYOGgEn6AZau90j3ggSOyiqXU0F9JZfeXYhNa+Jk4X+s45A2zg5jns87GAFa34BBm2kXw4XpNcbdg==", + "dev": true, + "requires": { + "escape-string-regexp": "^1.0.5" + } + } } }, "cucumber-tag-expressions": { - "version": "1.1.1", - "resolved": "https://registry.npmjs.org/cucumber-tag-expressions/-/cucumber-tag-expressions-1.1.1.tgz", - "integrity": "sha1-f1x7cACbwrZmWRv+ZIVFeL7e6Fo=", + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/cucumber-tag-expressions/-/cucumber-tag-expressions-2.0.3.tgz", + "integrity": "sha512-+x5j1IfZrBtbvYHuoUX0rl4nUGxaey6Do9sM0CABmZfDCcWXuuRm1fQeCaklIYQgOFHQ6xOHvDSdkMHHpni6tQ==", "dev": true }, "cycle": { @@ -2194,9 +2255,9 @@ } }, "gherkin": { - "version": "5.1.0", - "resolved": "https://registry.npmjs.org/gherkin/-/gherkin-5.1.0.tgz", - "integrity": "sha1-aEu7A63STq9731RPWAM+so+zxtU=", + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/gherkin/-/gherkin-5.0.0.tgz", + "integrity": "sha1-lt70EZjsOQgli1Ea909lWidk0qE=", "dev": true }, "glob": { @@ -2917,9 +2978,9 @@ "dev": true }, "indent-string": { - "version": "3.2.0", - "resolved": "https://registry.npmjs.org/indent-string/-/indent-string-3.2.0.tgz", - "integrity": "sha1-Sl/W0nzDMvN+VBmlBNu4NxBckok=", + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/indent-string/-/indent-string-4.0.0.tgz", + "integrity": "sha512-EdDDZu4A2OyIK7Lr/2zG+w5jmbuk1DVBnEwREQvBzspBJkCEbRa8GxU1lghYcaGJCnRWibjDXlq779X1/y5xwg==", "dev": true }, "inflight": { @@ -3120,9 +3181,9 @@ "dev": true }, "is-stream": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/is-stream/-/is-stream-1.1.0.tgz", - "integrity": "sha1-EtSj3U5o4Lec6428hBc66A2RykQ=", + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/is-stream/-/is-stream-2.0.0.tgz", + "integrity": "sha512-XCoy+WlUr7d1+Z8GgSuXmpuUFC9fOhRXglJMx+dwLKTkL44Cjd4W1Z5P+BQZpr+cR93aGP4S/s7Ftw6Nd/kiEw==", "dev": true }, "is-symbol": { @@ -6062,9 +6123,9 @@ } }, "regenerator-runtime": { - "version": "0.11.1", - "resolved": "https://registry.npmjs.org/regenerator-runtime/-/regenerator-runtime-0.11.1.tgz", - "integrity": "sha512-MguG95oij0fC3QV3URf4V2SDYGJhJnJGqvIIgdECeODCT98wSWDAJ94SSuVpYQUoTcGUIL6L4yNB7j1DFFHSBg==", + "version": "0.13.3", + "resolved": "https://registry.npmjs.org/regenerator-runtime/-/regenerator-runtime-0.13.3.tgz", + "integrity": "sha512-naKIZz2GQ8JWh///G7L3X6LaQUAMp2lvb1rvwwsURe/VXwD6VMfr+/1NuNw3ag8v2kY1aQ/go5SNn79O9JU7yw==", "dev": true }, "regexp.prototype.flags": { @@ -6630,10 +6691,21 @@ "dev": true }, "serialize-error": { - "version": "2.1.0", - "resolved": "https://registry.npmjs.org/serialize-error/-/serialize-error-2.1.0.tgz", - "integrity": "sha1-ULZ51WNc34Rme9yOWa9OW4HV9go=", - "dev": true + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/serialize-error/-/serialize-error-4.1.0.tgz", + "integrity": "sha512-5j9GgyGsP9vV9Uj1S0lDCvlsd+gc2LEPVK7HHHte7IyPwOD4lVQFeaX143gx3U5AnoCi+wbcb3mvaxVysjpxEw==", + "dev": true, + "requires": { + "type-fest": "^0.3.0" + }, + "dependencies": { + "type-fest": { + "version": "0.3.1", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.3.1.tgz", + "integrity": "sha512-cUGJnCdr4STbePCgqNFbpVNCepa+kAVohJs1sLhxzdH+gnEoOd8VhbYa7pD3zZYGiURWM2xzEII3fQcRizDkYQ==", + "dev": true + } + } }, "set-blocking": { "version": "2.0.0", @@ -6911,9 +6983,9 @@ } }, "string-argv": { - "version": "0.1.1", - "resolved": "https://registry.npmjs.org/string-argv/-/string-argv-0.1.1.tgz", - "integrity": "sha512-El1Va5ehZ0XTj3Ekw4WFidXvTmt9SrC0+eigdojgtJMVtPkF0qbBe9fyNSl9eQf+kUHnTSQxdQYzuHfZy8V+DQ==", + "version": "0.3.1", + "resolved": "https://registry.npmjs.org/string-argv/-/string-argv-0.3.1.tgz", + "integrity": "sha512-a1uQGz7IyVy9YwhqjZIZu1c8JO8dNIe20xBmSS6qu9kv++k3JGzCVmprbNN5Kn+BgzD5E7YYwg1CcjuJMRNsvg==", "dev": true }, "string-width": { @@ -7448,9 +7520,9 @@ } }, "typescript": { - "version": "3.7.5", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-3.7.5.tgz", - "integrity": "sha512-/P5lkRXkWHNAbcJIiHPfRoKqyd7bsyCma1hZNUGfn20qm64T6ZBlrzprymeu918H+mB/0rIg2gGK/BXkhhYgBw==", + "version": "3.8.2", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-3.8.2.tgz", + "integrity": "sha512-EgOVgL/4xfVrCMbhYKUQTdF37SQn4Iw73H5BgCrF1Abdun7Kwy/QZsE/ssAy0y4LxBbvua3PIbFsbRczWWnDdQ==", "dev": true }, "uc.micro": { @@ -7715,6 +7787,15 @@ "integrity": "sha512-MjGsXhKG8YjTKrDCXseFo3ClbMGvUD4en29H2Cev1dv4P/chlpw6KdYmlCWDkhosBVKRDjM836+3e3pm1cBNJA==", "dev": true }, + "xregexp": { + "version": "4.3.0", + "resolved": "https://registry.npmjs.org/xregexp/-/xregexp-4.3.0.tgz", + "integrity": "sha512-7jXDIFXh5yJ/orPn4SXjuVrWWoi4Cr8jfV1eHv9CixKSbU+jY4mxfrBwAuDvupPNKpMUY+FeIqsVw/JLT9+B8g==", + "dev": true, + "requires": { + "@babel/runtime-corejs3": "^7.8.3" + } + }, "xtend": { "version": "4.0.2", "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", diff --git a/package.json b/package.json index c0d3e24c59..4951781907 100644 --- a/package.json +++ b/package.json @@ -34,11 +34,11 @@ "dockerReady": "npm run dockerClean && node -e 'require(\"./scripts/npm_scripts/dockerReady.js\").dockerReady()'", "tapeIntegration": "./scripts/npm_scripts/runTape.sh", "tapeIntegrationNoHSM": "./scripts/npm_scripts/runTape.sh noHSM", - "cucumberScenario": "npm run setupCucumbers && export HFC_LOGGING='{\"debug\":\"test/temp/debugc.log\"}' && npm run test:cucumber && npm run dockerClean && nyc --check-coverage --statements 64 --branches 40 --functions 58 --lines 64 npm run test:ts-cucumber", + "cucumberScenario": "npm run setupCucumbers && export HFC_LOGGING='{\"debug\":\"test/temp/debugc.log\"}' && npm run test:cucumber && npm run dockerClean && nyc --check-coverage --statements 63 --branches 38 --functions 58 --lines 63 npm run test:ts-cucumber", "setupCucumbers": "node -e 'require(\"./scripts/npm_scripts/testFunctions.js\").createCucumberLogFile()'", "test:cucumber": "cucumber-js ./test/scenario/features/*.feature", - "test:ts-cucumber": "cucumber-js ./test/ts-scenario/features/*.feature --require './test/ts-scenario/steps/**/*.ts' --require './test/ts-scenario/support/**/*.ts' --require-module ts-node/register", - "test:ts-cucumber-tagged": "cucumber-js ./test/ts-scenario/features/*.feature --require './test/ts-scenario/steps/**/*.ts' --require './test/ts-scenario/support/**/*.ts' --require-module ts-node/register --tags @discovery", + "test:ts-cucumber": "cucumber-js -f node_modules/cucumber-pretty ./test/ts-scenario/features/*.feature --require './test/ts-scenario/steps/**/*.ts' --require './test/ts-scenario/support/**/*.ts' --require-module ts-node/register", + "test:ts-cucumber-tagged": "npm run test:ts-cucumber -- --tags @discovery", "testHeadless": "run-s cleanUp compile lint unitTest:all", "tapeAndCucumber": "run-s tapeIntegration dockerClean cucumberScenario" }, @@ -55,7 +55,8 @@ "bn.js": "^4.11.8", "chai": "^4.2.0", "chai-as-promised": "^7.1.1", - "cucumber": "5.0.1", + "cucumber": "^6.0.5", + "cucumber-pretty": "^6.0.0", "elliptic": "^6.3.2", "eslint": "^6.8.0", "fabric-ca-client": "file:./fabric-ca-client", @@ -81,7 +82,7 @@ "tape-promise": "^3.0.0", "ts-node": "^8.6.2", "tslint": "5.19.0", - "typescript": "^3.7.5", + "typescript": "^3.8.2", "winston": "^2.2.0" }, "license": "Apache-2.0", diff --git a/test/ts-scenario/features/events.feature b/test/ts-scenario/features/events.feature index e7bfd7a444..0a80fc12cc 100644 --- a/test/ts-scenario/features/events.feature +++ b/test/ts-scenario/features/events.feature @@ -37,16 +37,16 @@ Feature: Node SDK Events And I use the gateway named event_gateway to submit a total of 5 transactions with args [createValueDisconnect] for contract events instantiated on channel eventschannel Then I receive 0 events from the listener named dcValueListener - Scenario: Using a Contract I can listen to unfiltered block events emitted by networks - When I use the gateway named event_gateway to listen for unfiltered block events with a listener named unfilteredBlockListener on channel eventschannel - When I use the gateway named event_gateway to submit a transaction with args [createValue] for contract events instantiated on channel eventschannel - Then I receive a minimum 1 events from the listener unfilteredBlockListener +# Scenario: Using a Contract I can listen to unfiltered block events emitted by networks +# When I use the gateway named event_gateway to listen for unfiltered block events with a listener named unfilteredBlockListener on channel eventschannel +# When I use the gateway named event_gateway to submit a transaction with args [createValue] for contract events instantiated on channel eventschannel +# Then I receive a minimum 1 events from the listener unfilteredBlockListener - Scenario: Using a Contract I can stop listening to unfiltered block events emitted by networks - Given I am listening for unfiltered block events with a listener named unfilteredBlockListener - When I unregister the listener named unfilteredBlockListener - And I use the gateway named event_gateway to submit a total of 5 transactions with args [createValue] for contract events instantiated on channel eventschannel - Then I receive 0 events from the listener named unfilteredBlockListener +# Scenario: Using a Contract I can stop listening to unfiltered block events emitted by networks +# Given I am listening for unfiltered block events with a listener named unfilteredBlockListener +# When I unregister the listener named unfilteredBlockListener +# And I use the gateway named event_gateway to submit a total of 5 transactions with args [createValue] for contract events instantiated on channel eventschannel +# Then I receive 0 events from the listener named unfilteredBlockListener Scenario: Using a Contract I can listen to filtered block events emitted by networks When I use the gateway named event_gateway to listen for filtered block events with a listener named filteredBlockListener on channel eventschannel @@ -59,13 +59,13 @@ Feature: Node SDK Events And I use the gateway named event_gateway to submit a total of 5 transactions with args [createValue] for contract events instantiated on channel eventschannel Then I receive 0 events from the listener named filteredBlockListener - Scenario: Using a Contract I can listen to unfiltered block events emitted by networks between a start and end block - When I use the gateway named event_gateway to listen for unfiltered block events between 0 and 2 with a listener named unfilteredNumberedBlockListener on channel eventschannel - And I use the gateway named event_gateway to submit a total of 10 transactions with args [createValue] for contract events instantiated on channel eventschannel - Then I receive a minimum 2 events from the listener unfilteredNumberedBlockListener +# Scenario: Using a Contract I can listen to unfiltered block events emitted by networks between a start and end block +# When I use the gateway named event_gateway to listen for unfiltered block events between 0 and 2 with a listener named unfilteredNumberedBlockListener on channel eventschannel +# And I use the gateway named event_gateway to submit a total of 10 transactions with args [createValue] for contract events instantiated on channel eventschannel +# Then I receive a minimum 2 events from the listener unfilteredNumberedBlockListener - Scenario: Using a Contract I can stop listening to unfiltered block events emitted by networks between a start and end block - Given I am listening for unfiltered block events with a listener named unfilteredNumberedBlockListener - When I unregister the listener named unfilteredNumberedBlockListener - And I use the gateway named event_gateway to submit a total of 5 transactions with args [createValue] for contract events instantiated on channel eventschannel - Then I receive 0 events from the listener named unfilteredNumberedBlockListener \ No newline at end of file +# Scenario: Using a Contract I can stop listening to unfiltered block events emitted by networks between a start and end block +# Given I am listening for unfiltered block events with a listener named unfilteredNumberedBlockListener +# When I unregister the listener named unfilteredNumberedBlockListener +# And I use the gateway named event_gateway to submit a total of 5 transactions with args [createValue] for contract events instantiated on channel eventschannel +# Then I receive 0 events from the listener named unfilteredNumberedBlockListener diff --git a/test/ts-scenario/steps/lib/gateway.ts b/test/ts-scenario/steps/lib/gateway.ts index 3976571caa..21c0545c04 100644 --- a/test/ts-scenario/steps/lib/gateway.ts +++ b/test/ts-scenario/steps/lib/gateway.ts @@ -13,7 +13,7 @@ import { StateStore } from './utility/stateStore'; import { createQueryHandler as sampleQueryStrategy } from '../../config/handlers/sample-query-handler'; import { createTransactionEventHandler as sampleTxnEventStrategy } from '../../config/handlers/sample-transaction-event-handler'; -import { DefaultEventHandlerStrategies, QueryHandlerStrategies, Gateway, GatewayOptions, Wallet, Wallets, Identity, Contract, Network, TxEventHandlerFactory, QueryHandlerFactory, Transaction, TransientMap } from 'fabric-network'; +import { DefaultEventHandlerStrategies, DefaultQueryHandlerStrategies, Gateway, GatewayOptions, Wallet, Wallets, Identity, Contract, Network, TxEventHandlerFactory, QueryHandlerFactory, Transaction, TransientMap } from 'fabric-network'; import * as fs from 'fs'; import * as path from 'path'; @@ -22,16 +22,16 @@ const txnTypes: string[] = ['evaluate', 'submit']; const txnResponseTypes: string[] = ['evaluate', 'error', 'submit']; const supportedWallets: string[] = [Constants.FILE_WALLET as string, Constants.MEMORY_WALLET as string, Constants.COUCH_WALLET as string]; -const EventStrategies: any = { +const EventStrategies: { [key: string]: TxEventHandlerFactory } = { MSPID_SCOPE_ALLFORTX : DefaultEventHandlerStrategies.MSPID_SCOPE_ALLFORTX, MSPID_SCOPE_ANYFORTX : DefaultEventHandlerStrategies.MSPID_SCOPE_ANYFORTX, NETWORK_SCOPE_ALLFORTX : DefaultEventHandlerStrategies.NETWORK_SCOPE_ALLFORTX, NETWORK_SCOPE_ANYFORTX : DefaultEventHandlerStrategies.NETWORK_SCOPE_ANYFORTX, }; -const QueryStrategies: any = { - MSPID_SCOPE_SINGLE : QueryHandlerStrategies.MSPID_SCOPE_SINGLE, - MSPID_SCOPE_ROUND_ROBIN : QueryHandlerStrategies.MSPID_SCOPE_ROUND_ROBIN, +const QueryStrategies: { [key: string]: QueryHandlerFactory } = { + MSPID_SCOPE_SINGLE : DefaultQueryHandlerStrategies.MSPID_SCOPE_SINGLE, + MSPID_SCOPE_ROUND_ROBIN : DefaultQueryHandlerStrategies.MSPID_SCOPE_ROUND_ROBIN, }; /** diff --git a/test/ts-scenario/steps/lib/listeners.ts b/test/ts-scenario/steps/lib/listeners.ts index ab6924d11a..6f6b967625 100644 --- a/test/ts-scenario/steps/lib/listeners.ts +++ b/test/ts-scenario/steps/lib/listeners.ts @@ -4,7 +4,7 @@ 'use strict'; -import { Contract, Gateway, Network } from 'fabric-network'; +import { Contract, Gateway, Network, BlockEvent, BlockListener } from 'fabric-network'; import { Constants } from '../constants'; import * as GatewayHelper from './gateway'; import * as BaseUtils from './utility/baseUtils'; @@ -93,7 +93,7 @@ export async function createBlockListener(gatewayName: string, channelName: stri filtered, listener: {}, payloads: [], - type: Constants.BLOCK, + type: Constants.BLOCK }; // If no listeners, then create the new map item @@ -102,45 +102,33 @@ export async function createBlockListener(gatewayName: string, channelName: stri } // Create the listener - const listener = await (network as any).addBlockListener((err: any, blockNumber: string, block: any) => { // TODO: remove cast - if (err) { - BaseUtils.logMsg('-> Received a block event error', err); - throw err; - } + const listener: BlockListener = async (blockEvent: BlockEvent) => { BaseUtils.logMsg('->Received a block event', listenerName); - - if (filtered) { - BaseUtils.checkProperty(block, 'channel_id', true); - BaseUtils.checkProperty(block, 'number', true); - BaseUtils.checkProperty(block, 'filtered_transactions', true); - blockNumber = block.number; - } else { - BaseUtils.checkProperty(block, 'header', true); - BaseUtils.checkProperty(block, 'data', true); - BaseUtils.checkProperty(block, 'metadata', true); - } - - if (startBlock) { - BaseUtils.checkSizeEquality(Number(blockNumber), Number(startBlock) - 1, true, true); - } - if (endBlock) { - BaseUtils.checkSizeEquality(Number(blockNumber), Number(endBlock) + 1, false, true); - } + // if (startBlock) { + // BaseUtils.checkSizeEquality(Number(blockNumber), Number(startBlock) - 1, true, true); + // } + // if (endBlock) { + // BaseUtils.checkSizeEquality(Number(blockNumber), Number(endBlock) + 1, false, true); + // } const tlisteners: any = stateStore.get(Constants.LISTENERS); if (tlisteners) { const listenerUpdate: any = tlisteners.get(listenerName); if (listenerUpdate) { - listenerUpdate.payloads.push(block); + listenerUpdate.payloads.push(blockEvent); listenerUpdate.calls = listenerUpdate.payloads.length; } } - return Promise.resolve(); - }, {filtered, replay, startBlock, endBlock}); + if (endBlock && blockEvent.blockNumber.greaterThanOrEqual(endBlock)) { + network.removeBlockListener(listener); + } + }; + await network.addBlockListener(listener); // Roll into a listener object to store listenerObject.listener = listener; + listenerObject.remove = () => network.removeBlockListener(listener); listeners.set(listenerName, listenerObject); stateStore.set(Constants.LISTENERS, listeners); } @@ -256,9 +244,13 @@ export function checkTransactionListenerDetails(listenerName: string, listenerTy } } -export function unregisterListener(listenerName: string): void { - const listenerObject: any = getListenerObject(listenerName); - const listener: any = listenerObject.listener; - listener.unregister(); +export function unregisterListener(listenerName: string) { + const listenerObject = getListenerObject(listenerName); + if (typeof listenerObject.remove === 'function') { + listenerObject.remove(); + } else { + const listener = listenerObject.listener; + listener.unregister(); + } listenerObject.active = false; } diff --git a/test/ts-scenario/steps/lib/utility/clientUtils.ts b/test/ts-scenario/steps/lib/utility/clientUtils.ts index d8bbdf24b0..5f06c569bc 100644 --- a/test/ts-scenario/steps/lib/utility/clientUtils.ts +++ b/test/ts-scenario/steps/lib/utility/clientUtils.ts @@ -198,7 +198,7 @@ export async function commitChannelRequest(requestName: string, clientName: stri }; // Send the signed endorsement to the requested peers. - const endorsementResponse: ProposalResponse = await endorsement.send(endorsementRequest, {}); + const endorsementResponse: ProposalResponse = await endorsement.send(endorsementRequest); assertNoErrors(endorsementResponse); // Connect to 'Eventer' @@ -279,7 +279,7 @@ export async function commitChannelRequest(requestName: string, clientName: stri try { // Send commit, having started the event listener, wait for all - const commitSubmission: any = commit.send(commitRequest, {}); + const commitSubmission: any = commit.send(commitRequest); const commitResults: any[] = await Promise.all([eventListener, commitSubmission]); requestObject.results = { @@ -382,7 +382,7 @@ export async function submitChannelRequest(clientName: string, channelName: stri // Send query to target peers const queryObject: any = {}; try { - const queryResponse: ProposalResponse = await query.send(queryRequest, {}); + const queryResponse: ProposalResponse = await query.send(queryRequest); if (queryResponse.errors.length > 0) { // failure