Skip to content

Commit

Permalink
refactor: make a connection with mediator asynchronously (#231)
Browse files Browse the repository at this point in the history
* add `return_route` parameter to outbound when there is no inbound connection (no service with endpoint other than `didcomm:transport/queue`)
* add keylist-update-response message and handler with a naive implementation
* add batch message handler with agent event emitter 

BREAKING CHANGE: extracts outbound transporter from Agent's constructor.

Signed-off-by: Jakub Koci <[email protected]>
  • Loading branch information
jakubkoci authored Apr 13, 2021
1 parent 6b33dbf commit bafa839
Show file tree
Hide file tree
Showing 24 changed files with 283 additions and 148 deletions.
50 changes: 26 additions & 24 deletions samples/__tests__/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { OutboundPackage, InitConfig } from '../../src/types'
import { get, post } from '../http'
import { sleep, toBeConnectedWith, waitForBasicMessage } from '../../src/__tests__/helpers'
import indy from 'indy-sdk'
import testLogger from '../../src/__tests__/logger'
import logger from '../../src/__tests__/logger'

expect.extend({ toBeConnectedWith })

Expand All @@ -13,7 +13,7 @@ const aliceConfig: InitConfig = {
walletConfig: { id: 'e2e-alice' },
walletCredentials: { key: '00000000000000000000000000000Test01' },
autoAcceptConnections: true,
logger: testLogger,
logger: logger,
indy,
}

Expand All @@ -23,7 +23,7 @@ const bobConfig: InitConfig = {
walletConfig: { id: 'e2e-bob' },
walletCredentials: { key: '00000000000000000000000000000Test02' },
autoAcceptConnections: true,
logger: testLogger,
logger: logger,
indy,
}

Expand All @@ -44,26 +44,26 @@ describe('with mediator', () => {
})

test('Alice and Bob make a connection with mediator', async () => {
const aliceAgentSender = new HttpOutboundTransporter()
const aliceAgentReceiver = new PollingInboundTransporter()
const bobAgentSender = new HttpOutboundTransporter()
const bobAgentReceiver = new PollingInboundTransporter()

aliceAgent = new Agent(aliceConfig, aliceAgentReceiver, aliceAgentSender)
aliceAgent = new Agent(aliceConfig, aliceAgentReceiver)
aliceAgent.setOutboundTransporter(new HttpOutboundTransporter(aliceAgent))
await aliceAgent.init()

bobAgent = new Agent(bobConfig, bobAgentReceiver, bobAgentSender)
bobAgent = new Agent(bobConfig, bobAgentReceiver)
bobAgent.setOutboundTransporter(new HttpOutboundTransporter(bobAgent))
await bobAgent.init()

const aliceInbound = aliceAgent.routing.getInboundConnection()
const aliceInboundConnection = aliceInbound?.connection
const aliceKeyAtAliceMediator = aliceInboundConnection?.verkey
testLogger.test('aliceInboundConnection', aliceInboundConnection)
logger.test('aliceInboundConnection', aliceInboundConnection)

const bobInbound = bobAgent.routing.getInboundConnection()
const bobInboundConnection = bobInbound?.connection
const bobKeyAtBobMediator = bobInboundConnection?.verkey
testLogger.test('bobInboundConnection', bobInboundConnection)
logger.test('bobInboundConnection', bobInboundConnection)

// TODO This endpoint currently exists at mediator only for the testing purpose. It returns mediator's part of the pairwise connection.
const mediatorConnectionAtAliceMediator = JSON.parse(
Expand All @@ -73,8 +73,8 @@ describe('with mediator', () => {
await get(`${bobAgent.getMediatorUrl()}/api/connections/${bobKeyAtBobMediator}`)
)

testLogger.test('mediatorConnectionAtAliceMediator', mediatorConnectionAtAliceMediator)
testLogger.test('mediatorConnectionAtBobMediator', mediatorConnectionAtBobMediator)
logger.test('mediatorConnectionAtAliceMediator', mediatorConnectionAtAliceMediator)
logger.test('mediatorConnectionAtBobMediator', mediatorConnectionAtBobMediator)

expect(aliceInboundConnection).toBeConnectedWith(mediatorConnectionAtAliceMediator)
expect(bobInboundConnection).toBeConnectedWith(mediatorConnectionAtBobMediator)
Expand Down Expand Up @@ -104,7 +104,7 @@ describe('with mediator', () => {
throw new Error(`There is no connection for id ${aliceAtAliceBobId}`)
}

testLogger.test('aliceConnectionAtAliceBob\n', aliceConnectionAtAliceBob)
logger.test('aliceConnectionAtAliceBob\n', aliceConnectionAtAliceBob)

const message = 'hello, world'
await aliceAgent.basicMessages.sendMessage(aliceConnectionAtAliceBob, message)
Expand Down Expand Up @@ -141,14 +141,7 @@ class PollingInboundTransporter implements InboundTransporter {
private pollDownloadMessages(agent: Agent) {
const loop = async () => {
while (!this.stop) {
const downloadedMessages = await agent.routing.downloadMessages()
const messages = [...downloadedMessages]
testLogger.test('downloaded messages', messages)
while (messages && messages.length > 0) {
const message = messages.shift()
await agent.receiveMessage(message)
}

await agent.routing.downloadMessages()
await sleep(1000)
}
}
Expand All @@ -159,20 +152,29 @@ class PollingInboundTransporter implements InboundTransporter {
}

class HttpOutboundTransporter implements OutboundTransporter {
private agent: Agent

public constructor(agent: Agent) {
this.agent = agent
}
public async sendMessage(outboundPackage: OutboundPackage, receiveReply: boolean) {
const { payload, endpoint } = outboundPackage

if (!endpoint) {
throw new Error(`Missing endpoint. I don't know how and where to send the message.`)
}

testLogger.test(`Sending outbound message to connection ${outboundPackage.connection.id}`, outboundPackage.payload)
logger.debug(`Sending outbound message to connection ${outboundPackage.connection.id}`, outboundPackage.payload)

if (receiveReply) {
const response = await post(`${endpoint}`, JSON.stringify(payload))
const wireMessage = JSON.parse(response)
testLogger.test('received response', wireMessage)
return wireMessage
if (response) {
logger.debug(`Response received:\n ${response}`)
const wireMessage = JSON.parse(response)
this.agent.receiveMessage(wireMessage)
} else {
logger.debug(`No response received.`)
}
} else {
await post(`${endpoint}`, JSON.stringify(payload))
}
Expand Down
3 changes: 2 additions & 1 deletion samples/mediator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ app.set('json spaces', 2)
const messageRepository = new InMemoryMessageRepository()
const messageSender = new StorageOutboundTransporter(messageRepository)
const messageReceiver = new HttpInboundTransporter(app)
const agent = new Agent(config, messageReceiver, messageSender, messageRepository)
const agent = new Agent(config, messageReceiver, messageRepository)
agent.setOutboundTransporter(messageSender)

app.get('/', async (req, res) => {
const agentDid = agent.publicDid
Expand Down
12 changes: 6 additions & 6 deletions src/__tests__/agents.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ describe('agents', () => {
const aliceMessages = new Subject()
const bobMessages = new Subject()

const aliceAgentInbound = new SubjectInboundTransporter(aliceMessages)
const aliceAgentOutbound = new SubjectOutboundTransporter(bobMessages)
const bobAgentInbound = new SubjectInboundTransporter(bobMessages)
const bobAgentOutbound = new SubjectOutboundTransporter(aliceMessages)
const aliceAgentInbound = new SubjectInboundTransporter(aliceMessages, bobMessages)
const bobAgentInbound = new SubjectInboundTransporter(bobMessages, aliceMessages)

aliceAgent = new Agent(aliceConfig, aliceAgentInbound, aliceAgentOutbound)
aliceAgent = new Agent(aliceConfig, aliceAgentInbound)
aliceAgent.setOutboundTransporter(new SubjectOutboundTransporter(bobMessages))
await aliceAgent.init()

bobAgent = new Agent(bobConfig, bobAgentInbound, bobAgentOutbound)
bobAgent = new Agent(bobConfig, bobAgentInbound)
bobAgent.setOutboundTransporter(new SubjectOutboundTransporter(aliceMessages))
await bobAgent.init()

const aliceConnectionAtAliceBob = await aliceAgent.connections.createConnection()
Expand Down
12 changes: 6 additions & 6 deletions src/__tests__/credentials.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ describe('credentials', () => {
const faberMessages = new Subject()
const aliceMessages = new Subject()

const faberAgentInbound = new SubjectInboundTransporter(faberMessages)
const faberAgentOutbound = new SubjectOutboundTransporter(aliceMessages)
const aliceAgentInbound = new SubjectInboundTransporter(aliceMessages)
const aliceAgentOutbound = new SubjectOutboundTransporter(faberMessages)
const faberAgentInbound = new SubjectInboundTransporter(faberMessages, aliceMessages)
const aliceAgentInbound = new SubjectInboundTransporter(aliceMessages, faberMessages)

faberAgent = new Agent(faberConfig, faberAgentInbound, faberAgentOutbound)
aliceAgent = new Agent(aliceConfig, aliceAgentInbound, aliceAgentOutbound)
faberAgent = new Agent(faberConfig, faberAgentInbound)
aliceAgent = new Agent(aliceConfig, aliceAgentInbound)
faberAgent.setOutboundTransporter(new SubjectOutboundTransporter(aliceMessages))
aliceAgent.setOutboundTransporter(new SubjectOutboundTransporter(faberMessages))
await faberAgent.init()
await aliceAgent.init()

Expand Down
17 changes: 12 additions & 5 deletions src/__tests__/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,25 @@ export async function waitForBasicMessage(

export class SubjectInboundTransporter implements InboundTransporter {
private subject: Subject<WireMessage>
private theirSubject: Subject<WireMessage>

public constructor(subject: Subject<WireMessage>) {
public constructor(subject: Subject<WireMessage>, theirSubject: Subject<WireMessage>) {
this.subject = subject
this.theirSubject = theirSubject
}

public start(agent: Agent) {
this.subscribe(agent, this.subject)
this.subscribe(agent)
}

private subscribe(agent: Agent, subject: Subject<WireMessage>) {
subject.subscribe({
next: (message: WireMessage) => agent.receiveMessage(message),
private subscribe(agent: Agent) {
this.subject.subscribe({
next: async (message: WireMessage) => {
const outboundMessage = await agent.receiveMessage(message)
if (outboundMessage) {
this.theirSubject.next(outboundMessage.payload)
}
},
})
}
}
Expand Down
10 changes: 2 additions & 8 deletions src/__tests__/ledger.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import indy from 'indy-sdk'
import type { SchemaId } from 'indy-sdk'
import { Agent, InboundTransporter, OutboundTransporter } from '..'
import { Agent, InboundTransporter } from '..'
import { DID_IDENTIFIER_REGEX, VERKEY_REGEX, isFullVerkey, isAbbreviatedVerkey } from '../utils/did'
import { genesisPath, sleep } from './helpers'
import { InitConfig } from '../types'
Expand All @@ -22,7 +22,7 @@ describe('ledger', () => {
let schemaId: SchemaId

beforeAll(async () => {
faberAgent = new Agent(faberConfig, new DummyInboundTransporter(), new DummyOutboundTransporter())
faberAgent = new Agent(faberConfig, new DummyInboundTransporter())
await faberAgent.init()
})

Expand Down Expand Up @@ -140,9 +140,3 @@ class DummyInboundTransporter implements InboundTransporter {
testLogger.test('Starting agent...')
}
}

class DummyOutboundTransporter implements OutboundTransporter {
public async sendMessage() {
testLogger.test('Sending message...')
}
}
12 changes: 6 additions & 6 deletions src/__tests__/proofs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ describe('Present Proof', () => {
const faberMessages = new Subject()
const aliceMessages = new Subject()

const faberAgentInbound = new SubjectInboundTransporter(faberMessages)
const faberAgentOutbound = new SubjectOutboundTransporter(aliceMessages)
const aliceAgentInbound = new SubjectInboundTransporter(aliceMessages)
const aliceAgentOutbound = new SubjectOutboundTransporter(faberMessages)
const faberAgentInbound = new SubjectInboundTransporter(faberMessages, aliceMessages)
const aliceAgentInbound = new SubjectInboundTransporter(aliceMessages, faberMessages)

faberAgent = new Agent(faberConfig, faberAgentInbound, faberAgentOutbound)
aliceAgent = new Agent(aliceConfig, aliceAgentInbound, aliceAgentOutbound)
faberAgent = new Agent(faberConfig, faberAgentInbound)
aliceAgent = new Agent(aliceConfig, aliceAgentInbound)
faberAgent.setOutboundTransporter(new SubjectOutboundTransporter(aliceMessages))
aliceAgent.setOutboundTransporter(new SubjectOutboundTransporter(faberMessages))
await faberAgent.init()
await aliceAgent.init()

Expand Down
18 changes: 15 additions & 3 deletions src/agent/Agent.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { EventEmitter } from 'events'
import { Logger } from '../logger'
import { InitConfig } from '../types'
import { IndyWallet } from '../wallet/IndyWallet'
Expand Down Expand Up @@ -33,6 +34,7 @@ import { LedgerModule } from '../modules/ledger/LedgerModule'

export class Agent {
protected logger: Logger
protected eventEmitter: EventEmitter
protected wallet: Wallet
protected agentConfig: AgentConfig
protected messageReceiver: MessageReceiver
Expand Down Expand Up @@ -66,7 +68,6 @@ export class Agent {
public constructor(
initialConfig: InitConfig,
inboundTransporter: InboundTransporter,
outboundTransporter: OutboundTransporter,
messageRepository?: MessageRepository
) {
this.agentConfig = new AgentConfig(initialConfig)
Expand All @@ -79,10 +80,16 @@ export class Agent {
indy: initialConfig.indy != undefined,
logger: initialConfig.logger != undefined,
})

this.eventEmitter = new EventEmitter()
this.eventEmitter.addListener('agentMessage', async (payload) => {
await this.receiveMessage(payload)
})

this.wallet = new IndyWallet(this.agentConfig)
const envelopeService = new EnvelopeService(this.wallet, this.agentConfig)

this.messageSender = new MessageSender(envelopeService, outboundTransporter)
this.messageSender = new MessageSender(envelopeService)
this.dispatcher = new Dispatcher(this.messageSender)
this.inboundTransporter = inboundTransporter

Expand Down Expand Up @@ -119,6 +126,10 @@ export class Agent {
this.registerModules()
}

public setOutboundTransporter(outboundTransporter: OutboundTransporter) {
this.messageSender.setOutboundTransporter(outboundTransporter)
}

public async init() {
await this.wallet.init()

Expand Down Expand Up @@ -182,7 +193,8 @@ export class Agent {
this.provisioningService,
this.messagePickupService,
this.connectionService,
this.messageSender
this.messageSender,
this.eventEmitter
)

this.basicMessages = new BasicMessagesModule(this.dispatcher, this.basicMessageService, this.messageSender)
Expand Down
7 changes: 5 additions & 2 deletions src/agent/Dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Handler } from './Handler'
import { MessageSender } from './MessageSender'
import { AgentMessage } from './AgentMessage'
import { InboundMessageContext } from './models/InboundMessageContext'
import { ReturnRouteTypes } from '../decorators/transport/TransportDecorator'

class Dispatcher {
private handlers: Handler[] = []
Expand All @@ -29,15 +30,17 @@ class Dispatcher {
if (outboundMessage) {
const threadId = outboundMessage.payload.threadId

if (!outboundMessage.connection.hasInboundEndpoint()) {
outboundMessage.payload.setReturnRouting(ReturnRouteTypes.all)
}

// check for return routing, with thread id
if (message.hasReturnRouting(threadId)) {
return await this.messageSender.packMessage(outboundMessage)
}

await this.messageSender.sendMessage(outboundMessage)
}

return outboundMessage || undefined
}

private getHandlerForType(messageType: string): Handler | undefined {
Expand Down
Loading

0 comments on commit bafa839

Please sign in to comment.