Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnection problems #586

Closed
uhfath opened this issue Jul 29, 2024 · 6 comments
Closed

Reconnection problems #586

uhfath opened this issue Jul 29, 2024 · 6 comments

Comments

@uhfath
Copy link
Contributor

uhfath commented Jul 29, 2024

Observed behavior

While trying to fetch some data using ephemeral consumer if there is a timeout the client tries to reconnect endlessly and doesn't succed.
Here is a sample init code:

async Task Main()
{
	using var loggerFactory = Microsoft.Extensions.Logging.LoggerFactory.Create(builder => builder
		.AddFilter((Microsoft.Extensions.Logging.LogLevel level) => true)
		.AddSimpleConsole(options =>
		{
			options.SingleLine = true;
			options.TimestampFormat = "[HH:mm:ss] ";
		})
	);
	
	var logger = loggerFactory.CreateLogger("Main");
	
	var opts = new NatsOpts
	{
		Url = "localhost",
		SerializerRegistry = NatsJsonSerializerRegistry.Default,
		LoggerFactory = loggerFactory,
	};

	await using var connection = new NatsConnection(opts);
	
	var natsJSContext = new NatsJSContext(connection);
	var consumer = await natsJSContext.CreateConsumerAsync(
		"QUEUE",
		new ConsumerConfig
		{
			FilterSubjects = [ "QUEUE.agent.responses.software", "QUEUE.agent.responses.linux_software" ],
			AckPolicy = ConsumerConfigAckPolicy.None,
			DeliverPolicy = ConsumerConfigDeliverPolicy.ByStartSequence,
			OptStartSeq = 1,
		});

	logger.LogInformation("---=== STARTING ===---");

	var counter = 0;
	while (true)
	{
		await foreach (var message in consumer.FetchNoWaitAsync<JsonElement>(new NatsJSFetchOpts
		{
			MaxMsgs = 10,
		}))
		{
			++counter;
			logger.LogInformation("Current message: {Message}", counter);
		}
	}

	logger.LogInformation("---=== FINISHED ===---");
}

And here is a log output when everything is fine:

[10:54:03] info: NATS.Client.Core.NatsConnection[1001] Try to connect NATS nats://localhost:4222
[10:54:03] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 411 Elapsed: 0.5043ms
[10:54:03] info: NATS.Client.Core.Internal.NatsReadProtocolProcessor[1005] Received server info: ServerInfo { Id = NCPT77WCFZ2YMMOY2LFMSAJZ444WHB3LFF5IAPZTQVOYZKWJ33RVRSGO, Name = NCPT77WCFZ2YMMOY2LFMSAJZ444WHB3LFF5IAPZTQVOYZKWJ33RVRSGO, Version = 2.10.17, ProtocolVersion = 1, GitCommit = b91de03, GoVersion = go1.22.4, Host = 0.0.0.0, Port = 4222, HeadersSupported = True, AuthRequired = False, TlsRequired = False, TlsVerify = False, TlsAvailable = False, MaxPayload = 1048576, JetStreamAvailable = True, ClientId = 83, ClientIp = 172.20.0.1, Nonce = , Cluster = , ClusterDynamic = False, ClientConnectUrls = , WebSocketConnectUrls = , LameDuckMode = False }
[10:54:03] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 6 Elapsed: 11.7188ms
[10:54:03] info: NATS.Client.Core.NatsConnection[1001] Connect succeed NATS .NET Client, NATS nats://localhost:4222
[10:54:03] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 6 Elapsed: 13.4998ms
[10:54:03] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 708 Elapsed: 58.9068ms
[10:54:03] info: Main[0] ---=== STARTING ===---
[10:54:03] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2001] Fetch setup maxMsgs:10 maxBytes:0 expires:00:00:30 idle:00:00:15 hbTimeout:30000
[10:54:03] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 4088 Elapsed: 61.4958ms
[10:54:03] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 60317 Elapsed: 0.0304ms
[10:54:03] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 27225 Elapsed: 0.0631ms
[10:54:03] info: Main[0] Current message: 1
[10:54:03] info: Main[0] Current message: 2
[10:54:03] info: Main[0] Current message: 3
[10:54:03] info: Main[0] Current message: 4
[10:54:03] info: Main[0] Current message: 5
[10:54:03] info: Main[0] Current message: 6
[10:54:03] info: Main[0] Current message: 7
[10:54:03] info: Main[0] Current message: 8
[10:54:03] info: Main[0] Current message: 9
[10:54:03] info: Main[0] Current message: 10
[10:54:03] dbug: NATS.Client.Core.NatsSubBase[1002] End subscription MaxMsgs
[10:54:03] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2001] Fetch setup maxMsgs:10 maxBytes:0 expires:00:00:30 idle:00:00:15 hbTimeout:30000
[10:54:03] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 4088 Elapsed: 43.9324ms
[10:54:03] info: Main[0] Current message: 11
[10:54:03] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 34223 Elapsed: 0.0777ms
[10:54:03] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 65536 Elapsed: 0.1312ms
[10:54:03] info: Main[0] Current message: 12
[10:54:03] info: Main[0] Current message: 13
[10:54:03] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 65536 Elapsed: 0.0621ms
[10:54:03] info: Main[0] Current message: 14
[10:54:03] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 65536 Elapsed: 0.0144ms
[10:54:03] info: Main[0] Current message: 15
[10:54:03] info: Main[0] Current message: 16
[10:54:03] info: Main[0] Current message: 17
[10:54:03] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 65536 Elapsed: 0.0157ms
[10:54:03] info: Main[0] Current message: 18
[10:54:03] info: Main[0] Current message: 19
[10:54:03] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 7364 Elapsed: 0.0311ms
[10:54:03] dbug: NATS.Client.Core.NatsSubBase[1002] End subscription MaxMsgs
[10:54:03] info: Main[0] Current message: 20

