diff --git a/integration-tests/e2e-tests/src/abilities/WalletSdk.ts b/integration-tests/e2e-tests/src/abilities/WalletSdk.ts index 5418afb78..72584db30 100644 --- a/integration-tests/e2e-tests/src/abilities/WalletSdk.ts +++ b/integration-tests/e2e-tests/src/abilities/WalletSdk.ts @@ -1,137 +1,97 @@ -import { Ability, Discardable, Initialisable, Interaction, Question, QuestionAdapter } from "@serenity-js/core" -import SDK from "@atala/prism-wallet-sdk" -import { Message } from "@atala/prism-wallet-sdk/build/typings/domain" -import axios from "axios" -import { CloudAgentConfiguration } from "../configuration/CloudAgentConfiguration" -import { Utils } from "../Utils" -import { InMemoryStore } from "../configuration/InMemoryStore" -const { - Agent, - ApiImpl, - Apollo, - BasicMediatorHandler, - Castor, - ConnectionsManager, - DIDCommWrapper, - Domain, ListenerKey, - Mercury, - PublicMediatorStore -} = SDK; +import { Ability, Discardable, Initialisable, Interaction, Question, QuestionAdapter } from "@serenity-js/core"; +import SDK from "@atala/prism-wallet-sdk"; +import { Message } from "@atala/prism-wallet-sdk/build/typings/domain"; +import axios from "axios"; +import { CloudAgentConfiguration } from "../configuration/CloudAgentConfiguration"; +import { Utils } from "../Utils"; +import { InMemoryStore } from "../configuration/InMemoryStore"; + +const { Agent, Apollo, Domain, ListenerKey, } = SDK; export class WalletSdk extends Ability implements Initialisable, Discardable { - sdk!: SDK.Agent - messages: MessageQueue = new MessageQueue() + sdk!: SDK.Agent; + messages: MessageQueue = new MessageQueue(); static async withANewInstance(): Promise { const instance: SDK.Agent = await Utils.retry(2, async () => { - return await WalletSdkBuilder.createInstance() - }) - return new WalletSdk(instance) + return await WalletSdkBuilder.createInstance(); + }); + return new WalletSdk(instance); } constructor(sdk: SDK.Agent) { - super() - this.sdk = sdk + super(); + this.sdk = sdk; } static credentialOfferStackSize(): QuestionAdapter { return Question.about("credential offer stack", actor => { - return WalletSdk.as(actor).messages.credentialOfferStack.length - }) + return WalletSdk.as(actor).messages.credentialOfferStack.length; + }); } static issuedCredentialStackSize(): QuestionAdapter { return Question.about("issued credential stack", actor => { - return WalletSdk.as(actor).messages.issuedCredentialStack.length - }) + return WalletSdk.as(actor).messages.issuedCredentialStack.length; + }); } static proofOfRequestStackSize(): QuestionAdapter { return Question.about("proof of request stack", actor => { - return WalletSdk.as(actor).messages.proofRequestStack.length - }) + return WalletSdk.as(actor).messages.proofRequestStack.length; + }); } static execute(callback: (sdk: SDK.Agent, messages: { - credentialOfferStack: Message[], - issuedCredentialStack: Message[], - proofRequestStack: Message[] + credentialOfferStack: Message[]; + issuedCredentialStack: Message[]; + proofRequestStack: Message[]; }) => Promise): Interaction { return Interaction.where("#actor uses wallet sdk", async actor => { await callback(WalletSdk.as(actor).sdk, { credentialOfferStack: WalletSdk.as(actor).messages.credentialOfferStack, issuedCredentialStack: WalletSdk.as(actor).messages.issuedCredentialStack, proofRequestStack: WalletSdk.as(actor).messages.proofRequestStack - }) - }) + }); + }); } async discard(): Promise { - await this.sdk.stop() + await this.sdk.stop(); } async initialise(): Promise { this.sdk.addListener( ListenerKey.MESSAGE, (messages: SDK.Domain.Message[]) => { for (const message of messages) { - this.messages.enqueue(message) + this.messages.enqueue(message); } } - ) + ); - await this.sdk.start() + await this.sdk.start(); } isInitialised(): boolean { - return this.sdk.state != "stopped" + return this.sdk.state != "stopped"; } } class WalletSdkBuilder { private static async getMediatorDidThroughOob(): Promise { - const response = await axios.get(CloudAgentConfiguration.mediatorOobUrl) - const encodedData = response.data.split("?_oob=")[1] - const oobData = JSON.parse(Buffer.from(encodedData, "base64").toString()) - return oobData.from + const response = await axios.get(CloudAgentConfiguration.mediatorOobUrl); + const encodedData = response.data.split("?_oob=")[1]; + const oobData = JSON.parse(Buffer.from(encodedData, "base64").toString()); + return oobData.from; } static async createInstance() { - const apollo = new Apollo() - const castor = new Castor(apollo); - const store = new InMemoryStore() + const apollo = new Apollo(); + const store = new InMemoryStore(); const pluto = new SDK.Pluto(store, apollo); - await pluto.start() - - const api = new ApiImpl() - const didcomm = new DIDCommWrapper(apollo, castor, pluto) - const mercury = new Mercury(castor, didcomm, api) - - const mediatorDID = Domain.DID.fromString(await WalletSdkBuilder.getMediatorDidThroughOob()) - const mediatorStore = new PublicMediatorStore(pluto) - - const mediatorHandler = new BasicMediatorHandler( - mediatorDID, - mercury, - mediatorStore, - ) - - const connectionsManager = new ConnectionsManager( - castor, - mercury, - pluto, - mediatorHandler, - ) - - const seed = apollo.createRandomSeed().seed - return new Agent( - apollo, - castor, - pluto, - mercury, - mediatorHandler, - connectionsManager, - seed, - ) + const mediatorDID = Domain.DID.fromString(await WalletSdkBuilder.getMediatorDidThroughOob()); + + return Agent.initialize({ apollo, pluto, mediatorDID }); } } @@ -139,58 +99,59 @@ class WalletSdkBuilder { * Helper class for message queueing processor */ class MessageQueue { - private processingId: NodeJS.Timeout | null = null - private queue: Message[] = [] + private processingId: NodeJS.Timeout | null = null; + private queue: Message[] = []; - credentialOfferStack: Message[] = [] - proofRequestStack: Message[] = [] - issuedCredentialStack: Message[] = [] - receivedMessages: string[] = [] + credentialOfferStack: Message[] = []; + proofRequestStack: Message[] = []; + issuedCredentialStack: Message[] = []; + receivedMessages: string[] = []; enqueue(message: Message) { - this.queue.push(message) + this.queue.push(message); // auto start processing messages if (!this.processingId) { - this.processMessages() + this.processMessages(); } } dequeue(): Message { - return this.queue.shift()! + return this.queue.shift()!; } // Check if the queue is empty isEmpty(): boolean { - return this.queue.length === 0 + return this.queue.length === 0; } // Get the number of messages in the queue size(): number { - return this.queue.length + return this.queue.length; } processMessages() { this.processingId = setInterval(() => { if (!this.isEmpty()) { - const message: Message = this.dequeue() + const message: Message = this.dequeue(); // checks if sdk already received message if (this.receivedMessages.includes(message.id)) { - return + return; } - this.receivedMessages.push(message.id) + this.receivedMessages.push(message.id); + if (message.piuri.includes("/offer-credential")) { - this.credentialOfferStack.push(message) + this.credentialOfferStack.push(message); } else if (message.piuri.includes("/present-proof")) { - this.proofRequestStack.push(message) + this.proofRequestStack.push(message); } else if (message.piuri.includes("/issue-credential")) { - this.issuedCredentialStack.push(message) + this.issuedCredentialStack.push(message); } } else { - clearInterval(this.processingId!) - this.processingId = null + clearInterval(this.processingId!); + this.processingId = null; } - }, 50) + }, 50); } } diff --git a/src/prism-agent/connectionsManager/ConnectionsManager.ts b/src/prism-agent/connectionsManager/ConnectionsManager.ts index af07c0856..5ac7e7c9c 100644 --- a/src/prism-agent/connectionsManager/ConnectionsManager.ts +++ b/src/prism-agent/connectionsManager/ConnectionsManager.ts @@ -115,8 +115,7 @@ export class ConnectionsManager implements ConnectionsManagerClass { async awaitMessageResponse(id: string): Promise { console.log("Deprecated, use agent.addListener('THREAD-{{Your thread || messageId}}', fn), this method does not support live-mode."); const messages = await this.mediationHandler.pickupUnreadMessages(10); - return messages - .find(({ message }) => message.thid === id)?.message + return messages.find(x => x.message.thid === id)?.message; } /** @@ -133,25 +132,19 @@ export class ConnectionsManager implements ConnectionsManagerClass { } if (unreadMessages.length) { - const received = unreadMessages - .filter(({ message }) => message.direction === MessageDirection.RECEIVED) + const received = unreadMessages.filter(x => x.message.direction === MessageDirection.RECEIVED); + const messages = received.map(x => x.message); + const messageIds = received.map(x => x.attachmentId); - const messages = received - .map(({ message }) => message); - - const messageIds = received - .map(({ attachmentId }) => attachmentId); - - if (messages.length) { + if (messages.length > 0) { await this.pluto.storeMessages(messages); } - const revokeMessages = messages - .filter((message) => message.piuri === ProtocolType.PrismRevocation); + const revokeMessages = messages.filter(x => x.piuri === ProtocolType.PrismRevocation); + const allMessages = await this.pluto.getAllMessages(); - const allMessages = await this.pluto.getAllMessages() for (let message of revokeMessages) { - const revokeMessage = RevocationNotification.fromMessage(message) + const revokeMessage = RevocationNotification.fromMessage(message); const threadId = revokeMessage.body.issueCredentialProtocolThreadId; const matchingMessages = allMessages.filter(({ thid, piuri }) => @@ -159,27 +152,23 @@ export class ConnectionsManager implements ConnectionsManagerClass { piuri === ProtocolType.DidcommIssueCredential ); - if (matchingMessages.length) { - + if (matchingMessages.length > 0) { for (let message of matchingMessages) { - const issueMessage = IssueCredential.fromMessage(message) + const issueMessage = IssueCredential.fromMessage(message); const credential = await this.agentCredentials.processIssuedCredentialMessage( issueMessage - ) - await this.pluto.revokeCredential(credential) - this.events.emit(ListenerKey.REVOKE, credential) + ); + await this.pluto.revokeCredential(credential); + this.events.emit(ListenerKey.REVOKE, credential); } } - } if (messageIds.length) { await this.mediationHandler.registerMessagesAsRead(messageIds); } - this.events.emit(ListenerKey.MESSAGE, unreadMessages); - - + this.events.emit(ListenerKey.MESSAGE, messages); } } @@ -272,16 +261,16 @@ export class ConnectionsManager implements ConnectionsManagerClass { return; } const currentMediator = this.mediationHandler.mediator.mediatorDID; - const resolvedMediator = await this.castor.resolveDID(currentMediator.toString()) + const resolvedMediator = await this.castor.resolveDID(currentMediator.toString()); const hasWebsocket = resolvedMediator.services.find(({ serviceEndpoint: { uri } }) => uri.startsWith("ws://") || uri.startsWith("wss://") - ) + ); if (!hasWebsocket) { const timeInterval = Math.max(iterationPeriod, 5) * 1000; this.cancellable = new CancellableTask(async () => { const unreadMessages = await this.mediationHandler.pickupUnreadMessages(10); - await this.processMessages(unreadMessages) + await this.processMessages(unreadMessages); }, timeInterval); } else { //Connecting to websockets, do not repeat the task @@ -304,13 +293,13 @@ export class ConnectionsManager implements ConnectionsManagerClass { message: message, attachmentId: attachment.id } - ] + ]; }, []); - await this.processMessages(unreadMessages) + await this.processMessages(unreadMessages); } - ) - }) + ); + }); } this.cancellable.then().catch((err) => {