Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New async bindings #614

Merged
merged 14 commits into from
Apr 30, 2024
Merged

Conversation

gregschohn
Copy link
Collaborator

@gregschohn gregschohn commented Apr 25, 2024

Description

These changes begin to bind netty Futures to CompletableFutures by using a proxy CompletableFuture (and DiagnosticTrackableCompletableFuture wrapper) that simply signaled asynchronously from the addListener callback. From there, the promise graphs can be extended to be much more complete. This change attempts to fill out much more of the promise graph so that we can diagnose why a workflow would hang. Below is a sample of a top-level DiagnosticTrackableCompletableFuture, which used to render simply as something closer to...

waiting for accumulation to combine with target response[…]<-[410316111] releasing work item for the traffic limiter[…]<-[1163112884] Waiting to get response from target[…]<-[438999857] waiting for f0dc8e1c-27aa-4d47-9750-5133773e45b1.061d69fffe23c617-0000000b-00000274-2d95f9ca22bca938-99d1c56b.3259.3257 to be queued and run through TrafficStreamLimiter[…] ([WARN ] 2024-04-26 02:21:07,425)
  • Category Operational Enhancement
  • Why these changes are required? Determining exactly where work is broken has been a very challenging problem since this project's inception. This should provide much better traceability with promise graphs. The new one is shown below. Notice that the json can be copy-pasted right into its own file and rendered through firefox, jq, etc.
