Skip to content

Commit

Permalink
FABN-1524: Checkpoint contract events (#199)
Browse files Browse the repository at this point in the history
Signed-off-by: Mark S. Lewis <[email protected]>
  • Loading branch information
bestbeforetoday authored Mar 27, 2020
1 parent 9ba41f8 commit a299938
Show file tree
Hide file tree
Showing 10 changed files with 692 additions and 409 deletions.
4 changes: 2 additions & 2 deletions fabric-network/src/checkpointer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import Long = require('long');

export interface Checkpointer {
addTransactionId(transactionId: string): Promise<void>;
getBlockNumber(): Long | undefined;
getTransactionIds(): Set<string>;
getBlockNumber(): Promise<Long | undefined>;
getTransactionIds(): Promise<Set<string>>;
setBlockNumber(blockNumber: Long): Promise<void>;
}
29 changes: 29 additions & 0 deletions fabric-network/src/defaultcheckpointers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Copyright 2020 IBM All Rights Reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/

import { Checkpointer } from './checkpointer';

// TODO: Implement, remove @private tag, and export in both index.js and index.d.ts

/**
* Provides static factory functions used to create instances of default
* {@link module:fabric-network.Checkpointer|Checkpointer} implementations.
* @private
*/
export class DefaultCheckpointers {
/**
* Create a checkpointer that uses the specified file to store persistent state. If the file does not exist, it will
* be created and the checkpointer will have an uninitialized state that will accept any events. If the file does
* exist, it must contain valid checkpointer state.
* @param {string} path Path to a file holding persistent checkpoint state.
* @returns {Promise<module:fabric-network.Checkpointer>} A checkpointer.
*/
static async file(path: string): Promise<Checkpointer> {
throw new Error('Not yet implemented');
// TODO: Create file system checkpointer and call async initialization to load initial state asynchronously
// from the file.
}
}
22 changes: 8 additions & 14 deletions fabric-network/src/impl/event/contractlistenersession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { BlockEvent, BlockListener, ContractEvent, ContractListener, ListenerOpt
import * as Logger from '../../logger';
import { Network } from '../../network';
import { ListenerSession } from './listenersession';
import * as GatewayUtils from '../gatewayutils';
import * as Listeners from './listeners';
const logger = Logger.getLogger('ContractListenerSession');

export class ContractListenerSession implements ListenerSession {
Expand All @@ -22,7 +22,7 @@ export class ContractListenerSession implements ListenerSession {
this.listener = listener;
this.chaincodeId = chaincodeId;
this.network = network;
this.blockListener = (blockEvent: BlockEvent) => this.onBlockEvent(blockEvent);
this.blockListener = this.newBlockListener(options);
this.options = options;
}

Expand All @@ -34,20 +34,14 @@ export class ContractListenerSession implements ListenerSession {
this.network.removeBlockListener(this.blockListener);
}

private async onBlockEvent(blockEvent: BlockEvent): Promise<void> {
const transactionPromises = blockEvent.getTransactionEvents()
.filter((transactionEvent) => transactionEvent.isValid)
.map((transactionEvent) => this.onTransactionEvent(transactionEvent));

// Don't use Promise.all() as it returns early if any promises are rejected
await GatewayUtils.allSettled(transactionPromises);
private newBlockListener(options?: ListenerOptions): BlockListener {
const callback = this.onContractEvent.bind(this);
return Listeners.blockFromContractListener(callback, options?.checkpointer);
}

private async onTransactionEvent(transactionEvent: TransactionEvent): Promise<void> {
for (const contractEvent of transactionEvent.getContractEvents()) {
if (this.isMatch(contractEvent)) {
await this.notifyListener(contractEvent);
}
private async onContractEvent(event: ContractEvent): Promise<void> {
if (this.isMatch(event)) {
await this.notifyListener(event);
}
}

Expand Down
74 changes: 66 additions & 8 deletions fabric-network/src/impl/event/listeners.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,73 @@
*/

import { Checkpointer } from '../../checkpointer';
import { BlockListener } from '../../events';

export function newCheckpointBlockListener(listener: BlockListener, checkpointer: Checkpointer): BlockListener {
return async (event) => {
const checkpointBlockNumber = checkpointer.getBlockNumber();
if (!checkpointBlockNumber || checkpointBlockNumber.equals(event.blockNumber)) {
await listener(event);
const nextBlockNumber = event.blockNumber.add(1);
import { BlockListener, TransactionEvent, ContractListener } from '../../events';
import * as GatewayUtils from '../gatewayutils';
import * as Logger from '../../logger';

const logger = Logger.getLogger('Listener');

type TransactionListener = (event: TransactionEvent) => Promise<void>;

export function checkpointBlockListener(listener: BlockListener, checkpointer: Checkpointer): BlockListener {
return async (blockEvent) => {
const checkpointBlockNumber = await checkpointer.getBlockNumber();
if (!checkpointBlockNumber || checkpointBlockNumber.equals(blockEvent.blockNumber)) {
await listener(blockEvent);
const nextBlockNumber = blockEvent.blockNumber.add(1);
await checkpointer.setBlockNumber(nextBlockNumber);
}
};
}

export function blockFromContractListener(listener: ContractListener, checkpointer?: Checkpointer): BlockListener {
if (checkpointer) {
const transactionListener = transactionFromContractListener(listener);
const checkpointTxListener = checkpointTransactionListener(transactionListener, checkpointer);
const blockListener = blockFromTransactionListener(checkpointTxListener);
return checkpointBlockListener(blockListener, checkpointer);
} else {
const transactionListener = transactionFromContractListener(listener);
return blockFromTransactionListener(transactionListener);
}
}

function transactionFromContractListener(listener: ContractListener): TransactionListener {
return async (transactionEvent) => {
for (const contractEvent of transactionEvent.getContractEvents()) {
await listener(contractEvent);
}
};
}

function checkpointTransactionListener(listener: TransactionListener, checkpointer: Checkpointer): TransactionListener {
return async (transactionEvent) => {
const checkpointTransactionIds = await checkpointer.getTransactionIds();
if (!checkpointTransactionIds.has(transactionEvent.transactionId)) {
await listener(transactionEvent);
await checkpointer.addTransactionId(transactionEvent.transactionId);
}
};
}

function blockFromTransactionListener(listener: TransactionListener): BlockListener {
return async (blockEvent) => {
const transactionPromises = blockEvent.getTransactionEvents()
.filter((transactionEvent) => transactionEvent.isValid)
.map((transactionEvent) => listener(transactionEvent));

// Don't use Promise.all() as it returns early if any promises are rejected
const results = await GatewayUtils.allSettled(transactionPromises);
logAndThrowErrors(results);
};
}

function logAndThrowErrors(results: GatewayUtils.SettledPromiseResult<void>[]): void {
const errors = results
.filter((result) => result.status === 'rejected')
.map((result) => (result as GatewayUtils.RejectedPromiseResult).reason);
if (errors.length > 0) {
errors.forEach((error) => logger.warn('Error notifying transaction listener', error));
throw new Error('Error notifying listener: ' + errors[0].stack || errors[0].message);
}
}
4 changes: 2 additions & 2 deletions fabric-network/src/impl/event/listenersession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ export interface ListenerSession {
close(): void;
}

export async function addListener<T>(listener: T, listenerSessions: Map<T, ListenerSession>, sessionSupplier: () => ListenerSession) {
export async function addListener<T>(listener: T, listenerSessions: Map<T, ListenerSession>, sessionSupplier: () => Promise<ListenerSession>) {
if (!listenerSessions.has(listener)) {
const session = sessionSupplier();
const session = await sessionSupplier();
// Store listener before starting in case start fires error events that trigger remove of the listener
listenerSessions.set(listener, session);
await session.start();
Expand Down
16 changes: 8 additions & 8 deletions fabric-network/src/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { BlockEventSource } from './impl/event/blockeventsource';
import { CommitListenerSession } from './impl/event/commitlistenersession';
import { EventServiceManager } from './impl/event/eventservicemanager';
import { IsolatedBlockListenerSession } from './impl/event/isolatedblocklistenersession';
import { newCheckpointBlockListener } from './impl/event/listeners';
import { checkpointBlockListener } from './impl/event/listeners';
import { addListener, ListenerSession, removeListener } from './impl/event/listenersession';
import { SharedBlockListenerSession } from './impl/event/sharedblocklistenersession';
import { QueryHandlerFactory } from './impl/query/queryhandler';
Expand All @@ -22,13 +22,13 @@ import Gateway = require('./gateway');

const logger = Logger.getLogger('Network');

function listenerOptionsWithDefaults(options: ListenerOptions): ListenerOptions {
async function listenerOptionsWithDefaults(options: ListenerOptions): Promise<ListenerOptions> {
const defaultOptions = {
type: 'full'
};
const result = Object.assign(defaultOptions, options);

const checkpointBlock = options.checkpointer?.getBlockNumber();
const checkpointBlock = await options.checkpointer?.getBlockNumber();
if (checkpointBlock) {
result.startBlock = checkpointBlock;
}
Expand Down Expand Up @@ -223,7 +223,7 @@ export class NetworkImpl implements Network {
}

async addCommitListener(listener: CommitListener, peers: Endorser[], transactionId: string) {
const sessionSupplier = () => new CommitListenerSession(listener, this.eventServiceManager, peers, transactionId);
const sessionSupplier = async () => new CommitListenerSession(listener, this.eventServiceManager, peers, transactionId);
return await addListener(listener, this.commitListeners, sessionSupplier);
}

Expand All @@ -232,19 +232,19 @@ export class NetworkImpl implements Network {
}

async addBlockListener(listener: BlockListener, options = {} as ListenerOptions) {
const sessionSupplier = () => this.newBlockListenerSession(listener, options);
const sessionSupplier = async () => await this.newBlockListenerSession(listener, options);
return await addListener(listener, this.blockListeners, sessionSupplier);
}

removeBlockListener(listener: BlockListener) {
removeListener(listener, this.blockListeners);
}

private newBlockListenerSession(listener: BlockListener, options: ListenerOptions) {
options = listenerOptionsWithDefaults(options);
private async newBlockListenerSession(listener: BlockListener, options: ListenerOptions): Promise<ListenerSession> {
options = await listenerOptionsWithDefaults(options);

if (options.checkpointer) {
listener = newCheckpointBlockListener(listener, options.checkpointer);
listener = checkpointBlockListener(listener, options.checkpointer);
}

if (options.startBlock) {
Expand Down
Loading

0 comments on commit a299938

Please sign in to comment.