Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JetStream pull consumer redesign #115

Merged
merged 26 commits into from
Aug 31, 2023
Merged

Conversation

mtmk
Copy link
Collaborator

@mtmk mtmk commented Aug 18, 2023

JetStream pull consumers are implemented as direct subclasses of NatsSubBase in NatsJSSubConsume and NatsJSSubFetch classes. All the pending message tracking logic implemented in the subclasses.

Other fixes:

  • Messages size calculation precedence error fixed
  • Using Opts as short name
  • Multipart sequence parsing error in header parsing
  • RawData type introduced to potentially remove byte[] like calls (API discussion)

@mtmk mtmk changed the title [WIP] JetStream pull consumer redesign JetStream pull consumer redesign Aug 29, 2023
@mtmk mtmk marked this pull request as ready for review August 29, 2023 20:12
@mtmk mtmk requested a review from caleblloyd August 29, 2023 20:13
This test was checking if the server was resending a message
which was not ACKed, after the ack_wait times out. Resending
unacknowledged messages is a server function we could test
under more controlled test setup and it's this part of the
test isn't testing NATS NET codebase. For some reason resend
function isn't working most of the time when run under the
GitHub Actions environment and creating false positives.
Hence I decided to remove this part of the test to avoid
unnecessary noise for the time being.
This is a case where server request timeout might not
reach the client because of network anomalies.
* Options -> Opts refactor
* Ignore extra messages on consumer dispose
* Propagate JS options for ACK
{
var isVersionLineRead = false;

while (!reader.End)
{
var span = reader.UnreadSpan;
while (span.Length > 0)
if (reader.TryReadTo(out ReadOnlySpan<byte> line, ByteCRLF))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public bool TryReadTo (out ReadOnlySpan<T> span ... will copy a multi-span header wheras public bool TryReadTo (out System.Buffers.ReadOnlySequence<T> sequence ... would not.

If it's easy to change TryParseHeaderLine to operate on a ReadOnlySequence<byte> instead of ReadOnlySpan<byte> then that might be a worthwhile optimization

I suppose this is why Kestrel has separate paths for single-span and multi-span headers

It would be fine to shelf this optimization and do it later, the copy for a multi-span header does make the code significantly less complicated

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Let's do it in another PR.


namespace NATS.Client.Core;

public class RawData
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should evaluate RawData a little more before committing it to the public API. I think it is only used in the Example project, maybe move it there for now?


internal NatsJSOpts Opts { get; }

public string NewInbox() => $"{Opts.InboxPrefix}.{Guid.NewGuid():n}";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be internal? Confusing to have a public INatsConnection.NewInbox and NatsJSContext.NewInbox, one using the Mux Subscription and one not.

Comment on lines 13 to 17
public NatsJSMsg(NatsMsg<T> msg, NatsJSContext jsContext)
{
Msg = msg;
JSContext = jsContext;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of a case where NatsJSMsg needs a public constructor. Shouldn't a JS Msg always come from a consumer?

JSContext = jsContext;
}

public NatsJSContext JSContext { get; }

public NatsMsg<T> Msg { get; }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe instead of passing through NatsMsg<T>, we should proxy all of the public properties that are interesting:

    string Subject,
    int Size,
    NatsHeaders? Headers,
    T? Data,
    INatsConnection? Connection

We could leave out the string? replyTo property, and all of the Reply methods as well... Since the only thing that should be replied with are the Ack methods which are already on the NatsJSMsg<T>

@caleblloyd caleblloyd mentioned this pull request Aug 31, 2023
* JSMsg hides the NatsMsg and proxy relevant fields
* Moved RawData to example
* JSContext NewInbox is now internal
Copy link
Collaborator

@caleblloyd caleblloyd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mtmk mtmk merged commit 6f9df6e into main Aug 31, 2023
6 checks passed
@mtmk mtmk deleted the jetstream-pull-consumer-redesign branch August 31, 2023 17:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants