-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Transport Agnostic RPC #249
Comments
Note that all fields in The default values are all based on the types of the fields. So for primitives like numbers, it's Optional fields returned in proto 3.5. What this means is usually generates a handler to check whether it was set at all, that way you can differentiate between a value not being set by the client versus the client setting the value to the default value. This can be important when you want to change behaviour if the client really didn't set the value.
|
Regarding all the number types in proto3. There are 64 bit numbers:
Due to this issue protocolbuffers/protobuf#3666. There's no support for bigint yet. The default loses precision. Work around is:
This turns it into a string. |
This shows the mapping from the proto3 code to the generated JS code: https://developers.google.com/protocol-buffers/docs/reference/javascript-generated, of interest is the map, bytes, one of, and enums. |
The guide says that if we want to stop using a certain field in a message. It's important to reserve the field number and also the field name so that it cannot be used again. This means that older clients won't get confused. Basically the field numbers don't have to be actually in sequence. They represent unique positions in the message.
|
Regarding pagination I like this usage of message SearchResponse {
repeated Result results = 1;
}
message Result {
string url = 1;
string title = 2;
repeated string snippets = 3;
} It seems that this makes it easier to define a single message type and then lists of messages. I had done this previously in OpenAPI when I had situations where you have I imagine that responses may have further metadata beyond just the core data. So I could imagine types being represented like:
That way "response" messages are differentiated from domain structures that we want to work with. So in our proto definitions we can identify the relevant domain structures we want to work with, these should really be derived manually by the programmer by investigating each domain. For example in our notifications domain, we have a domain structure representing a notification message. But this is not the same type as the type of request & response messages being sent back and forth on the gRPC API about notifications. |
It's possible to also import proto files. This can make it easier to manage our proto definitions if we separate our type definitions from our message definitions from our service definitions. It could allow us to form subdirectories in Importing is just a matter of:
To re-export, just use I believe the name should start from where the |
I forgot if we were ever able to use any of the google protos. Like However there are a number of other types that are quite useful like |
The The The Although you should be able to put another |
Need to point out that re-reading https://medium.com/expedia-group-tech/the-weird-world-of-grpc-tooling-for-node-js-part-3-d994de02bedc seems to mean that our current usage of static tooling means that importing packages may not work. Not sure... at this point. |
This issue seems to indicate all we need to do is copy the proto source code verbatim in our |
Note that a gRPC channel tends to have a limit number of max concurrent connections (from that point onwards rpc requests are queued). I believe we have separate channels to each different node. We aren't likely to hit this max number in production right now, but the chattiness may increase when we integrate gestalt synchronisation #190. This issue has more details grpc/grpc#21386. The current work around is to create new channels and load balance between channels, however this is considered to be a temporary solution.
This will impact any benchmarking we do that involves launching multiple concurrent gRPC requests. This limit is mentioned here: https://github.com/grpc/grpc-node/blob/master/PACKAGE-COMPARISON.md and also compares the |
Regarding API versioning. gRPC is naturally backwards compatible. So as long as there are no breaking changes to the API, it's possible to keep using the same version, and clients can continue to work as normal. However when there is a breaking change, the MS guide recommends using package names. So right now we have This basically means one can create different service interfaces, and it is possible to run multiple versions of the same API. So right now we may have agent service, but we may also run 2 agent services on the same port. This would be similar to having However this doesn't telegraph to the client side what version it is using? It's just a matter of the client selecting which version they want to use, and hoping that the agent side still has that version. Maybe the lack of that version service existing is enough to signal to the client that their source code version is too old and needs to be updated. This will need to be prototyped in the test GRPC to see what happens when the relevant versioned package is no longer being offered as a service, what exceptions/errors occur. This would benefit from being able to break up the proto files into subdirectories so that way common messages and type declarations can be shared. It seems that there was an idea to use server reflection for the client to query about the protobuf descriptions. https://stackoverflow.com/a/41646864/582917 Server reflection is not available directly on grpc-js grpc/grpc-node#79. However there are libraries that have built on top to include it:
The examples given are that It has some relationship to the descriptor proto: https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/descriptor.proto Also there's option declaration. Not sure how that works yet. |
Found an interesting library that builds on top of |
Relevant errors that we should have more tests for: https://www.grpc.io/docs/guides/error/ |
Recommend reading over this @tegefaulkes when you're finished with your checklist in vaults refactoring. |
Playing around with getting the message definitions to exist within their own It seems doable, I can define for example the vault message inside
When construction the messages now we can do |
Are package/file names meant to be lowercase? It seems that way from
other examples. Have a look at how one is supposed to import google
proto library.
…On 10/20/21 3:34 PM, Brian Botha wrote:
Playing around with getting the message definitions to exist within
their own |.proto| files and packages.
It seems doable, I can define for example the vault message inside
|domains/Vaults.proto| and import them into |Client.Proto| with
|import public "domains/Vaults.proto";|. From there I can use the
messages inside a service by doing
|//OLD rpc VaultsRename(VaultRenameMessage) returns (VaultMessage) {};
//NEW rpc VaultsRename(Vault.Rename) returns (Vault.Vault); |
When construction the messages now we can do |VaultMessage = new
clientPB.Vaults.Rename()|. I'm likely going to rename |clientPB| to
|messages| at some point.
—
You are receiving this because you were assigned.
Reply to this email directly, view it on GitHub
<#249 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAE4OHOZF7IUWNRNT3OLHXTUHZBGFANCNFSM5GFZQLYA>.
|
I didn't check what it should be but I can change it if needed. |
I think we should have in our
The However I'm not sure if this will work under vercel/pkg. @tegefaulkes can you check this out as well. |
@tegefaulkes remember to tick off things as you're making progress. And if you have a new PR/branch, create a PR for it and link it here too. The task list should be copied there. |
I'm still working on creating common domain types and common message types. Almost done with that. I've created a new branch off of master for this under API_Review |
Looks like public closeServerForce(): void {
this.server.forceShutdown();
} |
I wonder how backpressure works in the case of rxjs streams. Normally it would depend on the underlying network protocol. But if the RPC client is not subscribed to the rxjs stream, it is possible the rxjs stream is reading it into an internal buffer. Or maybe not... it sort of depends on how the data from the socket/stream is being read to local rxjs stream. More info here: https://xgrommx.github.io/rx-book/content/getting_started_with_rxjs/creating_and_querying_observable_sequences/backpressure.html But pull streams backpressure is driven by the client. |
I've tried out MuxRPC and it's revealed some interesting ideas, but it also has some problems. Similar to deepkit, there's this notion that each side can have a manifest. So upon creating a "MRPC" instance, you pass a "remote" manifest" and a "local manifest" const rpc = MRPC(remoteManifest, localManifest); The idea being that servers would only have a These manifests correspond to the controllers in deepkit RPC. However again, it isn't clear in the docs nor the examples how the handlers on each side's manifest are supposed to make use of the other's manifest. There's no mention of any "remote side" in any of the handlers. I checked the arguments to the handlers, and there's still nothing. It appears you'd have to somehow acquire this information when a connection is established, but it seems very underbaked here. It seems that it would be easy if each handler just received a One good thing is how the underlying socket is definitely not part of the MuxRPC. It's ultimately handled by yourself. All MuxRPC really needs is a "duplex reliable stream". It makes use of an abstraction called "pull-stream" as the duplex reliable stream that it consumes. Any NodeJS stream can be converted to a "pull stream" using their utility packages such as More info to be written. |
I wrote a comparison of some multiplexed transport protocols here: #234 (comment). The conclusion is that based on the transport protocols that we are going to use, we do not require muxing/demuxing to be built into the RPC system. The RPC system should instead just rely on the underlying transport to provide the muxing/demuxing system. Here are some examples:
This is great news, since this means our RPC system is significantly simpler, and no custom muxing/demuxing is required and third party integration is simpler as they don't need a custom muxer/demuxer either. |
Now regarding the streaming calls in MuxRPC, there are some interesting ideas here. Unlike deepkit RPC, the RPC system in MuxRPC requires runtime specification about how the handlers are supposed to be called. Deepkit seems to rely on TS types and decorators and maybe some meta-programming. The manifest basically specifies 4 types of handlers:
For the client calling a handler, even if the server states that it is synchronous, the handler will automatically be converted asynchronous because it goes over the network. For So in mux rpc, the handler is just a regular function (with any number of parameters). I recommend that we change this so that the function always takes a single "input" and a single context object, and returns a single output. This enables explicit parsing of the input, and also explicit plumbing of exceptions. I believe this is better then letting the RPC system decide how to parse the input, and how to plumb exceptions. Functional composition is better than implicit magic. However if a local exception occurs in the handler, this could prevent returning anything. In that case, the RPC system should catch this exception and provide a particular default error response to the caller (a sort of 500 error in HTTP). The Notice that in all 3 cases, the stream is the returned object. There is no processing of the stream via input parameters. In fact in all 3 cases, the handler can still be called with arbitrary serialisable/deserialisable parameters. The stream object that it returns is a custom "pull-stream" concept. What exactly is a pull stream? A pull stream is an abstraction that enables sinks to pull from sources. The sources are not allowed to emit any data, until the sink reads from the source. I looked into the code for pull-streams, and the both sinks and sources are functional closures that use event emitters. I think the original reason to develop this was to provide some sort of backpressure. NodeJS streams also have native backpressure. They force backpressure by pausing the source until the sink reads again, and then pauses every single time. This is why the examples in muxrpc show how to convert NodeJS streams to pull streams. Web streams also do the same thing. They were optimised for IO patterns and is used internally by the This article compares streams (specifically web streams) to RxJS observables: https://surma.dev/things/streams-for-reactive-programming/ This leads to the conclusion that RxJS observables are not the right kind of abstraction to use for streams in RPC. To the need to have backpressure, pull-oriented streams are better suited for RPC. It's always possible to build push-flow on top of pull-flow if you need it. Pull-flow is also designed around single-consumer, which is the case for RPC, while push-flow is designed around multi-consumer. I think pull-flow as the default is better for RPC design. If the application requires push-flow, they can design that around the RPC's pull-flow and transform the RPC's pull-stream into an rxjs observable subsequently.
The opposite of building pull on top of push is much more code. This sort of means there's no need to use pull-streams anymore. We can just and use web streams which are more portable and more modern. But hold on... there's another more "language-level" abstraction that is also a pull-based dataflow that we already use all over the PK codebase. That's the iterators and async iterators provided by the async generator functions. At the same time, node and web streams satisfy the iterable interface, so they can be easily worked with as if they were async iterators. https://nodejs.org/api/stream.html#streams-compatibility-with-async-generators-and-async-iterators // Readable Stream to async iterator
// Using https://nodejs.org/api/stream.html#streamreadablefromiterable-options
for await (const chunk of readableStream) {
// use the chunks
} // Async iterator to Readable Stream (source)
const abortController = new AbortController();
async function *generate () {
let n = 0;
while (!abortController.signal.aborted) {
yield n;
n++;
}
}
const readableStream = Readable.from(generate());
readableStream.on('close', () => {
abortController.abort();
}); // Piping async iterator to writable stream
import { pipeline } from 'stream/promises';
const abortController = new AbortController();
async function *generate() {
let n = 0;
while (!abortController.signal.aborted) {
yield n;
n++;
}
}
// I think this works...?
const iterator = generate();
try {
// This appears to return a value?
await pipeline(iterator, writableStream);
} catch (e) {
abortController.abort();
throw e;
} The above examples are for node streams. But web streams also support the same concepts. // Readable Stream to async iterator
// If iterator exits, then readable stream is closed
for (const chunk of readableStream) {
// do something
}
// If iterator exits, then readable stream stays open
for (const chunk of readableStream.values({ preventCancel: true })) {
// do something
} // Async iterator to Readable Stream (source)
function iteratorToStream(iterator) {
return new ReadableStream({
async pull(controller) {
const { value, done } = await iterator.next();
if (done) {
controller.close();
} else {
controller.enqueue(value);
}
},
});
} // Piping async iterator to writable web stream
import { WritableStream } from 'web-streams-polyfill/ponyfill';
const abortController = new AbortController();
async function *generate() {
let n = 0;
while (!abortController.signal.aborted) {
yield n;
n++;
}
}
const iterator = generate();
const writableStream = new WritableStream({
close() {
abortController.abort();
},
});
const writer = writableStream.getWriter();
for await (const chunk of iterator) {
// Wait for it to be ready
await writer.ready;
// Write a chunk
await writer.write(chunk);
}
// Wait for it to be ready (flushed)
await writer.ready;
// Close the writable stream
await writer.close(); Note that NodeJS streams supports both objects and buffers. While web streams are naturally Let's suppose our RPC handlers looked this: // Stream oriented
const rpc = new RPC({
async handler(input: ReadableStream): WritableStream {
// Parse the input data (assuming this produces some structure)
const inputParsed = parse(input);
const writableStream = new WritableStream();
const writer = writableStream.getWriter();
// The writing must be done asynchronously
// Not within here, but you must return this prior
(async () => {
for await (const chunk of iterator) {
await writer.ready;
await writer.write(chunk);
}
await writer.ready;
await writer.close();
})();
return writableStream;
}
}); You can see here it's a bit awkward to write because you have to return the Alternatively we can use first-class features of the language, and instead of taking streams and returning streams. We can instead take async iterators and return async iterators. const rpc = new RPC({
async *handler(input: Generator<Buffer>): Generator<Buffer> {
// Now that you have 2 generators
// you can independently read data from the first generator
// while yield data to the output generator
for await (const chunk of input) {
yield chunk;
}
// Or more simply `yield* input;`
// Which simply is a "echo" handler
}
}); Notice here that the Why use a generator instead of Well the The 2 independent generators are necessary to have concurrent sources and sinks. We naturally have a form of backpressure here. We only acquire data from the input generator when we pull from it by asking for the next piece of data. At the same time, the yield will be frozen unless the the caller pulls from us. The client call then has to do something like this: // And we want to pass in a generator here
async function* input () {
yield Buffer.from('hello');
yield Buffer.from('world');
};
const g = client.handler(input);
for await (const chunk of g) {
console.log(chunk);
} Combining this with This design starts with first-class language features. It also focuses on the most powerful communication paradigm first, that is duplex communication, and then it's possible to abstract over duplex to constrain it to server streaming, client streaming or unary calls. The key point here is that server streaming just means no input generator is provided... or more appropriately, the input generator immediately returns. Client streaming means the output generator immediately returns. While unary calls is where both input generator and output generator immediately returns. Behind the scenes, the RPC system has to plumb the generator data into streams offered the underlying transport layer. Ideally these would be web streams, so we would use the appropriate transformations. Serialisation/deserialisation is not actually part of the RPC system, but instead used inside each handler. This allows a sigificant amount of flexibility since each individual handler can pick how they want to serialise/deserialise their data. The generators are limited to working with buffer chunks ( We can prototype this first without any RPC serialisation/deserialisation in a single process. Further prototypes can be done to demonstrate how to convert such pull-dataflow into push data flow. I'm starting to see how network communication should be pull-based which can be converted to push-flows either in-memory through rxjs or other kinds of protocols that are non-rpc, but instead messaging based.... this might end up being a very elegant design! |
Still need to address how "metadata" gets passed like "header" information or session token information (for authentication). And how exceptions are passed around, how this interact with the |
Here's an idea for communicating stream exceptions to the handler's input async iterator. async function *gf() {
while (true) {
await sleep(100);
try {
yield 'string';
} catch (e) {
yield; // <-- this is the trick
throw e;
}
}
}
async function main() {
const g = gf();
setTimeout(() => {
void g.throw(new Error('Stream Failure'));
}, 250);
for (let i = 0; i < 10; i++) {
try {
console.log(await g.next());
} catch (e) {
console.log('Consumed an exception!', e.message);
break;
}
}
} It was a bit tricky to realise how this works. The What we want to do, is to essentially "signal" to the handler that is consuming the input iterator that there is an problem. But because the input iterator is a "pull-based" data flow, this error can only be signalled when the handler is calling So when I first tried this, I just thought that rethrowing the exception is sufficient inside the generator. But it didn't work because the So the trick to fix this problem is in the generator function, catch the exception on the In this way, only when the handler wants to |
Alternatively if the async generator function itself is what is pulling data from the stream, then it can just The web stream API is actually promise-oriented, not event oriented. So technically it fits well with the async iterator API too. We could also just use web streams directly too. See: https://github.com/ReactiveX/IxJS/blob/master/src/asynciterable/fromdomstream.ts |
Just a note about GRPC metadata. All GRPC calls can have 1 leading metadata and 1 trailing metadata. For a given handler, for example unary call, you're given a For server to send the leading metadata, this is done with To send trailing metadata, this has to be done as part of the callback. Which is sent along with the response value. In a duplex stream handler, one can again do But when it ends the stream with Therefore the important thing to remember here is that the leading and trailing metadata both only occurs once at the beginning and at the end. Each side is capable of receiving and processing the metadata sent by the other side. In the case of unary handlers, they can get the leading metadata with On the unary caller side, they can get the leading metadata from the server with You can see here I've completely forgotten how the API works, it's too complex in grpc. For our RPC, we need to simplify this metadata situation. I think it does make sense that there can be a leading and trailing metadata, but this needs to be made more simpler. In fact, assuming we have 2 abstraction levels: web streams and async generators, we can understand that the web streams will basically emit the raw data. While the async generators can provide processed data (we can also have raw async generators). If the data is processed, we would naturally expect that the initial message be the leading metadata, and the last message to be the trailing metadata. But this depends on our "protocol abstraction" above the raw binary stream. For RPC purposes, we will need a least the ability to:
If an handler is only expecting 1 structured message, we could parse to completion of the structured message, but if there is more than 1 structured message, or data that we don't expect, this can then be considered a protocol error. So unlike other RPC systems, we want to be able to expose the lower level in case we need to do something special, and then specialise to higher-order abstractions like JSON RPC or BSON RPC. |
Moving this to todo, will be the next priority for @tegefaulkes after merging the crypto feature upgrade. |
Once we build higher-level RPC protocols like JSONRPC or BSONRPC with defined error behaviour and metadata structure, that's also when we can add in rules for versioning/backwards-compatibility and pagination. I'm considering supporting JSON RPC first as it's the most easiest to consume for third party APIs. Then to supporting an additional more efficient interface to support binary data. Choices are:
Key point is that JSON should be supported first. Then we also need to define a consistent way of representing binary data. Right now that's always been We can review Ethereum's API style to see how ours fit in. https://ethereum.org/da/developers/docs/apis/json-rpc/ As well as https://github.com/microsoft/vs-streamjsonrpc/blob/main/doc/index.md which discusses why JSON RPC is suitable for peer to peer communication. |
@tegefaulkes my initial experiments with generator based API is located in https://github.com/MatrixAI/Polykey/tree/feature-rpc. Please take over that branch and get a new PR for it. The experiments can be copied over after rebasing. The only relevant code is in the |
I think based on our discussions last time, it's worth while to spec out the details of how this will work in #495. This issue is more about just generally transport agnostic RPC. #495 is more specific to how to use JSON RPC for us. A combination of our generator ideas and decorators should be used to enable JSON RPC based handlers. We're still at a prototyping stage so nothing is set in stone. We will need to meet again Friday to discuss. |
Not sure why I got so many emails from this thread, but let me say something I noticed before I unsubscribe: In the hundreds of hours you spent looking at all these implementations, trying them out, and especially writing so much text, you could have instead written your own RPC implementation that is perfectly tailored to your needs. |
That's very strange... how did you get so many emails from this thread...? We are working on our own RPC implementation now! The text is just examining all the trade-offs. |
All the tasks above regarding GRPC should be crossed out @tegefaulkes, and replaced with tasks related to JSONRPC. |
In the future we shouldn't re-purpose issues like this. It makes it a little confusing what's going on in review. For something like this where we change track on an issue I think we should close the old ones. Outline the change of approach such as creating our own RPC to solve the problems. create new issues for the problem. To a degree I think issues should be immutable. Normal changes like updating the spec is fine. But a complete revamp of the topic isn't good. |
While this has been closed, this work has now been broken down into smaller issues:
|
@tegefaulkes CQRS is still a pending problem but only when we start tackling the GUI app again. |
We have reviewed the gRPC API, and worked with it extensively. It's time to work out a better RPC layer for PK. There are several problems to to consider here:
src/client
authenticator
1.0.0 -> 1.1.0 -> 2.0.0 -> 2.0.1
1 -> 2 -> 3
1 -> 2 -> 3
We have 2 main proto files:
proto/schemas/Client.proto
proto/schemas/Agent.proto
And a
Test.proto
as well, this will need to be used to generate the marshaling code.Additional context
timedCancellable
toSigchain
such as the deadline in nodes claim process #243 - it's a good idea for the RPC to have timeouts on its calls and to handle such timeouts, in particular we would want to be able to create "custom" protocols on top of any streams, or better would be to "lift" such protocols into the the underlying RPC system such as the nodes claiming processTasks
.proto
definition subdomains. #279Client.proto
andAgent.proto
with version names and test out multiple-version services, and find out what the client does when the version wanted is not availableMAX_CONCURRENT_CONNECTIONS
is used1.4.1
version has some incompatibility:TypeError: http2Server.on is not a function
.The text was updated successfully, but these errors were encountered: