Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add base agent class #922

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 47 additions & 196 deletions packages/core/src/agent/Agent.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import type { Logger } from '../logger'
import type { DependencyManager } from '../plugins'
import type { InboundTransport } from '../transport/InboundTransport'
import type { OutboundTransport } from '../transport/OutboundTransport'
import type { InitConfig } from '../types'
import type { AgentDependencies } from './AgentDependencies'
import type { AgentMessageReceivedEvent } from './Events'
import type { TransportSession } from './TransportService'
import type { Subscription } from 'rxjs'
import type { DependencyContainer } from 'tsyringe'

Expand All @@ -29,19 +28,15 @@ import { ProofsModule } from '../modules/proofs/ProofsModule'
import { QuestionAnswerModule } from '../modules/question-answer/QuestionAnswerModule'
import { MediatorModule } from '../modules/routing/MediatorModule'
import { RecipientModule } from '../modules/routing/RecipientModule'
import { RoutingService } from '../modules/routing/services/RoutingService'
import { W3cVcModule } from '../modules/vc/module'
import { DependencyManager } from '../plugins'
import { StorageUpdateService, DidCommMessageRepository, StorageVersionRepository } from '../storage'
import { DidCommMessageRepository, StorageUpdateService, StorageVersionRepository } from '../storage'
import { InMemoryMessageRepository } from '../storage/InMemoryMessageRepository'
import { IndyStorageService } from '../storage/IndyStorageService'
import { UpdateAssistant } from '../storage/migration/UpdateAssistant'
import { DEFAULT_UPDATE_CONFIG } from '../storage/migration/updates'
import { IndyWallet } from '../wallet/IndyWallet'
import { WalletModule } from '../wallet/WalletModule'
import { WalletError } from '../wallet/error'

import { AgentConfig } from './AgentConfig'
import { BaseAgent } from './BaseAgent'
import { Dispatcher } from './Dispatcher'
import { EnvelopeService } from './EnvelopeService'
import { EventEmitter } from './EventEmitter'
Expand All @@ -51,92 +46,25 @@ import { MessageSender } from './MessageSender'
import { TransportService } from './TransportService'
import { AgentContext, DefaultAgentContextProvider } from './context'

export class Agent {
protected agentConfig: AgentConfig
protected logger: Logger
public readonly dependencyManager: DependencyManager
protected eventEmitter: EventEmitter
protected messageReceiver: MessageReceiver
protected transportService: TransportService
protected messageSender: MessageSender
private _isInitialized = false
export class Agent extends BaseAgent {
public messageSubscription: Subscription
private routingService: RoutingService
private agentContext: AgentContext
private stop$ = new Subject<boolean>()

public readonly connections: ConnectionsModule
public readonly proofs: ProofsModule
public readonly basicMessages: BasicMessagesModule
public readonly genericRecords: GenericRecordsModule
public readonly ledger: LedgerModule
public readonly questionAnswer!: QuestionAnswerModule
public readonly credentials: CredentialsModule
public readonly mediationRecipient: RecipientModule
public readonly mediator: MediatorModule
public readonly discovery: DiscoverFeaturesModule
public readonly dids: DidsModule
public readonly wallet: WalletModule
public readonly oob!: OutOfBandModule

public constructor(
initialConfig: InitConfig,
dependencies: AgentDependencies,
injectionContainer?: DependencyContainer
) {
// Take input container or child container so we don't interfere with anything outside of this agent
const container = injectionContainer ?? baseContainer.createChildContainer()

this.dependencyManager = new DependencyManager(container)

this.agentConfig = new AgentConfig(initialConfig, dependencies)
this.logger = this.agentConfig.logger

this.logger.info('Creating agent with config', {
...initialConfig,
// Prevent large object being logged.
// Will display true/false to indicate if value is present in config
logger: initialConfig.logger != undefined,
})

if (!this.agentConfig.walletConfig) {
this.logger.warn(
'Wallet config has not been set on the agent config. ' +
'Make sure to initialize the wallet yourself before initializing the agent, ' +
'or provide the required wallet configuration in the agent constructor'
)
}
// NOTE: we can't create variables before calling super as TS will complain that the super call must be the
// the first statement in the constructor.
super(new AgentConfig(initialConfig, dependencies), injectionContainer ?? baseContainer.createChildContainer())

this.registerDependencies(this.dependencyManager)

// Resolve instances after everything is registered
this.eventEmitter = this.dependencyManager.resolve(EventEmitter)
this.messageSender = this.dependencyManager.resolve(MessageSender)
this.messageReceiver = this.dependencyManager.resolve(MessageReceiver)
this.transportService = this.dependencyManager.resolve(TransportService)
this.routingService = this.dependencyManager.resolve(RoutingService)
this.agentContext = this.dependencyManager.resolve(AgentContext)

// We set the modules in the constructor because that allows to set them as read-only
this.connections = this.dependencyManager.resolve(ConnectionsModule)
this.credentials = this.dependencyManager.resolve(CredentialsModule) as CredentialsModule
this.proofs = this.dependencyManager.resolve(ProofsModule)
this.mediator = this.dependencyManager.resolve(MediatorModule)
this.mediationRecipient = this.dependencyManager.resolve(RecipientModule)
this.basicMessages = this.dependencyManager.resolve(BasicMessagesModule)
this.questionAnswer = this.dependencyManager.resolve(QuestionAnswerModule)
this.genericRecords = this.dependencyManager.resolve(GenericRecordsModule)
this.ledger = this.dependencyManager.resolve(LedgerModule)
this.discovery = this.dependencyManager.resolve(DiscoverFeaturesModule)
this.dids = this.dependencyManager.resolve(DidsModule)
this.wallet = this.dependencyManager.resolve(WalletModule)
this.oob = this.dependencyManager.resolve(OutOfBandModule)
const stop$ = this.dependencyManager.resolve<Subject<boolean>>(InjectionSymbols.Stop$)

// Listen for new messages (either from transports or somewhere else in the framework / extensions)
this.messageSubscription = this.eventEmitter
.observable<AgentMessageReceivedEvent>(AgentEventTypes.AgentMessageReceived)
.pipe(
takeUntil(this.stop$),
takeUntil(stop$),
concatMap((e) =>
this.messageReceiver.receiveMessage(e.payload.message, {
connection: e.payload.connection,
Expand Down Expand Up @@ -172,51 +100,9 @@ export class Agent {
}

public async initialize() {
const { connectToIndyLedgersOnStartup, publicDidSeed, walletConfig, mediatorConnectionsInvite } = this.agentConfig

if (this._isInitialized) {
throw new AriesFrameworkError(
'Agent already initialized. Currently it is not supported to re-initialize an already initialized agent.'
)
}

if (!this.wallet.isInitialized && walletConfig) {
await this.wallet.initialize(walletConfig)
} else if (!this.wallet.isInitialized) {
throw new WalletError(
'Wallet config has not been set on the agent config. ' +
'Make sure to initialize the wallet yourself before initializing the agent, ' +
'or provide the required wallet configuration in the agent constructor'
)
}

// Make sure the storage is up to date
const storageUpdateService = this.dependencyManager.resolve(StorageUpdateService)
const isStorageUpToDate = await storageUpdateService.isUpToDate(this.agentContext)
this.logger.info(`Agent storage is ${isStorageUpToDate ? '' : 'not '}up to date.`)

if (!isStorageUpToDate && this.agentConfig.autoUpdateStorageOnStartup) {
const updateAssistant = new UpdateAssistant(this, DEFAULT_UPDATE_CONFIG)

await updateAssistant.initialize()
await updateAssistant.update()
} else if (!isStorageUpToDate) {
const currentVersion = await storageUpdateService.getCurrentStorageVersion(this.agentContext)
// Close wallet to prevent un-initialized agent with initialized wallet
await this.wallet.close()
throw new AriesFrameworkError(
// TODO: add link to where documentation on how to update can be found.
`Current agent storage is not up to date. ` +
`To prevent the framework state from getting corrupted the agent initialization is aborted. ` +
`Make sure to update the agent storage (currently at ${currentVersion}) to the latest version (${UpdateAssistant.frameworkStorageVersion}). ` +
`You can also downgrade your version of Aries Framework JavaScript.`
)
}
const { connectToIndyLedgersOnStartup, mediatorConnectionsInvite } = this.agentConfig

if (publicDidSeed) {
// If an agent has publicDid it will be used as routing key.
await this.agentContext.wallet.initPublicDid({ seed: publicDidSeed })
}
await super.initialize()

// set the pools on the ledger.
this.ledger.setPools(this.agentContext.config.indyLedgers)
Expand Down Expand Up @@ -250,84 +136,20 @@ export class Agent {
}

public async shutdown() {
const stop$ = this.dependencyManager.resolve<Subject<boolean>>(InjectionSymbols.Stop$)
// All observables use takeUntil with the stop$ observable
// this means all observables will stop running if a value is emitted on this observable
this.stop$.next(true)
stop$.next(true)

// Stop transports
const allTransports = [...this.inboundTransports, ...this.outboundTransports]
const transportPromises = allTransports.map((transport) => transport.stop())
await Promise.all(transportPromises)

// close wallet if still initialized
if (this.wallet.isInitialized) {
await this.wallet.close()
}
this._isInitialized = false
}

public get publicDid() {
return this.agentContext.wallet.publicDid
}

/**
* Receive a message. This should mainly be used for receiving connection-less messages.
*
* If you want to receive messages that originated from e.g. a transport make sure to use the {@link MessageReceiver}
* for this. The `receiveMessage` method on the `Agent` class will associate the current context to the message, which
* may not be what should happen (e.g. in case of multi tenancy).
*/
public async receiveMessage(inboundMessage: unknown, session?: TransportSession) {
return await this.messageReceiver.receiveMessage(inboundMessage, {
session,
contextCorrelationId: this.agentContext.contextCorrelationId,
})
}

public get injectionContainer() {
return this.dependencyManager.container
}

public get config() {
return this.agentConfig
}

public get context() {
return this.agentContext
}

private async getMediationConnection(mediatorInvitationUrl: string) {
const outOfBandInvitation = this.oob.parseInvitation(mediatorInvitationUrl)
const outOfBandRecord = await this.oob.findByInvitationId(outOfBandInvitation.id)
const [connection] = outOfBandRecord ? await this.connections.findAllByOutOfBandId(outOfBandRecord.id) : []

if (!connection) {
this.logger.debug('Mediation connection does not exist, creating connection')
// We don't want to use the current default mediator when connecting to another mediator
const routing = await this.routingService.getRouting(this.agentContext, { useDefaultMediator: false })

this.logger.debug('Routing created', routing)
const { connectionRecord: newConnection } = await this.oob.receiveInvitation(outOfBandInvitation, {
routing,
})
this.logger.debug(`Mediation invitation processed`, { outOfBandInvitation })

if (!newConnection) {
throw new AriesFrameworkError('No connection record to provision mediation.')
}

return this.connections.returnWhenIsConnected(newConnection.id)
}

if (!connection.isReady) {
return this.connections.returnWhenIsConnected(connection.id)
}
return connection
await super.shutdown()
}

private registerDependencies(dependencyManager: DependencyManager) {
const dependencies = this.agentConfig.agentDependencies

protected registerDependencies(dependencyManager: DependencyManager) {
// Register internal dependencies
dependencyManager.registerSingleton(EventEmitter)
dependencyManager.registerSingleton(MessageSender)
Expand All @@ -342,9 +164,9 @@ export class Agent {
dependencyManager.registerSingleton(StorageUpdateService)

dependencyManager.registerInstance(AgentConfig, this.agentConfig)
dependencyManager.registerInstance(InjectionSymbols.AgentDependencies, dependencies)
dependencyManager.registerInstance(InjectionSymbols.FileSystem, new dependencies.FileSystem())
dependencyManager.registerInstance(InjectionSymbols.Stop$, this.stop$)
dependencyManager.registerInstance(InjectionSymbols.AgentDependencies, this.agentConfig.agentDependencies)
dependencyManager.registerInstance(InjectionSymbols.Stop$, new Subject<boolean>())
dependencyManager.registerInstance(InjectionSymbols.FileSystem, new this.agentConfig.agentDependencies.FileSystem())

// Register possibly already defined services
if (!dependencyManager.isRegistered(InjectionSymbols.Wallet)) {
Expand Down Expand Up @@ -391,4 +213,33 @@ export class Agent {
this.dependencyManager.registerSingleton(InjectionSymbols.AgentContextProvider, DefaultAgentContextProvider)
}
}

protected async getMediationConnection(mediatorInvitationUrl: string) {
const outOfBandInvitation = this.oob.parseInvitation(mediatorInvitationUrl)
const outOfBandRecord = await this.oob.findByInvitationId(outOfBandInvitation.id)
const [connection] = outOfBandRecord ? await this.connections.findAllByOutOfBandId(outOfBandRecord.id) : []

if (!connection) {
this.logger.debug('Mediation connection does not exist, creating connection')
// We don't want to use the current default mediator when connecting to another mediator
const routing = await this.mediationRecipient.getRouting({ useDefaultMediator: false })

this.logger.debug('Routing created', routing)
const { connectionRecord: newConnection } = await this.oob.receiveInvitation(outOfBandInvitation, {
routing,
})
this.logger.debug(`Mediation invitation processed`, { outOfBandInvitation })

if (!newConnection) {
throw new AriesFrameworkError('No connection record to provision mediation.')
}

return this.connections.returnWhenIsConnected(newConnection.id)
}

if (!connection.isReady) {
return this.connections.returnWhenIsConnected(connection.id)
}
return connection
}
}
Loading