Skip to content

Commit

Permalink
FABN-1054: Private pool in EventHubFactory
Browse files Browse the repository at this point in the history
Using channel.getChannelEventHub() to obtain event hubs in
EventHubFactory meant that users might inadvertently use the same
event hubs in their own code. This could potentially cause problems
with use of the fabric-network API if users performed a disruptive
action, such as using the event hub to replay previous block events.

Instead, maintain a private pool of event hubs in EventHubFactory,
with event hubs created using channel.newChannelEventHub().

Change-Id: If8ff4254500f22ea0ef4d4ff8732a440476aa0b1
Signed-off-by: Mark S. Lewis <[email protected]>
  • Loading branch information
bestbeforetoday committed Dec 12, 2018
1 parent 48d73d8 commit 54026a4
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 82 deletions.
45 changes: 43 additions & 2 deletions fabric-network/lib/impl/event/eventhubfactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class EventHubFactory {
throw new Error(message);
}
logger.debug('constructor:', channel.getName());
this.channel = channel;
this._channel = channel;
this._savedEventHubs = new Map();
}

/**
Expand All @@ -35,7 +36,47 @@ class EventHubFactory {
* @returns {ChannelEventHub[]} Event hubs, which may or may not be connected.
*/
getEventHubs(peers) {
return peers.map((peer) => this.channel.getChannelEventHub(peer.getName()));
return peers.map((peer) => this.getEventHub(peer));
}

getEventHub(peer) {
const peerName = peer.getName();
let eventHub = this._getSavedEventHub(peerName);
if (!eventHub) {
eventHub = this._channel.newChannelEventHub(peer);
eventHub = this._setAndGetSavedEventHub(peerName, eventHub);
}
return eventHub;
}

_getSavedEventHub(peerName) {
const saved = this._savedEventHubs.get(peerName);
return saved ? saved.proxy : undefined;
}

_setAndGetSavedEventHub(peerName, eventHub) {
const proxy = new Proxy(eventHub, {
get: (target, property, receiver) => {
if (property === 'close' || property === 'disconnect') {
return () => {
// No-op to prevent client code from disconnecting a shared event hub
};
}
return Reflect.get(target, property, receiver);
}
});

const saved = {
original: eventHub,
proxy
};
this._savedEventHubs.set(peerName, saved);
return proxy;
}

dispose() {
this._savedEventHubs.forEach((saved) => saved.original.disconnect());
this._savedEventHubs.clear();
}
}

Expand Down
3 changes: 2 additions & 1 deletion fabric-network/lib/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ class Network {
this.queryHandler.dispose();
}

this.channel.close(); // Tidies up event hubs obtained from the channel
this.eventHubFactory.dispose();
this.channel.close();

this.initialized = false;
}
Expand Down
81 changes: 69 additions & 12 deletions fabric-network/test/impl/event/eventhubfactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,23 @@ describe('EventHubFactory', () => {
}
};

// Connected event hub
stubEventHub1 = sinon.createStubInstance(ChannelEventHub);
stubEventHub1._stubInfo = 'eventHub1';
stubEventHub1.getName.returns('eventHub1');
let eventHubCount = 0;
function newStubEventHub(peerName) {
const stubEventHub = sinon.createStubInstance(ChannelEventHub);
stubEventHub._stubInfo = `${peerName}-eventHub${++eventHubCount}`;
stubEventHub.getName.returns(peerName);

// Unconnected event hub that will successfully connect
stubEventHub2 = sinon.createStubInstance(ChannelEventHub);
stubEventHub2._stubInfo = 'eventHub2';
stubEventHub2.getName.returns('eventHub2');
return stubEventHub;
}

stubEventHub1 = newStubEventHub(stubPeer1.getName());
stubEventHub2 = newStubEventHub(stubPeer2.getName());

stubChannel = sinon.createStubInstance(Channel);
stubChannel.getName.returns('channel');
stubChannel.getChannelEventHub.withArgs(stubPeer1.getName()).returns(stubEventHub1);
stubChannel.getChannelEventHub.withArgs(stubPeer2.getName()).returns(stubEventHub2);
stubChannel.newChannelEventHub.callsFake((peer) => newStubEventHub(peer.getName()));
});

