Skip to content

Commit

Permalink
FABN-1493: Block event replay (#147)
Browse files Browse the repository at this point in the history
Signed-off-by: Mark S. Lewis <[email protected]>
  • Loading branch information
bestbeforetoday authored Feb 27, 2020
1 parent ddf1d71 commit ff3f86d
Show file tree
Hide file tree
Showing 32 changed files with 292 additions and 1,033 deletions.
4 changes: 2 additions & 2 deletions fabric-common/lib/EventService.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class EventService extends ServiceAction {
this.lastBlockNumber = null;

this.startBlock = NEWEST;
this.endBlock = null;
this.endBlock = undefined;
this._end_block_seen = false;

this._eventListenerRegistrations = new Map();
Expand All @@ -91,7 +91,7 @@ class EventService extends ServiceAction {

// remember the blockType this EventService is listening
// will be set during the .build call
this.blockType = null;
this.blockType = FILTERED_BLOCK;
this.replay = false;

this.myNumber = count++;
Expand Down
18 changes: 15 additions & 3 deletions fabric-common/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ export class Eventer extends ServiceEndpoint {
constructor(name: string, client: Client, mspid: string);
public disconnect(): void;
public checkConnection(): Promise<boolean>;
public setEndpoint(endpoint: Endpoint): void;
}

export class Discoverer extends ServiceEndpoint {
Expand Down Expand Up @@ -209,7 +210,7 @@ export class EventListener {
export type EventCallback = (error?: Error, event?: EventInfo) => void;

export interface EventInfo {
eventHub: EventService;
eventService: EventService;
blockNumber: Long;
transactionId?: string;
status?: string;
Expand All @@ -234,20 +235,31 @@ export interface ChaincodeEvent {
payload: Buffer;
}

export type BlockType = 'filtered' | 'full' | 'private';

export class EventService extends ServiceAction {
public startBlock: Long | string;
public endBlock: Long | string;
endBlock?: Long | string;
blockType: BlockType;
constructor(chaincodeName: string, channel: Channel);
public setEventer(discoverer: Eventer): EventService;
public getLastBlockNumber(): Long;
public close(): void;
public build(idContext: IdentityContext, request: any): Buffer;
public send(request: any): Promise<any>;
public send(request: StartRequestOptions): Promise<any>;
public isListening(): boolean;
public unregisterEventListener(eventListener: EventListener): EventService;
public registerTransactionListener(txid: string, callback: EventCallback, options: EventRegistrationOptions): EventListener;
public registerChaincodeListener(eventName: string, callback: EventCallback, options: EventRegistrationOptions): EventListener;
public registerBlockListener(callback: EventCallback, options: EventRegistrationOptions): EventListener;
setTargets(targets: Eventer[]): void;
isStarted(): boolean;
}

export interface StartRequestOptions {
blockType?: BlockType;
startBlock?: number | string | Long;
endBlock?: number | string | Long;
}

export class Client {
Expand Down
7 changes: 7 additions & 0 deletions fabric-network/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@
* @returns {Promise<void>}
*/

/**
* @typedef {object} Network~ListenerOptions
* @memberof module:fabric-network
* @property {number | string | Long} startBlock The block number from which events should be received.
*/

/**
* @typedef {EventInfo} Network~BlockEvent
* @memberof module:fabric-network
Expand Down Expand Up @@ -323,6 +329,7 @@
* @method Network#addBlockListener
* @memberof module:fabric-network
* @param {module:fabric-network.Network~BlockListener} listener A block listener callback function.
* @param {module:fabric-network.Network~ListenerOptions} [options] Listener options.
* @returns {module:fabric-network.Network~BlockListener} The added listener.
* @example
* const listener: BlockListener = async (event) => {
Expand Down
1 change: 0 additions & 1 deletion fabric-network/src/impl/event/allfortxstrategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import { EventCount, SuccessCallback, FailCallback, TransactionEventStrategy } from './transactioneventstrategy';

// @ts-ignore no implicit any
import Logger = require('../../logger');
const logger = Logger.getLogger('AllForTxStrategy');

Expand Down
3 changes: 1 addition & 2 deletions fabric-network/src/impl/event/anyfortxstrategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@

import { EventCount, SuccessCallback, FailCallback, TransactionEventStrategy } from './transactioneventstrategy';

// @ts-ignore no implicit any
import Logger = require('../../logger');
import * as Logger from '../../logger';
const logger = Logger.getLogger('AnyForTxStrategy');

/**
Expand Down
4 changes: 2 additions & 2 deletions fabric-network/src/impl/event/baseeventlistener.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ class BaseEventListener {

if (!this.eventService) {
if (this.replay) {
this.eventService = this.eventServiceManager.getReplayEventService();
this.eventService = this.eventServiceManager.newFailoverEventService();
// TODO should we check to see if running and then reset ???
} else {
this.eventService = this.eventServiceManager.getEventService(this.eventServiceOptions.blockType);
this.eventService = this.eventServiceManager.newFailoverEventService(this.eventServiceOptions.blockType);
}
}

Expand Down
7 changes: 3 additions & 4 deletions fabric-network/src/impl/event/blockeventsource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@
import { BlockEvent, BlockListener } from './blocklistener';
import { OrderedBlockQueue } from './orderedblockqueue';
import { AsyncNotifier } from './asyncnotifier';
import { EventServiceManager } from './eventservicemanager';
import {
EventCallback,
EventInfo,
EventListener,
EventRegistrationOptions,
EventService
} from 'fabric-common';
// @ts-ignore no implicit any
import EventServiceManager = require('./eventservicemanager');
import Long = require('long');

import * as Logger from '../../logger';
Expand Down Expand Up @@ -78,7 +77,7 @@ export class BlockEventSource {
this.started = true;

try {
this.eventService = this.eventServiceManager.getEventService();
this.eventService = this.eventServiceManager.newFailoverEventService();
this.registerListener(); // Register before start so no events are missed
await this.startEventService();
} catch (error) {
Expand Down Expand Up @@ -106,7 +105,7 @@ export class BlockEventSource {

private async startEventService() {
const options = { startBlock: this.getNextBlockNumber() };
await this.eventServiceManager.startEventService(this.eventService, options);
await this.eventServiceManager.startEventService(this.eventService!, options);
}

private blockEventCallback(error?: Error, event?: EventInfo) {
Expand Down
24 changes: 3 additions & 21 deletions fabric-network/src/impl/event/blocklistenersession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,23 @@
*/

import { ListenerSession } from './listenersession';
import { BlockEvent, BlockListener } from './blocklistener';
import { BlockListener } from './blocklistener';
import { BlockEventSource } from './blockeventsource';
import {
EventCallback,
EventListener,
EventService
} from 'fabric-common';

import * as Logger from '../../logger';
const logger = Logger.getLogger('BlockListenerSession');

export class BlockListenerSession implements ListenerSession {
private readonly listener: BlockListener;
private readonly eventSource: BlockEventSource;
private readonly eventListener: BlockListener = this.notifyListener.bind(this);

constructor(listener: BlockListener, eventSource: BlockEventSource) {
this.listener = listener;
this.eventSource = eventSource;
}

public async start() {
await this.eventSource.addBlockListener(this.eventListener);
await this.eventSource.addBlockListener(this.listener);
}

public close() {
this.eventSource.removeBlockListener(this.eventListener);
}

private async notifyListener(blockEvent: BlockEvent) {
try {
await this.listener(blockEvent);
} catch (error) {
logger.error('Error notifying listener:', error);
}
this.eventSource.removeBlockListener(this.listener);
}

}
68 changes: 0 additions & 68 deletions fabric-network/src/impl/event/commiteventlistener.js

This file was deleted.

5 changes: 2 additions & 3 deletions fabric-network/src/impl/event/commitlistenersession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import {
CommitEvent,
CommitListener
} from './commitlistener';
// @ts-ignore: no typings for EventServiceManager
import EventServiceManager = require('./eventservicemanager');
import { EventServiceManager } from './eventservicemanager';
import {
Endorser,
EventListener,
Expand All @@ -33,7 +32,7 @@ export class CommitListenerSession implements ListenerSession {
constructor(listener: CommitListener, eventServiceManager: EventServiceManager, endorsers: Endorser[], transactionId: string) {
this.listener = listener;
this.eventServiceManager = eventServiceManager;
this.eventServices = eventServiceManager.getEventServices(endorsers);
this.eventServices = endorsers.map((endorser) => eventServiceManager.getCachedEventService(endorser));
this.transactionId = transactionId;

for (const endorser of endorsers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import { AllForTxStrategy } from './allfortxstrategy';
import { AnyForTxStrategy } from './anyfortxstrategy';
import { TxEventHandlerFactory, TransactionEventHandler } from './transactioneventhandler';
// @ts-ignore no implicit any
import { Network } from '../../network';
import { Endorser } from 'fabric-common';

Expand Down
Loading

0 comments on commit ff3f86d

Please sign in to comment.