Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lost messages when using the bus protocol. #1571

Closed
sailxjx opened this issue Jan 29, 2022 · 3 comments
Closed

Lost messages when using the bus protocol. #1571

sailxjx opened this issue Jan 29, 2022 · 3 comments
Labels

Comments

@sailxjx
Copy link

sailxjx commented Jan 29, 2022

I'm trying to speed test nng with zmq and redis, but found that nng has a problem of message loss under bus protocol, even setting send_buffer_size and recv_buffer_size does not improve.

This problem does not exist in pubsub, and also not exist in zmq and redis (because they only have pubsub). I'm curious what is causing this and how to fix it.

Here is the log and code:

Log

Subs: 1 Msgs: 100
Count: 99 Time: 0.02 s
Subs: 2 Msgs: 100
Count: 98 Time: 0.01 s
Count: 95 Time: 0.01 s
Subs: 4 Msgs: 100
Count: 100 Time: 0.02 s
Count: 100 Time: 0.02 s
Count: 100 Time: 0.02 s
Count: 100 Time: 0.02 s
Subs: 6 Msgs: 100
Count: 100 Time: 0.02 s
Count: 100 Time: 0.02 s
Count: 100 Time: 0.02 s
Count: 100 Time: 0.02 s
Count: 100 Time: 0.02 s
Count: 0 Time: 0.00 s
Subs: 1 Msgs: 1000
Count: 551 Time: 0.04 s
Subs: 2 Msgs: 1000
Count: 682 Time: 0.05 s
Count: 687 Time: 0.05 s
Subs: 4 Msgs: 1000
Count: 915 Time: 0.09 s
Count: 911 Time: 0.09 s
Count: 900 Time: 0.09 s
Count: 0 Time: 0.00 s

Code:

from time import sleep
import pynng
import time
import redis
import zmq
from mpire.pool import WorkerPool
from pynng import Bus0, Pub0, Sub0

from random import random
from string import ascii_lowercase

bal = [c.encode('ascii') for c in ascii_lowercase]
msg_100000 = b''.join([bal[int(random() * 26)] for _ in range(int(1e5))])


def main_nng_pub_sub(i, n_msgs):
    def publish():
        with Pub0() as sock:
            sock.send_buffer_size = 1000
            sock.listen("tcp://127.0.0.1:12345")
            time.sleep(0.7)

            start_time = time.time()
            for i in range(n_msgs):
                sock.send(b"msg::" + msg_100000)
            for i in range(50):
                sock.send(b"end::")
                time.sleep(0.01)

    def subscribe():
        with Sub0() as sock:
            port = 12345 + i
            sock.recv_buffer_size = 1000
            # sock.listen("tcp://127.0.0.1:{}".format(port))
            sock.dial("tcp://127.0.0.1:12345")
            sock.subscribe("")

            start_time = None
            count = 0

            while True:
                msg = sock.recv(block=True)
                start_time = start_time or time.time(
                )  # Start when first message receive
                event, _ = msg.decode().split("::", maxsplit=1)
                if event == "end":
                    print("Count: {}".format(count),
                          "Time: {:.2f} s".format(time.time() - start_time))
                    break
                else:
                    count += 1

    if i == 0:
        publish()
    else:
        subscribe()


def main_nng(i, n_msgs):
    def publish():
        with Bus0() as sock:
            sock.send_buffer_size = 1000
            sock.listen("tcp://127.0.0.1:12345")
            time.sleep(0.7)

            start_time = time.time()
            for i in range(n_msgs):
                sock.send(b"msg::" + msg_100000)
            for i in range(500):
                sock.send(b"end::")
                time.sleep(0.01)

    def subscribe():
        with Bus0() as sock:
            port = 12345 + i
            sock.recv_buffer_size = 1000
            # sock.listen("tcp://127.0.0.1:{}".format(port))
            sock.dial("tcp://127.0.0.1:12345")

            start_time = None
            count = 0

            while True:
                msg = sock.recv(block=True)
                start_time = start_time or time.time(
                )  # Start when first message receive
                event, _ = msg.decode().split("::", maxsplit=1)
                if event == "end":
                    print("Count: {}".format(count),
                          "Time: {:.2f} s".format(time.time() - start_time))
                    break
                else:
                    count += 1

    if i == 0:
        publish()
    else:
        subscribe()


def main_redis(i, n_msgs):
    r = redis.Redis(host='127.0.0.1', port=6379, db=0)

    def publish():
        sleep(0.7)
        for i in range(n_msgs):
            r.publish("topic", b"msg::" + msg_100000)
        r.publish("topic", b"end::")
        time.sleep(1)

    def subscribe():
        p: redis.client.PubSub = r.pubsub()
        p.subscribe("topic")
        count = 0
        start_time = None

        while True:
            msg = p.get_message(ignore_subscribe_messages=True)
            if msg == None:
                sleep(0.001)
                continue
            start_time = start_time or time.time(
            )  # Start when first message receive
            event, _ = msg["data"].decode().split("::", maxsplit=1)
            if event == "end":
                print("Count: {}".format(count),
                      "Time: {:.2f} s".format(time.time() - start_time))
                break
            else:
                count += 1

    if i == 0:
        publish()
    else:
        subscribe()


def main_zmq(i, n_msgs):
    def publish():
        context = zmq.Context()
        socket = context.socket(zmq.PUB)
        socket.setsockopt(zmq.SNDHWM, 0)  # Maximize Queue Length
        socket.bind("tcp://*:%s" % 23456)

        sleep(0.7)

        start_time = time.time()
        for i in range(n_msgs):
            socket.send(b"msg::" + msg_100000)
        for i in range(50):
            socket.send(b"end::")
            time.sleep(0.01)

    def subscribe():
        context = zmq.Context()
        socket = context.socket(zmq.SUB)
        socket.setsockopt(zmq.SUBSCRIBE, b"")
        socket.connect("tcp://127.0.0.1:23456")

        count = 0
        start_time = None

        while True:
            msg = socket.recv()
            start_time = start_time or time.time(
            )  # Start when first message receive
            event, _ = msg.decode().split("::", maxsplit=1)
            if event == "end":
                print("Count: {}".format(count),
                      "Time: {:.2f} s".format(time.time() - start_time))
                break
            else:
                count += 1

    if i == 0:
        publish()
    else:
        subscribe()


if __name__ == "__main__":
    for n_msgs in [100, 1000, 10000]:
        for n_subs in [1, 2, 4, 6]:
            print("Subs: {}".format(n_subs), "Msgs: {}".format(n_msgs))
            n_jobs = n_subs + 1
            params = [[i, n_msgs] for i in range(n_jobs)]

            with WorkerPool(n_jobs=n_jobs, start_method="spawn",
                            daemon=False) as pool:
                pool.map(main_nng, params)
            time.sleep(1)
@gdamore
Copy link
Contributor

gdamore commented Feb 9, 2022

I recently did a lot of work on the bus protocol. What version of NNG are you using?

@sailxjx
Copy link
Author

sailxjx commented Feb 9, 2022

I used pynng 0.7.1 and after searching through the setup files, I think the version of nng code it used was this https://github.com/nanomsg/nng/tree/4f5e11c391c4a8f1b2731aee5ad47bc0c925042a

@gdamore
Copy link
Contributor

gdamore commented Apr 16, 2022

That's a quite old version of NNG now (1.5 years ago).

I recommend updating. The bus protocol should be very much better in recent builds (from this year).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants