Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplification Review, Tuning, Examples, Testing #790

Merged
merged 5 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 47 additions & 9 deletions src/NATS.Client/JetStream/ConsumerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
// limitations under the License.

using System;
using System.Threading.Tasks;
using NATS.Client.Internals;
using static NATS.Client.JetStream.BaseConsumeOptions;
using static NATS.Client.JetStream.ConsumeOptions;
using static NATS.Client.JetStream.JetStreamPullSubscription;

namespace NATS.Client.JetStream
{
Expand Down Expand Up @@ -50,19 +52,16 @@ public ConsumerInfo GetCachedConsumerInfo()
}

public Msg Next() {
return Next(DefaultExpiresInMillis);
return new NextSub(js, bindPso, DefaultExpiresInMillis).Next();
}

public Msg Next(int maxWaitMillis) {
if (maxWaitMillis < MinExpiresMills) {
public Msg Next(int maxWaitMillis)
{
if (maxWaitMillis < MinExpiresMills)
{
throw new ArgumentException($"Max wait must be at least {MinExpiresMills} milliseconds.");
}

long expires = maxWaitMillis - JetStreamPullSubscription.ExpireAdjustment;
JetStreamPullSubscription sub
= (JetStreamPullSubscription)new SubscriptionMaker(js, bindPso).MakeSubscription();
sub.pullImpl.Pull(false, null, PullRequestOptions.Builder(1).WithExpiresIn(expires).Build());
return sub.NextMessage(maxWaitMillis);
return new NextSub(js, bindPso, maxWaitMillis).Next();
}

public IFetchConsumer FetchMessages(int maxMessages) {
Expand Down Expand Up @@ -99,6 +98,45 @@ public IMessageConsumer consume(EventHandler<MsgHandlerEventArgs> handler, Consu
}
}

internal class NextSub
{
private int maxWaitMillis;
private JetStreamPullSubscription sub;

public NextSub(IJetStream js, PullSubscribeOptions pso, int maxWaitMillis)
{
sub = (JetStreamPullSubscription)new SubscriptionMaker(js, pso).MakeSubscription();
this.maxWaitMillis = maxWaitMillis;
sub.pullImpl.Pull(PullRequestOptions.Builder(1).WithExpiresIn(maxWaitMillis - ExpireAdjustment).Build(), false, null);
}

internal Msg Next()
{
try
{
return sub.NextMessage(maxWaitMillis);
}
catch (NATSTimeoutException)
{
return null;
}
finally
{
Task.Run(() =>
{
try
{
sub.Unsubscribe();
}
catch (Exception)
{
// intentionally ignored, nothing we can do anyway
}
});
}
}
}

internal class SubscriptionMaker
{
private readonly IJetStream js;
Expand Down
40 changes: 24 additions & 16 deletions src/NATS.Client/JetStream/FetchConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,38 @@ internal FetchConsumer(SubscriptionMaker subscriptionMaker, FetchConsumeOptions
.WithExpiresIn(opts.ExpiresIn)
.WithIdleHeartbeat(opts.IdleHeartbeat)
.Build();
((JetStreamPullSubscription)sub).pullImpl.Pull(false, null, pro);
((JetStreamPullSubscription)sub).pullImpl.Pull(pro, false, null);
}

public Msg NextMessage()
{
int timeLeftMillis;
if (sw == null) {
sw = Stopwatch.StartNew();
timeLeftMillis = maxWaitMillis;
}
else
try
{
timeLeftMillis = maxWaitMillis - (int)sw.ElapsedMilliseconds;
}
int timeLeftMillis;
if (sw == null)
{
sw = Stopwatch.StartNew();
timeLeftMillis = maxWaitMillis;
}
else
{
timeLeftMillis = maxWaitMillis - (int)sw.ElapsedMilliseconds;
}

// if the manager thinks it has received everything in the pull, it means
// that all the messages are already in the internal queue and there is
// no waiting necessary
if (timeLeftMillis < 1 | pmm.pendingMessages < 1 || (pmm.trackingBytes && pmm.pendingBytes < 1))
{
return ((JetStreamPullSubscription)sub).NextMessage(1); // 1 is the shortest time I can give
}

// if the manager thinks it has received everything in the pull, it means
// that all the messages are already in the internal queue and there is
// no waiting necessary
if (timeLeftMillis < 1 | pmm.pendingMessages < 1 || (pmm.trackingBytes && pmm.pendingBytes < 1))
return ((JetStreamPullSubscription)sub).NextMessage(timeLeftMillis);
}
catch (NATSTimeoutException)
{
return ((JetStreamPullSubscription)sub).NextMessage(1); // 1 is the shortest time I can give
return null;
}

return ((JetStreamPullSubscription)sub).NextMessage(timeLeftMillis);
}
}
}
7 changes: 7 additions & 0 deletions src/NATS.Client/JetStream/IFetchConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ namespace NATS.Client.JetStream
/// </summary>
public interface IFetchConsumer : IMessageConsumer
{
/// <summary>
/// Read the next message. Return null if the fetch has been fulfilled either
/// because max messages or bytes max bytes have been reached,
/// or because the fetch was not fulfilled in the timeout set byt the fetch options.
/// @return the next message for this subscriber or null if there is a timeout
/// </summary>
/// <returns>the next message or null if there is a timeout</returns>
Msg NextMessage();
}
}
3 changes: 1 addition & 2 deletions src/NATS.Client/JetStream/IMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// limitations under the License.

