Skip to content

Commit

Permalink
Adding Orderer Class
Browse files Browse the repository at this point in the history
Adding the initial orderer code, including the class
structure, broadcast method, and associated unit
tests.

Change-Id: I3f65a7a93cd5cb0164f5fc8435046fad45fc1f2e
Signed-off-by: Anna D Derbakova <[email protected]>
Signed-off-by: Bret E Harrison <[email protected]>
  • Loading branch information
Mr. Angry committed Oct 13, 2016
1 parent 69af5e3 commit e5d06ea
Show file tree
Hide file tree
Showing 3 changed files with 397 additions and 0 deletions.
162 changes: 162 additions & 0 deletions lib/Orderer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
Copyright 2016 IBM All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the 'License');
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an 'AS IS' BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

'use strict';

var api = require('./api.js');
var utils = require('./utils.js');

var grpc = require('grpc');
var debugModule = require('debug');
var debug = debugModule('hfc');
var isDebug = debugModule.enabled('hfc');

var _abProto = grpc.load(__dirname + '/protos/atomicbroadcast/ab.proto').atomicbroadcast;

/**
* The Orderer class represents a peer in the target blockchain network to which
* HFC sends a block of transactions of endorsed proposals requiring ordering.
*
* @class
*/
var Orderer = class {

/**
* Constructs an Orderer given its endpoint configuration settings.
*
* @param {string} url The orderer URL with format of 'grpcs://host:port'.
* @param {Chain} chain The chain of which this orderer is a member.
* @param {string} pem The certificate file, in PEM format,
* to use with the gRPC protocol (that is, with TransportCredentials).
* Required when using the grpcs protocol.
*/
constructor(url, chain, pem) {
if(isDebug) debug('Orderer.constructor');

this._url = url;
this._chain = chain;

// Orderer service connection
this._epOrderer = new utils.Endpoint(url, pem);
this._ordererClient = new _abProto.AtomicBroadcast(this._epOrderer.addr, this._epOrderer.creds);
}

/**
* Get the Chain of the orderer.
* @returns {Chain} Get the Chain associated with the Orderer.
*/
getChain() {
if(isDebug) debug('Orderer.getChain::'+this._chain);
return this._chain;
}


/**
* Get the URL of the orderer.
* @returns {string} Get the URL associated with the Orderer.
*/
getUrl() {
if(isDebug) debug('Orderer.getUrl::'+this._url);
return this._url;
}

/**
* Send a BroadcastMessage to the orderer service.
*
* @param {Object} data to be included in the BroadcastMessage
* see the ./proto/atomicbroadcast/ab.proto
* @returns Promise for a BroadcastResponse
* see the ./proto/atomicbroadcast/ab.proto
*/
sendBroadcast(send_data) {
if(isDebug) debug('Orderer.sendBroadcast - start');

if(!send_data || send_data == '') {
if(isDebug) debug('Orderer.sendBroadcast ERROR - missing data');
var err = new Error('Missing data - Nothing to order');
return Promise.reject(err);
}

var self = this;
var data = new Buffer(send_data);

// Build up the broadcast message
// This will be fleshed out we add more functionality and send fully
// structured requests, with all fields filled in.
var _broadcastMessage = {Data: data};

if(isDebug) debug('Orderer.sendBroadcast - _broadcastMessage = ' + JSON.stringify(_broadcastMessage));

// Send the endorsed proposals to the peer node (orderer) via grpc
// The rpc specification on the peer side is:
// rpc Broadcast(stream BroadcastMessage) returns (stream BroadcastResponse) {}
return new Promise(function(resolve, reject) {
var broadcast = self._ordererClient.broadcast();

broadcast.on('data', function (response) {
if(isDebug) debug('Orderer.sendBroadcast - on data response: ' + JSON.stringify(response));

if(response.Status) {
if (response.Status === 'SUCCESS') {
if(isDebug) debug('Orderer.sendBroadcast - resolve with %s', response.Status);
return resolve(response);
} else {
if(isDebug) debug('Orderer.sendBroadcast - reject with %s', response.Status);
return reject(new Error(response.Status));
}
}
else {
if(isDebug) debug('Orderer.sendBroadcast ERROR - reject with invalid response from the orderer');
return reject(new Error('SYSTEM_ERROR'));
}

});

broadcast.on('end', function (response) {
if(isDebug) debug('Orderer.sendBroadcast - on end:');
// Removing the promise reject here as on an 'error', this case
// will hit before the 'error' event, and we loose the error
// information coming back to the caller
// return reject(response);
});

broadcast.on('error', function (err) {
if(isDebug) debug('Orderer.sendBroadcast - on error: ' + JSON.stringify(err));
if(err && err.code) {
if(err.code == 14) {
return reject(new Error('SERVICE_UNAVAILABLE'));
}
}
return reject(new Error(err));
});

broadcast.write(_broadcastMessage);
broadcast.end();
if(isDebug) debug('Orderer.sendBroadcast - write/end complete');
});
}

/**
* return a printable representation of this object
*/
toString() {
return ' Orderer : {' +
'url:' + this._url +
'}';
}
};

module.exports = Orderer;
92 changes: 92 additions & 0 deletions lib/protos/atomicbroadcast/ab.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

syntax = "proto3";

package atomicbroadcast;

// These status codes are intended to resemble selected HTTP status codes
enum Status {
SUCCESS = 0;
BAD_REQUEST = 400;
FORBIDDEN = 403;
NOT_FOUND = 404;
SERVICE_UNAVAILABLE = 503;
}

message BroadcastResponse {
Status Status = 1;
}

