Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription Handling #225

Open
JaggerJo opened this issue Jan 22, 2025 · 0 comments
Open

Subscription Handling #225

JaggerJo opened this issue Jan 22, 2025 · 0 comments
Labels
bug Something isn't working

Comments

@JaggerJo
Copy link
Contributor

🐛 Bug Report

The client currently exposes a list of Subscriptions.

When subscribing to a new topic, a subscription is added to the list. When unsubscribing the topic is removed.

There are certain issues with the current implementation, and I would argue that subscriptions should not be held in the client at all.

IMHO it would be best to have a low level client that does not try to do anything smart, and then add a ManagedClient that can focus on providing nice user features built on the solid low level client. It would also reduce the API surface of the low level client and simplify thread safety.

@pglombardo what do you think? I'm happy to chat about this is private, feel free to send me an e-mail.

Issues:

        var client =
            new HiveMQClient(
                new HiveMQClientOptionsBuilder()
                    .WithClientId("ConcurrentSubscribe")
                    .Build()
            );

        var inbox = new ConcurrentBag<string>();

        client.OnMessageReceived += (_, e) =>
        {
            inbox.Add(e.PublishMessage.Topic!);
        };

        var connectResult = await client.ConnectAsync();

        Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success);
        Assert.True(client.IsConnected());

        var subscribeResult = await client.SubscribeAsync("/test/#", QualityOfService.ExactlyOnceDelivery, false);

        Assert.True(subscribeResult.Subscriptions.Count == 1);
        Assert.True(subscribeResult.Subscriptions[0].SubscribeReasonCode == SubAckReasonCode.GrantedQoS2);

        Assert.Equal(1, client.Subscriptions.Count(s => s.TopicFilter.Topic == "/test/#"));

        await client.PublishAsync("/test/1", "whatever");

        while (!inbox.Contains("/test/1"))
        {
            await Task.Delay(50);
        }

        await client.DisconnectAsync();

        await Task.Delay(1000);

        var reconnectResult = await client.ConnectAsync();

        Assert.True(reconnectResult.ReasonCode == ConnAckReasonCode.Success);
        Assert.True(client.IsConnected());

        await client.PublishAsync("/test/2", "whatever");

        // client still contains subscription.
        Assert.True(client.Subscriptions.Any(s => s.TopicFilter.Topic == "/test/#"));

        // we never receive the message, because the subscription is not re-established
        while (!inbox.Contains("/test/2"))
        {
            await Task.Delay(50);
        }
  • Subscribing multiple times results in multiple subscriptions (for the same topic) in the subscription list. This also included the handler.
        var subscribeResultA = await client.SubscribeAsync("/test/#", QualityOfService.ExactlyOnceDelivery, false);

        Assert.True(subscribeResultA.Subscriptions.Count == 1);
        Assert.True(subscribeResultA.Subscriptions[0].SubscribeReasonCode == SubAckReasonCode.GrantedQoS2);

        var subscribeResultB = await client.SubscribeAsync("/test/#", QualityOfService.ExactlyOnceDelivery, false);

        Assert.True(subscribeResultB.Subscriptions.Count == 1);
        Assert.True(subscribeResultB.Subscriptions[0].SubscribeReasonCode == SubAckReasonCode.GrantedQoS2);

        Assert.Equal(2, client.Subscriptions.Count(s => s.TopicFilter.Topic == "/test/#"));
  • Subscription level message handlers burn up CPU time, even when not used. All Subscriptions are iterated and checked for a topic match using MatchTopic, which is quite expensive.

foreach (var subscription in this.Subscriptions)
{
if (packet.Message.Topic != null && MatchTopic(subscription.TopicFilter.Topic, packet.Message.Topic))
{
if (subscription.MessageReceivedHandler != null && subscription.MessageReceivedHandler.GetInvocationList().Length > 0)
{
// We have a per-subscription message handler.
_ = Task.Run(() => subscription.MessageReceivedHandler?.Invoke(this, eventArgs)).ContinueWith(
t =>
{
if (t.IsFaulted)
{
Logger.Error($"per-subscription MessageReceivedEventLauncher faulted ({packet.Message.Topic}): " + t.Exception?.Message);
}
},
TaskScheduler.Default);
messageHandled = true;
}
}
}

🔬 How To Reproduce

Archive.zip

Environment

Current master or latest Nuget

@JaggerJo JaggerJo added the bug Something isn't working label Jan 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant