Skip to content

Commit

Permalink
feat: add inbound message queue
Browse files Browse the repository at this point in the history
Signed-off-by: Timo Glastra <[email protected]>
  • Loading branch information
TimoGlastra committed Jun 30, 2021
1 parent 4e9a48b commit f4b5035
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 2 deletions.
13 changes: 11 additions & 2 deletions src/agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { EventEmitter } from './EventEmitter'
import { AgentEventTypes } from './Events'
import { MessageReceiver } from './MessageReceiver'
import { MessageSender } from './MessageSender'
import { Queue } from './Queue'

export class Agent {
protected agentConfig: AgentConfig
Expand All @@ -37,6 +38,7 @@ export class Agent {
protected messageSender: MessageSender
public inboundTransporter?: InboundTransporter
private _isInitialized = false
private inboundMessageQueue: Queue<unknown> = new Queue()

public readonly connections!: ConnectionsModule
public readonly proofs!: ProofsModule
Expand Down Expand Up @@ -97,10 +99,17 @@ export class Agent {
this.listenForMessages()
}

private listenForMessages() {
private async listenForMessages() {
// Listen for incoming agent messages from the event listener
this.eventEmitter.on<AgentMessageReceivedEvent>(AgentEventTypes.AgentMessageReceived, async (event) => {
await this.receiveMessage(event.payload.message)
this.inboundMessageQueue.enqueue(event.payload.message)
})

// Continuously listen for new messages in the queue
while (await this.inboundMessageQueue.waitForMessage()) {
const message = this.inboundMessageQueue.dequeue()
await this.messageReceiver.receiveMessage(message)
}
}

public setInboundTransporter(inboundTransporter: InboundTransporter) {
Expand Down
35 changes: 35 additions & 0 deletions src/agent/Queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
export class Queue<T> {
private elements: T[] = []

public get length() {
return this.elements.length
}

public enqueue(element: T) {
this.elements.push(element)
}

public dequeue() {
return this.elements.shift()
}

public isEmpty() {
return this.elements.length == 0
}

public peek() {
return !this.isEmpty() ? this.elements[0] : undefined
}

public async waitForMessage(): Promise<boolean> {
return new Promise((resolve) => {
if (this.peek()) resolve(true)

setInterval(() => {
if (this.peek()) {
resolve(true)
}
}, 0)
})
}
}

0 comments on commit f4b5035

Please sign in to comment.