message BroadcastMessage {
bytes Data = 1;
}

message SeekInfo {
// Start may be specified to a specific block number, or may be request from the newest or oldest available
// The start location is always inclusive, so the first reply from NEWEST will contain the newest block at the time
// of reception, it will must not wait until a new block is created. Similarly, when SPECIFIED, and SpecifiedNumber = 10
// The first block received must be block 10, not block 11
enum StartType {
NEWEST = 0;
OLDEST = 1;
SPECIFIED = 2;
}
StartType Start = 1;
uint64 SpecifiedNumber = 2; // Only used when start = SPECIFIED
uint64 WindowSize = 3; // The window size is the maximum number of blocks that will be sent without Acknowledgement, the base of the window moves to the most recently received acknowledgment
}

message Acknowledgement {
uint64 Number = 1;
}

// The update message either causes a seek to a new stream start with a new window, or acknowledges a received block and advances the base of the window
message DeliverUpdate {
oneof Type {
Acknowledgement Acknowledgement = 1; // Acknowledgement should be sent monotonically and only for a block which has been received, Acknowledgements received non-monotonically has undefined behavior
SeekInfo Seek = 2; // When set, SeekInfo causes a seek and potential reconfiguration of the window size
}
}

// This is a temporary data structure, meant to hold the place of the finalized block structure
// This must be a 'block' structure and not a 'batch' structure, although the terminology is slightly confusing
// The requirement is to allow for a consumer of the orderer to declare the unvalidated blockchain as the definitive
// blockchain, without breaking the hash chain or existing proof
message Block {
uint64 Number = 2;
bytes PrevHash = 3;
bytes Proof = 4;
repeated BroadcastMessage Messages = 5;
}

message DeliverResponse {
oneof Type {
Status Error = 1;
Block Block = 2;
}
}

service AtomicBroadcast {
// broadcast receives a reply of Acknowledgement for each BroadcastMessage in order, indicating success or type of failure
rpc Broadcast(stream BroadcastMessage) returns (stream BroadcastResponse) {}

// deliver first requires an update containing a seek message, then a stream of block replies is received.
// The receiver may choose to send an Acknowledgement for any block number it receives, however Acknowledgements must never be more than WindowSize apart
// To avoid latency, clients will likely acknowledge before the WindowSize has been exhausted, preventing the server from stopping and waiting for an Acknowledgement
rpc Deliver(stream DeliverUpdate) returns (stream DeliverResponse) {}
}

143 changes: 143 additions & 0 deletions test/unit/orderer-tests.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/**
* Copyright 2016 IBM All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

var tape = require('tape');
var _test = require('tape-promise');
var test = _test(tape);

var hfc = require('../..');
var util = require('util');
var fs = require('fs');

var Orderer = require('../../lib/Orderer.js');

var keyValStorePath = '/tmp/keyValStore';

//
// Orderer happy path test
//
// Send a broadcast message containing data as Buffer. A successful response is
// expected in this case.
//
test('orderer happy path test', function(t) {
var client = new Orderer('grpc://127.0.0.1:5151');

client.sendBroadcast('some data')
.then(
function(status) {
console.log('response status: ' + JSON.stringify(status));
t.pass('Successfully sent to orderer.');
t.end();
},
function(err) {
t.fail('Failed to send to orderer!' + err.stack ? err.stack : err);
t.end();
}
).catch(function(err) {
t.fail('Caught Error: ' + err.stack ? err.stack : err);
t.end();
});
});

//
// Orderer bad address test
//
// Attempt to initialize an orderer with a bad URL address. An invalid protocol
// error is expected in this case.
//

test('orderer bad address test', function(t) {
try {
var client = new Orderer('xxxxx');
t.fail('Orderer allowed setting a bad URL.');
}
catch(err) {
console.log('Caught Error: ' + err);
t.pass('Orderer did not allow setting bad URL.');
}
t.end();
});

//
// Orderer missing address test
//
// Attempt to initialize an orderer with a missing URL address. A TypeError
// indicating that the URL must be a "string" is expected in this case.
//

test('orderer missing address test', function(t) {
try {
var client = new Orderer();
t.fail('Orderer allowed setting a missing address.');
}
catch(err) {
console.log('Caught Error: ' + err);
t.pass('Orderer did not allow setting a missing address.');
}
t.end();
});

//
// Orderer missing data test
//
// Send an empty broadcast message to an orderer. An error indicating that no
// data was sent is expected in this case.
//

test('orderer missing data test', function(t) {
var client = new Orderer('grpc://127.0.0.1:5005');

client.sendBroadcast()
.then(
function(status) {
console.log('response status: ' + JSON.stringify(status));
t.fail('Should have noticed missing data.');
},
function(err) {
console.log('Caught Error: ' + err);
t.pass('Successfully found missing data: ' + err);
}
).catch(function(err) {
t.fail('Caught Error: should not be here if we defined promise error function: ' + err);
});
t.end();
});

//
// Orderer unknown address test
//
// Send a broadcast message to a bad orderer address. An error indicating
// a connection failure is expected in this case.
//

test('orderer unknown address test', function(t) {
var client = new Orderer('grpc://127.0.0.1:51006');

client.sendBroadcast('some data')
.then(
function(status) {
console.log('response status: ' + JSON.stringify(status));
t.fail('Should have noticed a bad address.');
},
function(err) {
t.pass('Successfully found bad address!');
}
).catch(function(err) {
t.fail('Caught Error: should not be here if we defined promise error function: '
+ err);
});
t.end();
});

0 comments on commit e5d06ea

Please sign in to comment.