afterEach(() => {
Expand All @@ -75,19 +78,73 @@ describe('EventHubFactory', () => {
factory = new EventHubFactory(stubChannel);
});

function assertEventHubsMatchPeers(eventHubs, peers) {
expect(eventHubs).to.be.an('Array').with.lengthOf(peers.length);
const eventHubNames = eventHubs.map((eventHub) => eventHub.getName());
const peerNames = peers.map((peer) => peer.getName());
expect(eventHubNames).to.have.ordered.members(peerNames);
}

it('returns empty array for no peer arguments', () => {
const results = factory.getEventHubs([]);
expect(results).to.be.an('Array').that.is.empty;
});

it('returns eventHub for peer1', () => {
const results = factory.getEventHubs([stubPeer1]);
expect(results).to.have.members([stubEventHub1]);
const peers = [stubPeer1];
const eventHubs = factory.getEventHubs(peers);
assertEventHubsMatchPeers(eventHubs, peers);
});

it('returns eventHubs for peer1 and peer2', () => {
const results = factory.getEventHubs([stubPeer1, stubPeer2]);
expect(results).to.have.members([stubEventHub1, stubEventHub2]);
const peers = [stubPeer1, stubPeer2];
const eventHubs = factory.getEventHubs(peers);
assertEventHubsMatchPeers(eventHubs, peers);
});

it('does not return same eventHub as channel.getChannelEventHub()', () => {
const eventHubs = factory.getEventHubs([stubPeer1]);
expect(eventHubs).to.not.deep.equal(stubEventHub1);
});

it('returns the same eventHub on subsequent calls', () => {
const peers = [stubPeer1, stubPeer2];
const eventHubs1 = factory.getEventHubs(peers);
const eventHubs2 = factory.getEventHubs(peers);
expect(eventHubs1).to.have.deep.ordered.members(eventHubs2);
});

it('client code can\'t close event hubs', () => {
stubChannel.newChannelEventHub.returns(stubEventHub1);
const close = stubEventHub1.close;

const eventHub = factory.getEventHubs([stubPeer1])[0];
eventHub.close();

sinon.assert.notCalled(close);
});

it('client code can\'t disconnect event hubs', () => {
stubChannel.newChannelEventHub.returns(stubEventHub1);
const disconnect = stubEventHub1.disconnect;

const eventHub = factory.getEventHubs([stubPeer1])[0];
eventHub.disconnect();

sinon.assert.notCalled(disconnect);
});
});

describe('#dispose', () => {
it('disconnects created event hubs', () => {
stubChannel.newChannelEventHub.returns(stubEventHub1);
const disconnect = stubEventHub1.disconnect;
const factory = new EventHubFactory(stubChannel);
factory.getEventHubs([stubPeer1]);

factory.dispose();

sinon.assert.called(disconnect);
});
});
});
6 changes: 6 additions & 0 deletions fabric-network/test/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@ describe('Network', () => {
network._dispose();
sinon.assert.calledOnce(mockChannel.close);
});

it('calls dispose() on the event hub factory', () => {
const spy = sinon.spy(network.getEventHubFactory(), 'dispose');
network._dispose();
sinon.assert.called(spy);
});
});

describe('#getEventHubFactory', () => {
Expand Down
28 changes: 0 additions & 28 deletions test/integration/network-e2e/invoke-hsm.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,6 @@ async function createContract(t, gateway, gatewayOptions) {
return contract;
}

const getEventHubForOrg = async (gateway, orgMSP) => {
const network = await gateway.getNetwork(channelName);
const channel = network.getChannel();
const orgPeer = channel.getPeersForOrg(orgMSP)[0];
return channel.getChannelEventHub(orgPeer.getName());
};

test('\n\n****** Network End-to-end flow: import identity into wallet using hsm *****\n\n', async (t) => {
await setupAdmin();
await hsmIdentitySetup();
Expand All @@ -99,7 +92,6 @@ test('\n\n****** Network End-to-end flow: import identity into wallet using hsm

test('\n\n***** Network End-to-end flow: invoke transaction to move money using file hsm wallet and default event strategy *****\n\n', async (t) => {
const gateway = new Gateway();
let org1EventHub;

try {
await setupAdmin();
Expand All @@ -113,27 +105,8 @@ test('\n\n***** Network End-to-end flow: invoke transaction to move money using
discovery: {enabled: false}
});

// Obtain an event hub that that will be used by the underlying implementation
org1EventHub = await getEventHubForOrg(gateway, 'Org1MSP');
const org2EventHub = await getEventHubForOrg(gateway, 'Org2MSP');

// initialize eventFired to 0
let eventFired = -1;

// have to register for all transaction events (a new feature in 1.3) as
// there is no way to know what the initial transaction id is
org1EventHub.registerTxEvent('all', (txId, code) => {
if (code === 'VALID') {
eventFired++;
}
}, () => {});

const response = await contract.submitTransaction('move', 'a', 'b', '100');

t.true(org1EventHub.isconnected(), 'org1 event hub correctly connected');
t.false(org2EventHub.isconnected(), 'org2 event hub correctly not connected');
t.equal(eventFired, 1, 'single event for org1 correctly unblocked submitTransaction');

const expectedResult = 'move succeed';
if (response.toString() === expectedResult) {
t.pass('Successfully invoked transaction chaincode on channel');
Expand All @@ -144,7 +117,6 @@ test('\n\n***** Network End-to-end flow: invoke transaction to move money using
t.fail('Failed to invoke transaction chaincode on channel. ' + err.stack ? err.stack : err);
} finally {
gateway.disconnect();
t.false(org1EventHub.isconnected(), 'org1 event hub correctly been disconnected');
}
t.end();
});
Loading

0 comments on commit 54026a4

Please sign in to comment.