-
Notifications
You must be signed in to change notification settings - Fork 32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix traffic limiting #526
Fix traffic limiting #526
Conversation
…g processed. Signed-off-by: Greg Schohn <[email protected]>
…impler and support trying to recreate channels that are not active for subsequent request setups. Signed-off-by: Greg Schohn <[email protected]>
…e tests to pass Signed-off-by: Greg Schohn <[email protected]>
Signed-off-by: Greg Schohn <[email protected]>
fb13bfc
to
cb084e0
Compare
Signed-off-by: Greg Schohn <[email protected]>
Signed-off-by: Greg Schohn <[email protected]> # Conflicts: # TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml
… into their own package in test fixtures. That should make it easier to see how to write new functional tests with the TrafficReplayer. Signed-off-by: Greg Schohn <[email protected]>
…tion is now in test fixtures rather than inner classes. Signed-off-by: Greg Schohn <[email protected]>
TrafficCapture/build.gradle
Outdated
systemProperty 'junit.jupiter.execution.parallel.mode.default', "concurrent" | ||
systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'concurrent' | ||
} | ||
// useJUnitPlatform { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of just commenting, can you set junit.jupiter.execution.parallel.enabled = false
for individual projects with a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've uncommented these. We'll see how they do for the next couple builds.
Signed-off-by: Greg Schohn <[email protected]> # Conflicts: # TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java
…ng with a fix for it. The fix is to add a nonce close that pertains to the replayer's starting point (in traffic stream index) and pass that all the way down to the session keys. That lets us work on a target session basis to close out streams rather than on the source, which might get split any number of times. Signed-off-by: Greg Schohn <[email protected]>
…ed. Cap the number of retries to 16 and don't do it for things that aren't IOExceptions. There's lots of work that needs to happen for retries, the code in place now isn't meant to finish that, but rather setup the potential that a connection could have been broken and that when a new request comes in, if things are working well, that it can get a new connection. Without this code, FullTrafficReplayerTests were not completing because during TrafficReplayer teardowns, IllegalStateExceptions were being thrown because the netty eventloops weren't accepting work. I've also re-enabled slowTests which were disabled. Signed-off-by: Greg Schohn <[email protected]>
void onConnectionClose(int channelInteractionNumber, | ||
@NonNull IReplayContexts.IChannelKeyContext ctx, | ||
void onConnectionClose(int channelInteractionNum, | ||
IReplayContexts.@NonNull IChannelKeyContext ctx, int channelSessionNumber, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you intend to keep this ReplayContexts.@NonNull IChannelKeyContext ctx
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a very strange way to annotate. I've changed it to @NonNull ReplayContexts.IChannelKeyContext
.
… be more careful about polling for metrics from the InMemory verifications. Using specific name prefixes for threads allows us to wait for specific threads to die rather than ones that haven't been started by the current test. Signed-off-by: Greg Schohn <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #526 +/- ##
============================================
- Coverage 76.54% 76.32% -0.22%
- Complexity 1373 1387 +14
============================================
Files 154 154
Lines 5930 5986 +56
Branches 532 537 +5
============================================
+ Hits 4539 4569 +30
- Misses 1038 1054 +16
- Partials 353 363 +10
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Signed-off-by: Andre Kurait <[email protected]>
…8x increase), by reducing the level of some logs for BlockingTrafficSource, and use a dedicated thread to run the blocking async activities from. Signed-off-by: Greg Schohn <[email protected]>
…assembled. As expected, that created more latency, which was enough to break all of the e2e tests. They are admittedly brittle, but if a response takes 5 minutes, holding up the request for that long is unacceptable. We'll need another way to tackle the selective auth-transformation problem (which is easier if you know the source status code of the response). Anyway, I didn't bring this back the same way that it was when it went out. Now the callback for the request returns a Consumer that's called once the full request comes in. That eliminates the need for another Map and a lot more book keeping. Now all of the e2e tests pass locally for me. Same for all of the unit tests and slowTests. Signed-off-by: Greg Schohn <[email protected]>
…-migrations into FixTrafficLimiting
50f4a6c
to
9de5c74
Compare
@@ -64,7 +64,7 @@ class CommonUtils { | |||
// constructed in the configuration phase and the classpath won't be realized until the | |||
// execution phase. Therefore, we need to have docker run the command to resolve the classpath | |||
// and it's simplest to pack that up into a helper script. | |||
runCommand("printf \"#!/bin/sh\\njava -cp `echo /jars/*.jar | tr \\ :` \\\"\\\$@\\\" \" > /runJavaWithClasspath.sh"); | |||
runCommand("printf \"#!/bin/sh\\njava -XX:MaxRAMPercentage=80.0 -cp `echo /jars/*.jar | tr \\ :` \\\"\\\$@\\\" \" > /runJavaWithClasspath.sh"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this, should we document this somewhere?
Do we need to define ram limits in the docker solution to limit each container?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read through all the code in this PR. I don't understand it all completely, though I didn't see anything concerning, each file seems to be an improvement over the previous version.
We have an open question regarding if connection close behavior is impacted by this CR. Overall, it seems this CR is an improvement to replayer scaling, but i'd like to make sure we don't hamper the existing lower throughput use case with it. At a minimum, i'd like to see that as a fast follow.
I'm good if you force the TrafficReplayer slowTests to run in series to get them to pass for this PR
Signed-off-by: Greg Schohn <[email protected]>
9de5c74
to
f2bf136
Compare
…urce cleanup too. * There was a surprising and minor bug in ReplayEngine::isWorkOutstanding - it was simply off by one. * Consider this response to PR-Feedback from my own review... * General refactoring - on RequestResponsePacketPair. I wasn't happy that the lambdas were attached to the RRPair for the lifetime of that pair (through tuple outputting) so I introduced a holder class RequestResponsePacketPairWithCallback, which is held directly by the Accumulation and recycled for every new request. When the object is passed downstream, only the RequestResponsePacketPair is persisted, allowing the callbacks to evaporate to the GC. * A unit test was failing with a ConcurrentModificationException because close() wasn't being called on the same thread as polls. That was remedied via the kafkaExecutor in KafkaTrafficCaptureSource. * More threads/executors are shutdown, including within BlockingTrafficSource::close (executorForBlockingActivity) * I've added a timestamp to progress lines so that it's easier follow the progress. * For testing, the TimeShifter class now takes an additive value to shift realtime value. That allows a test to make sure that that a new test, testLongRequestEndingAfterEOFStillCountsCorrectly, to schedule a close far out into the future so that we can confirm that an EOF/teardown doesn't destroy pending work. * Many unit tests now wrap closeable resources within try-with-finally blocks. I was tired of looking at large numbers of threads. * I had to refactor the TrafficReplayer a bit to all others to override the wait for work mechanisms within teardown so that post-confirmation can be complete for some tests. Signed-off-by: Greg Schohn <[email protected]>
Description
Divide traffic source accumulation into a producer and consumer.
Previously, a single thread acquired TrafficStreams from the TrafficSource and then sent them through the CapturedTrafficToHttpTransactionAccumulator. That accumulator itself calls a callback object when requests and responses have been rematerialized. The callbacks that the TrafficReplayer supplied, however, also blocked when the number of requests was high and they needed to be throttled.
Now, the TrafficStreamLimiter takes a callback task to execute instead of just providing blocking functions. The new queueWork() function will never block. Instead, it will move work onto a transfer queue. That transferQueue is read by a new worker thread that is also owned by the TrafficStreamLimiter that will consume or throttle consumption in the order that items were received.
We still throttle the number of requests, but we allow them to accumulate in memory without processing them or sending them to the target server, which is what the original design has always intended to do. Now we're able to do it without impacting the traffic source objects, which can now expect to be running on a dedicated thread where only the traffic source itself is responsible for any blocking.
Issues Resolved
https://opensearch.atlassian.net/browse/MIGRATIONS-1515
Testing
A new test was added specifically to mimic the up-front burst pattern. Some other tests failed because of subtle design changes that were then reverted. A number of other tests had failed due to concurrency and general flakiness. Those were fixed/improved, but given the novelty of concurrent test runs (from an earlier PR), there are likely other ones that still need to be cleaned up.
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.