diff --git a/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/coreutils/MetricsLogBuilder.java b/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/coreutils/MetricsLogBuilder.java index 0e18e6218..d5b2e8480 100644 --- a/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/coreutils/MetricsLogBuilder.java +++ b/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/coreutils/MetricsLogBuilder.java @@ -17,7 +17,7 @@ public MetricsLogBuilder(Logger logger) { } public MetricsLogBuilder setAttribute(MetricsAttributeKey key, Object value) { - loggingEventBuilder.addKeyValue(key.getKeyName(), value); + loggingEventBuilder = loggingEventBuilder.addKeyValue(key.getKeyName(), value); return this; } diff --git a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandler.java b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandler.java index 7b5161ed6..e155593f0 100644 --- a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandler.java +++ b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandler.java @@ -28,7 +28,7 @@ protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Ob // This is a spot where we would benefit from having a behavioral policy that different users // could set as needed. Some users may be fine with just logging a failed offloading of a request // where other users may want to stop entirely. JIRA here: https://opensearch.atlassian.net/browse/MIGRATIONS-1276 - log.warn("Got error: " + t.getMessage()); + log.atWarn().setCause(t).setMessage("Got error").log(); ReferenceCountUtil.release(msg); } else { try { diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/FrontsideHandler.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/FrontsideHandler.java index 14416838b..da0f8564a 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/FrontsideHandler.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/FrontsideHandler.java @@ -57,7 +57,7 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) { } }); outboundChannel.config().setAutoRead(true); - } else { // if the outbound channel has died, so be it... let this frontside finish with it's caller naturally + } else { // if the outbound channel has died, so be it... let this frontside finish with its call naturally log.warn("Output channel (" + outboundChannel + ") is NOT active"); ReferenceCountUtil.release(msg); } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java index dc5aca221..22ee2fb0a 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java @@ -185,7 +185,8 @@ public CONNECTION_STATUS addObservationToAccumulation(@NonNull Accumulation accu .or(() -> handleObservationForReadState(accum, observation, trafficStreamKey, timestamp)) .or(() -> handleObservationForWriteState(accum, observation, trafficStreamKey, timestamp)) .orElseGet(() -> { - log.atWarn().setMessage(()->"unaccounted for observation type " + observation).log(); + log.atWarn().setMessage(()->"unaccounted for observation type " + observation + + " for " + accum.trafficChannelKey).log(); return CONNECTION_STATUS.ALIVE; }); } @@ -380,8 +381,9 @@ private void fireAccumulationsCallbacksAndClose(Accumulation accumulation, // It might be advantageous to replicate these to provide stress to the target server, but // it's a difficult decision and one to be managed with a policy. // TODO - add Jira/github issue here. - log.warn("Terminating a TrafficStream reconstruction w/out an accumulated value, " + - "assuming an empty server interaction and NOT reproducing this to the target cluster."); + log.atWarn().setMessage("Terminating a TrafficStream reconstruction before data was accumulated " + + "for " + accumulation.trafficChannelKey + " assuming an empty server interaction and NOT " + + "reproducing this to the target cluster.").log(); if (accumulation.hasRrPair()) { listener.onTrafficStreamsExpired(status, Collections.unmodifiableList(accumulation.getRrPair().trafficStreamKeysBeingHeld)); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java index d647abb34..fe6b8a00c 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java @@ -190,7 +190,6 @@ public ConnectionReplaySession getCachedSession(ISourceTrafficChannelKey channel var schedule = channelAndFutureWork.schedule; while (channelAndFutureWork.hasWorkRemaining()) { var scheduledItemToKill = schedule.peekFirstItem(); - scheduledItemToKill.getValue(); schedule.removeFirstItem(); } }) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java index df2037eb6..a4d0c5b58 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java @@ -9,6 +9,7 @@ import org.opensearch.migrations.coreutils.MetricsEvent; import org.opensearch.migrations.coreutils.MetricsLogBuilder; import org.opensearch.migrations.coreutils.MetricsLogger; +import org.opensearch.migrations.replay.datatypes.UniqueReplayerRequestKey; import org.opensearch.migrations.replay.datatypes.UniqueSourceRequestKey; import java.time.Duration; @@ -46,7 +47,7 @@ public ParsedHttpMessagesAsDicts(SourceTargetCaptureTuple tuple) { protected ParsedHttpMessagesAsDicts(SourceTargetCaptureTuple tuple, Optional sourcePairOp) { - this(getSourceRequestOp(sourcePairOp), + this(getSourceRequestOp(tuple.uniqueRequestKey, sourcePairOp), getSourceResponseOp(tuple, sourcePairOp), getTargetRequestOp(tuple), getTargetResponseOp(tuple)); @@ -55,28 +56,30 @@ protected ParsedHttpMessagesAsDicts(SourceTargetCaptureTuple tuple, private static Optional> getTargetResponseOp(SourceTargetCaptureTuple tuple) { return Optional.ofNullable(tuple.targetResponseData) .filter(r -> !r.isEmpty()) - .map(d -> convertResponse(d, tuple.targetResponseDuration)); + .map(d -> convertResponse(tuple.uniqueRequestKey, d, tuple.targetResponseDuration)); } private static Optional> getTargetRequestOp(SourceTargetCaptureTuple tuple) { return Optional.ofNullable(tuple.targetRequestData) .map(d -> d.asByteArrayStream()) - .map(d -> convertRequest(d.collect(Collectors.toList()))); + .map(d -> convertRequest(tuple.uniqueRequestKey, d.collect(Collectors.toList()))); } - private static Optional> getSourceResponseOp(SourceTargetCaptureTuple tuple, Optional sourcePairOp) { + private static Optional> getSourceResponseOp(SourceTargetCaptureTuple tuple, + Optional sourcePairOp) { return sourcePairOp.flatMap(p -> Optional.ofNullable(p.responseData).flatMap(d -> Optional.ofNullable(d.packetBytes)) - .map(d -> convertResponse(d, + .map(d -> convertResponse(tuple.uniqueRequestKey, d, // TODO: These durations are not measuring the same values! Duration.between(tuple.sourcePair.requestData.getLastPacketTimestamp(), tuple.sourcePair.responseData.getLastPacketTimestamp())))); } - private static Optional> getSourceRequestOp(Optional sourcePairOp) { + private static Optional> getSourceRequestOp(@NonNull UniqueSourceRequestKey diagnosticKey, + Optional sourcePairOp) { return sourcePairOp.flatMap(p -> Optional.ofNullable(p.requestData).flatMap(d -> Optional.ofNullable(d.packetBytes)) - .map(d -> convertRequest(d))); + .map(d -> convertRequest(diagnosticKey, d))); } public ParsedHttpMessagesAsDicts(Optional> sourceRequestOp1, @@ -137,18 +140,22 @@ private static Map fillMap(LinkedHashMap map, return map; } - private static Map makeSafeMap(Callable> c) { + private static Map makeSafeMap(@NonNull UniqueSourceRequestKey diagnosticKey, + Callable> c) { try { return c.call(); } catch (Exception e) { - log.warn("Putting what may be a bogus value in the output because transforming it " + - "into json threw an exception"); + // TODO - this isn't a good design choice. + // We should follow through with the spirit of this class and leave this as empty optional values + log.atWarn().setMessage(()->"Putting what may be a bogus value in the output because transforming it " + + "into json threw an exception for "+diagnosticKey.toString()).setCause(e).log(); return Map.of("Exception", (Object) e.toString()); } } - private static Map convertRequest(@NonNull List data) { - return makeSafeMap(() -> { + private static Map convertRequest(@NonNull UniqueSourceRequestKey diagnosticKey, + @NonNull List data) { + return makeSafeMap(diagnosticKey, () -> { var map = new LinkedHashMap(); var message = HttpByteBufFormatter.parseHttpRequestFromBufs(byteToByteBufStream(data), true); map.put("Request-URI", message.uri()); @@ -158,8 +165,9 @@ private static Map convertRequest(@NonNull List data) { }); } - private static Map convertResponse(@NonNull List data, Duration latency) { - return makeSafeMap(() -> { + private static Map convertResponse(@NonNull UniqueSourceRequestKey diagnosticKey, + @NonNull List data, Duration latency) { + return makeSafeMap(diagnosticKey, () -> { var map = new LinkedHashMap(); var message = HttpByteBufFormatter.parseHttpResponseFromBufs(byteToByteBufStream(data), true); map.put("HTTP-Version", message.protocolVersion()); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java index df6851652..b19cc3f8e 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java @@ -160,7 +160,8 @@ private void scheduleOnConnectionReplaySession(ISourceTrafficChannelKey chan var sf = eventLoop.schedule(kvp.getValue(), getDelayFromNowMs(kvp.getKey()), TimeUnit.MILLISECONDS); sf.addListener(sfp->{ if (!sfp.isSuccess()) { - log.atWarn().setCause(sfp.cause()).setMessage(()->"Scheduled future was not successful").log(); + log.atWarn().setCause(sfp.cause()).setMessage(()->"Scheduled future was not successful for " + + channelInteraction).log(); } }); }); @@ -171,7 +172,7 @@ private void scheduleOnConnectionReplaySession(ISourceTrafficChannelKey chan eventLoop.schedule(task, getDelayFromNowMs(atTime), TimeUnit.MILLISECONDS); scheduledFuture.addListener(f->{ if (!f.isSuccess()) { - log.atError().setCause(f.cause()).setMessage(()->"Error scheduling task").log(); + log.atError().setCause(f.cause()).setMessage(()->"Error scheduling task for " + channelKey).log(); } else { log.atInfo().setMessage(()->"scheduled future has finished for "+channelInteraction).log(); } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java index c1cc0d8a7..9138b9595 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java @@ -519,7 +519,7 @@ void setupRunAndWaitForReplay(Duration observedPacketConnectionTimeout, } catch (InterruptedException ex) { throw ex; } catch (Exception e) { - log.warn("Terminating runReplay due to", e); + log.atWarn().setCause(e).setMessage("Terminating runReplay due to exception").log(); throw e; } finally { trafficToHttpTransactionAccumulator.close(); @@ -645,7 +645,8 @@ Void handleCompletedTransaction(@NonNull UniqueReplayerRequestKey requestKey, Re commitTrafficStreams(rrPair.trafficStreamKeysBeingHeld, rrPair.completionStatus); return null; } else { - log.atError().setCause(t).setMessage(()->"Throwable passed to handle(). Rethrowing.").log(); + log.atError().setCause(t).setMessage(()->"Throwable passed to handle() for " + requestKey + + ". Rethrowing.").log(); throw Lombok.sneakyThrow(t); } } catch (Error error) { @@ -922,7 +923,7 @@ private static String formatWorkItem(DiagnosticTrackableCompletableFuture shutdown(Error error) { - log.warn("Shutting down "+this+" because of "+error); + log.atWarn().setCause(error).setMessage(()->"Shutting down " + this + " because of error").log(); shutdownReasonRef.compareAndSet(null, error); if (!shutdownFutureRef.compareAndSet(null, new CompletableFuture<>())) { log.atError().setMessage(()->"Shutdown was already signaled by {}. " + diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumer.java index 8d17c8187..1121730df 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumer.java @@ -170,11 +170,7 @@ private static Throwable unwindPossibleCompletionException(Throwable t) { chunks.stream().collect( Utils.foldLeft(DiagnosticTrackableCompletableFuture.Factory. completedFuture(null, ()->"Initial value"), - (dcf, bb) -> dcf.thenCompose(v -> { - var rval = packetConsumer.consumeBytes(bb); - log.error("packetConsumer.consumeBytes()="+rval); - return rval; - }, + (dcf, bb) -> dcf.thenCompose(v -> packetConsumer.consumeBytes(bb), ()->"HttpJsonTransformingConsumer.redriveWithoutTransformation collect()"))); DiagnosticTrackableCompletableFuture finalizedFuture = consumptionChainedFuture.thenCompose(v -> packetConsumer.finalizeRequest(), diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonToByteBufHandler.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonToByteBufHandler.java index cf0f3eb07..0195b810b 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonToByteBufHandler.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonToByteBufHandler.java @@ -137,7 +137,8 @@ private void writeHeadersIntoByteBufs(ChannelHandlerContext ctx, return; } } catch (Exception e) { - log.warn("writing headers directly to chunks w/ sizes didn't work: "+e); + log.atWarn().setCause(e).setMessage(()->"writing headers directly to chunks w/ sizes didn't work for " + + httpJson).log(); } try (var baos = new ByteArrayOutputStream()) { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/PojoTrafficStreamKey.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/PojoTrafficStreamKey.java index da4ce3ee1..564b6dbf5 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/PojoTrafficStreamKey.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/PojoTrafficStreamKey.java @@ -1,11 +1,11 @@ package org.opensearch.migrations.replay.datatypes; +import java.util.StringJoiner; + import lombok.EqualsAndHashCode; -import lombok.ToString; import org.opensearch.migrations.trafficcapture.protos.TrafficStream; import org.opensearch.migrations.trafficcapture.protos.TrafficStreamUtils; -@ToString @EqualsAndHashCode() public class PojoTrafficStreamKey implements ITrafficStreamKey { private final String nodeId; @@ -36,4 +36,13 @@ public String getConnectionId() { public int getTrafficStreamIndex() { return trafficStreamIndex; } + + @Override + public String toString() { + return new StringJoiner(".") + .add(nodeId) + .add(connectionId) + .add(""+trafficStreamIndex) + .toString(); + } } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumer.java new file mode 100644 index 000000000..0c00ef168 --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumer.java @@ -0,0 +1,297 @@ +package org.opensearch.migrations.replay.kafka; + +import com.google.protobuf.InvalidProtocolBufferException; +import lombok.NonNull; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.opensearch.migrations.coreutils.MetricsAttributeKey; +import org.opensearch.migrations.coreutils.MetricsEvent; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.opensearch.migrations.coreutils.MetricsLogger; +import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey; +import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamKey; +import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamWithKey; +import org.opensearch.migrations.replay.traffic.source.ISimpleTrafficCaptureSource; +import org.opensearch.migrations.replay.traffic.source.ITrafficStreamWithKey; +import org.opensearch.migrations.trafficcapture.protos.TrafficStream; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.PriorityQueue; +import java.util.Properties; +import java.util.StringJoiner; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * Adapt a Kafka stream into a TrafficCaptureSource. + * + * Notice that there's a critical gap between how Kafka accepts commits and how the + * BlockingTrafficSource throttles calls to Kafka. The BlockingTrafficSource may + * block calls to readNextTrafficStreamChunk() until some time window elapses. This + * could be a very large window in cases where there were long gaps between recorded + * requests from the capturing proxy. For example, if a TrafficStream is read and if + * that stream is scheduled to be run one hour later, readNextTrafficStreamChunk() + * may not be called for almost an hour. By design, we're not calling Kafka to pull + * any more messages since we know that we don't have work to do for an hour. Shortly + * after the hour of waiting begins, Kakfa will notice that this application is no + * longer calling poll and will kick the consumer out of the client group. Other + * consumers may connect, though they'll also be kicked out of the group shortly. + * + * See + * ... + * + * "Basically if you don't call poll at least as frequently as the configured max interval, + * then the client will proactively leave the group so that another consumer can take + * over its partitions. When this happens, you may see an offset commit failure (as + * indicated by a CommitFailedException thrown from a call to commitSync())." + * + * I believe that this can be mitigated, hopefully fully, by adding a keepAlive/do nothing + * call that the BlockingTrafficSource can use. That can be implemented in a source + * like this with Kafka by polling, then resetting the position on the stream if we + * aren't supposed to be reading new data. + */ +@Slf4j +public class KafkaProtobufConsumer implements ISimpleTrafficCaptureSource { + + static class TrafficStreamKeyWithKafkaRecordId extends PojoTrafficStreamKey { + private final int partition; + private final long offset; + + TrafficStreamKeyWithKafkaRecordId(TrafficStream trafficStream, int partition, long offset) { + super(trafficStream); + this.partition = partition; + this.offset = offset; + } + + @Override + public String toString() { + return new StringJoiner("|") + .add(super.toString()) + .add("partition=" + partition) + .add("offset=" + offset) + .toString(); + } + } + + private static class OffsetLifecycleTracker { + private final PriorityQueue pQueue = new PriorityQueue<>(); + private long cursorHighWatermark; + + private OffsetLifecycleTracker() { + } + + boolean isEmpty() { + return pQueue.isEmpty(); + } + + void add(long offset) { + cursorHighWatermark = offset; + pQueue.add(offset); + } + + Optional removeAndReturnNewHead(TrafficStreamKeyWithKafkaRecordId kafkaRecord) { + var offsetToRemove = kafkaRecord.offset; + var topCursor = pQueue.peek(); + var didRemove = pQueue.remove(offsetToRemove); + assert didRemove : "Expected all live records to have an entry and for them to be removed only once"; + if (topCursor == offsetToRemove) { + topCursor = Optional.ofNullable(pQueue.peek()) + .orElse(cursorHighWatermark+1); // most recent cursor was previously popped + log.atDebug().setMessage("Commit called for {}, and new topCursor={}") + .addArgument(offsetToRemove).addArgument(topCursor).log(); + return Optional.of(topCursor); + } else { + log.atDebug().setMessage("Commit called for {}, but topCursor={}") + .addArgument(offsetToRemove).addArgument(topCursor).log(); + return Optional.empty(); + } + } + } + + private static final MetricsLogger metricsLogger = new MetricsLogger("KafkaProtobufConsumer"); + + public static final Duration CONSUMER_POLL_TIMEOUT = Duration.ofSeconds(1); + + private final Consumer kafkaConsumer; + private final ConcurrentHashMap partitionToOffsetLifecycleTrackerMap; + private final ConcurrentHashMap nextSetOfCommitsMap; + private final Object offsetLifecycleLock = new Object(); + private final String topic; + private final KafkaBehavioralPolicy behavioralPolicy; + private final AtomicInteger trafficStreamsRead; + + public KafkaProtobufConsumer(Consumer kafkaConsumer, String topic) { + this(kafkaConsumer, topic, new KafkaBehavioralPolicy()); + } + + public KafkaProtobufConsumer(Consumer kafkaConsumer, @NonNull String topic, + KafkaBehavioralPolicy behavioralPolicy) { + this.kafkaConsumer = kafkaConsumer; + this.topic = topic; + this.behavioralPolicy = behavioralPolicy; + kafkaConsumer.subscribe(Collections.singleton(topic)); + trafficStreamsRead = new AtomicInteger(); + + partitionToOffsetLifecycleTrackerMap = new ConcurrentHashMap<>(); + nextSetOfCommitsMap = new ConcurrentHashMap<>(); + } + + public static KafkaProtobufConsumer buildKafkaConsumer(@NonNull String brokers, + @NonNull String topic, + @NonNull String groupId, + boolean enableMSKAuth, + String propertyFilePath, + KafkaBehavioralPolicy behavioralPolicy) throws IOException { + var kafkaProps = buildKafkaProperties(brokers, groupId, enableMSKAuth, propertyFilePath); + return new KafkaProtobufConsumer(new KafkaConsumer<>(kafkaProps), topic, behavioralPolicy); + } + + public static Properties buildKafkaProperties(@NonNull String brokers, + @NonNull String groupId, + boolean enableMSKAuth, + String propertyFilePath) throws IOException { + var kafkaProps = new Properties(); + kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + if (propertyFilePath != null) { + try (InputStream input = new FileInputStream(propertyFilePath)) { + kafkaProps.load(input); + } catch (IOException ex) { + log.error("Unable to load properties from kafka properties file with path: {}", propertyFilePath); + throw ex; + } + } + // Required for using SASL auth with MSK public endpoint + if (enableMSKAuth) { + kafkaProps.setProperty("security.protocol", "SASL_SSL"); + kafkaProps.setProperty("sasl.mechanism", "AWS_MSK_IAM"); + kafkaProps.setProperty("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;"); + kafkaProps.setProperty("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler"); + } + kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + return kafkaProps; + } + + @Override + @SuppressWarnings("unchecked") + public CompletableFuture> readNextTrafficStreamChunk() { + return CompletableFuture.supplyAsync(this::readNextTrafficStreamSynchronously); + } + + public List readNextTrafficStreamSynchronously() { + try { + ConsumerRecords records; + records = safeCommitAndPollWithSwallowedRuntimeExceptions(); + Stream trafficStream = StreamSupport.stream(records.spliterator(), false) + .map(kafkaRecord -> { + try { + TrafficStream ts = TrafficStream.parseFrom(kafkaRecord.value()); + // Ensure we increment trafficStreamsRead even at a higher log level + metricsLogger.atSuccess(MetricsEvent.PARSED_TRAFFIC_STREAM_FROM_KAFKA) + .setAttribute(MetricsAttributeKey.CONNECTION_ID, ts.getConnectionId()) + .setAttribute(MetricsAttributeKey.TOPIC_NAME, this.topic) + .setAttribute(MetricsAttributeKey.SIZE_IN_BYTES, ts.getSerializedSize()).emit(); + addOffset(kafkaRecord.partition(), kafkaRecord.offset()); + var key = new TrafficStreamKeyWithKafkaRecordId(ts, kafkaRecord.partition(), kafkaRecord.offset()); + log.atTrace().setMessage(()->"Parsed traffic stream #{}: {} {}") + .addArgument(trafficStreamsRead.incrementAndGet()) + .addArgument(key) + .addArgument(ts) + .log(); + return (ITrafficStreamWithKey) new PojoTrafficStreamWithKey(ts, key); + } catch (InvalidProtocolBufferException e) { + RuntimeException recordError = behavioralPolicy.onInvalidKafkaRecord(kafkaRecord, e); + metricsLogger.atError(MetricsEvent.PARSING_TRAFFIC_STREAM_FROM_KAFKA_FAILED, recordError) + .setAttribute(MetricsAttributeKey.TOPIC_NAME, this.topic).emit(); + if (recordError != null) { + throw recordError; + } + return null; + } + }).filter(Objects::nonNull); + return trafficStream.collect(Collectors.toList()); + } catch (Exception e) { + log.atError().setCause(e).setMessage("Terminating Kafka traffic stream").log(); + throw e; + } + } + + private ConsumerRecords safeCommitAndPollWithSwallowedRuntimeExceptions() { + try { + synchronized (offsetLifecycleLock) { + if (!nextSetOfCommitsMap.isEmpty()) { + log.atDebug().setMessage(()->"Committing "+nextSetOfCommitsMap).log(); + kafkaConsumer.commitSync(nextSetOfCommitsMap); + log.atDebug().setMessage(()->"Done committing "+nextSetOfCommitsMap).log(); + nextSetOfCommitsMap.clear(); + } + } + + var records = kafkaConsumer.poll(CONSUMER_POLL_TIMEOUT); + log.atInfo().setMessage(()->"Kafka consumer poll has fetched "+records.count()+" records").log(); + log.atDebug().setMessage(()->"All positions: {"+kafkaConsumer.assignment().stream() + .map(tp->tp+": "+kafkaConsumer.position(tp)).collect(Collectors.joining(",")) + "}").log(); + log.atDebug().setMessage(()->"All COMMITTED positions: {"+kafkaConsumer.assignment().stream() + .map(tp->tp+": "+kafkaConsumer.committed(tp)).collect(Collectors.joining(",")) + "}").log(); + return records; + } catch (RuntimeException e) { + log.atWarn().setCause(e).setMessage("Unable to poll the topic: {} with our Kafka consumer. " + + "Swallowing and awaiting next metadata refresh to try again.").addArgument(topic).log(); + return new ConsumerRecords<>(Collections.emptyMap()); + } + } + + private void addOffset(int partition, long offset) { + synchronized (offsetLifecycleLock) { + var offsetTracker = partitionToOffsetLifecycleTrackerMap.computeIfAbsent(partition, p -> + new OffsetLifecycleTracker()); + offsetTracker.add(offset); + } + } + + @Override + public void commitTrafficStream(ITrafficStreamKey trafficStreamKey) { + if (!(trafficStreamKey instanceof TrafficStreamKeyWithKafkaRecordId)) { + throw new IllegalArgumentException("Expected key of type "+TrafficStreamKeyWithKafkaRecordId.class+ + " but received "+trafficStreamKey+" (of type="+trafficStreamKey.getClass()+")"); + } + var kafkaTsk = (TrafficStreamKeyWithKafkaRecordId) trafficStreamKey; + var p = kafkaTsk.partition; + Optional newHeadValue; + synchronized (offsetLifecycleLock) { + var tracker = partitionToOffsetLifecycleTrackerMap.get(p); + newHeadValue = tracker.removeAndReturnNewHead(kafkaTsk); + newHeadValue.ifPresent(o -> { + if (tracker.isEmpty()) { + partitionToOffsetLifecycleTrackerMap.remove(p); + } + nextSetOfCommitsMap.put(new TopicPartition(topic, p), new OffsetAndMetadata(o)); + }); + } + } + + @Override + public void close() throws IOException { + kafkaConsumer.close(); + log.info("Kafka consumer closed successfully."); + } +} diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSource.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSource.java index 4d6de51df..a195d553c 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSource.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSource.java @@ -39,7 +39,7 @@ * BlockingTrafficSource throttles calls to Kafka. The BlockingTrafficSource may * block calls to readNextTrafficStreamChunk() until some time window elapses. This * could be a very large window in cases where there were long gaps between recorded - * requests from the capturing proxy. For example, if a TrafficStream is read and it + * requests from the capturing proxy. For example, if a TrafficStream is read and if * that stream is scheduled to be run one hour later, readNextTrafficStreamChunk() * may not be called for almost an hour. By design, we're not calling Kafka to pull * any more messages since we know that we don't have work to do for an hour. Shortly diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrafficStreamKeyWithKafkaRecordId.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrafficStreamKeyWithKafkaRecordId.java index aa86e96c7..4a19942f5 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrafficStreamKeyWithKafkaRecordId.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrafficStreamKeyWithKafkaRecordId.java @@ -6,7 +6,8 @@ import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamKey; import org.opensearch.migrations.trafficcapture.protos.TrafficStream; -@ToString(callSuper = true) +import java.util.StringJoiner; + @EqualsAndHashCode(callSuper = true) @Getter class TrafficStreamKeyWithKafkaRecordId extends PojoTrafficStreamKey implements KafkaCommitOffsetData { @@ -24,4 +25,13 @@ class TrafficStreamKeyWithKafkaRecordId extends PojoTrafficStreamKey implements this.partition = partition; this.offset = offset; } + + @Override + public String toString() { + return new StringJoiner("|") + .add(super.toString()) + .add("partition=" + partition) + .add("offset=" + offset) + .toString(); + } } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDictsTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDictsTest.java index fa5c34445..8025f52e0 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDictsTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDictsTest.java @@ -74,4 +74,12 @@ public void testMetricsAreRightWhenMissing() { Map.of("Status-Code", Integer.valueOf(404)))); Assertions.assertEquals("REQUEST_ID:C.0|SOURCE_HTTP_STATUS:200|TARGET_HTTP_STATUS:404|HTTP_STATUS_MATCH:0", loggedMetrics); } + + @Test + public void testMetricsAreRightWithMissingStatusCode() { + var loggedMetrics = getLoggedMetrics(makeTestData( + Map.of("Sorry", "exception message..."), + Map.of("Status-Code", Integer.valueOf(404)))); + Assertions.assertEquals("REQUEST_ID:C.0|TARGET_HTTP_STATUS:404|HTTP_STATUS_MATCH:0", loggedMetrics); + } } \ No newline at end of file diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java index 794da7768..ce9d066ee 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java @@ -10,6 +10,7 @@ import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamKey; import org.opensearch.migrations.replay.traffic.source.ITrafficStreamWithKey; import org.opensearch.migrations.trafficcapture.protos.ReadObservation; import org.opensearch.migrations.trafficcapture.protos.TrafficObservation; @@ -35,6 +36,17 @@ class KafkaTrafficCaptureSourceTest { public static final int NUM_READ_ITEMS_BOUND = 1000; public static final String TEST_TOPIC_NAME = "TEST_TOPIC_NAME"; + @Test + public void testRecordToString() { + var ts = TrafficStream.newBuilder() + .setConnectionId("c") + .setNodeId("n") + .setNumber(7) + .build(); + var tsk = new KafkaProtobufConsumer.TrafficStreamKeyWithKafkaRecordId(ts, 2,123); + Assertions.assertEquals("n.c.7|partition=2|offset=123", tsk.toString()); + } + @Test public void testSupplyTrafficFromSource() { int numTrafficStreams = 10;