-
Notifications
You must be signed in to change notification settings - Fork 204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: make a connection with mediator asynchronously #231
refactor: make a connection with mediator asynchronously #231
Conversation
Signed-off-by: Jakub Koci <[email protected]>
Codecov Report
@@ Coverage Diff @@
## main #231 +/- ##
==========================================
- Coverage 89.06% 88.95% -0.12%
==========================================
Files 174 176 +2
Lines 3302 3358 +56
Branches 364 370 +6
==========================================
+ Hits 2941 2987 +46
- Misses 358 368 +10
Partials 3 3
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like these changes @jakubkoci! :)
The only thing I'm not sure about is the addition of sendAndReceiveMessage
to the OutboundTransporter
interface. Maybe you can clarify a bit on that
src/agent/MessageSender.ts
Outdated
outboundMessage.payload.setReturnRouting(ReturnRouteTypes.all) | ||
|
||
const outboundPackage = await this.envelopeService.packMessage(outboundMessage) | ||
const inboundPackedMessage = await this.outboundTransporter.sendMessage(outboundPackage, true) | ||
const inboundPackedMessage = await this.outboundTransporter.sendAndReceiveMessage(outboundPackage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not fully understanding. Why did you add a sendAndReceiveMessage
to the outbound transporter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. I wanted to preserve the current behavior of await agent.routing.downloadMessages()
.
First, I thought that using sendMessage(outboundPackage, true)
would be good enough. The problem is that in HTTP outbound we now pass the response immediately to the agent calling this.agent.receiveMessage(wireMessage)
. It means that the batch
response is not processed inside downloadMessages
anymore.
This is related to the problem we discussed in our last AFJ call about dispatching messages to the agent from a message handler. I listed as last point in the description:
"Update download messages functionality to process batch message and to remove sendAndReceiveMessage that is still implemented in the old way relaying on synchronicity of HTTP request-response."
I'm thinking if we could implement the solution as part of this PR. I usually tend to work in smaller batches. Maybe prioritize it at least and do it before WebSockets 🤷♂️ Maybe it's not so big change 🤔
const { message: connectionRequest } = await this.connectionService.createRequest(connectionRecord.id) | ||
|
||
const outboundMessage = createOutboundMessage(connectionRecord, connectionRequest, connectionRecord.invitation) | ||
outboundMessage.payload.setReturnRouting(ReturnRouteTypes.all) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're already setting return routing if the connection has no inbound endpoint. this shouldn't be needed right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to move it into Dispatcher
, where is not called because of the manual invitation acceptance process here. When return_route
is set, we don't call sendMessage
where I had it previously in #218
public supportedMessages = [KeylistUpdateResponseMessage] | ||
|
||
public async handle(messageContext: HandlerInboundMessage<KeylistUpdateResponseHandler>) { | ||
// TODO It should handle the response when agent calls `await this.consumerRoutingService.createRoute(connectionRecord.verkey)` and notify about the result. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why aren't we doing anything with the response? Are you going to address this in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We think should do it in separate PR which would also add the step to wait for the response after calling createRoute
. I didn't plan to add this at first, but I had to when I set return_route
and update HTTP inbound transporter. I wasn't 100 % sure what to do here and it seemed kind of out of the scope of this PR.
I created #234 to address this.
const { connection, message } = messageContext | ||
|
||
if (!connection) { | ||
// TODO We could eventually remove this check if we do it at some higher level where we create messageContext that must have a connection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup would be nice to be able to declare if a connection is required for a handler or something...
Signed-off-by: Jakub Koci <[email protected]>
Signed-off-by: Jakub Koci <[email protected]>
Thanks, @TimoGlastra for the good suggestions. I see we have 3 different areas to address (ordered by priority from my perspective):
I don't think we have to solve them all here. I'm quite curious about the solution to the 1. issue so I'm going to look at it briefly at least, considering prioritizing that. |
Signed-off-by: Jakub Koci <[email protected]>
I added an event emitter to Agent and Batch message handler to emit incoming messages to Agent. That allowed to remove I'm not saying it's perfect, but I'm quite happy with this for now. What do you think @TimoGlastra? |
I updated the PR name according to conventional commits. I guess this also brings some breaking change in agent API which I expressed in the description of the PR. That can go into the commit message when merging. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! I like it.
With the addition of the new event emitter I think it'll be good to look at a global event emitter rather sooner than later
forwardedMessages.forEach((message) => { | ||
this.eventEmitter.emit('agentMessage', message.message) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it will cause problems that all messages are emitted at the same time? E.g. If we receive 10 messages, they all get processed at the same time, meaning message 10 could be processed before message 5. It could also mean we fetch a record, another received message updated the record in the meantime and then we overwrite the new data.
I'm not sure if we want to take such things into account. A message queue could solve this, but being able to handle message in parallel would be nice. ACA-Py handles this by fetching a record 'for-update' which means it'll lock other sessions from writing until you are done. Reading is always possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I'm afraid that the problem can occur. My first idea was also a message queue. You're right that it could be convenient to handle messages in parallel in some cases. I'm surprised that ACA-Py decided to go with locking of objects which sounds quite sophisticated to me, but maybe it's the only option 🤷♂️
Agree. It could solve even more problems like:
|
…foundation#231) * add `return_route` parameter to outbound when there is no inbound connection (no service with endpoint other than `didcomm:transport/queue`) * add keylist-update-response message and handler with a naive implementation * add batch message handler with agent event emitter BREAKING CHANGE: extracts outbound transporter from Agent's constructor. Signed-off-by: Jakub Koci <[email protected]>
The previous implementation assumed that a connection with a mediator is made “synchronously “ via HTTP transport. By “synchronously” I mean that the provision method handled every step of making a connection and waiting for the response message assuming HTTP request-response communication. Now it should be independent on the transport layer. It just accepts the invitation and waits until the connection is made regardless of how the messages are exchanged.
I needed to do the following changes:
return_route
parameter to outbound when there is no inbound connection (there is no service with endpoint other thandidcomm:transport/queue
)keylist-update-response
message and handler, just with naive implementation.outboundTransporter
as direct dependency in Agent’sconstructor
.Next steps on the way to support WebSockets transport I would like to do:
inboundTransporter
from Agent’sconstructor
. (I can perhaps do it as part of this PR, but the changes started growing and I wanted to share what I have now.)DidDoc
.Next steps I see we will need to do someday in near future but not necessarily before or right after adding the WebSockets:
batch
message and to removesendAndReceiveMessage
that is still implemented in the old way relaying on synchronicity of HTTP request-response.BREAKING CHANGE: extracts outbound transporter from
Agent
's constructor.