-
Notifications
You must be signed in to change notification settings - Fork 204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: mediator updates #432
Changes from 6 commits
03c6636
df4af99
174b8d2
4dd6e4e
5d79591
b17d57f
63c9254
4a73348
18a2ed4
5ea2270
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,11 @@ | ||
import type { Logger } from '../../logger' | ||
import type { OutboundWebSocketClosedEvent } from '../../transport' | ||
import type { ConnectionRecord } from '../connections' | ||
import type { MediationStateChangedEvent } from './RoutingEvents' | ||
import type { MediationRecord } from './index' | ||
|
||
import { firstValueFrom, interval, ReplaySubject } from 'rxjs' | ||
import { filter, first, takeUntil, timeout } from 'rxjs/operators' | ||
import { filter, first, takeUntil, throttleTime, timeout, delay, tap } from 'rxjs/operators' | ||
import { Lifecycle, scoped } from 'tsyringe' | ||
|
||
import { AgentConfig } from '../../agent/AgentConfig' | ||
|
@@ -13,6 +14,7 @@ import { EventEmitter } from '../../agent/EventEmitter' | |
import { MessageSender } from '../../agent/MessageSender' | ||
import { createOutboundMessage } from '../../agent/helpers' | ||
import { AriesFrameworkError } from '../../error' | ||
import { TransportEventTypes } from '../../transport' | ||
import { ConnectionInvitationMessage } from '../connections' | ||
import { ConnectionService } from '../connections/services' | ||
|
||
|
@@ -71,6 +73,57 @@ export class RecipientModule { | |
} | ||
} | ||
|
||
private async openMediationWebSocket(mediator: MediationRecord) { | ||
const { message, connectionRecord } = await this.connectionService.createTrustPing(mediator.connectionId) | ||
|
||
const websocketSchemes = ['ws', 'wss'] | ||
const hasWebSocketTransport = | ||
connectionRecord.didDoc.didCommServices.filter((s) => websocketSchemes.includes(s.protocolScheme)).length > 0 | ||
|
||
if (!hasWebSocketTransport) { | ||
throw new AriesFrameworkError('Cannot open websocket to connection without websocket service endpoint') | ||
} | ||
|
||
await this.messageSender.sendMessage(createOutboundMessage(connectionRecord, message), { | ||
transportPriority: { | ||
schemes: websocketSchemes, | ||
restrictive: true, | ||
// TODO: add keepAlive: true to enforce through the public api | ||
// we need to keep the socket alive. It already works this way, but would | ||
// be good to make more explicit from the public facing API. | ||
// This would also make it easier to change the internal API later on. | ||
// keepAlive: true, | ||
}, | ||
}) | ||
} | ||
|
||
private async initiateImplicitPickup(mediator: MediationRecord) { | ||
let interval = 50 | ||
|
||
// Listens to Outbound websocket closed events and will reopen the websocket connection | ||
// in a recursive back off strategy if it matches the following criteria: | ||
// - Agent is not shutdown | ||
// - Socket was for current mediator connection id | ||
this.eventEmitter | ||
.observable<OutboundWebSocketClosedEvent>(TransportEventTypes.OutboundWebSocketClosedEvent) | ||
.pipe( | ||
// Stop when the agent shuts down | ||
takeUntil(this.agentConfig.stop$), | ||
filter((e) => e.payload.connectionId === mediator.connectionId), | ||
// Make sure we're not reconnecting multiple times | ||
throttleTime(interval), | ||
// Increase the interval (recursive back-off) | ||
tap(() => (interval *= 2)), | ||
// Wait for interval time before reconnecting | ||
delay(interval) | ||
) | ||
.subscribe(() => { | ||
this.openMediationWebSocket(mediator) | ||
}) | ||
|
||
await this.openMediationWebSocket(mediator) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't those methods be part of the transport layer rather than inside the core of the framework? Or, it's just temporary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What it does is sending a trust ping message with a restriction to only use websocket. The websocket will be kept alive autmoatically (I'd like to make the keep alive more explicit) I'm not sure yet how we could nicely integrate this into the transport layer. Would be nice to not make it WebSocket specific (so just send a ping and keep the socket alive), but we need to specify somewhere which transports are capable of this, as e.g. HTTP isn't. Maybe a transport should be able to specify whether it can send multiple messages (WebSocket is ∞, while HTTP is 1 request, 1 response). That way we wouldn't have to specify WS explicitly, but rather whether we want a transport that can stay open (for implicit pickup) I'm going to think about it for a while. We can have a discussion about it in the WG call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have opened #435. Please leave your thoughts :) |
||
|
||
public async initiateMessagePickup(mediator: MediationRecord) { | ||
const { mediatorPickupStrategy, mediatorPollingInterval } = this.agentConfig | ||
|
||
|
@@ -92,8 +145,7 @@ export class RecipientModule { | |
// such as WebSockets to work | ||
else if (mediatorPickupStrategy === MediatorPickupStrategy.Implicit) { | ||
this.agentConfig.logger.info(`Starting implicit pickup of messages from mediator '${mediator.id}'`) | ||
const { message, connectionRecord } = await this.connectionService.createTrustPing(mediatorConnection.id) | ||
await this.messageSender.sendMessage(createOutboundMessage(connectionRecord, message)) | ||
await this.initiateImplicitPickup(mediator) | ||
} else { | ||
this.agentConfig.logger.info( | ||
`Skipping pickup of messages from mediator '${mediator.id}' due to pickup strategy none` | ||
|
@@ -189,10 +241,14 @@ export class RecipientModule { | |
// Also requests mediation and sets as default mediator | ||
// Assumption: processInvitation is a URL-encoded invitation | ||
const invitation = await ConnectionInvitationMessage.fromUrl(mediatorConnInvite) | ||
|
||
// Check if invitation has been used already | ||
if (!invitation || !invitation.recipientKeys || !invitation.recipientKeys[0]) { | ||
throw new AriesFrameworkError(`Invalid mediation invitation. Invitation must have at least one recipient key.`) | ||
} | ||
|
||
let mediationRecord: MediationRecord | null = null | ||
|
||
const connection = await this.connectionService.findByInvitationKey(invitation.recipientKeys[0]) | ||
if (!connection) { | ||
this.logger.debug('Mediation Connection does not exist, creating connection') | ||
|
@@ -209,25 +265,22 @@ export class RecipientModule { | |
const outbound = createOutboundMessage(connectionRecord, message) | ||
await this.messageSender.sendMessage(outbound) | ||
|
||
// TODO: add timeout to returnWhenIsConnected | ||
const completedConnectionRecord = await this.connectionService.returnWhenIsConnected(connectionRecord.id) | ||
this.logger.debug('Connection completed, requesting mediation') | ||
const mediationRecord = await this.requestAndAwaitGrant(completedConnectionRecord, 60000) // TODO: put timeout as a config parameter | ||
mediationRecord = await this.requestAndAwaitGrant(completedConnectionRecord, 60000) // TODO: put timeout as a config parameter | ||
this.logger.debug('Mediation Granted, setting as default mediator') | ||
await this.setDefaultMediator(mediationRecord) | ||
this.logger.debug('Default mediator set') | ||
return | ||
} else if (connection && !connection.isReady) { | ||
const connectionRecord = await this.connectionService.returnWhenIsConnected(connection.id) | ||
const mediationRecord = await this.requestAndAwaitGrant(connectionRecord, 60000) // TODO: put timeout as a config parameter | ||
mediationRecord = await this.requestAndAwaitGrant(connectionRecord, 60000) // TODO: put timeout as a config parameter | ||
await this.setDefaultMediator(mediationRecord) | ||
return | ||
} else if (connection.isReady) { | ||
} else { | ||
this.agentConfig.logger.warn('Mediator Invitation in configuration has already been used to create a connection.') | ||
const mediator = await this.findByConnectionId(connection.id) | ||
if (!mediator) { | ||
this.agentConfig.logger.warn('requesting mediation over connection.') | ||
const mediationRecord = await this.requestAndAwaitGrant(connection, 60000) // TODO: put timeout as a config parameter | ||
mediationRecord = await this.requestAndAwaitGrant(connection, 60000) // TODO: put timeout as a config parameter | ||
await this.setDefaultMediator(mediationRecord) | ||
} else { | ||
this.agentConfig.logger.warn( | ||
|
@@ -237,7 +290,10 @@ export class RecipientModule { | |
) | ||
} | ||
} | ||
|
||
return mediationRecord | ||
} | ||
|
||
// Register handlers for the several messages for the mediator. | ||
private registerHandlers(dispatcher: Dispatcher) { | ||
dispatcher.registerHandler(new KeylistUpdateResponseHandler(this.mediationRecipientService)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
import type { TagsBase } from '../../../storage/BaseRecord' | ||
|
||
import { BaseRecord } from '../../../storage/BaseRecord' | ||
import { uuid } from '../../../utils/uuid' | ||
|
||
export interface MediatorRoutingRecordProps { | ||
id?: string | ||
createdAt?: Date | ||
routingKeys?: string[] | ||
tags?: TagsBase | ||
} | ||
|
||
export class MediatorRoutingRecord extends BaseRecord implements MediatorRoutingRecordProps { | ||
public routingKeys!: string[] | ||
|
||
public static readonly type = 'MediatorRoutingRecord' | ||
public readonly type = MediatorRoutingRecord.type | ||
|
||
public constructor(props: MediatorRoutingRecordProps) { | ||
super() | ||
|
||
if (props) { | ||
this.id = props.id ?? uuid() | ||
this.createdAt = props.createdAt ?? new Date() | ||
this.routingKeys = props.routingKeys || [] | ||
} | ||
} | ||
|
||
public getTags() { | ||
return this._tags | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
import { inject, scoped, Lifecycle } from 'tsyringe' | ||
|
||
import { InjectionSymbols } from '../../../constants' | ||
import { Repository } from '../../../storage/Repository' | ||
import { StorageService } from '../../../storage/StorageService' | ||
|
||
import { MediatorRoutingRecord } from './MediatorRoutingRecord' | ||
|
||
@scoped(Lifecycle.ContainerScoped) | ||
export class MediatorRoutingRepository extends Repository<MediatorRoutingRecord> { | ||
public readonly MEDIATOR_ROUTING_RECORD_ID = 'MEDIATOR_ROUTING_RECORD' | ||
|
||
public constructor(@inject(InjectionSymbols.StorageService) storageService: StorageService<MediatorRoutingRecord>) { | ||
super(MediatorRoutingRecord, storageService) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,4 @@ | ||
export * from './MediationRepository' | ||
export * from './MediatorRoutingRepository' | ||
export * from './MediationRecord' | ||
export * from './MediatorRoutingRecord' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JamesKEbert @burdettadam The services were not being reassigned meaning the sort and filter did not do anything. This should fix it.
This means that currently it will take the http endpoint, send the ping and receive the ping response after which the http endpoint is closed 😄
So if we could merge this rather sooner than later that would be great!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How embarrassing! Thanks for catching that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoooops--my bad!
As a side note, the
array.sort()
method does actually do its operation in place and returns too, so it doesn't necessarily have to be reassigned too, but doesn't hurt to reassign it. :)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah you're right. So weird that filter does not do its operation in place, but sort does. Thanks