And here is how a log looks like if there is a delay after "---=== STARTING ===---" before the fetch:

[10:55:05] info: NATS.Client.Core.NatsConnection[1001] Try to connect NATS nats://localhost:4222
[10:55:05] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 411 Elapsed: 0.2997ms
[10:55:05] info: NATS.Client.Core.Internal.NatsReadProtocolProcessor[1005] Received server info: ServerInfo { Id = NCPT77WCFZ2YMMOY2LFMSAJZ444WHB3LFF5IAPZTQVOYZKWJ33RVRSGO, Name = NCPT77WCFZ2YMMOY2LFMSAJZ444WHB3LFF5IAPZTQVOYZKWJ33RVRSGO, Version = 2.10.17, ProtocolVersion = 1, GitCommit = b91de03, GoVersion = go1.22.4, Host = 0.0.0.0, Port = 4222, HeadersSupported = True, AuthRequired = False, TlsRequired = False, TlsVerify = False, TlsAvailable = False, MaxPayload = 1048576, JetStreamAvailable = True, ClientId = 86, ClientIp = 172.20.0.1, Nonce = , Cluster = , ClusterDynamic = False, ClientConnectUrls = , WebSocketConnectUrls = , LameDuckMode = False }
[10:55:05] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 6 Elapsed: 15.9152ms
[10:55:05] info: NATS.Client.Core.NatsConnection[1001] Connect succeed NATS .NET Client, NATS nats://localhost:4222
[10:55:05] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 6 Elapsed: 11.9685ms
[10:55:05] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 708 Elapsed: 56.1542ms
[10:57:36] info: Main[0] ---=== STARTING ===---
[10:57:36] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 6 Elapsed: 150640.5573ms
[10:57:36] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 6 Elapsed: 0.0263ms
[10:57:36] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 6 Elapsed: 0.0128ms
[10:57:36] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2001] Fetch setup maxMsgs:10 maxBytes:0 expires:00:00:30 idle:00:00:15 hbTimeout:30000
[10:58:06] dbug: NATS.Client.Core.NatsSubBase[1002] End subscription IdleHeartbeatTimeout
[10:58:06] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2003] Idle heartbeat timed-out after 00:00:15ns
[10:58:06] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2001] Fetch setup maxMsgs:10 maxBytes:0 expires:00:00:30 idle:00:00:15 hbTimeout:30000
[10:58:36] dbug: NATS.Client.Core.NatsSubBase[1002] End subscription IdleHeartbeatTimeout
[10:58:36] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2003] Idle heartbeat timed-out after 00:00:15ns
[10:58:36] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2001] Fetch setup maxMsgs:10 maxBytes:0 expires:00:00:30 idle:00:00:15 hbTimeout:30000
[10:59:05] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 6 Elapsed: 89273.6352ms
[10:59:06] dbug: NATS.Client.Core.NatsSubBase[1002] End subscription IdleHeartbeatTimeout
[10:59:06] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2003] Idle heartbeat timed-out after 00:00:15ns
[10:59:06] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2001] Fetch setup maxMsgs:10 maxBytes:0 expires:00:00:30 idle:00:00:15 hbTimeout:30000
[10:59:36] dbug: NATS.Client.Core.NatsSubBase[1002] End subscription IdleHeartbeatTimeout
[10:59:36] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2003] Idle heartbeat timed-out after 00:00:15ns
[10:59:36] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2001] Fetch setup maxMsgs:10 maxBytes:0 expires:00:00:30 idle:00:00:15 hbTimeout:30000
[11:00:06] dbug: NATS.Client.Core.NatsSubBase[1002] End subscription IdleHeartbeatTimeout
[11:00:06] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2003] Idle heartbeat timed-out after 00:00:15ns
[11:00:06] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2001] Fetch setup maxMsgs:10 maxBytes:0 expires:00:00:30 idle:00:00:15 hbTimeout:30000
[11:00:36] dbug: NATS.Client.Core.NatsSubBase[1002] End subscription IdleHeartbeatTimeout
[11:00:36] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2003] Idle heartbeat timed-out after 00:00:15ns
[11:00:36] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2001] Fetch setup maxMsgs:10 maxBytes:0 expires:00:00:30 idle:00:00:15 hbTimeout:30000
[11:01:05] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 6 Elapsed: 119995.1108ms
[11:01:06] dbug: NATS.Client.Core.NatsSubBase[1002] End subscription IdleHeartbeatTimeout
[11:01:06] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2003] Idle heartbeat timed-out after 00:00:15ns
[11:01:06] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2001] Fetch setup maxMsgs:10 maxBytes:0 expires:00:00:30 idle:00:00:15 hbTimeout:30000
[11:01:36] dbug: NATS.Client.Core.NatsSubBase[1002] End subscription IdleHeartbeatTimeout
[11:01:36] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2003] Idle heartbeat timed-out after 00:00:15ns
[11:01:36] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2001] Fetch setup maxMsgs:10 maxBytes:0 expires:00:00:30 idle:00:00:15 hbTimeout:30000
[11:02:06] dbug: NATS.Client.Core.NatsSubBase[1002] End subscription IdleHeartbeatTimeout
[11:02:06] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2003] Idle heartbeat timed-out after 00:00:15ns
[11:02:06] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2001] Fetch setup maxMsgs:10 maxBytes:0 expires:00:00:30 idle:00:00:15 hbTimeout:30000

