Skip to content

Commit

Permalink
use protobufjs rpcImpl (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
dckc committed Aug 24, 2019
1 parent e58d3f9 commit 2903258
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 46 deletions.
68 changes: 44 additions & 24 deletions src/rnodeAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ refs:
// @flow strict

const assert = require('assert');
const protoLoader = require('@grpc/proto-loader');
const {
DeployService,
BlockQueryResponse,
BlockInfoWithoutTuplespace,
DataWithBlockInfo,
Expand All @@ -22,7 +22,8 @@ const {
ListeningNameContinuationResponse,
ListeningNameDataResponse,
PrivateNamePreviewResponse,
} = require('../protobuf/CasperMessage').coop.rchain.casper.protocol;
} = require('../protobuf/DeployService').coop.rchain.casper.protocol;
const { ProposeService } = require('../protobuf/ProposeService').coop.rchain.casper.protocol;
const RHOCore = require('./RHOCore');
const Hex = require('./hex');
const { RholangCrypto } = require('./signing');
Expand All @@ -31,15 +32,6 @@ const { blake2b256Hash, ed25519Verify } = RholangCrypto;

const def = obj => Object.freeze(obj); // cf. ocap design note

// Options for similarity to grpc.load behavior
// https://grpc.io/docs/tutorials/basic/node.html#loading-service-descriptors-from-proto-files
const likeLoad = { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true };
const packageDefinition = protoLoader.loadSync(
__dirname + '/../protobuf/CasperMessage.proto', // eslint-disable-line
likeLoad,
);


/*::
import type { JsonExt } from './RHOCore';
import type { KeyPair } from './signing';
Expand All @@ -51,10 +43,9 @@ type JSData = JsonExt<URL | GPrivate>;
type EndPoint = { host: string, port: number };
import GRPCAccess from 'grpc';
export type IRNode = $Call<typeof RNode, GRPCAccess, EndPoint>;
export type IRNode = $Call<typeof RNode, GRPCAccess, EndPoint, EndPoint>;
export type IDeployData = coop$rchain$casper$protocol$IDeployData;
type DeployService = coop$rchain$casper$protocol$DeployService;
type Decoder<T> = { decode(reader: Uint8Array): T };
*/
Expand Down Expand Up @@ -106,17 +97,41 @@ exports.RNode = RNode;
function RNode(
grpc /*: GRPCAccess */,
endPoint /*: { host: string, port: number } */,
internalEndPoint /*: ?{ host: string, port: number } */,
) /*: IRNode */ {
const { host, port } = endPoint;
assert.ok(host, 'endPoint.host missing');
assert.ok(port, 'endPoint.port missing');
function makeRpcImpl(base, { host, port }) { // limit scope of client
assert.ok(host, 'endPoint.host missing');
assert.ok(port, 'endPoint.port missing');

const Client = grpc.makeGenericClientConstructor({});
const client = new Client(
`${host}:${port}`,
grpc.credentials.createInsecure(), // ISSUE: let caller do secure?
);

const proto = grpc.loadPackageDefinition(packageDefinition);
const casper = proto.coop.rchain.casper.protocol;
function rpc(method, requestData, callback) {
console.log('@@rpc', { method, name: method.name, requestData, client });
client.makeUnaryRequest(
base + method.name,
arg => arg,
arg => arg,
requestData,
(...args) => {
console.log('@@rpc callback:', args);
callback(...args);
},
);
}

return rpc;
}

const client /*: DeployService */ = new casper.DeployService(
`${host}:${port}`, grpc.credentials.createInsecure(), // ISSUE: let caller do secure?
const client = DeployService.create(
makeRpcImpl('/coop.rchain.casper.protocol.DeployService/', endPoint), false, false,
);
const proposeClient = internalEndPoint ? ProposeService.create(
makeRpcImpl('/coop.rchain.casper.protocol.ProposeService/', internalEndPoint), false, false,
) : null;

/**
* Ask rnode to compute ids of top level private names, given deploy parameters.
Expand Down Expand Up @@ -176,7 +191,9 @@ function RNode(
}
let out = await either(DeployServiceResponse, send(f => client.doDeploy(deployData, f)));
if (autoCreateBlock) {
out = await either(DeployServiceResponse, send(f => client.createBlock({}, f)));
// TODO: printUnmatchedSends
if (!proposeClient) { throw new TypeError('need internalEndPoint to propose'); }
out = await either(DeployServiceResponse, send(f => proposeClient.propose({}, f)));
}
return out.message;
}
Expand All @@ -187,8 +204,9 @@ function RNode(
* @instance
* @return A promise for response message
*/
async function createBlock() /*: Promise<string>*/ {
const r = await either(DeployServiceResponse, send(f => client.createBlock({}, f)));
async function propose() /*: Promise<string>*/ {
if (!proposeClient) { throw new TypeError('need internalEndPoint to propose'); }
const r = await either(DeployServiceResponse, send(f => proposeClient.propose({}, f)));
return r.message;
}

Expand All @@ -199,6 +217,8 @@ function RNode(

function eitherSync/*::<T>*/(cls /*: Decoder<T>*/, x /*: IEither*/) /*: T*/{
if (x.success) {
console.log('@@@either success', { cls, value: x.success.response.value });
console.log('@@constructor?', new cls());
/* $FlowFixMe$ ISSUE: Either.proto fibs a bit*/
return cls.decode(x.success.response.value);
}
Expand Down Expand Up @@ -368,7 +388,7 @@ function RNode(

return def({
doDeploy,
createBlock,
propose,
listenForDataAtName,
listenForDataAtPrivateName,
listenForDataAtPublicName,
Expand Down
57 changes: 35 additions & 22 deletions test/testRNode.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ function testRNode(suite2) {
const defaultSec = Hex.decode('b18e1d0045995ec3d010c387ccfeb984d783af8fbb0f40fa7db126d889f6dadd');

function netTests({ grpc, clock, rng }) {
const localNode = () => RNode(grpc, { host: 'localhost', port: 40401 });
const localNode = () => RNode(grpc, { host: 'localhost', port: 40401, internalPort: 40402 });

function hashTest(test, hashBytes, hashData, fname) {
const returnChannel = rng().toString(36).substring(7);
Expand Down Expand Up @@ -153,37 +153,50 @@ function payFor(d0, key, phloPrice = 1, phloLimit = 10000000) {

exports.grpcMock = grpcMock;
function grpcMock() {
function DeployService(_hostPort /*: Object */, _chan /*: Object */) {
function getBlocks(_depth /*: number */) {
const block4 = {
value: LightBlockInfo
.encode({ blockHash: 'deadbeef' }).finish(),
};

return Object.freeze({
doDeploy(_dd /*: Object */, _auto /*: boolean */ = false) { return 'Success!'; },
showBlocks(_depth /*: number */) {
const block4 = {
value: BlockInfoWithoutTuplespace
.encode({ blockHash: 'deadbeef' }).finish(),
};

return Object.freeze({
on(name /*: string */, handler /*: (...args: any[]) => void */) {
if (name === 'data') {
handler({ success: { response: block4 } });
} else if (name === 'end') {
handler();
}
},
});
on(name /*: string */, handler /*: (...args: any[]) => void */) {
if (name === 'data') {
handler({ success: { response: block4 } });
} else if (name === 'end') {
handler();
}
},
});
}

const casper = { DeployService };
const proto = { coop: { rchain: { casper: { protocol: casper } } } };
function makeGenericClientConstructor(_opts /*: {}*/) {
function Client(_url, _thingy) {
return Object.freeze({
makeUnaryRequest(name, fn1, fn2, requestData, callback) {
switch (name) {
case 'doDeploy':
callback(null, 'Success!');
break;
case 'getBlocks':
callback(null, getBlocks(1));
break;
default:
throw new Error(`not implemented: ${name}`);
}
},
});
}

return Client;
}

return Object.freeze({
loadPackageDefinition(_d /*: Object */) { return proto; },
makeGenericClientConstructor,
credentials: { createInsecure() { } },
});
}


if (require.main === module) {
// Access ambient stuff only when invoked as main module.
/* eslint-disable global-require */
Expand Down

0 comments on commit 2903258

Please sign in to comment.