Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
fix: make tests more reliable (#103)
Browse files Browse the repository at this point in the history
await finishing processing rpc messages instead of using setTimeout.

Also return promises and await during processing message stream so we don't get unhandled promise rejections (potentially).
  • Loading branch information
achingbrain authored Jul 8, 2021
1 parent d8d46f0 commit cd4c409
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 29 deletions.
28 changes: 8 additions & 20 deletions packages/compliance-tests/src/pubsub/messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,11 @@ module.exports = (common) => {
}

pubsub.subscribe(topic)
pubsub._processRpc(peerStream.id.toB58String(), peerStream, rpc)
await pubsub._processRpc(peerStream.id.toB58String(), peerStream, rpc)

return new Promise((resolve) => {
setTimeout(() => {
expect(pubsub.validate.callCount).to.eql(1)
expect(pubsub._emitMessage.called).to.eql(false)
expect(pubsub._publish.called).to.eql(false)

resolve()
}, 50)
})
expect(pubsub.validate.callCount).to.eql(1)
expect(pubsub._emitMessage.called).to.eql(false)
expect(pubsub._publish.called).to.eql(false)
})

it('should not drop unsigned messages if strict signing is disabled', async () => {
Expand All @@ -99,17 +93,11 @@ module.exports = (common) => {
}

pubsub.subscribe(topic)
pubsub._processRpc(peerStream.id.toB58String(), peerStream, rpc)
await pubsub._processRpc(peerStream.id.toB58String(), peerStream, rpc)

return new Promise((resolve) => {
setTimeout(() => {
expect(pubsub.validate.callCount).to.eql(1)
expect(pubsub._emitMessage.called).to.eql(true)
expect(pubsub._publish.called).to.eql(true)

resolve()
}, 50)
})
expect(pubsub.validate.callCount).to.eql(1)
expect(pubsub._emitMessage.called).to.eql(true)
expect(pubsub._publish.called).to.eql(true)
})
})
}
1 change: 0 additions & 1 deletion packages/interfaces/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
"@types/bl": "^5.0.1",
"@types/debug": "^4.1.5",
"aegir": "^33.0.0",
"cids": "^1.1.6",
"events": "^3.3.0",
"it-pair": "^1.0.0",
"p-wait-for": "^3.2.0",
Expand Down
16 changes: 8 additions & 8 deletions packages/interfaces/src/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ class PubsubBaseProtocol extends EventEmitter {
const rpcBytes = data instanceof Uint8Array ? data : data.slice()
const rpcMsg = this._decodeRpc(rpcBytes)

this._processRpc(idB58Str, peerStreams, rpcMsg)
await this._processRpc(idB58Str, peerStreams, rpcMsg)
}
}
)
Expand All @@ -371,9 +371,9 @@ class PubsubBaseProtocol extends EventEmitter {
* @param {string} idB58Str
* @param {PeerStreams} peerStreams
* @param {RPC} rpc
* @returns {boolean}
* @returns {Promise<boolean>}
*/
_processRpc (idB58Str, peerStreams, rpc) {
async _processRpc (idB58Str, peerStreams, rpc) {
this.log('rpc from', idB58Str)
const subs = rpc.subscriptions
const msgs = rpc.msgs
Expand All @@ -393,14 +393,14 @@ class PubsubBaseProtocol extends EventEmitter {

if (msgs.length) {
// @ts-ignore RPC message is modified
msgs.forEach((message) => {
for (const message of msgs) {
if (!(this.canRelayMessage || (message.topicIDs && message.topicIDs.some((topic) => this.subscriptions.has(topic))))) {
this.log('received message we didn\'t subscribe to. Dropping.')
return
continue
}
const msg = utils.normalizeInRpcMessage(message, idB58Str)
this._processRpcMessage(msg)
})
await this._processRpcMessage(msg)
}
}
return true
}
Expand Down Expand Up @@ -455,7 +455,7 @@ class PubsubBaseProtocol extends EventEmitter {
// Emit to self
this._emitMessage(msg)

this._publish(utils.normalizeOutRpcMessage(msg))
return this._publish(utils.normalizeOutRpcMessage(msg))
}

/**
Expand Down

0 comments on commit cd4c409

Please sign in to comment.