Skip to content

Commit

Permalink
refactor!: add agent context (#920)
Browse files Browse the repository at this point in the history
Signed-off-by: Timo Glastra <[email protected]>

BREAKING CHANGE: To make AFJ multi-tenancy ready, all services and repositories have been made stateless. A new `AgentContext` is introduced that holds the current context, which is passed to each method call. The public API hasn't been affected, but due to the large impact of this change it is marked as breaking.
  • Loading branch information
TimoGlastra authored Jul 6, 2022
1 parent d045dd6 commit 65579dd
Show file tree
Hide file tree
Showing 161 changed files with 3,861 additions and 2,768 deletions.
49 changes: 33 additions & 16 deletions packages/core/src/agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ import type { Logger } from '../logger'
import type { InboundTransport } from '../transport/InboundTransport'
import type { OutboundTransport } from '../transport/OutboundTransport'
import type { InitConfig } from '../types'
import type { Wallet } from '../wallet/Wallet'
import type { AgentDependencies } from './AgentDependencies'
import type { AgentMessageReceivedEvent } from './Events'
import type { TransportSession } from './TransportService'
import type { Subscription } from 'rxjs'
import type { DependencyContainer } from 'tsyringe'

import { Subject } from 'rxjs'
import { concatMap, takeUntil } from 'rxjs/operators'
import { container as baseContainer } from 'tsyringe'

Expand Down Expand Up @@ -42,6 +42,7 @@ import { WalletModule } from '../wallet/WalletModule'
import { WalletError } from '../wallet/error'

import { AgentConfig } from './AgentConfig'
import { AgentContext } from './AgentContext'
import { Dispatcher } from './Dispatcher'
import { EnvelopeService } from './EnvelopeService'
import { EventEmitter } from './EventEmitter'
Expand All @@ -60,8 +61,9 @@ export class Agent {
protected messageSender: MessageSender
private _isInitialized = false
public messageSubscription: Subscription
private walletService: Wallet
private routingService: RoutingService
private agentContext: AgentContext
private stop$ = new Subject<boolean>()

public readonly connections: ConnectionsModule
public readonly proofs: ProofsModule
Expand Down Expand Up @@ -112,8 +114,8 @@ export class Agent {
this.messageSender = this.dependencyManager.resolve(MessageSender)
this.messageReceiver = this.dependencyManager.resolve(MessageReceiver)
this.transportService = this.dependencyManager.resolve(TransportService)
this.walletService = this.dependencyManager.resolve(InjectionSymbols.Wallet)
this.routingService = this.dependencyManager.resolve(RoutingService)
this.agentContext = this.dependencyManager.resolve(AgentContext)

// We set the modules in the constructor because that allows to set them as read-only
this.connections = this.dependencyManager.resolve(ConnectionsModule)
Expand All @@ -134,8 +136,12 @@ export class Agent {
this.messageSubscription = this.eventEmitter
.observable<AgentMessageReceivedEvent>(AgentEventTypes.AgentMessageReceived)
.pipe(
takeUntil(this.agentConfig.stop$),
concatMap((e) => this.messageReceiver.receiveMessage(e.payload.message, { connection: e.payload.connection }))
takeUntil(this.stop$),
concatMap((e) =>
this.messageReceiver.receiveMessage(this.agentContext, e.payload.message, {
connection: e.payload.connection,
})
)
)
.subscribe()
}
Expand Down Expand Up @@ -185,7 +191,7 @@ export class Agent {

// Make sure the storage is up to date
const storageUpdateService = this.dependencyManager.resolve(StorageUpdateService)
const isStorageUpToDate = await storageUpdateService.isUpToDate()
const isStorageUpToDate = await storageUpdateService.isUpToDate(this.agentContext)
this.logger.info(`Agent storage is ${isStorageUpToDate ? '' : 'not '}up to date.`)

if (!isStorageUpToDate && this.agentConfig.autoUpdateStorageOnStartup) {
Expand All @@ -194,7 +200,7 @@ export class Agent {
await updateAssistant.initialize()
await updateAssistant.update()
} else if (!isStorageUpToDate) {
const currentVersion = await storageUpdateService.getCurrentStorageVersion()
const currentVersion = await storageUpdateService.getCurrentStorageVersion(this.agentContext)
// Close wallet to prevent un-initialized agent with initialized wallet
await this.wallet.close()
throw new AriesFrameworkError(
Expand All @@ -208,9 +214,11 @@ export class Agent {

if (publicDidSeed) {
// If an agent has publicDid it will be used as routing key.
await this.walletService.initPublicDid({ seed: publicDidSeed })
await this.agentContext.wallet.initPublicDid({ seed: publicDidSeed })
}

// set the pools on the ledger.
this.ledger.setPools(this.agentContext.config.indyLedgers)
// As long as value isn't false we will async connect to all genesis pools on startup
if (connectToIndyLedgersOnStartup) {
this.ledger.connectToPools().catch((error) => {
Expand Down Expand Up @@ -243,7 +251,7 @@ export class Agent {
public async shutdown() {
// All observables use takeUntil with the stop$ observable
// this means all observables will stop running if a value is emitted on this observable
this.agentConfig.stop$.next(true)
this.stop$.next(true)

// Stop transports
const allTransports = [...this.inboundTransports, ...this.outboundTransports]
Expand All @@ -258,11 +266,11 @@ export class Agent {
}

public get publicDid() {
return this.walletService.publicDid
return this.agentContext.wallet.publicDid
}

public async receiveMessage(inboundMessage: unknown, session?: TransportSession) {
return await this.messageReceiver.receiveMessage(inboundMessage, { session })
return await this.messageReceiver.receiveMessage(this.agentContext, inboundMessage, { session })
}

public get injectionContainer() {
Expand All @@ -273,6 +281,10 @@ export class Agent {
return this.agentConfig
}

public get context() {
return this.agentContext
}

private async getMediationConnection(mediatorInvitationUrl: string) {
const outOfBandInvitation = this.oob.parseInvitation(mediatorInvitationUrl)
const outOfBandRecord = await this.oob.findByInvitationId(outOfBandInvitation.id)
Expand All @@ -281,7 +293,7 @@ export class Agent {
if (!connection) {
this.logger.debug('Mediation connection does not exist, creating connection')
// We don't want to use the current default mediator when connecting to another mediator
const routing = await this.routingService.getRouting({ useDefaultMediator: false })
const routing = await this.routingService.getRouting(this.agentContext, { useDefaultMediator: false })

this.logger.debug('Routing created', routing)
const { connectionRecord: newConnection } = await this.oob.receiveInvitation(outOfBandInvitation, {
Expand All @@ -303,7 +315,7 @@ export class Agent {
}

private registerDependencies(dependencyManager: DependencyManager) {
dependencyManager.registerInstance(AgentConfig, this.agentConfig)
const dependencies = this.agentConfig.agentDependencies

// Register internal dependencies
dependencyManager.registerSingleton(EventEmitter)
Expand All @@ -318,11 +330,14 @@ export class Agent {
dependencyManager.registerSingleton(StorageVersionRepository)
dependencyManager.registerSingleton(StorageUpdateService)

dependencyManager.registerInstance(AgentConfig, this.agentConfig)
dependencyManager.registerInstance(InjectionSymbols.AgentDependencies, dependencies)
dependencyManager.registerInstance(InjectionSymbols.FileSystem, new dependencies.FileSystem())
dependencyManager.registerInstance(InjectionSymbols.Stop$, this.stop$)

// Register possibly already defined services
if (!dependencyManager.isRegistered(InjectionSymbols.Wallet)) {
this.dependencyManager.registerSingleton(IndyWallet)
const wallet = this.dependencyManager.resolve(IndyWallet)
dependencyManager.registerInstance(InjectionSymbols.Wallet, wallet)
dependencyManager.registerContextScoped(InjectionSymbols.Wallet, IndyWallet)
}
if (!dependencyManager.isRegistered(InjectionSymbols.Logger)) {
dependencyManager.registerInstance(InjectionSymbols.Logger, this.logger)
Expand Down Expand Up @@ -352,5 +367,7 @@ export class Agent {
IndyModule,
W3cVcModule
)

dependencyManager.registerInstance(AgentContext, new AgentContext({ dependencyManager }))
}
}
8 changes: 0 additions & 8 deletions packages/core/src/agent/AgentConfig.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import type { Logger } from '../logger'
import type { FileSystem } from '../storage/FileSystem'
import type { InitConfig } from '../types'
import type { AgentDependencies } from './AgentDependencies'

import { Subject } from 'rxjs'

import { DID_COMM_TRANSPORT_QUEUE } from '../constants'
import { AriesFrameworkError } from '../error'
import { ConsoleLogger, LogLevel } from '../logger'
Expand All @@ -17,17 +14,12 @@ export class AgentConfig {
public label: string
public logger: Logger
public readonly agentDependencies: AgentDependencies
public readonly fileSystem: FileSystem

// $stop is used for agent shutdown signal
public readonly stop$ = new Subject<boolean>()

public constructor(initConfig: InitConfig, agentDependencies: AgentDependencies) {
this.initConfig = initConfig
this.label = initConfig.label
this.logger = initConfig.logger ?? new ConsoleLogger(LogLevel.off)
this.agentDependencies = agentDependencies
this.fileSystem = new agentDependencies.FileSystem()

const { mediatorConnectionsInvite, clearDefaultMediator, defaultMediatorId } = this.initConfig

Expand Down
32 changes: 32 additions & 0 deletions packages/core/src/agent/AgentContext.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import type { DependencyManager } from '../plugins'
import type { Wallet } from '../wallet'

import { InjectionSymbols } from '../constants'

import { AgentConfig } from './AgentConfig'

export class AgentContext {
/**
* Dependency manager holds all dependencies for the current context. Possibly a child of a parent dependency manager,
* in which case all singleton dependencies from the parent context are also available to this context.
*/
public readonly dependencyManager: DependencyManager

public constructor({ dependencyManager }: { dependencyManager: DependencyManager }) {
this.dependencyManager = dependencyManager
}

/**
* Convenience method to access the agent config for the current context.
*/
public get config() {
return this.dependencyManager.resolve(AgentConfig)
}

/**
* Convenience method to access the wallet for the current context.
*/
public get wallet() {
return this.dependencyManager.resolve<Wallet>(InjectionSymbols.Wallet)
}
}
20 changes: 12 additions & 8 deletions packages/core/src/agent/Dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import type { Logger } from '../logger'
import type { OutboundMessage, OutboundServiceMessage } from '../types'
import type { AgentMessage } from './AgentMessage'
import type { AgentMessageProcessedEvent } from './Events'
import type { Handler } from './Handler'
import type { InboundMessageContext } from './models/InboundMessageContext'

import { AgentConfig } from '../agent/AgentConfig'
import { InjectionSymbols } from '../constants'
import { AriesFrameworkError } from '../error/AriesFrameworkError'
import { injectable } from '../plugins'
import { Logger } from '../logger'
import { injectable, inject } from '../plugins'
import { canHandleMessageType, parseMessageType } from '../utils/messageType'

import { ProblemReportMessage } from './../modules/problem-reports/messages/ProblemReportMessage'
Expand All @@ -23,10 +23,14 @@ class Dispatcher {
private eventEmitter: EventEmitter
private logger: Logger

public constructor(messageSender: MessageSender, eventEmitter: EventEmitter, agentConfig: AgentConfig) {
public constructor(
messageSender: MessageSender,
eventEmitter: EventEmitter,
@inject(InjectionSymbols.Logger) logger: Logger
) {
this.messageSender = messageSender
this.eventEmitter = eventEmitter
this.logger = agentConfig.logger
this.logger = logger
}

public registerHandler(handler: Handler) {
Expand Down Expand Up @@ -70,19 +74,19 @@ class Dispatcher {
}

if (outboundMessage && isOutboundServiceMessage(outboundMessage)) {
await this.messageSender.sendMessageToService({
await this.messageSender.sendMessageToService(messageContext.agentContext, {
message: outboundMessage.payload,
service: outboundMessage.service,
senderKey: outboundMessage.senderKey,
returnRoute: true,
})
} else if (outboundMessage) {
outboundMessage.sessionId = messageContext.sessionId
await this.messageSender.sendMessage(outboundMessage)
await this.messageSender.sendMessage(messageContext.agentContext, outboundMessage)
}

// Emit event that allows to hook into received messages
this.eventEmitter.emit<AgentMessageProcessedEvent>({
this.eventEmitter.emit<AgentMessageProcessedEvent>(messageContext.agentContext, {
type: AgentEventTypes.AgentMessageProcessed,
payload: {
message: messageContext.message,
Expand Down
35 changes: 18 additions & 17 deletions packages/core/src/agent/EnvelopeService.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import type { Logger } from '../logger'
import type { EncryptedMessage, PlaintextMessage } from '../types'
import type { AgentContext } from './AgentContext'
import type { AgentMessage } from './AgentMessage'

import { InjectionSymbols } from '../constants'
import { Key, KeyType } from '../crypto'
import { Logger } from '../logger'
import { ForwardMessage } from '../modules/routing/messages'
import { inject, injectable } from '../plugins'
import { Wallet } from '../wallet/Wallet'

import { AgentConfig } from './AgentConfig'

export interface EnvelopeKeys {
recipientKeys: Key[]
Expand All @@ -18,28 +16,28 @@ export interface EnvelopeKeys {

@injectable()
export class EnvelopeService {
private wallet: Wallet
private logger: Logger
private config: AgentConfig

public constructor(@inject(InjectionSymbols.Wallet) wallet: Wallet, agentConfig: AgentConfig) {
this.wallet = wallet
this.logger = agentConfig.logger
this.config = agentConfig
public constructor(@inject(InjectionSymbols.Logger) logger: Logger) {
this.logger = logger
}

public async packMessage(payload: AgentMessage, keys: EnvelopeKeys): Promise<EncryptedMessage> {
public async packMessage(
agentContext: AgentContext,
payload: AgentMessage,
keys: EnvelopeKeys
): Promise<EncryptedMessage> {
const { recipientKeys, routingKeys, senderKey } = keys
let recipientKeysBase58 = recipientKeys.map((key) => key.publicKeyBase58)
const routingKeysBase58 = routingKeys.map((key) => key.publicKeyBase58)
const senderKeyBase58 = senderKey && senderKey.publicKeyBase58

// pass whether we want to use legacy did sov prefix
const message = payload.toJSON({ useLegacyDidSovPrefix: this.config.useLegacyDidSovPrefix })
const message = payload.toJSON({ useLegacyDidSovPrefix: agentContext.config.useLegacyDidSovPrefix })

this.logger.debug(`Pack outbound message ${message['@type']}`)

let encryptedMessage = await this.wallet.pack(message, recipientKeysBase58, senderKeyBase58 ?? undefined)
let encryptedMessage = await agentContext.wallet.pack(message, recipientKeysBase58, senderKeyBase58 ?? undefined)

// If the message has routing keys (mediator) pack for each mediator
for (const routingKeyBase58 of routingKeysBase58) {
Expand All @@ -51,17 +49,20 @@ export class EnvelopeService {
recipientKeysBase58 = [routingKeyBase58]
this.logger.debug('Forward message created', forwardMessage)

const forwardJson = forwardMessage.toJSON({ useLegacyDidSovPrefix: this.config.useLegacyDidSovPrefix })
const forwardJson = forwardMessage.toJSON({ useLegacyDidSovPrefix: agentContext.config.useLegacyDidSovPrefix })

// Forward messages are anon packed
encryptedMessage = await this.wallet.pack(forwardJson, [routingKeyBase58], undefined)
encryptedMessage = await agentContext.wallet.pack(forwardJson, [routingKeyBase58], undefined)
}

return encryptedMessage
}

public async unpackMessage(encryptedMessage: EncryptedMessage): Promise<DecryptedMessageContext> {
const decryptedMessage = await this.wallet.unpack(encryptedMessage)
public async unpackMessage(
agentContext: AgentContext,
encryptedMessage: EncryptedMessage
): Promise<DecryptedMessageContext> {
const decryptedMessage = await agentContext.wallet.unpack(encryptedMessage)
const { recipientKey, senderKey, plaintextMessage } = decryptedMessage
return {
recipientKey: recipientKey ? Key.fromPublicKeyBase58(recipientKey, KeyType.Ed25519) : undefined,
Expand Down
Loading

0 comments on commit 65579dd

Please sign in to comment.