[{"idHash":1797461143,"label":"waiting for accumulation to combine with target response","value":"…"},{"idHash":32979567,"label":"releasing work item for the traffic limiter","value":"…"},{"idHash":98256953,"label":"Waiting to get response from target","pending":[{"idHash":12868091,"label":"Checking for exception out of sending data to the target server","value":"…"},{"idHash":1932550872,"label":"transitioning transformed packets onto the wire","pending":[{"idHash":1269570195,"label":"(if applicable) packaging transformed result into a failed TransformedTargetRequestAndResponse object","value":"…"},{"idHash":342427999,"label":"(if applicable) packaging transformed result into a completed TransformedTargetRequestAndResponse object","value":"…"},{"idHash":918585768,"label":"Updating fields for callers to poll progress and updating backpressure","value":"…"},{"idHash":1184240394,"label":"Waiting for sequencer to finish for slot 1671","pending":[{"idHash":1841689033,"label":"Caller-task completion for idx=1671","value":"…"},{"idHash":834289375,"label":"Work callback on replay session","value":"…"},{"idHash":63251585,"label":"Kickoff for slot #1671","value":"…"},{"idHash":1975347632,"label":"Work to finish for slot #1670 is awaiting [slotsOutstanding: >1702,]","value":"…"},{"idHash":860885033,"label":"Work callback on replay session","value":"…"},{"idHash":887533551,"label":"Kickoff for slot #1670","value":"…"},{"idHash":1934384233,"label":"Work to finish for slot #1669 is awaiting [slotsOutstanding: >1702,]","value":"…"},{"idHash":583970497,"label":"Work callback on replay session","value":"…"},{"idHash":1932086048,"label":"Kickoff for slot #1669","value":"…"},{"idHash":463461409,"label":"Work to finish for slot #1668 is awaiting [slotsOutstanding: >1702,]","value":"…"},{"idHash":824729065,"label":"Work callback on replay session","value":"…"},{"idHash":575846809,"label":"Kickoff for slot #1668","value":"…"},{"idHash":2143009051,"label":"Work to finish for slot #1667 is awaiting [slotsOutstanding: >1702,]","value":"…"},{"idHash":2108613294,"label":"Work callback on replay session","value":"…"},{"idHash":1804126577,"label":"Kickoff for slot #1667","value":"…"},{"idHash":232728894,"label":"Work to finish for slot #1666 is awaiting [slotsOutstanding: >1702,]","value":"…"},{"idHash":2100286241,"label":"Work callback on replay session","value":"…"},{"idHash":1835158206,"label":"Kickoff for slot #1666","value":"…"},{"idHash":841816948,"label":"Work to finish for slot #1665 is awaiting [slotsOutstanding: >1702,]","value":"…"},{"idHash":365172230,"label":"Work callback on replay session","value":"…"},{"idHash":582353890,"label":"Kickoff for slot #1665","value":"…"},{"idHash":1970140327,"label":"Work to finish for slot #1664 is awaiting [slotsOutstanding: >1702,]","value":"…"},{"idHash":1698849040,"label":"Work callback on replay session","value":"…"},{"idHash":918207866,"label":"Kickoff for slot #1664","value":"…"},{"idHash":2025866340,"label":"Work to finish for slot #1663 is awaiting [slotsOutstanding: >1702,]","value":"…"},{"idHash":78029053,"label":"Work callback on replay session","value":"…"},{"idHash":707050315,"label":"Kickoff for slot #1663","value":"…"},{"idHash":1647133406,"label":"Work to finish for slot #1662 is awaiting [slotsOutstanding: >1702,]","value":"…"},{"idHash":292254606,"label":"Work callback on replay session","value":"…"},{"idHash":612777999,"label":"Kickoff for slot #1662","value":"…"},{"idHash":84781237,"label":"Work to finish for slot #1661 is awaiting [slotsOutstanding: >1702,]","value":"…"},{"idHash":1982640638,"label":"Work callback on replay session","value":"…"},{"idHash":39097331,"label":"Kickoff for slot #1661","value":"…"},{"idHash":451545807,"label":"Work to finish for slot #1660 is awaiting [slotsOutstanding: >1702,]","value":"…"},{"idHash":892560753,"label":"Work callback on replay session","value":"…"},{"idHash":1524069678,"label":"Kickoff for slot #1660","value":"…"},{"idHash":77266375,"label":"Work to finish for slot #1659 is awaiting [slotsOutstanding: >1702,]","value":"…"},{"idHash":1557276290,"label":"Work callback on replay session","value":"…"},{"idHash":706665392,"label":"Kickoff for slot #1659","value":"…"},{"idHash":1315040873,"label":"Work to finish for slot #1658 is awaiting [slotsOutstanding: >1702,]","value":"…"},{"idHash":1384219985,"label":"Work callback on replay session","value":"…"},{"idHash":123118183,"label":"Kickoff for slot #1658","value":"…"},{"idHash":85754298,"label":"Work to finish for slot #1657 is awaiting [slotsOutstanding: >1702,]","value":"…"},{"idHash":1314702309,"label":"Work callback on replay session","value":"…"},{"idHash":464771969,"label":"Kickoff for slot #1657","value":"…"},{"idHash":1745555924,"label":"Work to finish for slot #1656 is awaiting [slotsOutstanding: >1702,]","value":"…"},{"idHash":1410671552,"label":"Work callback on replay session","pending":[{"idHash":1007398097,"label":"sending packets for request","pending":[{"idHash":1267829242,"label":"recursing, once ready","pending":[{"idHash":186641080,"label":"sending next packet","pending":[{"idHash":1986646840,"label":"finalizing, once ready","pending":[{"idHash":400917504,"label":"clearing pipeline","value":"…"},{"idHash":1358692703,"label":"Waiting for previous consumes to set the future","pending":[{"idHash":616725458,"label":"NettyPacketToHttpConsumer.finalizeRequest()","value":"…"}]},{"idHash":478841953,"label":"consumeBytes - after channel is fully initialized (potentially waiting on TLS handshake)","value":"^"},{"idHash":661966452,"label":"consumeBytes - after channel is fully initialized (potentially waiting on TLS handshake)","value":"^"}]},{"idHash":478841953,"label":"consumeBytes - after channel is fully initialized (potentially waiting on TLS handshake)","value":"^"},{"idHash":661966452,"label":"consumeBytes - after channel is fully initialized (potentially waiting on TLS handshake)","value":"^"}]},{"idHash":397591868,"label":"scheduling to run next send in PT-3M-53.681298S (clipped: 0ms)","value":"^"}]},{"idHash":661966452,"label":"consumeBytes - after channel is fully initialized (potentially waiting on TLS handshake)","value":"^"}]},{"idHash":538521843,"label":"scheduled start for 2024-04-28T21:43:27.672020Z","value":"^"}]},{"idHash":1756907797,"label":"Kickoff for slot #1656","value":"^"},{"idHash":1765686552,"label":"Work to finish for slot #1655 is awaiting [slotsOutstanding: >1702,]","value":"^"}]},{"idHash":926193015,"label":"waiting for event loop submission","value":"^"}]},{"idHash":1461193096,"label":"Updating fields for callers to poll progress and updating backpressure","value":"^"},{"idHash":2111037126,"label":"The scheduled callback is running work for f0dc8e1c-27aa-4d47-9750-5133773e45b1.061d69fffe23c617-0000000b-0000043e-6e14660205c769b8-d93f93e3.1672.1671","value":"^"}]},{"idHash":1673386021,"label":"waiting for f0dc8e1c-27aa-4d47-9750-5133773e45b1.061d69fffe23c617-0000000b-0000043e-6e14660205c769b8-d93f93e3.1672.1671 to be queued and run through TrafficStreamLimiter","value":"^"}]

Issues Resolved

[List any issues this PR will resolve]

Is this a backport? If so, please add backport PR # and/or commits #

Testing

[Please provide details of testing done: unit testing, integration testing and manual testing]

Check List

  • [partial] New functionality includes testing (though not all of it is automated yet)
    • All tests pass, including unit test, integration test and doctest
  • New functionality has been documented
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

A memory leak for errantly formed http messages going through HttpByteBufFormatter; log4j2 config bug that stopped old replayer logs from compressing.
I've also further improved the logging on shutdown.

Signed-off-by: Greg Schohn <[email protected]>
1) Netty futures are now bound to CompletableFutures via a utility function that has the netty future simply mark a CompletableFuture<Void> as finished (with the null value) or propagate the exception into the CompletableFuture.  That allows the rest of the codebase to leverage CompletableFuture and DiagnosticTrackableCompletableFuture objects and methods rather than a cobbled approach.
2) Helper classes that managed coordination and scheduling (TimeToResponseFulfillmentFutureMap and OnlineRadixSorter)  have been redesigned to be more functional so that future values can be derived from earlier values rather than rely upon side effects to connect the dots.

Some tests still don't pass and intense testing will be required before these changes are better than what was previously there.

Signed-off-by: Greg Schohn <[email protected]>
Signed-off-by: Greg Schohn <[email protected]>

# Conflicts:
#	TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java
#	TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayerCore.java
#	TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMap.java
…FulfillmentFutureMap to use a dequeue instead of a queue.

If all of the events for the time fulfillment map come in order thanks to the sequencer, there's no reason to use a more complicated/expensive data structure.  When the items are in order, they lend themselves more to chaining too.
Also...
Refactor netty-CompletableFuture binding helpers into a new utility class.
Open up the DiagnosticTrackableCompletableFuture class to make it easier to test that chains are properly formed and updated.
Start to build out the RequestSenderOrchestratorTest a little bit more.

Signed-off-by: Greg Schohn <[email protected]>
… to allow for chaining.

In addition to chaining sequential stages together (which now, again, need to create all intermediates before the current work item), the class now emits which sequential stages are missing (outstanding) in the DCF's diagnostic supplier.
Sample output may look like this
[504336483] Caller-task completion for idx=3[…]<-[335107734] OnlineRadixSorterForIntegratedKeys.addFutureForWork[…]<-[215078753] Kickoff for slot #3[…]<-[1384454980] Work to finish for slot #2 is awaiting [slotsOutstanding:2][…]

Signed-off-by: Greg Schohn <[email protected]>
…ies.

Cull grandparents (if they exist) when the current future is done, checking when traversing the ancestry chain and in a whenComplete handler (which may be pretty late to the mix given how many other dependent functions might be in line before it).
There's also now the ability to set the dependent parent AFTER construction.  This helps to bind sequential stages from the OnlineRadixSorter together.

Signed-off-by: Greg Schohn <[email protected]>
Signed-off-by: Greg Schohn <[email protected]>

# Conflicts:
#	TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java
@gregschohn gregschohn marked this pull request as ready for review April 26, 2024 11:40
return consumeFuture.thenCompose(cf ->
NettyToCompletableFutureBinders.bindNettyScheduleToCompletableFuture(eventLoop,
Duration.between(now(), startAt.plus(interval.multipliedBy(counter.get()))))
.getDeferredFutureThroughHandle((v,t)-> sendSendingRestOfPackets(packetReceiver, eventLoop,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be getDeferredFutureThroughCompose to fast fail?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe (thenCompose)? I'm trying that. A netty schedule operation will fail during shutdown & I don't see why I wouldn't want to fail faster in that situation. The other call is already in a thenCompose, so the fastness of the failure is already pretty quick, but I think that you're right that it could still be quicker.

…ingText shorter by doing run-length compression.

Signed-off-by: Greg Schohn <[email protected]>
The old format is still present and is used to print outstanding work that's outstanding when the replayer is shut down.
Json should make it easier to pretty print the dependency graph.
I think it might make more sense to re-reverse the order of the graph for json output though.  I should also shift the LEVEL line to the beginning so that it's easier to copy-paste.

Signed-off-by: Greg Schohn <[email protected]>
…letableFutureBinders + other assorted tweaks.

I've also stripped the longRunningActivity file of the log levels and timestamps.  They were really just getting in the way of understanding what was going on and being able to quickly copy-paste from there into a json file for further analysis.

Signed-off-by: Greg Schohn <[email protected]>
Copy link

codecov bot commented Apr 29, 2024

Codecov Report

Attention: Patch coverage is 78.65707% with 89 lines in your changes are missing coverage. Please review.

Project coverage is 75.77%. Comparing base (0c6d19d) to head (4994898).
Report is 5 commits behind head on main.

Files Patch % Lines
...replay/datahandlers/NettyPacketToHttpConsumer.java 65.95% 13 Missing and 3 partials ⚠️
...ensearch/migrations/replay/util/TrackedFuture.java 68.62% 7 Missing and 9 partials ⚠️
...arch/migrations/replay/util/OnlineRadixSorter.java 81.63% 8 Missing and 1 partial ⚠️
...ations/replay/util/TrackedFutureJsonFormatter.java 66.66% 4 Missing and 5 partials ⚠️
...h/migrations/replay/RequestSenderOrchestrator.java 87.30% 5 Missing and 3 partials ⚠️
.../datatypes/TimeToResponseFulfillmentFutureMap.java 63.15% 5 Missing and 2 partials ⚠️
...nsearch/migrations/replay/TrafficReplayerCore.java 86.20% 3 Missing and 1 partial ⚠️
.../http/NettySendByteBufsToPacketHandlerHandler.java 55.55% 4 Missing ⚠️
...ions/replay/util/TrackedFutureStringFormatter.java 75.00% 1 Missing and 3 partials ⚠️
...search/migrations/replay/ClientConnectionPool.java 87.50% 3 Missing ⚠️
... and 4 more
Additional details and impacted files
@@             Coverage Diff              @@
##               main     #614      +/-   ##
============================================
- Coverage     75.93%   75.77%   -0.16%     
- Complexity     1496     1533      +37     
============================================
  Files           165      168       +3     
  Lines          6362     6395      +33     
  Branches        573      570       -3     
============================================
+ Hits           4831     4846      +15     
- Misses         1150     1172      +22     
+ Partials        381      377       -4     
Flag Coverage Δ
unittests 75.77% <78.65%> (-0.16%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

…be shorter.

s/DiagnosticTrackableCompletableFuture/TrackedFuture/
s/NettyToCompletableFutureBinders/NettyFutureBinders/
Identifier names were also updated.  Where types could be implicitly deduced, code was updated so that these type names didn't need to be specified.

Signed-off-by: Greg Schohn <[email protected]>
Copy link
Member

@AndreKurait AndreKurait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing everything, this is a great improvement

@gregschohn gregschohn merged commit a94008c into opensearch-project:main Apr 30, 2024
6 of 7 checks passed
@gregschohn gregschohn deleted the NewAsyncBindings branch July 13, 2024 01:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants