-
Notifications
You must be signed in to change notification settings - Fork 698
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CHANGED] Ordered consumer recreate on missing heartbeat #1097
Conversation
js_test.go
Outdated
@@ -292,7 +293,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) { | |||
testSyncConsumer() | |||
} | |||
|
|||
func TestJetStreamOrderedConsumerWithErrors(t *testing.T) { | |||
func TestOrderedConsumerDeleteAssets(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to maintain TestJetStream
prefix so that if you want to quickly check that your JS changes are having an impact, you can limit the tests locally to run JetStream tests...
test/js_test.go
Outdated
@@ -8126,3 +8145,59 @@ func TestJetStreamMsgAckShouldErrForConsumerAckNone(t *testing.T) { | |||
t.Fatalf("Expected error indicating that sub is AckNone, got %v", err) | |||
} | |||
} | |||
|
|||
func TestOrderedConsumerRecreateAfterReconnect(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment than before.
js.go
Outdated
} | ||
pushErr(err) | ||
pushErr(fmt.Errorf("%w: recreating ordered consumer", err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the %w
doing? Also, since you seem to have this fmt.Errorf() almost every time you call pushErr(), should you have that part of pushErr() itself? (I know there is a check/pushErr in case of request marshaling failing, but that is very unlikely, and still it would be a failure recreating the ordered consumer anyway).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
%w
is the error wrapping semantic. Thanks to it, if you call e.g. errors.Is(err, ErrStreamNotFound)
on this error it will unwrap correctly and return true for fmt.Errorf("%w: recreating ordered consumer", ErrStreamNotFound)
.
As to your second point, you're right, I'll change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally, I added the recreating ordered consumer
part to at least give some insight to the user as to where an error happened (async error callback does not give any information on where the error comes from, so this is better than nothing).
In the near future I'll be working on the async errors revamp (at least for ordered consumers) to be able to identify them and react more easily)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Addresses: nats-io/nats-architecture-and-design#162
Resolves: #1094
This PR changes: