-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rewroks impl to use unsafe unicast sink with MpScQueue #2892
base: main
Are you sure you want to change the base?
Conversation
Sinks.Many<String> sink = Sinks.unsafe() | ||
.many() | ||
.unicast() | ||
.onBackpressureBuffer(new MpscUnboundedArrayQueue<>(256)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should resolve problems with high CPU contention since that queue is designed for Multiple producers scenarios
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, we can have a bounded Multiproducer queue here since as far as I understand we can drop part of the values
.onBackpressureLatest(); | ||
} else { | ||
publisher = this.sink.asFlux(); | ||
publisher = this.sink.asFlux().publish().autoConnect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually - this may be optional. I don't know if we have tests for that but if we do I will try to remove that and put only in cases where we have retry In place
sink = new LogbackMetricsSuppressingManySink(sink); | ||
} | ||
|
||
this.sink = sink; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updating this.sink
here does not update the reference that already registered meters have, which is why some of the StatsdMeterRegistryPublishTest
tests are failing. For example, when the registry is stopped and then started, this.sink
will be a new sink, but an already created StatsdCounter
will still reference the old sink.
closes #2880