-
Notifications
You must be signed in to change notification settings - Fork 207
/
Copy pathAgent.ts
241 lines (203 loc) · 9.93 KB
/
Agent.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
import type { InboundTransport } from '../transport/InboundTransport'
import type { OutboundTransport } from '../transport/OutboundTransport'
import type { InitConfig } from '../types'
import type { AgentDependencies } from './AgentDependencies'
import type { AgentModulesInput, ModulesMap } from './AgentModules'
import type { AgentMessageReceivedEvent } from './Events'
import type { Subscription } from 'rxjs'
import { Subject } from 'rxjs'
import { concatMap, takeUntil } from 'rxjs/operators'
import { CacheRepository } from '../cache'
import { InjectionSymbols } from '../constants'
import { SigningProviderToken } from '../crypto'
import { JwsService } from '../crypto/JwsService'
import { AriesFrameworkError } from '../error'
import { DependencyManager } from '../plugins'
import { DidCommMessageRepository, StorageUpdateService, StorageVersionRepository } from '../storage'
import { InMemoryMessageRepository } from '../storage/InMemoryMessageRepository'
import { IndyStorageService } from '../storage/IndyStorageService'
import { IndyWallet } from '../wallet/IndyWallet'
import { AgentConfig } from './AgentConfig'
import { extendModulesWithDefaultModules } from './AgentModules'
import { BaseAgent } from './BaseAgent'
import { Dispatcher } from './Dispatcher'
import { EnvelopeService } from './EnvelopeService'
import { EventEmitter } from './EventEmitter'
import { AgentEventTypes } from './Events'
import { FeatureRegistry } from './FeatureRegistry'
import { MessageReceiver } from './MessageReceiver'
import { MessageSender } from './MessageSender'
import { TransportService } from './TransportService'
import { AgentContext, DefaultAgentContextProvider } from './context'
interface AgentOptions<AgentModules extends AgentModulesInput> {
config: InitConfig
modules?: AgentModules
dependencies: AgentDependencies
}
export class Agent<AgentModules extends AgentModulesInput = ModulesMap> extends BaseAgent<AgentModules> {
public messageSubscription: Subscription
public constructor(options: AgentOptions<AgentModules>, dependencyManager = new DependencyManager()) {
const agentConfig = new AgentConfig(options.config, options.dependencies)
const modulesWithDefaultModules = extendModulesWithDefaultModules(agentConfig, options.modules)
// Register internal dependencies
dependencyManager.registerSingleton(EventEmitter)
dependencyManager.registerSingleton(MessageSender)
dependencyManager.registerSingleton(MessageReceiver)
dependencyManager.registerSingleton(TransportService)
dependencyManager.registerSingleton(Dispatcher)
dependencyManager.registerSingleton(EnvelopeService)
dependencyManager.registerSingleton(FeatureRegistry)
dependencyManager.registerSingleton(JwsService)
dependencyManager.registerSingleton(CacheRepository)
dependencyManager.registerSingleton(DidCommMessageRepository)
dependencyManager.registerSingleton(StorageVersionRepository)
dependencyManager.registerSingleton(StorageUpdateService)
// This is a really ugly hack to make tsyringe work without any SigningProviders registered
// It is currently impossible to use @injectAll if there are no instances registered for the
// token. We register a value of `default` by default and will filter that out in the registry.
// Once we have a signing provider that should always be registered we can remove this. We can make an ed25519
// signer using the @stablelib/ed25519 library.
dependencyManager.registerInstance(SigningProviderToken, 'default')
dependencyManager.registerInstance(AgentConfig, agentConfig)
dependencyManager.registerInstance(InjectionSymbols.AgentDependencies, agentConfig.agentDependencies)
dependencyManager.registerInstance(InjectionSymbols.Stop$, new Subject<boolean>())
dependencyManager.registerInstance(InjectionSymbols.FileSystem, new agentConfig.agentDependencies.FileSystem())
// Register possibly already defined services
if (!dependencyManager.isRegistered(InjectionSymbols.Wallet)) {
dependencyManager.registerContextScoped(InjectionSymbols.Wallet, IndyWallet)
}
if (!dependencyManager.isRegistered(InjectionSymbols.Logger)) {
dependencyManager.registerInstance(InjectionSymbols.Logger, agentConfig.logger)
}
if (!dependencyManager.isRegistered(InjectionSymbols.StorageService)) {
dependencyManager.registerSingleton(InjectionSymbols.StorageService, IndyStorageService)
}
if (!dependencyManager.isRegistered(InjectionSymbols.MessageRepository)) {
dependencyManager.registerSingleton(InjectionSymbols.MessageRepository, InMemoryMessageRepository)
}
// Register all modules. This will also include the default modules
dependencyManager.registerModules(modulesWithDefaultModules)
// TODO: contextCorrelationId for base wallet
// Bind the default agent context to the container for use in modules etc.
dependencyManager.registerInstance(
AgentContext,
new AgentContext({
dependencyManager,
contextCorrelationId: 'default',
})
)
// If no agent context provider has been registered we use the default agent context provider.
if (!dependencyManager.isRegistered(InjectionSymbols.AgentContextProvider)) {
dependencyManager.registerSingleton(InjectionSymbols.AgentContextProvider, DefaultAgentContextProvider)
}
super(agentConfig, dependencyManager)
const stop$ = this.dependencyManager.resolve<Subject<boolean>>(InjectionSymbols.Stop$)
// Listen for new messages (either from transports or somewhere else in the framework / extensions)
this.messageSubscription = this.eventEmitter
.observable<AgentMessageReceivedEvent>(AgentEventTypes.AgentMessageReceived)
.pipe(
takeUntil(stop$),
concatMap((e) =>
this.messageReceiver
.receiveMessage(e.payload.message, {
connection: e.payload.connection,
contextCorrelationId: e.payload.contextCorrelationId,
})
.catch((error) => {
this.logger.error('Failed to process message', { error })
})
)
)
.subscribe()
}
public registerInboundTransport(inboundTransport: InboundTransport) {
this.messageReceiver.registerInboundTransport(inboundTransport)
}
public get inboundTransports() {
return this.messageReceiver.inboundTransports
}
public registerOutboundTransport(outboundTransport: OutboundTransport) {
this.messageSender.registerOutboundTransport(outboundTransport)
}
public get outboundTransports() {
return this.messageSender.outboundTransports
}
public get events() {
return this.eventEmitter
}
/**
* Agent's feature registry
*/
public get features() {
return this.featureRegistry
}
public async initialize() {
await super.initialize()
// set the pools on the ledger.
this.ledger.setPools(this.ledger.config.indyLedgers)
// As long as value isn't false we will async connect to all genesis pools on startup
if (this.ledger.config.connectToIndyLedgersOnStartup) {
this.ledger.connectToPools().catch((error) => {
this.logger.warn('Error connecting to ledger, will try to reconnect when needed.', { error })
})
}
for (const transport of this.inboundTransports) {
await transport.start(this)
}
for (const transport of this.outboundTransports) {
await transport.start(this)
}
// Connect to mediator through provided invitation if provided in config
// Also requests mediation ans sets as default mediator
// Because this requires the connections module, we do this in the agent constructor
if (this.mediationRecipient.config.mediatorInvitationUrl) {
this.logger.debug('Provision mediation with invitation', {
mediatorInvitationUrl: this.mediationRecipient.config.mediatorInvitationUrl,
})
const mediationConnection = await this.getMediationConnection(
this.mediationRecipient.config.mediatorInvitationUrl
)
await this.mediationRecipient.provision(mediationConnection)
}
await this.mediator.initialize()
await this.mediationRecipient.initialize()
this._isInitialized = true
}
public async shutdown() {
const stop$ = this.dependencyManager.resolve<Subject<boolean>>(InjectionSymbols.Stop$)
// All observables use takeUntil with the stop$ observable
// this means all observables will stop running if a value is emitted on this observable
stop$.next(true)
// Stop transports
const allTransports = [...this.inboundTransports, ...this.outboundTransports]
const transportPromises = allTransports.map((transport) => transport.stop())
await Promise.all(transportPromises)
if (this.wallet.isInitialized) {
await this.wallet.close()
}
this._isInitialized = false
}
protected async getMediationConnection(mediatorInvitationUrl: string) {
const outOfBandInvitation = await this.oob.parseInvitation(mediatorInvitationUrl)
const outOfBandRecord = await this.oob.findByInvitationId(outOfBandInvitation.id)
const [connection] = outOfBandRecord ? await this.connections.findAllByOutOfBandId(outOfBandRecord.id) : []
if (!connection) {
this.logger.debug('Mediation connection does not exist, creating connection')
// We don't want to use the current default mediator when connecting to another mediator
const routing = await this.mediationRecipient.getRouting({ useDefaultMediator: false })
this.logger.debug('Routing created', routing)
const { connectionRecord: newConnection } = await this.oob.receiveInvitation(outOfBandInvitation, {
routing,
})
this.logger.debug(`Mediation invitation processed`, { outOfBandInvitation })
if (!newConnection) {
throw new AriesFrameworkError('No connection record to provision mediation.')
}
return this.connections.returnWhenIsConnected(newConnection.id)
}
if (!connection.isReady) {
return this.connections.returnWhenIsConnected(connection.id)
}
return connection
}
}