And if there is a delay right after the first fetch and output to log:

[11:02:22] info: NATS.Client.Core.NatsConnection[1001] Try to connect NATS nats://localhost:4222
[11:02:22] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 411 Elapsed: 0.3608ms
[11:02:22] info: NATS.Client.Core.Internal.NatsReadProtocolProcessor[1005] Received server info: ServerInfo { Id = NCPT77WCFZ2YMMOY2LFMSAJZ444WHB3LFF5IAPZTQVOYZKWJ33RVRSGO, Name = NCPT77WCFZ2YMMOY2LFMSAJZ444WHB3LFF5IAPZTQVOYZKWJ33RVRSGO, Version = 2.10.17, ProtocolVersion = 1, GitCommit = b91de03, GoVersion = go1.22.4, Host = 0.0.0.0, Port = 4222, HeadersSupported = True, AuthRequired = False, TlsRequired = False, TlsVerify = False, TlsAvailable = False, MaxPayload = 1048576, JetStreamAvailable = True, ClientId = 89, ClientIp = 172.20.0.1, Nonce = , Cluster = , ClusterDynamic = False, ClientConnectUrls = , WebSocketConnectUrls = , LameDuckMode = False }
[11:02:22] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 6 Elapsed: 14.4337ms
[11:02:22] info: NATS.Client.Core.NatsConnection[1001] Connect succeed NATS .NET Client, NATS nats://localhost:4222
[11:02:22] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 6 Elapsed: 13.2562ms
[11:02:22] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 708 Elapsed: 62.5155ms
[11:02:22] info: Main[0] ---=== STARTING ===---
[11:02:22] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2001] Fetch setup maxMsgs:10 maxBytes:0 expires:00:00:30 idle:00:00:15 hbTimeout:30000
[11:02:22] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 4088 Elapsed: 62.6256ms
[11:02:22] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 60317 Elapsed: 0.0791ms
[11:02:22] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 27225 Elapsed: 0.0381ms
[11:03:13] info: Main[0] Current message: 1
[11:03:13] dbug: NATS.Client.Core.NatsSubBase[1002] End subscription IdleHeartbeatTimeout
[11:03:13] dbug: NATS.Client.Core.NatsSubBase[1002] End subscription Timeout
[11:03:13] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2004] JetStream pull request expired 00:00:30ns
[11:03:13] warn: NATS.Client.Core.Internal.SubscriptionManager[1002] Can't find subscription for QUEUE.agent.responses.software/2
[11:03:13] warn: NATS.Client.Core.Internal.SubscriptionManager[1002] Can't find subscription for QUEUE.agent.responses.software/2
[11:03:13] warn: NATS.Client.Core.Internal.SubscriptionManager[1002] Can't find subscription for QUEUE.agent.responses.software/2
[11:03:13] warn: NATS.Client.Core.Internal.SubscriptionManager[1002] Can't find subscription for QUEUE.agent.responses.software/2
[11:03:13] warn: NATS.Client.Core.Internal.SubscriptionManager[1002] Can't find subscription for QUEUE.agent.responses.software/2
[11:03:13] warn: NATS.Client.Core.Internal.SubscriptionManager[1002] Can't find subscription for QUEUE.agent.responses.software/2
[11:03:13] warn: NATS.Client.Core.Internal.SubscriptionManager[1002] Can't find subscription for QUEUE.agent.responses.software/2
[11:03:13] warn: NATS.Client.Core.Internal.SubscriptionManager[1002] Can't find subscription for QUEUE.agent.responses.software/2
[11:03:13] trce: NATS.Client.Core.Internal.SocketReader[1006] Socket.ReceiveAsync Size: 6 Elapsed: 0.0308ms
[11:03:13] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2003] Idle heartbeat timed-out after 00:00:15ns
[11:03:13] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2001] Fetch setup maxMsgs:10 maxBytes:0 expires:00:00:30 idle:00:00:15 hbTimeout:30000
[11:03:43] dbug: NATS.Client.Core.NatsSubBase[1002] End subscription IdleHeartbeatTimeout
[11:03:43] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2003] Idle heartbeat timed-out after 00:00:15ns
[11:03:43] dbug: NATS.Client.JetStream.Internal.NatsJSFetch[2001] Fetch setup maxMsgs:10 maxBytes:0 expires:00:00:30 idle:00:00:15 hbTimeout:30000

