Skip to content

Commit

Permalink
feat(core): connection-less issuance and verification (#359)
Browse files Browse the repository at this point in the history
Signed-off-by: Timo Glastra <[email protected]>
  • Loading branch information
TimoGlastra authored Jul 22, 2021
1 parent d84acc7 commit fb46ade
Show file tree
Hide file tree
Showing 60 changed files with 2,167 additions and 992 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,13 @@ Some features are not yet supported, but are on our roadmap. Check [the roadmap]
- ✅ Mediator Coordination Protocol ([RFC 0211](https://github.com/hyperledger/aries-rfcs/blob/master/features/0211-route-coordination/README.md))
- ✅ Indy Credentials (with `did:sov` support)
- ✅ HTTP Transport
- ✅ Auto accept proofs
- ✅ Connection-less Issuance and Verification
- ✅ Mediator Coordination Protocol ([RFC 0211](https://github.com/hyperledger/aries-rfcs/blob/master/features/0211-route-coordination/README.md))
- ✅ Smart Auto Acceptance of Connections, Credentials and Proofs
- 🚧 Revocation of Indy Credentials
- 🚧 Electron
- 🚧 WebSocket Transport
- ❌ Browser
- ❌ Connection-less Issuance and Verification
- ❌ Issue Credential V2, Present Proof V2, DID Exchange Protocol, Out-Of-Band
- ❌ W3C Linked Data VCs, BBS+ Signatures

Expand Down
8 changes: 4 additions & 4 deletions packages/core/src/agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ export class Agent {
}

public async shutdown({ deleteWallet = false }: { deleteWallet?: boolean } = {}) {
// All observables use takeUntil with the stop$ observable
// this means all observables will stop running if a value is emitted on this observable
this.agentConfig.stop$.next(true)

// Stop transports
await this.outboundTransporter?.stop()
await this.inboundTransporter?.stop()
Expand All @@ -199,10 +203,6 @@ export class Agent {
await this.wallet.close()
}
}

// All observables use takeUntil with the stop$ observable
// this means all observables will stop running if a value is emitted on this observable
this.agentConfig.stop$.next(true)
}

public get publicDid() {
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/agent/AgentMessage.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { AckDecorated } from '../decorators/ack/AckDecoratorExtension'
import { AttachmentDecorated } from '../decorators/attachment/AttachmentExtension'
import { L10nDecorated } from '../decorators/l10n/L10nDecoratorExtension'
import { ServiceDecorated } from '../decorators/service/ServiceDecoratorExtension'
import { ThreadDecorated } from '../decorators/thread/ThreadDecoratorExtension'
import { TimingDecorated } from '../decorators/timing/TimingDecoratorExtension'
import { TransportDecorated } from '../decorators/transport/TransportDecoratorExtension'
Expand All @@ -16,6 +17,7 @@ const DefaultDecorators = [
TimingDecorated,
AckDecorated,
AttachmentDecorated,
ServiceDecorated,
]

export class AgentMessage extends Compose(BaseMessage, DefaultDecorators) {
Expand Down
16 changes: 12 additions & 4 deletions packages/core/src/agent/Dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { AriesFrameworkError } from '../error/AriesFrameworkError'
import { EventEmitter } from './EventEmitter'
import { AgentEventTypes } from './Events'
import { MessageSender } from './MessageSender'
import { isOutboundServiceMessage } from './helpers'

@scoped(Lifecycle.ContainerScoped)
class Dispatcher {
Expand All @@ -36,6 +37,17 @@ class Dispatcher {

const outboundMessage = await handler.handle(messageContext)

if (outboundMessage && isOutboundServiceMessage(outboundMessage)) {
await this.messageSender.sendMessageToService({
message: outboundMessage.payload,
service: outboundMessage.service,
senderKey: outboundMessage.senderKey,
returnRoute: true,
})
} else if (outboundMessage) {
await this.messageSender.sendMessage(outboundMessage)
}

// Emit event that allows to hook into received messages
this.eventEmitter.emit<AgentMessageProcessedEvent>({
type: AgentEventTypes.AgentMessageProcessed,
Expand All @@ -44,10 +56,6 @@ class Dispatcher {
connection: messageContext.connection,
},
})

if (outboundMessage) {
await this.messageSender.sendMessage(outboundMessage)
}
}

private getHandlerForType(messageType: string): Handler | undefined {
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/agent/EnvelopeService.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Logger } from '../logger'
import type { PackedMessage, UnpackedMessageContext } from '../types'
import type { UnpackedMessageContext, WireMessage } from '../types'
import type { AgentMessage } from './AgentMessage'
import type { Verkey } from 'indy-sdk'

Expand Down Expand Up @@ -27,7 +27,7 @@ class EnvelopeService {
this.logger = agentConfig.logger
}

public async packMessage(payload: AgentMessage, keys: EnvelopeKeys): Promise<PackedMessage> {
public async packMessage(payload: AgentMessage, keys: EnvelopeKeys): Promise<WireMessage> {
const { routingKeys, recipientKeys, senderKey: senderVk } = keys
const message = payload.toJSON()

Expand All @@ -50,7 +50,7 @@ class EnvelopeService {
return wireMessage
}

public async unpackMessage(packedMessage: PackedMessage): Promise<UnpackedMessageContext> {
public async unpackMessage(packedMessage: WireMessage): Promise<UnpackedMessageContext> {
return this.wallet.unpack(packedMessage)
}
}
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/agent/Handler.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import type { OutboundMessage } from '../types'
import type { OutboundMessage, OutboundServiceMessage } from '../types'
import type { AgentMessage } from './AgentMessage'
import type { InboundMessageContext } from './models/InboundMessageContext'

export interface Handler<T extends typeof AgentMessage = typeof AgentMessage> {
readonly supportedMessages: readonly T[]

handle(messageContext: InboundMessageContext): Promise<OutboundMessage | void>
handle(messageContext: InboundMessageContext): Promise<OutboundMessage | OutboundServiceMessage | void>
}

/**
Expand Down
25 changes: 13 additions & 12 deletions packages/core/src/agent/MessageReceiver.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Logger } from '../logger'
import type { UnpackedMessageContext, UnpackedMessage } from '../types'
import type { UnpackedMessageContext, UnpackedMessage, WireMessage } from '../types'
import type { AgentMessage } from './AgentMessage'
import type { TransportSession } from './TransportService'

Expand Down Expand Up @@ -53,13 +53,15 @@ export class MessageReceiver {

this.logger.debug(`Agent ${this.config.label} received message`)

const unpackedMessage = await this.unpackMessage(inboundPackedMessage as Record<string, unknown>)
const senderKey = unpackedMessage.sender_verkey
const unpackedMessage = await this.unpackMessage(inboundPackedMessage as WireMessage)
const senderKey = unpackedMessage.senderVerkey
const recipientKey = unpackedMessage.recipientVerkey

let connection = undefined
if (senderKey && unpackedMessage.recipient_verkey) {
if (senderKey && recipientKey) {
// TODO: only attach if theirKey is present. Otherwise a connection that may not be complete, validated or correct will
// be attached to the message context. See #76
connection = (await this.connectionService.findByVerkey(unpackedMessage.recipient_verkey)) || undefined
connection = (await this.connectionService.findByVerkey(recipientKey)) || undefined

// We check whether the sender key is the same as the key we have stored in the connection
// otherwise everyone could send messages to our key and we would just accept
Expand All @@ -80,26 +82,25 @@ export class MessageReceiver {
const messageContext = new InboundMessageContext(message, {
connection,
senderVerkey: senderKey,
recipientVerkey: unpackedMessage.recipient_verkey,
recipientVerkey: recipientKey,
})

// We want to save a session if there is a chance of returning outbound message via inbound transport.
// That can happen when inbound message has `return_route` set to `all` or `thread`.
// If `return_route` defines just `thread`, we decide later whether to use session according to outbound message `threadId`.
if (connection && message.hasAnyReturnRoute() && session) {
if (senderKey && recipientKey && message.hasAnyReturnRoute() && session) {
const keys = {
// TODO handle the case when senderKey is missing
recipientKeys: senderKey ? [senderKey] : [],
recipientKeys: [senderKey],
routingKeys: [],
senderKey: connection?.verkey || null,
senderKey: recipientKey,
}
session.keys = keys
session.inboundMessage = message
session.connection = connection
this.transportService.saveSession(session)
}

return await this.dispatcher.dispatch(messageContext)
await this.dispatcher.dispatch(messageContext)
}

/**
Expand All @@ -108,7 +109,7 @@ export class MessageReceiver {
*
* @param packedMessage the received, probably packed, message to unpack
*/
private async unpackMessage(packedMessage: Record<string, unknown>): Promise<UnpackedMessageContext> {
private async unpackMessage(packedMessage: WireMessage): Promise<UnpackedMessageContext> {
// If the inbound message has no @type field we assume
// the message is packed and must be unpacked first
if (!this.isUnpackedMessage(packedMessage)) {
Expand Down
Loading

0 comments on commit fb46ade

Please sign in to comment.