-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for concurrent dispatch and configurable prefetch #418
Add support for concurrent dispatch and configurable prefetch #418
Conversation
@benmoss: GitHub didn't allow me to assign the following users: embano1. Note that only knative-sandbox members, repo collaborators and people who have commented on this issue/PR can be assigned. Additionally, issues/PRs can only have 10 assignees at the same time. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: benmoss The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
/hold |
40e71a2
to
c70eab3
Compare
Codecov Report
@@ Coverage Diff @@
## main #418 +/- ##
==========================================
- Coverage 75.54% 75.54% -0.01%
==========================================
Files 44 44
Lines 2781 2785 +4
==========================================
+ Hits 2101 2104 +3
- Misses 548 549 +1
Partials 132 132
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Questions:
-
Regarding this comment: https://github.com/knative-sandbox/eventing-rabbitmq/blob/c70eab331eeb664526a2b54c6e630f33c7938789/pkg/dispatcher/dispatcher.go#L144 -> IMHO the SDK won't retry at all if set to 0.
-
https://github.com/knative-sandbox/eventing-rabbitmq/blob/c70eab331eeb664526a2b54c6e630f33c7938789/pkg/dispatcher/dispatcher.go#L166 < this is also available as env, so use this instead?
-
https://github.com/knative-sandbox/eventing-rabbitmq/blob/c70eab331eeb664526a2b54c6e630f33c7938789/pkg/dispatcher/dispatcher.go#L168 < use retries defined in env?
-
Should we define a custom consumer name (ie dispatcher) to simplify troubleshooting (especially when adding auto-scaling later)? https://github.com/knative-sandbox/eventing-rabbitmq/blob/c70eab331eeb664526a2b54c6e630f33c7938789/pkg/dispatcher/dispatcher.go#L68
-
Do you also want to tackle Be smarter about retries #409 in this PR?
Retry int `envconfig:"RETRY" required:"false"` | ||
BackoffPolicy string `envconfig:"BACKOFF_POLICY" required:"false"` | ||
BackoffDelay time.Duration `envconfig:"BACKOFF_DELAY" required:"false"` | ||
} | ||
|
||
const ( | ||
defaultBackoffDelay = 50 * time.Millisecond | ||
defaultPrefetch = 1 | ||
defaultPrefetchSize = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's safe to drop this since we force no-ack
"The prefetch-size is ignored if the no-ack option is set" https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.prefetch-size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do you see that we force no-ack? https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.no-ack
That would imply we don't use acknowledgements, which is not true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how I'm reading go-amqp and the Rabbit docs:
When autoAck (also known as noAck) is true, the server will acknowledge
deliveries to this consumer prior to writing the delivery to the network. When
autoAck is true, the consumer should not call Delivery.Ack.
The prefetch-size is ignored if the no-ack option is set.
https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.prefetch-size
We set autoAck=false
: https://github.com/knative-sandbox/eventing-rabbitmq/blob/c944a0c5a3bea768ce200c6812d44072b4bb7001/pkg/dispatcher/dispatcher.go#L70
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I think this is bad documentation on rabbit's part. I think they mean
The prefetch-size is ignored if the no-ack option is set to true.
Since we set it to false, we are not using autoAck/noAck, and therefore prefetch size/prefetch count is still relevant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gerhard can you back me up on this interpretation?
c70eab3
to
6976202
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A prefetch of 10 is a decent default, and better than 1 if consumer message ordering is not needed. This portion of the RabbitMQ docs explain it well: Consumer Acknowledgement Modes, Prefetch and Throughput. While a value of 100 or more can result in higher throughput, this tends to be workload specific (e.g. message payload, number of consumers etc.) and it can result in the broker using more memory than necessary. Messages that have been delivered and are waiting for ACKs will be kept in memory not just on the client side, but also on the broker side. So yes, 10 is a decent prefetch default.
@@ -41,14 +41,15 @@ type envConfig struct { | |||
// Should failed deliveries be requeued in the RabbitMQ? | |||
Requeue bool `envconfig:"REQUEUE" default:"false" required:"true"` | |||
|
|||
// Number of concurrent messages in flight | |||
PrefetchCount int `envconfig:"PREFETCH_COUNT" default:"10" required:"false"` | |||
Retry int `envconfig:"RETRY" required:"false"` | |||
BackoffPolicy string `envconfig:"BACKOFF_POLICY" required:"false"` | |||
BackoffDelay time.Duration `envconfig:"BACKOFF_DELAY" required:"false"` | |||
} | |||
|
|||
const ( | |||
defaultBackoffDelay = 50 * time.Millisecond |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would set this as a BackoffDelay
default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what you mean. The prefetch count in milliseconds should be the backoff delay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/hold cancel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand the comment here
Sorry for the confusion, I left my comments inline. Looks good overall to me!
/assign @salaboy |
This lets us define different topologies and is a little more explicit in the test what's being setup
Set a default of 10 and dispatch in parallel. Had to change the unit tests to not care about ordering now that messages are being handled asynchronously.
Easier to just construct with named fields rather than a positional-argument constructor
This guarantees we don't exceed the concurrency factor set by the prefetch count. This also allows us to wait for the workers to shut down gracefully if we are cancelled.
79e1c9e
to
753c41c
Compare
@benmoss after reviewing this PR with my morning coffee at hand I can see that it is doing what we discussed yesterday. |
/lgtm |
💯 |
Is there documentation around how to use this feature? For instance, is this correct? apiVersion: sources.knative.dev/v1alpha1
kind: RabbitmqSource
metadata:
name: rabbitmq-source
annotations:
rabbitmq.eventing.knative.dev/prefetchCount: "10000"
# ...
# additional config below |
@JamesMcMahon IMHO the annotation is only respected on a |
Changes
/kind enhancement
Fixes #329
Release Note
/assign @embano1 @gabo1208
Users can now set the annotation
rabbitmq.eventing.knative.dev/prefetchCount
to control the concurrency of the dispatcher and the number of in-flight events allowed. This has the implication that event order is no longer guaranteed with prefetch counts > 1. The default prefetch if not specified is 10, this is totally arbitrary and I'm open to opinions on a more sensible default if anyone has any 😄 .