Expected behavior

The client should reconnect sucesfully if there is a timeout during any phase.

Server and client version

Server used:
docker image with tag 2.10

Client used:
NATS.net version 2.3.2

Host environment

Docker used under Windows 11 using WSL2.
Client used under Windows 11.

Steps to reproduce

No response

@mtmk
Copy link
Collaborator

mtmk commented Jul 29, 2024

thanks @uhfath how do you introduce the delay?

@uhfath
Copy link
Contributor Author

uhfath commented Jul 29, 2024

@mtmk I've stumbled upon this during debugging and so even a simple Thread.Sleep(TimeSpan.FromSeconds(10)) would be suffice.
This should emulate something like a network outage.

@mtmk
Copy link
Collaborator

mtmk commented Jul 29, 2024

OK got it thank you. You're hitting the consumer 'Inactive Threshold' (5 seconds) then consumer is deleted by the server. you should use ordered consumer instead.

@uhfath
Copy link
Contributor Author

uhfath commented Jul 29, 2024

@mtmk thanks.
Could you please correct me if I'm wrong, but aren't ordered consumers are 'push' consumers and so considered almost 'legacy' since docs and examples mention that 'pull' consumers are recomended?
And how do we detect this event so we could re-create the consumer during application running?

@mtmk
Copy link
Collaborator

mtmk commented Jul 29, 2024

that's true push consumer usage is discouraged at this time however this client implements ordered consumers using pull approach.

unfortunately there is no good way of detecting a missing consumer in this case since server simply doesn't respond to the request if there is no consumer. you can try to recreate the consumer if you don't receive anything after a timeout, you can extend inactivity and make sure to delete the consumer yourself etc.

we do try to take care of all these issues in our ordered consumer implementations. would be more than happy to see your feedback on it.

@uhfath
Copy link
Contributor Author

uhfath commented Jul 30, 2024

@mtmk thank you for the help!
Indeed using OrderedConsumers helped.
Also, thanks for the tip regarding 'Inactive Threshold`. Things are much clear now.

@uhfath uhfath closed this as completed Jul 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants