From 44ec296a41da2fec2fa2cf3fe77c354fd65caacd Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Fri, 8 Dec 2023 10:06:07 -0500 Subject: [PATCH] More context on throws (#453) * Add more context to a number of warn/error log statements and tighten RequestKey.toString(). Also pass the request key through ParsedHttpMessagesAsDicts so that the request key is present in the (which I've just realized should be sending back optionals or sneaky throws & letting the callers do this conversion... next change). * Delint - Fix a bug in metrics logging where a fluent API's response was being dropped. This can create an error down the line if the underlying implementation changes, even if it works now. I've also removed a getValue() that didn't do anything. Signed-off-by: Greg Schohn --- .../coreutils/MetricsLogBuilder.java | 2 +- ...allyReliableLoggingHttpRequestHandler.java | 2 +- .../proxyserver/netty/FrontsideHandler.java | 2 +- ...edTrafficToHttpTransactionAccumulator.java | 8 +- .../replay/ClientConnectionPool.java | 1 - .../replay/ParsedHttpMessagesAsDicts.java | 36 ++- .../replay/RequestSenderOrchestrator.java | 5 +- .../migrations/replay/TrafficReplayer.java | 7 +- .../http/HttpJsonTransformingConsumer.java | 6 +- .../http/NettyJsonToByteBufHandler.java | 3 +- .../datatypes/PojoTrafficStreamKey.java | 13 +- .../replay/kafka/KafkaProtobufConsumer.java | 297 ++++++++++++++++++ .../kafka/KafkaTrafficCaptureSource.java | 2 +- .../TrafficStreamKeyWithKafkaRecordId.java | 12 +- .../replay/ParsedHttpMessagesAsDictsTest.java | 8 + .../kafka/KafkaTrafficCaptureSourceTest.java | 12 + 16 files changed, 380 insertions(+), 36 deletions(-) create mode 100644 TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumer.java diff --git a/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/coreutils/MetricsLogBuilder.java b/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/coreutils/MetricsLogBuilder.java index 0e18e6218..d5b2e8480 100644 --- a/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/coreutils/MetricsLogBuilder.java +++ b/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/coreutils/MetricsLogBuilder.java @@ -17,7 +17,7 @@ public MetricsLogBuilder(Logger logger) { } public MetricsLogBuilder setAttribute(MetricsAttributeKey key, Object value) { - loggingEventBuilder.addKeyValue(key.getKeyName(), value); + loggingEventBuilder = loggingEventBuilder.addKeyValue(key.getKeyName(), value); return this; } diff --git a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandler.java b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandler.java index 7b5161ed6..e155593f0 100644 --- a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandler.java +++ b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandler.java @@ -28,7 +28,7 @@ protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Ob // This is a spot where we would benefit from having a behavioral policy that different users // could set as needed. Some users may be fine with just logging a failed offloading of a request // where other users may want to stop entirely. JIRA here: https://opensearch.atlassian.net/browse/MIGRATIONS-1276 - log.warn("Got error: " + t.getMessage()); + log.atWarn().setCause(t).setMessage("Got error").log(); ReferenceCountUtil.release(msg); } else { try { diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/FrontsideHandler.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/FrontsideHandler.java index 14416838b..da0f8564a 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/FrontsideHandler.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/FrontsideHandler.java @@ -57,7 +57,7 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) { } }); outboundChannel.config().setAutoRead(true); - } else { // if the outbound channel has died, so be it... let this frontside finish with it's caller naturally + } else { // if the outbound channel has died, so be it... let this frontside finish with its call naturally log.warn("Output channel (" + outboundChannel + ") is NOT active"); ReferenceCountUtil.release(msg); } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java index dc5aca221..22ee2fb0a 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java @@ -185,7 +185,8 @@ public CONNECTION_STATUS addObservationToAccumulation(@NonNull Accumulation accu .or(() -> handleObservationForReadState(accum, observation, trafficStreamKey, timestamp)) .or(() -> handleObservationForWriteState(accum, observation, trafficStreamKey, timestamp)) .orElseGet(() -> { - log.atWarn().setMessage(()->"unaccounted for observation type " + observation).log(); + log.atWarn().setMessage(()->"unaccounted for observation type " + observation + + " for " + accum.trafficChannelKey).log(); return CONNECTION_STATUS.ALIVE; }); } @@ -380,8 +381,9 @@ private void fireAccumulationsCallbacksAndClose(Accumulation accumulation, // It might be advantageous to replicate these to provide stress to the target server, but // it's a difficult decision and one to be managed with a policy. // TODO - add Jira/github issue here. - log.warn("Terminating a TrafficStream reconstruction w/out an accumulated value, " + - "assuming an empty server interaction and NOT reproducing this to the target cluster."); + log.atWarn().setMessage("Terminating a TrafficStream reconstruction before data was accumulated " + + "for " + accumulation.trafficChannelKey + " assuming an empty server interaction and NOT " + + "reproducing this to the target cluster.").log(); if (accumulation.hasRrPair()) { listener.onTrafficStreamsExpired(status, Collections.unmodifiableList(accumulation.getRrPair().trafficStreamKeysBeingHeld)); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java index d647abb34..fe6b8a00c 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java @@ -190,7 +190,6 @@ public ConnectionReplaySession getCachedSession(ISourceTrafficChannelKey channel var schedule = channelAndFutureWork.schedule; while (channelAndFutureWork.hasWorkRemaining()) { var scheduledItemToKill = schedule.peekFirstItem(); - scheduledItemToKill.getValue(); schedule.removeFirstItem(); } }) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java index df2037eb6..a4d0c5b58 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java @@ -9,6 +9,7 @@ import org.opensearch.migrations.coreutils.MetricsEvent; import org.opensearch.migrations.coreutils.MetricsLogBuilder; import org.opensearch.migrations.coreutils.MetricsLogger; +import org.opensearch.migrations.replay.datatypes.UniqueReplayerRequestKey; import org.opensearch.migrations.replay.datatypes.UniqueSourceRequestKey; import java.time.Duration; @@ -46,7 +47,7 @@ public ParsedHttpMessagesAsDicts(SourceTargetCaptureTuple tuple) { protected ParsedHttpMessagesAsDicts(SourceTargetCaptureTuple tuple, Optional sourcePairOp) { - this(getSourceRequestOp(sourcePairOp), + this(getSourceRequestOp(tuple.uniqueRequestKey, sourcePairOp), getSourceResponseOp(tuple, sourcePairOp), getTargetRequestOp(tuple), getTargetResponseOp(tuple)); @@ -55,28 +56,30 @@ protected ParsedHttpMessagesAsDicts(SourceTargetCaptureTuple tuple, private static Optional> getTargetResponseOp(SourceTargetCaptureTuple tuple) { return Optional.ofNullable(tuple.targetResponseData) .filter(r -> !r.isEmpty()) - .map(d -> convertResponse(d, tuple.targetResponseDuration)); + .map(d -> convertResponse(tuple.uniqueRequestKey, d, tuple.targetResponseDuration)); } private static Optional> getTargetRequestOp(SourceTargetCaptureTuple tuple) { return Optional.ofNullable(tuple.targetRequestData) .map(d -> d.asByteArrayStream()) - .map(d -> convertRequest(d.collect(Collectors.toList()))); + .map(d -> convertRequest(tuple.uniqueRequestKey, d.collect(Collectors.toList()))); } - private static Optional> getSourceResponseOp(SourceTargetCaptureTuple tuple, Optional sourcePairOp) { + private static Optional> getSourceResponseOp(SourceTargetCaptureTuple tuple, + Optional sourcePairOp) { return sourcePairOp.flatMap(p -> Optional.ofNullable(p.responseData).flatMap(d -> Optional.ofNullable(d.packetBytes)) - .map(d -> convertResponse(d, + .map(d -> convertResponse(tuple.uniqueRequestKey, d, // TODO: These durations are not measuring the same values! Duration.between(tuple.sourcePair.requestData.getLastPacketTimestamp(), tuple.sourcePair.responseData.getLastPacketTimestamp())))); } - private static Optional> getSourceRequestOp(Optional sourcePairOp) { + private static Optional> getSourceRequestOp(@NonNull UniqueSourceRequestKey diagnosticKey, + Optional sourcePairOp) { return sourcePairOp.flatMap(p -> Optional.ofNullable(p.requestData).flatMap(d -> Optional.ofNullable(d.packetBytes)) - .map(d -> convertRequest(d))); + .map(d -> convertRequest(diagnosticKey, d))); } public ParsedHttpMessagesAsDicts(Optional> sourceRequestOp1, @@ -137,18 +140,22 @@ private static Map fillMap(LinkedHashMap map, return map; } - private static Map makeSafeMap(Callable> c) { + private static Map makeSafeMap(@NonNull UniqueSourceRequestKey diagnosticKey, + Callable> c) { try { return c.call(); } catch (Exception e) { - log.warn("Putting what may be a bogus value in the output because transforming it " + - "into json threw an exception"); + // TODO - this isn't a good design choice. + // We should follow through with the spirit of this class and leave this as empty optional values + log.atWarn().setMessage(()->"Putting what may be a bogus value in the output because transforming it " + + "into json threw an exception for "+diagnosticKey.toString()).setCause(e).log(); return Map.of("Exception", (Object) e.toString()); } } - private static Map convertRequest(@NonNull List data) { - return makeSafeMap(() -> { + private static Map convertRequest(@NonNull UniqueSourceRequestKey diagnosticKey, + @NonNull List data) { + return makeSafeMap(diagnosticKey, () -> { var map = new LinkedHashMap(); var message = HttpByteBufFormatter.parseHttpRequestFromBufs(byteToByteBufStream(data), true); map.put("Request-URI", message.uri()); @@ -158,8 +165,9 @@ private static Map convertRequest(@NonNull List data) { }); } - private static Map convertResponse(@NonNull List data, Duration latency) { - return makeSafeMap(() -> { + private static Map convertResponse(@NonNull UniqueSourceRequestKey diagnosticKey, + @NonNull List data, Duration latency) { + return makeSafeMap(diagnosticKey, () -> { var map = new LinkedHashMap(); var message = HttpByteBufFormatter.parseHttpResponseFromBufs(byteToByteBufStream(data), true); map.put("HTTP-Version", message.protocolVersion()); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java index df6851652..b19cc3f8e 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java @@ -160,7 +160,8 @@ private void scheduleOnConnectionReplaySession(ISourceTrafficChannelKey chan var sf = eventLoop.schedule(kvp.getValue(), getDelayFromNowMs(kvp.getKey()), TimeUnit.MILLISECONDS); sf.addListener(sfp->{ if (!sfp.isSuccess()) { - log.atWarn().setCause(sfp.cause()).setMessage(()->"Scheduled future was not successful").log(); + log.atWarn().setCause(sfp.cause()).setMessage(()->"Scheduled future was not successful for " + + channelInteraction).log(); } }); }); @@ -171,7 +172,7 @@ private void scheduleOnConnectionReplaySession(ISourceTrafficChannelKey chan eventLoop.schedule(task, getDelayFromNowMs(atTime), TimeUnit.MILLISECONDS); scheduledFuture.addListener(f->{ if (!f.isSuccess()) { - log.atError().setCause(f.cause()).setMessage(()->"Error scheduling task").log(); + log.atError().setCause(f.cause()).setMessage(()->"Error scheduling task for " + channelKey).log(); } else { log.atInfo().setMessage(()->"scheduled future has finished for "+channelInteraction).log(); } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java index c1cc0d8a7..9138b9595 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java @@ -519,7 +519,7 @@ void setupRunAndWaitForReplay(Duration observedPacketConnectionTimeout, } catch (InterruptedException ex) { throw ex; } catch (Exception e) { - log.warn("Terminating runReplay due to", e); + log.atWarn().setCause(e).setMessage("Terminating runReplay due to exception").log(); throw e; } finally { trafficToHttpTransactionAccumulator.close(); @@ -645,7 +645,8 @@ Void handleCompletedTransaction(@NonNull UniqueReplayerRequestKey requestKey, Re commitTrafficStreams(rrPair.trafficStreamKeysBeingHeld, rrPair.completionStatus); return null; } else { - log.atError().setCause(t).setMessage(()->"Throwable passed to handle(). Rethrowing.").log(); + log.atError().setCause(t).setMessage(()->"Throwable passed to handle() for " + requestKey + + ". Rethrowing.").log(); throw Lombok.sneakyThrow(t); } } catch (Error error) { @@ -922,7 +923,7 @@ private static String formatWorkItem(DiagnosticTrackableCompletableFuture shutdown(Error error) { - log.warn("Shutting down "+this+" because of "+error); + log.atWarn().setCause(error).setMessage(()->"Shutting down " + this + " because of error").log(); shutdownReasonRef.compareAndSet(null, error); if (!shutdownFutureRef.compareAndSet(null, new CompletableFuture<>())) { log.atError().setMessage(()->"Shutdown was already signaled by {}. " + diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumer.java index 8d17c8187..1121730df 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumer.java @@ -170,11 +170,7 @@ private static Throwable unwindPossibleCompletionException(Throwable t) { chunks.stream().collect( Utils.foldLeft(DiagnosticTrackableCompletableFuture.Factory. completedFuture(null, ()->"Initial value"), - (dcf, bb) -> dcf.thenCompose(v -> { - var rval = packetConsumer.consumeBytes(bb); - log.error("packetConsumer.consumeBytes()="+rval); - return rval; - }, + (dcf, bb) -> dcf.thenCompose(v -> packetConsumer.consumeBytes(bb), ()->"HttpJsonTransformingConsumer.redriveWithoutTransformation collect()"))); DiagnosticTrackableCompletableFuture finalizedFuture = consumptionChainedFuture.thenCompose(v -> packetConsumer.finalizeRequest(), diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonToByteBufHandler.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonToByteBufHandler.java index cf0f3eb07..0195b810b 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonToByteBufHandler.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonToByteBufHandler.java @@ -137,7 +137,8 @@ private void writeHeadersIntoByteBufs(ChannelHandlerContext ctx, return; } } catch (Exception e) { - log.warn("writing headers directly to chunks w/ sizes didn't work: "+e); + log.atWarn().setCause(e).setMessage(()->"writing headers directly to chunks w/ sizes didn't work for " + + httpJson).log(); } try (var baos = new ByteArrayOutputStream()) { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/PojoTrafficStreamKey.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/PojoTrafficStreamKey.java index da4ce3ee1..564b6dbf5 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/PojoTrafficStreamKey.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/PojoTrafficStreamKey.java @@ -1,11 +1,11 @@ package org.opensearch.migrations.replay.datatypes; +import java.util.StringJoiner; + import lombok.EqualsAndHashCode; -import lombok.ToString; import org.opensearch.migrations.trafficcapture.protos.TrafficStream; import org.opensearch.migrations.trafficcapture.protos.TrafficStreamUtils; -@ToString @EqualsAndHashCode() public class PojoTrafficStreamKey implements ITrafficStreamKey { private final String nodeId; @@ -36,4 +36,13 @@ public String getConnectionId() { public int getTrafficStreamIndex() { return trafficStreamIndex; } + + @Override + public String toString() { + return new StringJoiner(".") + .add(nodeId) + .add(connectionId) + .add(""+trafficStreamIndex) + .toString(); + } } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumer.java new file mode 100644 index 000000000..0c00ef168 --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumer.java @@ -0,0 +1,297 @@ +package org.opensearch.migrations.replay.kafka; + +import com.google.protobuf.InvalidProtocolBufferException; +import lombok.NonNull; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.opensearch.migrations.coreutils.MetricsAttributeKey; +import org.opensearch.migrations.coreutils.MetricsEvent; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.opensearch.migrations.coreutils.MetricsLogger; +import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey; +import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamKey; +import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamWithKey; +import org.opensearch.migrations.replay.traffic.source.ISimpleTrafficCaptureSource; +import org.opensearch.migrations.replay.traffic.source.ITrafficStreamWithKey; +import org.opensearch.migrations.trafficcapture.protos.TrafficStream; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.PriorityQueue; +import java.util.Properties; +import java.util.StringJoiner; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * Adapt a Kafka stream into a TrafficCaptureSource. + * + * Notice that there's a critical gap between how Kafka accepts commits and how the + * BlockingTrafficSource throttles calls to Kafka. The BlockingTrafficSource may + * block calls to readNextTrafficStreamChunk() until some time window elapses. This + * could be a very large window in cases where there were long gaps between recorded + * requests from the capturing proxy. For example, if a TrafficStream is read and if + * that stream is scheduled to be run one hour later, readNextTrafficStreamChunk() + * may not be called for almost an hour. By design, we're not calling Kafka to pull + * any more messages since we know that we don't have work to do for an hour. Shortly + * after the hour of waiting begins, Kakfa will notice that this application is no + * longer calling poll and will kick the consumer out of the client group. Other + * consumers may connect, though they'll also be kicked out of the group shortly. + * + * See + * ... + * + * "Basically if you don't call poll at least as frequently as the configured max interval, + * then the client will proactively leave the group so that another consumer can take + * over its partitions. When this happens, you may see an offset commit failure (as + * indicated by a CommitFailedException thrown from a call to commitSync())." + * + * I believe that this can be mitigated, hopefully fully, by adding a keepAlive/do nothing + * call that the BlockingTrafficSource can use. That can be implemented in a source + * like this with Kafka by polling, then resetting the position on the stream if we + * aren't supposed to be reading new data. + */ +@Slf4j +public class KafkaProtobufConsumer implements ISimpleTrafficCaptureSource { + + static class TrafficStreamKeyWithKafkaRecordId extends PojoTrafficStreamKey { + private final int partition; + private final long offset; + + TrafficStreamKeyWithKafkaRecordId(TrafficStream trafficStream, int partition, long offset) { + super(trafficStream); + this.partition = partition; + this.offset = offset; + } + + @Override + public String toString() { + return new StringJoiner("|") + .add(super.toString()) + .add("partition=" + partition) + .add("offset=" + offset) + .toString(); + } + } + + private static class OffsetLifecycleTracker { + private final PriorityQueue pQueue = new PriorityQueue<>(); + private long cursorHighWatermark; + + private OffsetLifecycleTracker() { + } + + boolean isEmpty() { + return pQueue.isEmpty(); + } + + void add(long offset) { + cursorHighWatermark = offset; + pQueue.add(offset); + } + + Optional removeAndReturnNewHead(TrafficStreamKeyWithKafkaRecordId kafkaRecord) { + var offsetToRemove = kafkaRecord.offset; + var topCursor = pQueue.peek(); + var didRemove = pQueue.remove(offsetToRemove); + assert didRemove : "Expected all live records to have an entry and for them to be removed only once"; + if (topCursor == offsetToRemove) { + topCursor = Optional.ofNullable(pQueue.peek()) + .orElse(cursorHighWatermark+1); // most recent cursor was previously popped + log.atDebug().setMessage("Commit called for {}, and new topCursor={}") + .addArgument(offsetToRemove).addArgument(topCursor).log(); + return Optional.of(topCursor); + } else { + log.atDebug().setMessage("Commit called for {}, but topCursor={}") + .addArgument(offsetToRemove).addArgument(topCursor).log(); + return Optional.empty(); + } + } + } + + private static final MetricsLogger metricsLogger = new MetricsLogger("KafkaProtobufConsumer"); + + public static final Duration CONSUMER_POLL_TIMEOUT = Duration.ofSeconds(1); + + private final Consumer kafkaConsumer; + private final ConcurrentHashMap partitionToOffsetLifecycleTrackerMap; + private final ConcurrentHashMap nextSetOfCommitsMap; + private final Object offsetLifecycleLock = new Object(); + private final String topic; + private final KafkaBehavioralPolicy behavioralPolicy; + private final AtomicInteger trafficStreamsRead; + + public KafkaProtobufConsumer(Consumer kafkaConsumer, String topic) { + this(kafkaConsumer, topic, new KafkaBehavioralPolicy()); + } + + public KafkaProtobufConsumer(Consumer kafkaConsumer, @NonNull String topic, + KafkaBehavioralPolicy behavioralPolicy) { + this.kafkaConsumer = kafkaConsumer; + this.topic = topic; + this.behavioralPolicy = behavioralPolicy; + kafkaConsumer.subscribe(Collections.singleton(topic)); + trafficStreamsRead = new AtomicInteger(); + + partitionToOffsetLifecycleTrackerMap = new ConcurrentHashMap<>(); + nextSetOfCommitsMap = new ConcurrentHashMap<>(); + } + + public static KafkaProtobufConsumer buildKafkaConsumer(@NonNull String brokers, + @NonNull String topic, + @NonNull String groupId, + boolean enableMSKAuth, + String propertyFilePath, + KafkaBehavioralPolicy behavioralPolicy) throws IOException { + var kafkaProps = buildKafkaProperties(brokers, groupId, enableMSKAuth, propertyFilePath); + return new KafkaProtobufConsumer(new KafkaConsumer<>(kafkaProps), topic, behavioralPolicy); + } + + public static Properties buildKafkaProperties(@NonNull String brokers, + @NonNull String groupId, + boolean enableMSKAuth, + String propertyFilePath) throws IOException { + var kafkaProps = new Properties(); + kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + if (propertyFilePath != null) { + try (InputStream input = new FileInputStream(propertyFilePath)) { + kafkaProps.load(input); + } catch (IOException ex) { + log.error("Unable to load properties from kafka properties file with path: {}", propertyFilePath); + throw ex; + } + } + // Required for using SASL auth with MSK public endpoint + if (enableMSKAuth) { + kafkaProps.setProperty("security.protocol", "SASL_SSL"); + kafkaProps.setProperty("sasl.mechanism", "AWS_MSK_IAM"); + kafkaProps.setProperty("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;"); + kafkaProps.setProperty("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler"); + } + kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + return kafkaProps; + } + + @Override + @SuppressWarnings("unchecked") + public CompletableFuture> readNextTrafficStreamChunk() { + return CompletableFuture.supplyAsync(this::readNextTrafficStreamSynchronously); + } + + public List readNextTrafficStreamSynchronously() { + try { + ConsumerRecords records; + records = safeCommitAndPollWithSwallowedRuntimeExceptions(); + Stream trafficStream = StreamSupport.stream(records.spliterator(), false) + .map(kafkaRecord -> { + try { + TrafficStream ts = TrafficStream.parseFrom(kafkaRecord.value()); + // Ensure we increment trafficStreamsRead even at a higher log level + metricsLogger.atSuccess(MetricsEvent.PARSED_TRAFFIC_STREAM_FROM_KAFKA) + .setAttribute(MetricsAttributeKey.CONNECTION_ID, ts.getConnectionId()) + .setAttribute(MetricsAttributeKey.TOPIC_NAME, this.topic) + .setAttribute(MetricsAttributeKey.SIZE_IN_BYTES, ts.getSerializedSize()).emit(); + addOffset(kafkaRecord.partition(), kafkaRecord.offset()); + var key = new TrafficStreamKeyWithKafkaRecordId(ts, kafkaRecord.partition(), kafkaRecord.offset()); + log.atTrace().setMessage(()->"Parsed traffic stream #{}: {} {}") + .addArgument(trafficStreamsRead.incrementAndGet()) + .addArgument(key) + .addArgument(ts) + .log(); + return (ITrafficStreamWithKey) new PojoTrafficStreamWithKey(ts, key); + } catch (InvalidProtocolBufferException e) { + RuntimeException recordError = behavioralPolicy.onInvalidKafkaRecord(kafkaRecord, e); + metricsLogger.atError(MetricsEvent.PARSING_TRAFFIC_STREAM_FROM_KAFKA_FAILED, recordError) + .setAttribute(MetricsAttributeKey.TOPIC_NAME, this.topic).emit(); + if (recordError != null) { + throw recordError; + } + return null; + } + }).filter(Objects::nonNull); + return trafficStream.collect(Collectors.toList()); + } catch (Exception e) { + log.atError().setCause(e).setMessage("Terminating Kafka traffic stream").log(); + throw e; + } + } + + private ConsumerRecords safeCommitAndPollWithSwallowedRuntimeExceptions() { + try { + synchronized (offsetLifecycleLock) { + if (!nextSetOfCommitsMap.isEmpty()) { + log.atDebug().setMessage(()->"Committing "+nextSetOfCommitsMap).log(); + kafkaConsumer.commitSync(nextSetOfCommitsMap); + log.atDebug().setMessage(()->"Done committing "+nextSetOfCommitsMap).log(); + nextSetOfCommitsMap.clear(); + } + } + + var records = kafkaConsumer.poll(CONSUMER_POLL_TIMEOUT); + log.atInfo().setMessage(()->"Kafka consumer poll has fetched "+records.count()+" records").log(); + log.atDebug().setMessage(()->"All positions: {"+kafkaConsumer.assignment().stream() + .map(tp->tp+": "+kafkaConsumer.position(tp)).collect(Collectors.joining(",")) + "}").log(); + log.atDebug().setMessage(()->"All COMMITTED positions: {"+kafkaConsumer.assignment().stream() + .map(tp->tp+": "+kafkaConsumer.committed(tp)).collect(Collectors.joining(",")) + "}").log(); + return records; + } catch (RuntimeException e) { + log.atWarn().setCause(e).setMessage("Unable to poll the topic: {} with our Kafka consumer. " + + "Swallowing and awaiting next metadata refresh to try again.").addArgument(topic).log(); + return new ConsumerRecords<>(Collections.emptyMap()); + } + } + + private void addOffset(int partition, long offset) { + synchronized (offsetLifecycleLock) { + var offsetTracker = partitionToOffsetLifecycleTrackerMap.computeIfAbsent(partition, p -> + new OffsetLifecycleTracker()); + offsetTracker.add(offset); + } + } + + @Override + public void commitTrafficStream(ITrafficStreamKey trafficStreamKey) { + if (!(trafficStreamKey instanceof TrafficStreamKeyWithKafkaRecordId)) { + throw new IllegalArgumentException("Expected key of type "+TrafficStreamKeyWithKafkaRecordId.class+ + " but received "+trafficStreamKey+" (of type="+trafficStreamKey.getClass()+")"); + } + var kafkaTsk = (TrafficStreamKeyWithKafkaRecordId) trafficStreamKey; + var p = kafkaTsk.partition; + Optional newHeadValue; + synchronized (offsetLifecycleLock) { + var tracker = partitionToOffsetLifecycleTrackerMap.get(p); + newHeadValue = tracker.removeAndReturnNewHead(kafkaTsk); + newHeadValue.ifPresent(o -> { + if (tracker.isEmpty()) { + partitionToOffsetLifecycleTrackerMap.remove(p); + } + nextSetOfCommitsMap.put(new TopicPartition(topic, p), new OffsetAndMetadata(o)); + }); + } + } + + @Override + public void close() throws IOException { + kafkaConsumer.close(); + log.info("Kafka consumer closed successfully."); + } +} diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSource.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSource.java index 4d6de51df..a195d553c 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSource.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSource.java @@ -39,7 +39,7 @@ * BlockingTrafficSource throttles calls to Kafka. The BlockingTrafficSource may * block calls to readNextTrafficStreamChunk() until some time window elapses. This * could be a very large window in cases where there were long gaps between recorded - * requests from the capturing proxy. For example, if a TrafficStream is read and it + * requests from the capturing proxy. For example, if a TrafficStream is read and if * that stream is scheduled to be run one hour later, readNextTrafficStreamChunk() * may not be called for almost an hour. By design, we're not calling Kafka to pull * any more messages since we know that we don't have work to do for an hour. Shortly diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrafficStreamKeyWithKafkaRecordId.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrafficStreamKeyWithKafkaRecordId.java index aa86e96c7..4a19942f5 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrafficStreamKeyWithKafkaRecordId.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrafficStreamKeyWithKafkaRecordId.java @@ -6,7 +6,8 @@ import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamKey; import org.opensearch.migrations.trafficcapture.protos.TrafficStream; -@ToString(callSuper = true) +import java.util.StringJoiner; + @EqualsAndHashCode(callSuper = true) @Getter class TrafficStreamKeyWithKafkaRecordId extends PojoTrafficStreamKey implements KafkaCommitOffsetData { @@ -24,4 +25,13 @@ class TrafficStreamKeyWithKafkaRecordId extends PojoTrafficStreamKey implements this.partition = partition; this.offset = offset; } + + @Override + public String toString() { + return new StringJoiner("|") + .add(super.toString()) + .add("partition=" + partition) + .add("offset=" + offset) + .toString(); + } } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDictsTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDictsTest.java index fa5c34445..8025f52e0 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDictsTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDictsTest.java @@ -74,4 +74,12 @@ public void testMetricsAreRightWhenMissing() { Map.of("Status-Code", Integer.valueOf(404)))); Assertions.assertEquals("REQUEST_ID:C.0|SOURCE_HTTP_STATUS:200|TARGET_HTTP_STATUS:404|HTTP_STATUS_MATCH:0", loggedMetrics); } + + @Test + public void testMetricsAreRightWithMissingStatusCode() { + var loggedMetrics = getLoggedMetrics(makeTestData( + Map.of("Sorry", "exception message..."), + Map.of("Status-Code", Integer.valueOf(404)))); + Assertions.assertEquals("REQUEST_ID:C.0|TARGET_HTTP_STATUS:404|HTTP_STATUS_MATCH:0", loggedMetrics); + } } \ No newline at end of file diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java index 794da7768..ce9d066ee 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java @@ -10,6 +10,7 @@ import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamKey; import org.opensearch.migrations.replay.traffic.source.ITrafficStreamWithKey; import org.opensearch.migrations.trafficcapture.protos.ReadObservation; import org.opensearch.migrations.trafficcapture.protos.TrafficObservation; @@ -35,6 +36,17 @@ class KafkaTrafficCaptureSourceTest { public static final int NUM_READ_ITEMS_BOUND = 1000; public static final String TEST_TOPIC_NAME = "TEST_TOPIC_NAME"; + @Test + public void testRecordToString() { + var ts = TrafficStream.newBuilder() + .setConnectionId("c") + .setNodeId("n") + .setNumber(7) + .build(); + var tsk = new KafkaProtobufConsumer.TrafficStreamKeyWithKafkaRecordId(ts, 2,123); + Assertions.assertEquals("n.c.7|partition=2|offset=123", tsk.toString()); + } + @Test public void testSupplyTrafficFromSource() { int numTrafficStreams = 10;