using System;
using System.Threading.Tasks;

namespace NATS.Client.JetStream
{
Expand All @@ -34,6 +33,6 @@ public interface IMessageConsumer : IDisposable
/// <param name="timeout">The time to wait for the stop to succeed, pass 0 to wait forever.
/// Stop involves moving messages to and from the server so a very short timeout is not recommended.</param>
/// <returns>A task so you could wait for the stop to know when there are no more messages.</returns>
Task Stop(int timeout);
void Stop(int timeout);
}
}
2 changes: 1 addition & 1 deletion src/NATS.Client/JetStream/IStreamContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public interface IStreamContext
/// </summary>
/// <param name="config">the consumer configuration to use.</param>
/// <returns>consumer information.</returns>
IConsumerContext AddConsumer(ConsumerConfiguration config);
IConsumerContext CreateOrUpdateConsumer(ConsumerConfiguration config);

/// <summary>
/// Management function to deletes a consumer.
Expand Down
12 changes: 6 additions & 6 deletions src/NATS.Client/JetStream/IterableConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ public Msg NextMessage(int timeoutMillis)
{
return ((JetStreamPullSubscription)sub).NextMessage(timeoutMillis);
}
catch (NATSBadSubscriptionException e)
catch (NATSTimeoutException)
{
return null;
}
catch (NATSBadSubscriptionException)
{
// this happens if the consumer is stopped, since it is
// drained/unsubscribed, so don't pass it on if it's expected
if (stopped)
{
throw new NATSTimeoutException();
}
throw e;
return null;
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/NATS.Client/JetStream/JetStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ Subscription CreateSubscription(string subject, string queueName,
}
}

Conn.FlushBuffer();
return sub;
}

Expand Down Expand Up @@ -494,8 +495,9 @@ public IJetStreamPullSubscription PullSubscribe(string subject, PullSubscribeOpt

public IJetStreamPullAsyncSubscription PullSubscribeAsync(string subject, EventHandler<MsgHandlerEventArgs> handler, PullSubscribeOptions options)
{
ValidateNotNull(options, "Pull Subscribe Options");
ValidateSubject(subject, IsSubjectRequired(options));
ValidateNotNull(handler, "Handler");
ValidateNotNull(options, "Pull Subscribe Options");
return (IJetStreamPullAsyncSubscription) CreateSubscription(subject, null, handler, false, null, options);
}

Expand Down
29 changes: 24 additions & 5 deletions src/NATS.Client/JetStream/JetStreamAbstractSyncSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,24 @@ public override void Unsubscribe()
base.Unsubscribe();
}

public override void AutoUnsubscribe(int max)
{
MessageManager.Shutdown();
base.AutoUnsubscribe(max);
}

internal override void close()
{
MessageManager.Shutdown();
base.close();
}

protected override void Dispose(bool disposing)
{
MessageManager.Shutdown();
base.Dispose(disposing);
}

public override Msg NextMessage()
{
return _nextUnmanagedWaitForever(null);
Expand Down Expand Up @@ -93,9 +105,12 @@ protected Msg _nextUnmanagedWaitForever(String expectedPullSubject)
{
throw new NATSJetStreamStatusException(msg.Status, this);
}

break;
}
// StatusHandled, StatusTerminus and StatusError that isn't for expected pullSubject: check again since waiting forever
// Check again since waiting forever when:
// 1. Any StatusHandled or StatusTerminus
// 2. StatusError that aren't for expected pullSubject
}
}

Expand All @@ -108,18 +123,22 @@ protected Msg _nextUnmanagedNoWait(string expectedPullSubject)
return msg;
case ManageResult.StatusTerminus:
// if the status applies return null, otherwise it's ignored, fall through
if (expectedPullSubject == null || expectedPullSubject.Equals(msg.Subject)) {
if (expectedPullSubject == null || expectedPullSubject.Equals(msg.Subject))
{
throw new NATSTimeoutException();
}
break;
case ManageResult.StatusError:
// if the status applies throw exception, otherwise it's ignored, fall through
if (expectedPullSubject == null || expectedPullSubject.Equals(msg.Subject)) {
if (expectedPullSubject == null || expectedPullSubject.Equals(msg.Subject))
{
throw new NATSJetStreamStatusException(msg.Status, this);
}
break;
}
// StatusHandled and StatusTerminus / StatusError that aren't for expected pullSubject: check again
// Check again when, regular messages might have arrived
// 1. Any StatusHandled
// 2. StatusTerminus or StatusError that aren't for expected pullSubject
}
}

Expand Down Expand Up @@ -149,7 +168,7 @@ protected Msg _nextUnmanaged(int timeout, string expectedPullSubject)
}
break;
}
// StatusHandled and StatusTerminus / StatusError that aren't for expected pullSubject: check again while have time
// anything else, try again while we have time
timeLeft = timeout - (int)sw.ElapsedMilliseconds;
}
throw new NATSTimeoutException();
Expand Down
1 change: 1 addition & 0 deletions src/NATS.Client/JetStream/JetStreamManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public PurgeResponse PurgeStream(string streamName)
public PurgeResponse PurgeStream(string streamName, PurgeOptions options)
{
Validator.ValidateStreamName(streamName, true);
Validator.ValidateNotNull(options, nameof(options));
string subj = string.Format(JetStreamConstants.JsapiStreamPurge, streamName);
Msg m = RequestResponseRequired(subj, options.Serialize(), Timeout);
return new PurgeResponse(m, true);
Expand Down
3 changes: 1 addition & 2 deletions src/NATS.Client/JetStream/JetStreamPullApiImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ internal void UpdateConsumer(string consumer)
_consumer = consumer;
}

internal string Pull(bool raiseStatusWarnings, ITrackPendingListener trackPendingListener,
PullRequestOptions pullRequestOptions) {
internal string Pull(PullRequestOptions pullRequestOptions, bool raiseStatusWarnings, ITrackPendingListener trackPendingListener) {
string publishSubject = _js.PrependPrefix(string.Format(JetStreamConstants.JsapiConsumerMsgNext, _stream, _consumer));
string pullSubject = _subject.Replace("*", _pullSubjectIdHolder.Increment().ToString());
_mm.StartPullRequest(pullSubject, pullRequestOptions, raiseStatusWarnings, trackPendingListener);
Expand Down
10 changes: 5 additions & 5 deletions src/NATS.Client/JetStream/JetStreamPullAsyncSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,28 @@ internal override void UpdateConsumer(string consumer)

public void Pull(int batchSize)
{
pullImpl.Pull(true, null, PullRequestOptions.Builder(batchSize).Build());
pullImpl.Pull(PullRequestOptions.Builder(batchSize).Build(), true, null);
}

public void Pull(PullRequestOptions pullRequestOptions) {
pullImpl.Pull(true, null, pullRequestOptions);
pullImpl.Pull(pullRequestOptions, true, null);
}

public void PullExpiresIn(int batchSize, int expiresInMillis)
{
Validator.ValidateDurationGtZeroRequired(expiresInMillis, "Expires In");
pullImpl.Pull(true, null, PullRequestOptions.Builder(batchSize).WithExpiresIn(expiresInMillis).Build());
pullImpl.Pull(PullRequestOptions.Builder(batchSize).WithExpiresIn(expiresInMillis).Build(), true, null);
}

public void PullNoWait(int batchSize)
{
pullImpl.Pull(true, null, PullRequestOptions.Builder(batchSize).WithNoWait().Build());
pullImpl.Pull(PullRequestOptions.Builder(batchSize).WithNoWait().Build(), true, null);
}

public void PullNoWait(int batchSize, int expiresInMillis)
{
Validator.ValidateDurationGtZeroRequired(expiresInMillis, "NoWait Expires In");
pullImpl.Pull(true, null, PullRequestOptions.Builder(batchSize).WithNoWait().WithExpiresIn(expiresInMillis).Build());
pullImpl.Pull(PullRequestOptions.Builder(batchSize).WithNoWait().WithExpiresIn(expiresInMillis).Build(), true, null);
}
}
}
12 changes: 6 additions & 6 deletions src/NATS.Client/JetStream/JetStreamPullSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,28 @@ internal override void UpdateConsumer(string consumer)

public void Pull(int batchSize)
{
pullImpl.Pull(true, null, PullRequestOptions.Builder(batchSize).Build());
pullImpl.Pull(PullRequestOptions.Builder(batchSize).Build(), true, null);
}

public void Pull(PullRequestOptions pullRequestOptions) {
pullImpl.Pull(true, null, pullRequestOptions);
pullImpl.Pull(pullRequestOptions, true, null);
}

public void PullExpiresIn(int batchSize, int expiresInMillis)
{
Validator.ValidateDurationGtZeroRequired(expiresInMillis, "Expires In");
pullImpl.Pull(true, null, PullRequestOptions.Builder(batchSize).WithExpiresIn(expiresInMillis).Build());
pullImpl.Pull(PullRequestOptions.Builder(batchSize).WithExpiresIn(expiresInMillis).Build(), true, null);
}

public void PullNoWait(int batchSize)
{
pullImpl.Pull(true, null, PullRequestOptions.Builder(batchSize).WithNoWait().Build());
pullImpl.Pull(PullRequestOptions.Builder(batchSize).WithNoWait().Build(), true, null);
}

public void PullNoWait(int batchSize, int expiresInMillis)
{
Validator.ValidateDurationGtZeroRequired(expiresInMillis, "NoWait Expires In");
pullImpl.Pull(true, null, PullRequestOptions.Builder(batchSize).WithNoWait().WithExpiresIn(expiresInMillis).Build());
pullImpl.Pull(PullRequestOptions.Builder(batchSize).WithNoWait().WithExpiresIn(expiresInMillis).Build(), true, null);
}

internal const int ExpireAdjustment = 10;
Expand All @@ -77,7 +77,7 @@ public IList<Msg> Fetch(int batchSize, int maxWaitMillis)

Duration expires = Duration.OfMillis(
maxWaitMillis > ExpireAdjustment ? maxWaitMillis - ExpireAdjustment : maxWaitMillis);
string pullSubject = pullImpl.Pull(false, null, PullRequestOptions.Builder(batchLeft).WithExpiresIn(expires).Build());
string pullSubject = pullImpl.Pull(PullRequestOptions.Builder(batchLeft).WithExpiresIn(expires).Build(), false, null);

try
{
Expand Down
Loading