From 0e7096d527d9f4d39c7bedbcdaf1a504226c620d Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Tue, 30 Aug 2022 10:33:39 +0200 Subject: [PATCH] docs: update message filtering example (#1362) Updates the example to use the new pubsub `addEventListener`-style API along with the README. Also updates the test to actually test that the relevant messages were received. Fixes https://github.com/libp2p/js-libp2p/issues/1288 --- examples/pubsub/message-filtering/1.js | 53 ++++++++++++++------- examples/pubsub/message-filtering/README.md | 37 +++++++------- examples/pubsub/message-filtering/test.js | 53 +++++++-------------- 3 files changed, 74 insertions(+), 69 deletions(-) diff --git a/examples/pubsub/message-filtering/1.js b/examples/pubsub/message-filtering/1.js index f83ecc98a5..86e43ec16a 100644 --- a/examples/pubsub/message-filtering/1.js +++ b/examples/pubsub/message-filtering/1.js @@ -40,24 +40,42 @@ const createNode = async () => { await node2.dial(node3.peerId) //subscribe - node1.pubsub.addEventListener(topic, (evt) => { + node1.pubsub.addEventListener('message', (evt) => { + if (evt.detail.topic !== topic) { + return + } + // Will not receive own published messages by default console.log(`node1 received: ${uint8ArrayToString(evt.detail.data)}`) }) node1.pubsub.subscribe(topic) - node2.pubsub.addEventListener(topic, (evt) => { + node2.pubsub.addEventListener('message', (evt) => { + if (evt.detail.topic !== topic) { + return + } + console.log(`node2 received: ${uint8ArrayToString(evt.detail.data)}`) }) + node2.pubsub.subscribe(topic) + + node3.pubsub.addEventListener('message', (evt) => { + if (evt.detail.topic !== topic) { + return + } - node3.pubsub.addEventListener(topic, (evt) => { console.log(`node3 received: ${uint8ArrayToString(evt.detail.data)}`) }) + node3.pubsub.subscribe(topic) + + // wait for subscriptions to propagate + await delay(1000) const validateFruit = (msgTopic, msg) => { const fruit = uint8ArrayToString(msg.data) const validFruit = ['banana', 'apple', 'orange'] + // car is not a fruit ! if (!validFruit.includes(fruit)) { throw new Error('no valid fruit received') } @@ -68,18 +86,19 @@ const createNode = async () => { node2.pubsub.topicValidators.set(topic, validateFruit) node3.pubsub.topicValidators.set(topic, validateFruit) - // node1 publishes "fruits" every five seconds - var count = 0; - const myFruits = ['banana', 'apple', 'car', 'orange']; - // car is not a fruit ! - setInterval(() => { - console.log('############## fruit ' + myFruits[count] + ' ##############') - node1.pubsub.publish(topic, uint8ArrayFromString(myFruits[count])).catch(err => { - console.info(err) - }) - count++ - if (count == myFruits.length) { - count = 0 - } - }, 5000) + // node1 publishes "fruits" + for (const fruit of ['banana', 'apple', 'car', 'orange']) { + console.log('############## fruit ' + fruit + ' ##############') + await node1.pubsub.publish(topic, uint8ArrayFromString(fruit)) + } + + // wait a few seconds for messages to be received + await delay(5000) + console.log('############## all messages sent ##############') })() + +async function delay (ms) { + await new Promise((resolve) => { + setTimeout(() => resolve(), ms) + }) +} \ No newline at end of file diff --git a/examples/pubsub/message-filtering/README.md b/examples/pubsub/message-filtering/README.md index 99cec8aa7d..11aad74207 100644 --- a/examples/pubsub/message-filtering/README.md +++ b/examples/pubsub/message-filtering/README.md @@ -48,18 +48,30 @@ Now we' can subscribe to the fruit topic and log incoming messages. ```JavaScript const topic = 'fruit' -node1.pubsub.on(topic, (msg) => { +node1.pubsub.addEventListener('message', (msg) => { + if (msg.detail.topic !== topic) { + return + } + console.log(`node1 received: ${uint8ArrayToString(msg.data)}`) }) await node1.pubsub.subscribe(topic) -node2.pubsub.on(topic, (msg) => { +node2.pubsub.addEventListener('message', (msg) => { + if (msg.detail.topic !== topic) { + return + } + console.log(`node2 received: ${uint8ArrayToString(msg.data)}`) }) await node2.pubsub.subscribe(topic) -node3.pubsub.on(topic, (msg) => { - console.log(`node3 received: ${uint8ArrayToString(msg.data)}`) +node3.pubsub.addEventListener('message', (msg) => { + if (msg.detail.topic !== topic) { + return + } + +console.log(`node3 received: ${uint8ArrayToString(msg.data)}`) }) await node3.pubsub.subscribe(topic) ``` @@ -83,19 +95,10 @@ node3.pubsub.topicValidators.set(topic, validateFruit) In this example, node one has an outdated version of the system, or is a malicious node. When it tries to publish fruit, the messages are re-shared and all the nodes share the message. However, when it tries to publish a vehicle the message is not re-shared. ```JavaScript -var count = 0; -const myFruits = ['banana', 'apple', 'car', 'orange']; - -setInterval(() => { - console.log('############## fruit ' + myFruits[count] + ' ##############') - node1.pubsub.publish(topic, new TextEncoder().encode(myFruits[count])).catch(err => { - console.error(err) - }) - count++ - if (count == myFruits.length) { - count = 0 - } -}, 5000) +for (const fruit of ['banana', 'apple', 'car', 'orange']) { + console.log('############## fruit ' + fruit + ' ##############') + await node1.pubsub.publish(topic, uint8ArrayFromString(fruit)) +} ``` Result diff --git a/examples/pubsub/message-filtering/test.js b/examples/pubsub/message-filtering/test.js index 97736f4985..e7d202df2a 100644 --- a/examples/pubsub/message-filtering/test.js +++ b/examples/pubsub/message-filtering/test.js @@ -6,29 +6,11 @@ import { fileURLToPath } from 'url' const __dirname = path.dirname(fileURLToPath(import.meta.url)) -const stdout = [ - { - topic: 'banana', - messageCount: 2 - }, - { - topic: 'apple', - messageCount: 2 - }, - { - topic: 'car', - messageCount: 0 - }, - { - topic: 'orange', - messageCount: 2 - }, -] +// holds messages received by peers +const messages = {} export async function test () { const defer = pDefer() - let topicCount = 0 - let topicMessageCount = 0 process.stdout.write('message-filtering/1.js\n') @@ -38,26 +20,27 @@ export async function test () { }) proc.all.on('data', async (data) => { - // End - if (topicCount === stdout.length) { - defer.resolve() - proc.all.removeAllListeners('data') - } - process.stdout.write(data) const line = uint8ArrayToString(data) - if (stdout[topicCount] && line.includes(stdout[topicCount].topic)) { - // Validate previous number of messages - if (topicCount > 0 && topicMessageCount > stdout[topicCount - 1].messageCount) { - defer.reject() - throw new Error(`topic ${stdout[topicCount - 1].topic} had ${topicMessageCount} messages instead of ${stdout[topicCount - 1].messageCount}`) + // End + if (line.includes('all messages sent')) { + if (messages.car > 0) { + defer.reject(new Error('Message validation failed - peers failed to filter car messages')) + } + + for (const fruit of ['banana', 'apple', 'orange']) { + if (messages[fruit] !== 2) { + defer.reject(new Error(`Not enough ${fruit} messages - received ${messages[fruit] ?? 0}, expected 2`)) + } } - topicCount++ - topicMessageCount = 0 - } else { - topicMessageCount++ + defer.resolve() + } + + if (line.includes('received:')) { + const fruit = line.split('received:')[1].trim() + messages[fruit] = (messages[fruit] ?? 0) + 1 } })