-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Drop user agent matches and a bugfix for over-committing KafkaRecords too early #468
Drop user agent matches and a bugfix for over-committing KafkaRecords too early #468
Conversation
…, for requests when a specific header matches some regex. Since captures are really on a connection (socket) basis and not a request one, there are additional complications. Data is captured as it is received. We may not know that we want to drop packets early enough. To compensate and to give the traffic stream enough details to differentiate this occurrence from a bug where packets were dropped, a new "RequestIntentionallyDropped" TrafficObservation has been introduced. This observation will only be used when data for a request has been included. That means that if we get a packet that has the entire HTTP preamble (first line + headers) that matches the drop filter, we won't capture anything at all. The --suppressCaptureForHeaderMatch argument has been added to the CaptureProxy. It expects two elements for a header name and a regex to match that header's value. When a request matches, it will be passed to the HTTP server, but will NOT be captured as per the above. Because the capture proxy only captures a handful of headers so that it can understand request boundaries, the predicate to scan additional headers must tweak the set of headers that the LoggingHttpRequestHandler's internal classes will preserve. Signed-off-by: Greg Schohn <[email protected]>
…estIntentionallyDropped observation. Along the way, I also fixed some issues with tests. One was a bug within the test (RawPackets weren't being properly compared) and the other was a more serious issue that could have resulted in over-committing messages before they were completely handled. The wrong commit came about because the Accumulation's RequestResponsePacketPair ('RRPair') needs a TrafficStreamKey for some maintenance (like closing the stream). Since that RRPair also tracks which TrafficStreamKeys should be committed once the message has been fully handled, there was an optimization to just use the first traffic stream being held as the key for the maintenance. To do that, the TrafficStreamKey was passed into the RRPair constructor, which immediately added it to the trafficStreamsBeingHeld list. While TrafficStreamKeys are routinely added to the RRPairs, they're otherwise only added AFTER we've done all of the processing for all of the TrafficStream's observations. That behavior is what protects the completely messages passed to the onFullDataReceived callback from advertising that they have ownership (holding) of the TrafficStreamKey that may still contain observations for additional, yet to be processed, requests. Notice that callbacks' awareness of which TrafficStreams may often be more loose than what was required. However, close observations and expirations will cause the connection's last observed TrafficStreamKeys to be committed. Therefore, to preserve 'at-least-once' guaranteed delivery, the RRPair class no longer appends the constructor's incoming key to the trafficStreamKeysBeingHeld list and instead tracks it as a separate field (and concern). Signed-off-by: Greg Schohn <[email protected]>
…ction when it was closed. The message began "Work items are still remaining for this connection..." because the close() task that was running was remaining and that alone could trigger the log message. Now the type of task is passed around with the task itself so that we can do a smarter check. Notice that there may be other work scheduled for the connection/eventLoop that we aren't able to track in this class right now. See https://opensearch.atlassian.net/browse/MIGRATIONS-1447. Signed-off-by: Greg Schohn <[email protected]>
… of only requests that were suppressed, nothing (not even a close) will be emitted. This test, if enabled, breaks because we don't differentiate the different paths that can lead to close cases. Signed-off-by: Greg Schohn <[email protected]>
@@ -274,6 +286,14 @@ private static SslContext loadBacksideSslContext(URI serverUri, boolean allowIns | |||
} | |||
} | |||
|
|||
private static Map<String, String> convertPairListToMap(List<String> list) { | |||
var map = new TreeMap<String, String>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We could add validation in case a user accidentally provides an odd number of arguments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beust takes care of that for us already & the user will see the usage. Here's what the user will see with the following command args...
--kafkaConnection kafka:9092 --otelCollectorEndpoint http://localhost:4317 --destinationUri https://localhost:19200 --insecureDestination --listenPort 59200 --suppressCaptureForHeaderMatch user-agent foo --suppressCaptureForHeaderMatch cookie
Starting Capture Proxy
Got args: --kafkaConnection; kafka:9092; --otelCollectorEndpoint; http://localhost:4317; --destinationUri; https://localhost:19200; --insecureDestination; --listenPort; 59200; --suppressCaptureForHeaderMatch; user-agent; foo; --suppressCaptureForHeaderMatch; cookie
Expected 2 values after --suppressCaptureForHeaderMatch
Got args: --kafkaConnection; kafka:9092; --otelCollectorEndpoint; http://localhost:4317; --destinationUri; https://localhost:19200; --insecureDestination; --listenPort; 59200; --suppressCaptureForHeaderMatch; user-agent; foo; --suppressCaptureForHeaderMatch; cookie
Usage: <main class> [options]
Options:
--destinationConnectionPoolSize
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh so a user could specify--suppressCaptureForHeaderMatch
multiple times (if proper number of arguments are used each time) and the variable in code will hold all of them? I didn't realize we had that functionality
@@ -23,17 +23,17 @@ public enum ReconstructionStatus { | |||
|
|||
HttpMessageAndTimestamp requestData; | |||
HttpMessageAndTimestamp responseData; | |||
final ITrafficStreamKey trafficStreamKey; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is just the original trafficStreamKey?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by original? Either way, I don't really have a good answer from looking at the code to know what it's for. I've renamed this to 'firstTrafficStreamKeyForRequest'. Does that make it more clear?
@@ -184,6 +175,23 @@ private <T> void scheduleOnConnectionReplaySession(ISourceTrafficChannelKey chan | |||
schedule.appendTask(atTime, task); | |||
log.atTrace().setMessage(()->channelInteraction + " added a scheduled event at " + atTime + | |||
"... " + schedule).log(); | |||
|
|||
futureToBeCompletedByTask.map(f->f.whenComplete((v,t)-> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we move this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly, I'm just nervous. If there would be a fluke that the futureToBeCompletedByTask was already completed, we'd run the whenComplete callback BEFORE we added the task to the schedule. This way, we're guaranteed that if that DOES happen, that we'll exit the function with the item properly removed, as opposed to being run once and zombie scheduled again.
@@ -204,6 +205,16 @@ public CompletableFuture<T> flushCommitAndResetStream(boolean isFinal) throws IO | |||
return future; | |||
} | |||
|
|||
@Override | |||
public void cancelCaptureForCurrentRequest(Instant timestamp) throws IOException { | |||
beginSubstreamObservation(timestamp, TrafficObservation.REQUESTDROPPED_FIELD_NUMBER, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have any logic on the Replayer to handle this new observation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question - as of when you reviewed this, No. I've pushed a new commit.
I had gotten distracted yesterday when I had published it & had forgotten to walk through the unit tests to make sure that they were complete (they weren't & the broke the same way that the code broke).
}; | ||
} | ||
|
||
// This test doesn't work yet, but this is an optimization. Getting connections with only a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
@@ -204,6 +205,16 @@ public CompletableFuture<T> flushCommitAndResetStream(boolean isFinal) throws IO | |||
return future; | |||
} | |||
|
|||
@Override | |||
public void cancelCaptureForCurrentRequest(Instant timestamp) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly we will still write any packets up to the point that we can decipher the headers. At which case we will add this new observation to signify that the preceding observations can be ignored. It seems like ideally we wouldn't even write them to Kafka but do agree that is not necessarily an easy thing to do with our current logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct. If a caller wants to make sure that there's no trace, it's reasonable that they'll be able to send in a request with the filtering logic early enough (first packet). That said, we don't parse the headers as they come in, but rather once they've all arrived. Doing them as they come in would be a really nice to have.
However, there may be cases that are spread across time. We have two choices. Buffer and manage, or offload ASAP (keeping in mind that there's a lot of buffering through the stacks, just not super easy to retract). Not capturing is an optimization on the wire protocol, but at the expense of compute & memory for the proxy. Not replaying is the visible high-level requirement, which we'll meet by adding the tombstone. Trying to do more for what could be very rare cases, considering that we're going to be setup to log nearly all of the traffic anyway doesn't seem like an investment that would ever pay off. (as much as it pains me to NOT do that wire optimization)
Since the following observations after a Drop will be more reads from the next request, rather than writes, the state model in reconstruction needed some changes. The tests also needed to be improved to make sequences that were well-formed. The invalid ones that HAD been generated and were flawed in the same way that the accumulation logic was. Both effectively just ignored the DropRequest observation. Now neither does & changes have been made to each accordingly. Since there's a new observation, I had to update the list of seeds to get complete coverage. Signed-off-by: Greg Schohn <[email protected]>
… when the first observed trafficStream had a carry-over request that was ALSO going to be ignored (in a subsequent TrafficStream). There are some other changes included here because it wasn't clear WHAT the cause of the duplicates was. Now the test prints out the first characters (which are now always printable ascii) from each read/write observation and the callback compares that the result wasn't the same as the previous one (which was the issue before this patch). Signed-off-by: Greg Schohn <[email protected]>
…aProtobufConsumer references from a previous merge Signed-off-by: Greg Schohn <[email protected]> # Conflicts: # TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why it is showing me this file, I'm pretty sure this file doesn't exist on main already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wasn't deleted when the branch point had happened for another brach that I might have merged into this one. Either way, it had come back in and I had to delete it again here.
Description
Add the ability to drop captures, while still forwarding the contents, for requests when a specific header matches some regex.
Since captures are really on a connection (socket) basis and not a request one, there are additional complications.
Data is captured as it is received. We may not know that we want to drop packets early enough. To compensate and to give the traffic stream enough details to differentiate this occurrence from a bug where packets were dropped, a new "RequestIntentionallyDropped" TrafficObservation has been introduced. This observation will only be used when data for a request has been included. That means that if we get a packet that has the entire HTTP preamble (first line + headers) that matches the drop filter, we won't capture anything at all.
The --suppressCaptureForHeaderMatch argument has been added to the CaptureProxy. It expects two elements for a header name and a regex to match that header's value. When a request matches, it will be passed to the HTTP server, but will NOT be captured as per the above. Because the capture proxy only captures a handful of headers so that it can understand request boundaries, the predicate to scan additional headers must tweak the set of headers that the LoggingHttpRequestHandler's internal classes will preserve.
There was also a bugfix for over-committing KafkaRecords too early and a spurious log message has been disabled.
Issues Resolved
https://opensearch.atlassian.net/browse/MIGRATIONS-1444
Testing
./gradlew :dockerSolution:composeUp
... OSB --test script and doc count checkCheck List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.