diff --git a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandlerTest.java b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandlerTest.java index 531d206ad..4879ee983 100644 --- a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandlerTest.java +++ b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandlerTest.java @@ -57,7 +57,7 @@ public CodedOutputStreamAndByteBufferWrapper createStream() { cos.flush(); byteBufferAtomicReference.set(osh.getByteBuffer().flip().asReadOnlyBuffer()); - log.error("byteBufferAtomicReference.get="+byteBufferAtomicReference.get()); + log.trace("byteBufferAtomicReference.get="+byteBufferAtomicReference.get()); return CompletableFuture.completedFuture(flushCount.incrementAndGet()); } diff --git a/TrafficCapture/replayerPlugins/jsonMessageTransformers/jsonJoltMessageTransformerProvider/src/test/java/org/opensearch/migrations/replay/JsonTransformerTest.java b/TrafficCapture/replayerPlugins/jsonMessageTransformers/jsonJoltMessageTransformerProvider/src/test/java/org/opensearch/migrations/replay/JsonTransformerTest.java index a8e911e20..9e36ea7a3 100644 --- a/TrafficCapture/replayerPlugins/jsonMessageTransformers/jsonJoltMessageTransformerProvider/src/test/java/org/opensearch/migrations/replay/JsonTransformerTest.java +++ b/TrafficCapture/replayerPlugins/jsonMessageTransformers/jsonJoltMessageTransformerProvider/src/test/java/org/opensearch/migrations/replay/JsonTransformerTest.java @@ -66,7 +66,7 @@ public void testHttpTransform() throws IOException { .build(); var transformedDocument = transformer.transformJson(documentJson); String transformedJsonOutputStr = emitJson(transformedDocument); - log.error("transformed json document: "+transformedJsonOutputStr); + log.info("transformed json document: "+transformedJsonOutputStr); Assertions.assertTrue(transformedJsonOutputStr.contains(DUMMY_HOSTNAME_TEST_STRING)); } } \ No newline at end of file diff --git a/TrafficCapture/trafficCaptureProxyServer/README.md b/TrafficCapture/trafficCaptureProxyServer/README.md index cf93ec03f..4609843d4 100644 --- a/TrafficCapture/trafficCaptureProxyServer/README.md +++ b/TrafficCapture/trafficCaptureProxyServer/README.md @@ -1,81 +1,56 @@ # Capture Proxy -## How to attach a Capture Proxy on a coordinator node. +## Installing Capture Proxy on Coordinator nodes -Follow documentation for [deploying solution](../../deployment/README.md). Then, on a cluster with at least two coordinator nodes, the user can attach a Capture Proxy on a node by following these steps: -Please note that this is one method for installing the Capture Proxy on a node, and that these steps may vary depending on your environment. +Follow documentation for [deploying solution](../../deployment/README.md) to set up the Kafka cluster which the Capture Proxy will send captured traffic to. Then, following the steps below, attach the Capture Proxy on each coordinator node of the source cluster, so that all incoming traffic can be captured. If the source cluster has more than one coordinator node, this can be done in a rolling restart fashion where there is no downtime for the source cluster, otherwise a single node cluster should expect downtime as the Elasticsearch/OpenSearch process restarts. - -These are the **prerequisites** to being able to attach the Capture Proxy: - -* **Make sure that your MSK client is accessible by the coordinator nodes in the cluster* - * Add the following IAM policy to the node/EC2 instance so that it’s able to store the captured traffic in Kafka: - * From the AWS Console, go to the EC2 instance page, click on **IAM Role**, click on **Add permissions**, choose **Create inline policy**, click on **JSON VIEW** then add the following policy (replace region and account-id). - - ```json - { - "Version": "2012-10-17", - "Statement": [ - { - "Action": "kafka-cluster:Connect", - "Resource": "arn:aws:kafka:::cluster/migration-msk-cluster-/*", - "Effect": "Allow" - }, - { - "Action": [ - "kafka-cluster:CreateTopic", - "kafka-cluster:DescribeTopic", - "kafka-cluster:WriteData" - ], - "Resource": "arn:aws:kafka:::topic/migration-msk-cluster-/*", - "Effect": "Allow" - } - ] - } - ``` - -* **Verify Java installation is accessible**. - * From linux command line of that EC2 instance, Check that the JAVA_HOME environment variable is set properly `(echo $JAVA_HOME)`, if not, then try running the following command that might help set it correctly: - - `JAVA_HOME=$(dirname "$(dirname "$(type -p java)")")` - * If that doesn’t work, then find the java directory on your node and set it as $JAVA_HOME +**Note**: For AWS deployments, see required instructions for setting up IAM and Security Groups [here](../../deployment/cdk/opensearch-service-migration/README.md#configuring-capture-proxy-iam-and-security-groups)
+**Note**: This is one method for installing the Capture Proxy on a node, and that these steps may vary depending on your environment. ### Follow these steps to attach a Capture Proxy on the node. -1. **Log in to one of the coordinator nodes** for command line access. -2. **Update node’s port setting**. - 1. **Update elasticsearch.yml/opensearch.yml**. Add this line to the node’s config file: - http.port: 19200 -3. **Restart Elasticsearch/OpenSearch process** so that the process will bind to the newly configured port. For example, if systemctl is available on your linux distribution you can run the following (Note: depending on your installation of Elasticsearch, these methods may not work for you) - 1. `sudo systemctl restart elasticsearch.service` - -4. **Verify process is bound to new port**. Run netstat -tapn to see if the new port is being listened on. - If the new port is not there, then there is a chance that Elasticsearch/ OpenSearch is not running, in that case, you must start the process again. (Depending on your setup, restarting/starting the Elasticsearch process may differ) -5. **Test the new port** by sending any kind of traffic or request, e.g; curl https://localhost:19200 or http:// -6. **Download Capture Proxy**: - 1. Go to the Opensearch Migrations latest releases page: https://github.com/opensearch-project/opensearch-migrations/releases/latest - 2. Copy the link for the Capture Proxy tar file, mind your instance’s architecture. - 3. `curl -L0 --output CaptureProxyX64.tar.gz` - 4. Unpack solution tarball: `tar -xvf CaptureProxyX64.tar.gz` - 5. `cd CaptureProxyX64/bin` -7. **Running the Capture Proxy**: - 1. `nohup ./CaptureProxyX64 --kafkaConnection --destinationUri http://localhost:19200 —listenPort 9200 —enableMSKAuth --insecureDestination &` - - **Explanation of parameters** in the command above: - - * **--kafkaConnection**: your MSK client endpoint. - * **--destinationUri**: URI of the server that the Capture Proxy is capturing traffic for. - * **--listenPort**: Exposed port for clients to connect to this proxy. (The original port that the node was listening to) - * **--enableMSKAuth**: Enables SASL Kafka properties required for connecting to MSK with IAM auth. - * **--insecureDestination**: Do not check the destination server’s certificate. - -8. **Test the port** that the Capture Proxy is now listening to. - 1. `curl https://localhost:9200` or `http://` - 2. You should expect the same response when sending a request to either ports (9200, 19200), except that the traffic sent to the port that the Capture Proxy is listening to, will be captured and sent to your MSK Client, also forwarded to the new Elasticsearch port. -9. **Verify requests are sent to Kafka** - * **Verify that a new topic has been created** - 1. Log in to the Migration Console container. - 2. Go the Kafka tools directory - cd kafka-tools/kafka/bin - 3. Run the following command to list the Kafka topics, and confirm that a new topic was created. - `./kafka-topics.sh --bootstrap-server "$MIGRATION_KAFKA_BROKER_ENDPOINTS" --list --command-config ../../aws/msk-iam-auth.properties` +1. Log in to one of the coordinator nodes for command line access. +2. Locate the elasticsearch/opensearch directory + * On EC2 this may look like `/home/ec2-user/elasticsearch` +3. Update node’s port setting in elasticsearch.yml/opensearch.yml, e.g. `/home/ec2-user/elasticsearch/config/elasticsearch.yml`. + * Add this line to the node’s config file: `http.port: 19200` + * This will allow incoming traffic to enter through the Capture proxy at the normal port (typically `9200`) and then be passed to Elasticsearch/OpenSearch at the `19200` port +4. Restart Elasticsearch/OpenSearch process so that the process will bind to the newly configured port. + * Depending on the installation method used, how you should restart Elasticsearch/OpenSearch will differ: + * For **Debian packages with systemctl** (Most common) you can run the following command: `sudo systemctl restart elasticsearch.service` + * For **Debian packages with service** you can run the following command: `sudo -i service elasticsearch restart` + * For **Tarball** you should find the running process (e.g. `ps aux | grep elasticsearch`) and stop it. Then start the process again as you normally do. +5. Verify process is bound to new port. Run `netstat -tapn | grep LISTEN` to see if the new port is being listened on and that the old port is no longer there. + * If the new port is not there, then there is a chance that Elasticsearch/OpenSearch is not running, in that case, you must start the process again and verify that it has started properly. +6. Test the new port by sending any kind of traffic or request, e.g. `curl https://localhost:19200` or if security is not enabled `curl http://localhost:19200` +7. Verify the JAVA_HOME environment variable has been set (`echo $JAVA_HOME`) as this will be necessary for the Capture Proxy to execute. + * If Java is not already available on your node, your elasticsearch/opensearch folder may have a bundled JDK you can use, e.g. `export JAVA_HOME=/home/ec2-user/elasticsearch/jdk` (this can also be added to your shell startup script to be available for all sessions) +8. Download the Capture Proxy tar file + 1. Go to the `opensearch-migrations` repository's latest releases page: https://github.com/opensearch-project/opensearch-migrations/releases/latest + 2. Copy the link for the Capture Proxy tar file, mind your node’s architecture. + 3. Download the tar file in a persistent folder, using the link from the previous step `curl -L0 --output CaptureProxyX64.tar.gz` + 4. Unpack the tar file: `tar -xvf CaptureProxyX64.tar.gz` +9. Start the Capture Proxy: + 1. Access the Capture Proxy shell script directory `cd trafficCaptureProxyServer/bin` + 2. Run the Capture Proxy command + * Depending on how you are running Kafka, the command needed will differ: + * For **default/Docker Kafka** clusters `nohup ./trafficCaptureProxyServer --kafkaConnection --destinationUri http://localhost:19200 --listenPort 9200 --insecureDestination &` + * The `KAFKA_BROKERS` referenced here will vary based on setup, but for a default docker setup would be `kafka:9092` + * For **AWS MSK(Kafka)** clusters `nohup ./trafficCaptureProxyServer --kafkaConnection --destinationUri http://localhost:19200 --listenPort 9200 --enableMSKAuth --insecureDestination &` + * The `KAFKA_BROKERS` referenced here can be obtained from the AWS Console(MSK -> Clusters -> Select Cluster -> View Client Information -> Copy Private endpoint) + * This command will start the Capture Proxy in the background and allow it to continue past the lifetime of the shell session. + * :warning: If the machine running the Elasticsearch/OpenSearch process is restarted, the Capture Proxy will need to be started again + * Explanation of parameters in the command above: + * --kafkaConnection: Your Kafka broker endpoint(s) as a string with multiple brokers delimited by a ',' e.g. `"broker1:9098,broker2:9098"`. + * --destinationUri: URI of the server that the Capture Proxy is capturing traffic for. + * --listenPort: Exposed port for clients to connect to this proxy. (The original port that the node was listening to) + * --enableMSKAuth: Enables SASL Kafka properties required for connecting to Kafka with IAM auth. **Note**: Only valid for AWS MSK setups + * --insecureDestination: Do not check the destination server’s certificate. +10. Test that the original Elasticsearch/OpenSearch port for the node is available again. + * `curl https://localhost:9200` or if security is not enabled `curl http://localhost:9200` + * You should expect the same response when sending a request to either ports (`9200`, `19200`), with the difference being that requests that are sent to the `9200` port now pass through the Capture Proxy whose first priority is to forward the request to the Elasticsearch/OpenSearch process, and whose second priority is to send the captured requests to Kafka. +11. Verify requests are sent to Kafka + 1. Log in to the Migration Console container. More details [here](../../deployment/cdk/opensearch-service-migration/README.md#executing-commands-on-a-deployed-service) for AWS deployments + 2. Run the following command to list the Kafka topics, and confirm that the `logging-traffic-topic` has been created. + * For **default/Docker Kafka** clusters `./kafka-tools/kafka/bin/kafka-topics.sh --bootstrap-server "$MIGRATION_KAFKA_BROKER_ENDPOINTS" --list` + * For **AWS MSK(Kafka)** clusters `./kafka-tools/kafka/bin/kafka-topics.sh --bootstrap-server "$MIGRATION_KAFKA_BROKER_ENDPOINTS" --list --command-config kafka-tools/aws/msk-iam-auth.properties` diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/resources/log4j2.properties b/TrafficCapture/trafficCaptureProxyServer/src/test/resources/log4j2.properties index 9c227b84f..702836711 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/test/resources/log4j2.properties +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/resources/log4j2.properties @@ -6,6 +6,6 @@ appender.console.target = SYSTEM_ERR appender.console.layout.type = PatternLayout appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} [%t] %c{1} - %msg%equals{ ctx=%mdc}{ ctx=\{\}}{}%n -rootLogger.level = trace +rootLogger.level = debug rootLogger.appenderRefs = stderr rootLogger.appenderRef.stderr.ref = STDERR diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficCaptureSourceFactory.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficCaptureSourceFactory.java index 68ea3b8a1..93d1bb664 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficCaptureSourceFactory.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficCaptureSourceFactory.java @@ -2,13 +2,14 @@ import lombok.extern.slf4j.Slf4j; import org.opensearch.migrations.replay.kafka.KafkaBehavioralPolicy; -import org.opensearch.migrations.replay.kafka.KafkaProtobufConsumer; +import org.opensearch.migrations.replay.kafka.KafkaTrafficCaptureSource; import org.opensearch.migrations.replay.traffic.source.BlockingTrafficSource; import org.opensearch.migrations.replay.traffic.source.ISimpleTrafficCaptureSource; import org.opensearch.migrations.replay.traffic.source.InputStreamOfTraffic; import java.io.FileInputStream; import java.io.IOException; +import java.time.Clock; import java.time.Duration; @Slf4j @@ -31,8 +32,9 @@ private TrafficCaptureSourceFactory() {} } if (isKafkaActive) { - return KafkaProtobufConsumer.buildKafkaConsumer(appParams.kafkaTrafficBrokers, appParams.kafkaTrafficTopic, - appParams.kafkaTrafficGroupId, appParams.kafkaTrafficEnableMSKAuth, appParams.kafkaTrafficPropertyFile, new KafkaBehavioralPolicy()); + return KafkaTrafficCaptureSource.buildKafkaConsumer(appParams.kafkaTrafficBrokers, appParams.kafkaTrafficTopic, + appParams.kafkaTrafficGroupId, appParams.kafkaTrafficEnableMSKAuth, appParams.kafkaTrafficPropertyFile, + Clock.systemUTC(), new KafkaBehavioralPolicy()); } else if (isInputFileActive) { return new InputStreamOfTraffic(new FileInputStream(appParams.inputFilename)); } else { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/PojoTrafficStreamKey.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/PojoTrafficStreamKey.java index ce32ab971..da4ce3ee1 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/PojoTrafficStreamKey.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/PojoTrafficStreamKey.java @@ -1,10 +1,12 @@ package org.opensearch.migrations.replay.datatypes; +import lombok.EqualsAndHashCode; import lombok.ToString; import org.opensearch.migrations.trafficcapture.protos.TrafficStream; import org.opensearch.migrations.trafficcapture.protos.TrafficStreamUtils; @ToString +@EqualsAndHashCode() public class PojoTrafficStreamKey implements ITrafficStreamKey { private final String nodeId; private final String connectionId; diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaCommitOffsetData.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaCommitOffsetData.java new file mode 100644 index 000000000..d110e24c5 --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaCommitOffsetData.java @@ -0,0 +1,8 @@ +package org.opensearch.migrations.replay.kafka; + +public interface KafkaCommitOffsetData { + int getPartition(); + long getOffset(); + int getGeneration(); + +} diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumer.java deleted file mode 100644 index 2282121fd..000000000 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumer.java +++ /dev/null @@ -1,288 +0,0 @@ -package org.opensearch.migrations.replay.kafka; - -import com.google.protobuf.InvalidProtocolBufferException; -import lombok.NonNull; -import lombok.ToString; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.opensearch.migrations.coreutils.MetricsAttributeKey; -import org.opensearch.migrations.coreutils.MetricsEvent; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.opensearch.migrations.coreutils.MetricsLogger; -import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey; -import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamKey; -import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamWithKey; -import org.opensearch.migrations.replay.traffic.source.ISimpleTrafficCaptureSource; -import org.opensearch.migrations.replay.traffic.source.ITrafficStreamWithKey; -import org.opensearch.migrations.trafficcapture.protos.TrafficStream; - -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.time.Duration; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.PriorityQueue; -import java.util.Properties; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; - -/** - * Adapt a Kafka stream into a TrafficCaptureSource. - * - * Notice that there's a critical gap between how Kafka accepts commits and how the - * BlockingTrafficSource throttles calls to Kafka. The BlockingTrafficSource may - * block calls to readNextTrafficStreamChunk() until some time window elapses. This - * could be a very large window in cases where there were long gaps between recorded - * requests from the capturing proxy. For example, if a TrafficStream is read and it - * that stream is scheduled to be run one hour later, readNextTrafficStreamChunk() - * may not be called for almost an hour. By design, we're not calling Kafka to pull - * any more messages since we know that we don't have work to do for an hour. Shortly - * after the hour of waiting begins, Kakfa will notice that this application is no - * longer calling poll and will kick the consumer out of the client group. Other - * consumers may connect, though they'll also be kicked out of the group shortly. - * - * See - * ... - * - * "Basically if you don't call poll at least as frequently as the configured max interval, - * then the client will proactively leave the group so that another consumer can take - * over its partitions. When this happens, you may see an offset commit failure (as - * indicated by a CommitFailedException thrown from a call to commitSync())." - * - * I believe that this can be mitigated, hopefully fully, by adding a keepAlive/do nothing - * call that the BlockingTrafficSource can use. That can be implemented in a source - * like this with Kafka by polling, then resetting the position on the stream if we - * aren't supposed to be reading new data. - */ -@Slf4j -public class KafkaProtobufConsumer implements ISimpleTrafficCaptureSource { - - @ToString(callSuper = true) - private static class TrafficStreamKeyWithKafkaRecordId extends PojoTrafficStreamKey { - private final int partition; - private final long offset; - - TrafficStreamKeyWithKafkaRecordId(TrafficStream trafficStream, int partition, long offset) { - super(trafficStream); - this.partition = partition; - this.offset = offset; - } - } - - private static class OffsetLifecycleTracker { - private final PriorityQueue pQueue = new PriorityQueue<>(); - private long cursorHighWatermark; - - private OffsetLifecycleTracker() { - } - - boolean isEmpty() { - return pQueue.isEmpty(); - } - - void add(long offset) { - cursorHighWatermark = offset; - pQueue.add(offset); - } - - Optional removeAndReturnNewHead(TrafficStreamKeyWithKafkaRecordId kafkaRecord) { - var offsetToRemove = kafkaRecord.offset; - var topCursor = pQueue.peek(); - var didRemove = pQueue.remove(offsetToRemove); - assert didRemove : "Expected all live records to have an entry and for them to be removed only once"; - if (topCursor == offsetToRemove) { - topCursor = Optional.ofNullable(pQueue.peek()) - .orElse(cursorHighWatermark+1); // most recent cursor was previously popped - log.atDebug().setMessage("Commit called for {}, and new topCursor={}") - .addArgument(offsetToRemove).addArgument(topCursor).log(); - return Optional.of(topCursor); - } else { - log.atDebug().setMessage("Commit called for {}, but topCursor={}") - .addArgument(offsetToRemove).addArgument(topCursor).log(); - return Optional.empty(); - } - } - } - - private static final MetricsLogger metricsLogger = new MetricsLogger("KafkaProtobufConsumer"); - - public static final Duration CONSUMER_POLL_TIMEOUT = Duration.ofSeconds(1); - - private final Consumer kafkaConsumer; - private final ConcurrentHashMap partitionToOffsetLifecycleTrackerMap; - private final ConcurrentHashMap nextSetOfCommitsMap; - private final Object offsetLifecycleLock = new Object(); - private final String topic; - private final KafkaBehavioralPolicy behavioralPolicy; - private final AtomicInteger trafficStreamsRead; - - public KafkaProtobufConsumer(Consumer kafkaConsumer, String topic) { - this(kafkaConsumer, topic, new KafkaBehavioralPolicy()); - } - - public KafkaProtobufConsumer(Consumer kafkaConsumer, @NonNull String topic, - KafkaBehavioralPolicy behavioralPolicy) { - this.kafkaConsumer = kafkaConsumer; - this.topic = topic; - this.behavioralPolicy = behavioralPolicy; - kafkaConsumer.subscribe(Collections.singleton(topic)); - trafficStreamsRead = new AtomicInteger(); - - partitionToOffsetLifecycleTrackerMap = new ConcurrentHashMap<>(); - nextSetOfCommitsMap = new ConcurrentHashMap<>(); - } - - public static KafkaProtobufConsumer buildKafkaConsumer(@NonNull String brokers, - @NonNull String topic, - @NonNull String groupId, - boolean enableMSKAuth, - String propertyFilePath, - KafkaBehavioralPolicy behavioralPolicy) throws IOException { - var kafkaProps = buildKafkaProperties(brokers, groupId, enableMSKAuth, propertyFilePath); - return new KafkaProtobufConsumer(new KafkaConsumer<>(kafkaProps), topic, behavioralPolicy); - } - - public static Properties buildKafkaProperties(@NonNull String brokers, - @NonNull String groupId, - boolean enableMSKAuth, - String propertyFilePath) throws IOException { - var kafkaProps = new Properties(); - kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - if (propertyFilePath != null) { - try (InputStream input = new FileInputStream(propertyFilePath)) { - kafkaProps.load(input); - } catch (IOException ex) { - log.error("Unable to load properties from kafka properties file with path: {}", propertyFilePath); - throw ex; - } - } - // Required for using SASL auth with MSK public endpoint - if (enableMSKAuth) { - kafkaProps.setProperty("security.protocol", "SASL_SSL"); - kafkaProps.setProperty("sasl.mechanism", "AWS_MSK_IAM"); - kafkaProps.setProperty("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;"); - kafkaProps.setProperty("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler"); - } - kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); - kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); - return kafkaProps; - } - - @Override - @SuppressWarnings("unchecked") - public CompletableFuture> readNextTrafficStreamChunk() { - return CompletableFuture.supplyAsync(this::readNextTrafficStreamSynchronously); - } - - public List readNextTrafficStreamSynchronously() { - try { - ConsumerRecords records; - records = safeCommitAndPollWithSwallowedRuntimeExceptions(); - Stream trafficStream = StreamSupport.stream(records.spliterator(), false) - .map(kafkaRecord -> { - try { - TrafficStream ts = TrafficStream.parseFrom(kafkaRecord.value()); - // Ensure we increment trafficStreamsRead even at a higher log level - metricsLogger.atSuccess(MetricsEvent.PARSED_TRAFFIC_STREAM_FROM_KAFKA) - .setAttribute(MetricsAttributeKey.CONNECTION_ID, ts.getConnectionId()) - .setAttribute(MetricsAttributeKey.TOPIC_NAME, this.topic) - .setAttribute(MetricsAttributeKey.SIZE_IN_BYTES, ts.getSerializedSize()).emit(); - addOffset(kafkaRecord.partition(), kafkaRecord.offset()); - var key = new TrafficStreamKeyWithKafkaRecordId(ts, kafkaRecord.partition(), kafkaRecord.offset()); - log.atTrace().setMessage(()->"Parsed traffic stream #{}: {} {}") - .addArgument(trafficStreamsRead.incrementAndGet()) - .addArgument(key) - .addArgument(ts) - .log(); - return (ITrafficStreamWithKey) new PojoTrafficStreamWithKey(ts, key); - } catch (InvalidProtocolBufferException e) { - RuntimeException recordError = behavioralPolicy.onInvalidKafkaRecord(kafkaRecord, e); - metricsLogger.atError(MetricsEvent.PARSING_TRAFFIC_STREAM_FROM_KAFKA_FAILED, recordError) - .setAttribute(MetricsAttributeKey.TOPIC_NAME, this.topic).emit(); - if (recordError != null) { - throw recordError; - } - return null; - } - }).filter(Objects::nonNull); - return trafficStream.collect(Collectors.toList()); - } catch (Exception e) { - log.error("Terminating Kafka traffic stream"); - throw e; - } - } - - private ConsumerRecords safeCommitAndPollWithSwallowedRuntimeExceptions() { - try { - synchronized (offsetLifecycleLock) { - if (!nextSetOfCommitsMap.isEmpty()) { - log.atDebug().setMessage(()->"Committing "+nextSetOfCommitsMap).log(); - kafkaConsumer.commitSync(nextSetOfCommitsMap); - log.atDebug().setMessage(()->"Done committing "+nextSetOfCommitsMap).log(); - nextSetOfCommitsMap.clear(); - } - } - - var records = kafkaConsumer.poll(CONSUMER_POLL_TIMEOUT); - log.atInfo().setMessage(()->"Kafka consumer poll has fetched "+records.count()+" records").log(); - log.atDebug().setMessage(()->"All positions: {"+kafkaConsumer.assignment().stream() - .map(tp->tp+": "+kafkaConsumer.position(tp)).collect(Collectors.joining(",")) + "}").log(); - log.atDebug().setMessage(()->"All COMMITTED positions: {"+kafkaConsumer.assignment().stream() - .map(tp->tp+": "+kafkaConsumer.committed(tp)).collect(Collectors.joining(",")) + "}").log(); - return records; - } catch (RuntimeException e) { - log.atWarn().setCause(e).setMessage("Unable to poll the topic: {} with our Kafka consumer. " + - "Swallowing and awaiting next metadata refresh to try again.").addArgument(topic).log(); - return new ConsumerRecords<>(Collections.emptyMap()); - } - } - - private void addOffset(int partition, long offset) { - synchronized (offsetLifecycleLock) { - var offsetTracker = partitionToOffsetLifecycleTrackerMap.computeIfAbsent(partition, p -> - new OffsetLifecycleTracker()); - offsetTracker.add(offset); - } - } - - @Override - public void commitTrafficStream(ITrafficStreamKey trafficStreamKey) { - if (!(trafficStreamKey instanceof TrafficStreamKeyWithKafkaRecordId)) { - throw new IllegalArgumentException("Expected key of type "+TrafficStreamKeyWithKafkaRecordId.class+ - " but received "+trafficStreamKey+" (of type="+trafficStreamKey.getClass()+")"); - } - var kafkaTsk = (TrafficStreamKeyWithKafkaRecordId) trafficStreamKey; - var p = kafkaTsk.partition; - Optional newHeadValue; - synchronized (offsetLifecycleLock) { - var tracker = partitionToOffsetLifecycleTrackerMap.get(p); - newHeadValue = tracker.removeAndReturnNewHead(kafkaTsk); - newHeadValue.ifPresent(o -> { - if (tracker.isEmpty()) { - partitionToOffsetLifecycleTrackerMap.remove(p); - } - nextSetOfCommitsMap.put(new TopicPartition(topic, p), new OffsetAndMetadata(o)); - }); - } - } - - @Override - public void close() throws IOException { - kafkaConsumer.close(); - log.info("Kafka consumer closed successfully."); - } -} diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSource.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSource.java new file mode 100644 index 000000000..4d6de51df --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSource.java @@ -0,0 +1,221 @@ +package org.opensearch.migrations.replay.kafka; + +import com.google.protobuf.InvalidProtocolBufferException; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.opensearch.migrations.coreutils.MetricsAttributeKey; +import org.opensearch.migrations.coreutils.MetricsEvent; +import org.opensearch.migrations.coreutils.MetricsLogger; +import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey; +import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamWithKey; +import org.opensearch.migrations.replay.traffic.source.ISimpleTrafficCaptureSource; +import org.opensearch.migrations.replay.traffic.source.ITrafficStreamWithKey; +import org.opensearch.migrations.trafficcapture.protos.TrafficStream; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * Adapt a Kafka stream into a TrafficCaptureSource. + * + * Notice that there's a critical gap between how Kafka accepts commits and how the + * BlockingTrafficSource throttles calls to Kafka. The BlockingTrafficSource may + * block calls to readNextTrafficStreamChunk() until some time window elapses. This + * could be a very large window in cases where there were long gaps between recorded + * requests from the capturing proxy. For example, if a TrafficStream is read and it + * that stream is scheduled to be run one hour later, readNextTrafficStreamChunk() + * may not be called for almost an hour. By design, we're not calling Kafka to pull + * any more messages since we know that we don't have work to do for an hour. Shortly + * after the hour of waiting begins, Kakfa will notice that this application is no + * longer calling poll and will kick the consumer out of the client group. + * + * See + * ... + * + * "Basically if you don't call poll at least as frequently as the configured max interval, + * then the client will proactively leave the group so that another consumer can take + * over its partitions. When this happens, you may see an offset commit failure (as + * indicated by a CommitFailedException thrown from a call to commitSync())." + * + * Since the Kafka client requires all calls to be made from the same thread, we can't + * simply run a background job to keep the client warm. We need the caller to touch + * this object periodically to keep the connection alive. + */ +@Slf4j +public class KafkaTrafficCaptureSource implements ISimpleTrafficCaptureSource { + + public static final String MAX_POLL_INTERVAL_KEY = "max.poll.interval.ms"; + // see https://stackoverflow.com/questions/39730126/difference-between-session-timeout-ms-and-max-poll-interval-ms-for-kafka-0-10 + public static final String DEFAULT_POLL_INTERVAL_MS = "60000"; + private static final MetricsLogger metricsLogger = new MetricsLogger("KafkaProtobufConsumer"); + + + final TrackingKafkaConsumer trackingKafkaConsumer; + private final AtomicLong trafficStreamsRead; + private final KafkaBehavioralPolicy behavioralPolicy; + + public KafkaTrafficCaptureSource(Consumer kafkaConsumer, String topic, Duration keepAliveInterval) { + this(kafkaConsumer, topic, keepAliveInterval, Clock.systemUTC(), new KafkaBehavioralPolicy()); + } + + public KafkaTrafficCaptureSource(Consumer kafkaConsumer, + @NonNull String topic, + Duration keepAliveInterval, + Clock clock, + @NonNull KafkaBehavioralPolicy behavioralPolicy) + { + trackingKafkaConsumer = new TrackingKafkaConsumer(kafkaConsumer, topic, keepAliveInterval, clock); + trafficStreamsRead = new AtomicLong(); + this.behavioralPolicy = behavioralPolicy; + kafkaConsumer.subscribe(Collections.singleton(topic), trackingKafkaConsumer); + } + + public static KafkaTrafficCaptureSource buildKafkaConsumer(@NonNull String brokers, + @NonNull String topic, + @NonNull String groupId, + boolean enableMSKAuth, + String propertyFilePath, + @NonNull Clock clock, + @NonNull KafkaBehavioralPolicy behavioralPolicy) + throws IOException + { + var kafkaProps = buildKafkaProperties(brokers, groupId, enableMSKAuth, propertyFilePath); + kafkaProps.putIfAbsent(MAX_POLL_INTERVAL_KEY, DEFAULT_POLL_INTERVAL_MS); + var pollPeriod = Duration.ofMillis(Long.valueOf((String)kafkaProps.get(MAX_POLL_INTERVAL_KEY))); + var keepAlivePeriod = getKeepAlivePeriodFromPollPeriod(pollPeriod); + return new KafkaTrafficCaptureSource(new KafkaConsumer<>(kafkaProps), + topic, keepAlivePeriod, clock, behavioralPolicy); + } + + /** + * We'll have to 'maintain' touches more frequently than the poll period, otherwise the + * consumer will fall out of the group, putting all the commits in-flight at risk. Notice + * that this doesn't have a bearing on heartbeats, which themselves are maintained through + * Kafka Consumer poll() calls. When those poll calls stop, so does the heartbeat, which + * is more sensitive, but managed via the 'session.timeout.ms' property. + */ + private static Duration getKeepAlivePeriodFromPollPeriod(Duration pollPeriod) { + return pollPeriod.dividedBy(2); + } + + public static Properties buildKafkaProperties(@NonNull String brokers, + @NonNull String groupId, + boolean enableMSKAuth, + String propertyFilePath) throws IOException { + var kafkaProps = new Properties(); + kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + if (propertyFilePath != null) { + try (InputStream input = new FileInputStream(propertyFilePath)) { + kafkaProps.load(input); + } catch (IOException ex) { + log.error("Unable to load properties from kafka properties file with path: {}", propertyFilePath); + throw ex; + } + } + // Required for using SASL auth with MSK public endpoint + if (enableMSKAuth) { + kafkaProps.setProperty("security.protocol", "SASL_SSL"); + kafkaProps.setProperty("sasl.mechanism", "AWS_MSK_IAM"); + kafkaProps.setProperty("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;"); + kafkaProps.setProperty("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler"); + } + kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + return kafkaProps; + } + + @Override + public void touch() { + trackingKafkaConsumer.touch(); + } + + /** + * If messages are outstanding, we need to keep the connection alive, otherwise, there's no + * reason to. It's OK to fall out of the group and rejoin once ready. + * @return + */ + @Override + public Optional getNextRequiredTouch() { + return trackingKafkaConsumer.getNextRequiredTouch(); + } + + @Override + @SuppressWarnings("unchecked") + public CompletableFuture> readNextTrafficStreamChunk() { + log.atTrace().setMessage("readNextTrafficStreamChunk()").log(); + return CompletableFuture.supplyAsync(() -> { + log.atTrace().setMessage("async...readNextTrafficStreamChunk()").log(); + return readNextTrafficStreamSynchronously(); + }); + } + + public List readNextTrafficStreamSynchronously() { + log.atTrace().setMessage("readNextTrafficStreamSynchronously()").log(); + try { + var records = trackingKafkaConsumer.getNextBatchOfRecords(); + Stream trafficStream = StreamSupport.stream(records.spliterator(), false) + .map(kafkaRecord -> { + try { + TrafficStream ts = TrafficStream.parseFrom(kafkaRecord.value()); + // Ensure we increment trafficStreamsRead even at a higher log level + metricsLogger.atSuccess(MetricsEvent.PARSED_TRAFFIC_STREAM_FROM_KAFKA) + .setAttribute(MetricsAttributeKey.CONNECTION_ID, ts.getConnectionId()) + .setAttribute(MetricsAttributeKey.TOPIC_NAME, trackingKafkaConsumer.topic) + .setAttribute(MetricsAttributeKey.SIZE_IN_BYTES, ts.getSerializedSize()).emit(); + var key = trackingKafkaConsumer.createAndTrackKey(kafkaRecord.partition(), kafkaRecord.offset(), + ck -> new TrafficStreamKeyWithKafkaRecordId(ts, ck)); + var trafficStreamsSoFar = trafficStreamsRead.incrementAndGet(); + log.atTrace().setMessage(()->"Parsed traffic stream #" + trafficStreamsSoFar + + ": " + key + " " + ts).log(); + return (ITrafficStreamWithKey) new PojoTrafficStreamWithKey(ts, key); + } catch (InvalidProtocolBufferException e) { + RuntimeException recordError = behavioralPolicy.onInvalidKafkaRecord(kafkaRecord, e); + metricsLogger.atError(MetricsEvent.PARSING_TRAFFIC_STREAM_FROM_KAFKA_FAILED, recordError) + .setAttribute(MetricsAttributeKey.TOPIC_NAME, trackingKafkaConsumer.topic).emit(); + if (recordError != null) { + throw recordError; + } + return null; + } + }).filter(Objects::nonNull); + return trafficStream.collect(Collectors.toList()); + } catch (Exception e) { + log.atError().setCause(e).setMessage("Terminating Kafka traffic stream due to exception").log(); + throw e; + } + } + + @Override + public void commitTrafficStream(ITrafficStreamKey trafficStreamKey) { + if (!(trafficStreamKey instanceof TrafficStreamKeyWithKafkaRecordId)) { + throw new IllegalArgumentException("Expected key of type "+TrafficStreamKeyWithKafkaRecordId.class+ + " but received "+trafficStreamKey+" (of type="+trafficStreamKey.getClass()+")"); + } + trackingKafkaConsumer.commitKafkaKey((TrafficStreamKeyWithKafkaRecordId) trafficStreamKey); + } + + @Override + public void close() throws IOException { + trackingKafkaConsumer.close(); + } +} diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/OffsetLifecycleTracker.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/OffsetLifecycleTracker.java new file mode 100644 index 000000000..98fc02332 --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/OffsetLifecycleTracker.java @@ -0,0 +1,49 @@ +package org.opensearch.migrations.replay.kafka; + +import lombok.extern.slf4j.Slf4j; + +import java.util.Optional; +import java.util.PriorityQueue; + +/** + * This uses a PriorityQueue to find the MINIMUM offset that has yet to be 'committed'. + * This class assumes that add() will be called with ascending offsets and that + * removeAndReturnNewHead may be called in any order. removeAndReturnNewHead returns + * the new commit offset for the partition that this object is associated with. + * It's also assumed that callers MUST call removeAndReturnNewHead for every offset + * that was previously added for commit points to be advanced. + */ +@Slf4j +class OffsetLifecycleTracker { + private final PriorityQueue pQueue = new PriorityQueue<>(); + private long cursorHighWatermark; + final int consumerConnectionGeneration; + + OffsetLifecycleTracker(int generation) { + this.consumerConnectionGeneration = generation; + } + + boolean isEmpty() { + return pQueue.isEmpty(); + } + + void add(long offset) { + cursorHighWatermark = offset; + pQueue.add(offset); + } + + Optional removeAndReturnNewHead(long offsetToRemove) { + var topCursor = pQueue.peek(); + var didRemove = pQueue.remove(offsetToRemove); + assert didRemove : "Expected all live records to have an entry and for them to be removed only once"; + if (topCursor == offsetToRemove) { + topCursor = Optional.ofNullable(pQueue.peek()) + .orElse(cursorHighWatermark + 1); // most recent cursor was previously popped + log.atDebug().setMessage("Commit called for " + offsetToRemove + ", and new topCursor=" + topCursor).log(); + return Optional.of(topCursor); + } else { + log.atDebug().setMessage("Commit called for " + offsetToRemove + ", but topCursor=" + topCursor).log(); + return Optional.empty(); + } + } +} diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/PojoKafkaCommitOffsetData.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/PojoKafkaCommitOffsetData.java new file mode 100644 index 000000000..bf3d24a62 --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/PojoKafkaCommitOffsetData.java @@ -0,0 +1,12 @@ +package org.opensearch.migrations.replay.kafka; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public class PojoKafkaCommitOffsetData implements KafkaCommitOffsetData { + final int generation; + final int partition; + final long offset; +} diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrackingKafkaConsumer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrackingKafkaConsumer.java new file mode 100644 index 000000000..6379869d0 --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrackingKafkaConsumer.java @@ -0,0 +1,240 @@ +package org.opensearch.migrations.replay.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.event.Level; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * This is a wrapper around Kafka's Consumer class that provides tracking of partitions + * and their current (asynchronously 'committed' by the calling contexts) offsets. It + * manages those offsets and the 'active' set of records that have been rendered by this + * consumer, when to pause a poll loop(), and how to deal with consumer rebalances. + */ +@Slf4j +public class TrackingKafkaConsumer implements ConsumerRebalanceListener { + + /** + * The keep-alive should already be set to a fraction of the max poll timeout for + * the consumer (done outside of this class). The keep-alive tells this class how + * often the caller should be interacting with touch() and poll() calls. As such, + * we want to set up a long enough poll to not overwhelm a broker or client with + * many empty poll() message responses. We also don't want to poll() for so long + * when there aren't messages that there isn't enough time to commit messages, + * which happens after we poll() (on the same thread, as per Consumer requirements). + */ + public static final int POLL_TIMEOUT_KEEP_ALIVE_DIVISOR = 4; + private final Consumer kafkaConsumer; + + final String topic; + private final Clock clock; + /** + * This collection holds the definitive list, as per the rebalance callback, of the partitions + * that are currently assigned to this consumer. The objects are removed when partitions are + * revoked and new objects are only created/inserted when they're assigned. That means that + * the generations of each OffsetLifecycleTracker value may be different. + */ + final Map partitionToOffsetLifecycleTrackerMap; + // loosening visibility so that a unit test can read this + final Map nextSetOfCommitsMap; + private final Duration keepAliveInterval; + private final AtomicReference lastTouchTimeRef; + private int consumerConnectionGeneration; + private boolean hasPendingCommitsToSend; + + public TrackingKafkaConsumer(Consumer kafkaConsumer, String topic, + Duration keepAliveInterval, Clock c) { + this.kafkaConsumer = kafkaConsumer; + this.topic = topic; + this.clock = c; + this.partitionToOffsetLifecycleTrackerMap = new HashMap<>(); + this.nextSetOfCommitsMap = new HashMap<>(); + this.lastTouchTimeRef = new AtomicReference<>(Instant.EPOCH); + this.keepAliveInterval = keepAliveInterval; + } + + @Override + public void onPartitionsRevoked(Collection partitions) { + safeCommit(); + partitions.forEach(p->{ + nextSetOfCommitsMap.remove(new TopicPartition(topic, p.partition())); + partitionToOffsetLifecycleTrackerMap.remove(p.partition()); + }); + if (hasPendingCommitsToSend) { + hasPendingCommitsToSend = partitionToOffsetLifecycleTrackerMap.values().stream() + .anyMatch(olt -> !olt.isEmpty()); + } + log.atWarn().setMessage(()->this+"partitions revoked for "+partitions.stream() + .map(p->p+"").collect(Collectors.joining(","))).log(); + } + + @Override public void onPartitionsAssigned(Collection partitions) { + ++consumerConnectionGeneration; + partitions.forEach(p->partitionToOffsetLifecycleTrackerMap.computeIfAbsent(p.partition(), + x->new OffsetLifecycleTracker(consumerConnectionGeneration))); + log.atWarn().setMessage(()->this+"partitions added for "+partitions.stream() + .map(p->p+"").collect(Collectors.joining(","))).log(); + } + + public void close() { + log.atInfo().setMessage(()->"Kafka consumer closing. " + + "Committing (implicitly by Kafka's consumer): " + nextCommitsToString()).log(); + kafkaConsumer.close(); + } + + public Optional getNextRequiredTouch() { + return hasPendingCommitsToSend ? Optional.of(lastTouchTimeRef.get().plus(keepAliveInterval)) : Optional.empty(); + } + + public void touch() { + log.trace("touch() called."); + pause(); + try { + var records = kafkaConsumer.poll(Duration.ZERO); + if (!records.isEmpty()) { + throw new IllegalStateException("Expected no entries once the consumer was paused. " + + "This may have happened because a new assignment slipped into the consumer AFTER pause calls."); + } + } catch (IllegalStateException e) { + throw e; + } catch (RuntimeException e) { + log.atWarn().setCause(e).setMessage("Unable to poll the topic: " + topic + " with our Kafka consumer. " + + "Swallowing and awaiting next metadata refresh to try again.").log(); + } finally { + resume(); + } + safeCommit(); + lastTouchTimeRef.set(clock.instant()); + } + + private void pause() { + var activePartitions = kafkaConsumer.assignment(); + try { + kafkaConsumer.pause(activePartitions); + } catch (IllegalStateException e) { + log.atError().setCause(e).setMessage(()->"Unable to pause the topic partitions: " + topic + ". " + + "The active partitions passed here : " + activePartitions.stream() + .map(x->x.toString()).collect(Collectors.joining(",")) + ". " + + "The active partitions as tracked here are: " + getActivePartitions().stream() + .map(x->x.toString()).collect(Collectors.joining(",")) + ". " + + "The active partitions according to the consumer: " + kafkaConsumer.assignment().stream() + .map(x->x.toString()).collect(Collectors.joining(",")) + ).log(); + } + } + + private void resume() { + var activePartitions = kafkaConsumer.assignment(); + try { + kafkaConsumer.pause(activePartitions); + } catch (IllegalStateException e) { + log.atError().setCause(e).setMessage(()->"Unable to resume the topic partitions: " + topic + ". " + + "This may not be a fatal error for the entire process as the consumer should eventually" + + " rejoin and rebalance. " + + "The active partitions passed here : " + activePartitions.stream() + .map(x->x.toString()).collect(Collectors.joining(",")) + ". " + + "The active partitions as tracked here are: " + getActivePartitions().stream() + .map(x->x.toString()).collect(Collectors.joining(",")) + ". " + + "The active partitions according to the consumer: " + kafkaConsumer.assignment().stream() + .map(x->x.toString()).collect(Collectors.joining(",")) + ).log(); + } + } + + public K createAndTrackKey(int partition, long offset, Function keyFactory) { + var offsetTracker = partitionToOffsetLifecycleTrackerMap.get(partition); + offsetTracker.add(offset); + return keyFactory.apply(new PojoKafkaCommitOffsetData(consumerConnectionGeneration, partition, offset)); + } + + private Collection getActivePartitions() { + return partitionToOffsetLifecycleTrackerMap.keySet().stream() + .map(p->new TopicPartition(topic,p)).collect(Collectors.toList()); + } + + public ConsumerRecords getNextBatchOfRecords() { + var records = safePollWithSwallowedRuntimeExceptions(); + safeCommit(); + return records; + } + + private ConsumerRecords safePollWithSwallowedRuntimeExceptions() { + try { + lastTouchTimeRef.set(clock.instant()); + var records = kafkaConsumer.poll(keepAliveInterval.dividedBy(POLL_TIMEOUT_KEEP_ALIVE_DIVISOR)); + log.atLevel(records.isEmpty()? Level.TRACE:Level.INFO) + .setMessage(()->"Kafka consumer poll has fetched "+records.count()+" records").log(); + log.atTrace().setMessage(()->"All positions: {"+kafkaConsumer.assignment().stream() + .map(tp->tp+": "+kafkaConsumer.position(tp)).collect(Collectors.joining(",")) + "}").log(); + log.atTrace().setMessage(()->"All previously COMMITTED positions: {"+kafkaConsumer.assignment().stream() + .map(tp->tp+": "+kafkaConsumer.committed(tp)).collect(Collectors.joining(",")) + "}").log(); + return records; + } catch (RuntimeException e) { + log.atWarn().setCause(e).setMessage("Unable to poll the topic: {} with our Kafka consumer. " + + "Swallowing and awaiting next metadata refresh to try again.").addArgument(topic).log(); + return new ConsumerRecords<>(Collections.emptyMap()); + } + } + + void commitKafkaKey(KafkaCommitOffsetData kafkaTsk) { + if (kafkaTsk.getGeneration() != consumerConnectionGeneration) { + log.atWarn().setMessage(()->"trafficKey's generation (" + kafkaTsk.getGeneration() + ") is not current (" + + consumerConnectionGeneration + "). Dropping this commit request since the record would have " + + "been or will be handled again by a current consumer within this process or another. Full key=" + + kafkaTsk).log(); + return; + } + var p = kafkaTsk.getPartition(); + Optional newHeadValue; + + newHeadValue = partitionToOffsetLifecycleTrackerMap.get(p).removeAndReturnNewHead(kafkaTsk.getOffset()); + newHeadValue.ifPresent(o -> { + hasPendingCommitsToSend = true; + nextSetOfCommitsMap.put(new TopicPartition(topic, p), new OffsetAndMetadata(o)); + }); + } + + private void safeCommit() { + try { + if (hasPendingCommitsToSend) { + log.atDebug().setMessage(() -> "Committing " + nextSetOfCommitsMap).log(); + kafkaConsumer.commitSync(nextSetOfCommitsMap); + log.atDebug().setMessage(() -> "Done committing " + nextSetOfCommitsMap).log(); + nextSetOfCommitsMap.clear(); + } + } catch (RuntimeException e) { + log.atWarn().setCause(e) + .setMessage(() -> "Error while committing. " + + "Another consumer may already be processing messages before these commits. " + + "Commits ARE NOT being discarded here, with the expectation that the revoked callback " + + "(onPartitionsRevoked) will be called. " + + "Within that method, commits for unassigned partitions will be discarded. " + + "After that, touch() or poll() will trigger another commit attempt." + + "Those calls will occur in the near future if assigned partitions have pending commits." + + nextSetOfCommitsMap.entrySet().stream() + .map(kvp -> kvp.getKey() + "->" + kvp.getValue()).collect(Collectors.joining(","))) + .log(); + } + } + + String nextCommitsToString() { + return "nextCommits="+nextSetOfCommitsMap.entrySet().stream() + .map(kvp->kvp.getKey()+"->"+kvp.getValue()).collect(Collectors.joining(",")); + } +} diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrafficStreamKeyWithKafkaRecordId.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrafficStreamKeyWithKafkaRecordId.java new file mode 100644 index 000000000..aa86e96c7 --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/TrafficStreamKeyWithKafkaRecordId.java @@ -0,0 +1,27 @@ +package org.opensearch.migrations.replay.kafka; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamKey; +import org.opensearch.migrations.trafficcapture.protos.TrafficStream; + +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@Getter +class TrafficStreamKeyWithKafkaRecordId extends PojoTrafficStreamKey implements KafkaCommitOffsetData { + private final int generation; + private final int partition; + private final long offset; + + TrafficStreamKeyWithKafkaRecordId(TrafficStream trafficStream, KafkaCommitOffsetData ok) { + this(trafficStream, ok.getGeneration(), ok.getPartition(), ok.getOffset()); + } + + TrafficStreamKeyWithKafkaRecordId(TrafficStream trafficStream, int generation, int partition, long offset) { + super(trafficStream); + this.generation = generation; + this.partition = partition; + this.offset = offset; + } +} diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/BlockingTrafficSource.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/BlockingTrafficSource.java index a22ad2549..a84e7b80b 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/BlockingTrafficSource.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/BlockingTrafficSource.java @@ -15,6 +15,7 @@ import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** @@ -40,9 +41,9 @@ public class BlockingTrafficSource implements ITrafficCaptureSource, BufferedFlo * Limit the number of readers to one at a time and only if we haven't yet maxed out our time buffer */ private final Semaphore readGate; - private final Duration bufferTimeWindow; + public BlockingTrafficSource(ISimpleTrafficCaptureSource underlying, Duration bufferTimeWindow) { this.underlyingSource = underlying; @@ -89,27 +90,13 @@ public Duration getBufferTimeWindow() { @Override public CompletableFuture> readNextTrafficStreamChunk() { - var trafficStreamListFuture = - CompletableFuture.supplyAsync(() -> { - if (stopReadingAtRef.get().equals(Instant.EPOCH)) { return null; } - while (stopReadingAtRef.get().isBefore(lastTimestampSecondsRef.get())) { - try { - log.atInfo().setMessage( - "blocking until signaled to read the next chunk last={} stop={}") - .addArgument(lastTimestampSecondsRef.get()) - .addArgument(stopReadingAtRef.get()) - .log(); - readGate.acquire(); - } catch (InterruptedException e) { - log.atWarn().setCause(e).log("Interrupted while waiting to read more data"); - Thread.currentThread().interrupt(); - break; - } - } - return null; - }, - task -> new Thread(task).start()) - .thenCompose(v->underlyingSource.readNextTrafficStreamChunk()); + log.info("BlockingTrafficSource::readNext"); + var trafficStreamListFuture = CompletableFuture + .supplyAsync(this::blockIfNeeded, task -> new Thread(task).start()) + .thenCompose(v->{ + log.info("BlockingTrafficSource::composing"); + return underlyingSource.readNextTrafficStreamChunk(); + }); return trafficStreamListFuture.whenComplete((v,t)->{ if (t != null) { return; @@ -126,6 +113,42 @@ public Duration getBufferTimeWindow() { }); } + private Void blockIfNeeded() { + if (stopReadingAtRef.get().equals(Instant.EPOCH)) { return null; } + log.atInfo().setMessage(()->"stopReadingAtRef="+stopReadingAtRef+ + " lastTimestampSecondsRef="+lastTimestampSecondsRef).log(); + while (stopReadingAtRef.get().isBefore(lastTimestampSecondsRef.get())) { + try { + log.atInfo().setMessage("blocking until signaled to read the next chunk last={} stop={}") + .addArgument(lastTimestampSecondsRef.get()) + .addArgument(stopReadingAtRef.get()) + .log(); + var nextTouchOp = underlyingSource.getNextRequiredTouch(); + if (nextTouchOp.isEmpty()) { + readGate.acquire(); + } else { + var nextInstant = nextTouchOp.get(); + final var nowTime = Instant.now(); + var waitIntervalMs = Duration.between(nowTime, nextInstant).toMillis(); + log.atDebug().setMessage(()->"Next touch at " + nextInstant + + " ... in " + waitIntervalMs + "ms (now="+nowTime+")").log(); + if (waitIntervalMs <= 0) { + underlyingSource.touch(); + } else { + // if this doesn't succeed, we'll loop around & likely do a touch, then loop around again. + // if it DOES succeed, we'll loop around and make sure that there's not another reason to stop + readGate.tryAcquire(waitIntervalMs, TimeUnit.MILLISECONDS); + } + } + } catch (InterruptedException e) { + log.atWarn().setCause(e).log("Interrupted while waiting to read more data"); + Thread.currentThread().interrupt(); + break; + } + } + return null; + } + @Override public void commitTrafficStream(ITrafficStreamKey trafficStreamKey) throws IOException { underlyingSource.commitTrafficStream(trafficStreamKey); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/ITrafficCaptureSource.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/ITrafficCaptureSource.java index 43838a2a5..625bde671 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/ITrafficCaptureSource.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/ITrafficCaptureSource.java @@ -4,7 +4,10 @@ import java.io.Closeable; import java.io.IOException; +import java.time.Duration; +import java.time.Instant; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; public interface ITrafficCaptureSource extends Closeable { @@ -14,4 +17,17 @@ public interface ITrafficCaptureSource extends Closeable { default void commitTrafficStream(ITrafficStreamKey trafficStreamKey) throws IOException {} default void close() throws IOException {} + + /** + * Keep-alive call to be used by the BlockingTrafficSource to keep this connection alive if + * this is required. + */ + default void touch() {} + + /** + * @return The time that the next call to touch() must be completed for this source to stay + * active. Empty indicates that touch() does not need to be called to keep the + * source active. + */ + default Optional getNextRequiredTouch() { return Optional.empty(); } } diff --git a/TrafficCapture/trafficReplayer/src/main/resources/log4j2.properties b/TrafficCapture/trafficReplayer/src/main/resources/log4j2.properties index 74c01dc2d..76b324f4d 100644 --- a/TrafficCapture/trafficReplayer/src/main/resources/log4j2.properties +++ b/TrafficCapture/trafficReplayer/src/main/resources/log4j2.properties @@ -32,8 +32,8 @@ appender.ReplayerLogFile.strategy.max = 288 appender.OUTPUT_TUPLES.type = RollingFile appender.OUTPUT_TUPLES.name = OUTPUT_TUPLES -appender.OUTPUT_TUPLES.fileName = ${bundle:bundle.base}/${bundle:bundle.version}/${bundle:bundle.name}/logs/${bundle:bundle.name}_app.log -appender.OUTPUT_TUPLES.filePattern = ${bundle:bundle.base}/${bundle:bundle.version}/${bundle:bundle.name}/logs/${bundle:bundle.name}_app-%d{yyyy-MM-dd_HH-mm}.log.gz +appender.OUTPUT_TUPLES.fileName = ${tupleDir}/tuples.log +appender.OUTPUT_TUPLES.filePattern = ${tupleDir}/tuples-%d{yyyy-MM-dd-HH:mm}{UTC}.log appender.OUTPUT_TUPLES.layout.type = JsonLayout appender.OUTPUT_TUPLES.layout.properties = false appender.OUTPUT_TUPLES.layout.complete = false diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullTrafficReplayerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullTrafficReplayerTest.java index a50d4c953..4e0f3765c 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullTrafficReplayerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullTrafficReplayerTest.java @@ -80,7 +80,7 @@ public void testSingleStreamWithCloseIsCommitted() throws Throwable { TrafficReplayerRunner.runReplayerUntilSourceWasExhausted(0, httpServer.localhostEndpoint(), new IndexWatchingReceiverFactory(), trafficSourceSupplier); Assertions.assertEquals(1, trafficSourceSupplier.nextReadCursor.get()); - log.error("done"); + log.info("done"); } @ParameterizedTest @@ -104,7 +104,7 @@ public void fullTest(int testSize, boolean randomize) throws Throwable { TrafficReplayerRunner.runReplayerUntilSourceWasExhausted(numExpectedRequests, httpServer.localhostEndpoint(), new IndexWatchingReceiverFactory(), trafficSourceSupplier); Assertions.assertEquals(trafficSourceSupplier.streams.size(), trafficSourceSupplier.nextReadCursor.get()); - log.error("done"); + log.info("done"); } @Getter @@ -141,7 +141,7 @@ public ArrayCursorTrafficSourceFactory(List streams) { public ISimpleTrafficCaptureSource get() { var rval = new ArrayCursorTrafficCaptureSource(this); - log.error("trafficSource="+rval+" readCursor="+rval.readCursor.get()+" nextReadCursor="+ nextReadCursor.get()); + log.info("trafficSource="+rval+" readCursor="+rval.readCursor.get()+" nextReadCursor="+ nextReadCursor.get()); return rval; } } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaRestartingTrafficReplayerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaRestartingTrafficReplayerTest.java index 34d28db72..8d55a6ffc 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaRestartingTrafficReplayerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaRestartingTrafficReplayerTest.java @@ -11,10 +11,8 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; -import org.opensearch.migrations.replay.kafka.KafkaProtobufConsumer; +import org.opensearch.migrations.replay.kafka.KafkaTrafficCaptureSource; import org.opensearch.migrations.replay.traffic.source.ISimpleTrafficCaptureSource; import org.opensearch.migrations.replay.traffic.source.ITrafficStreamWithKey; import org.opensearch.migrations.replay.traffic.source.TrafficStreamWithEmbeddedKey; @@ -50,10 +48,11 @@ public class KafkaRestartingTrafficReplayerTest { public static final int PRODUCER_SLEEP_INTERVAL_MS = 100; public static final Duration MAX_WAIT_TIME_FOR_TOPIC = Duration.ofMillis(PRODUCER_SLEEP_INTERVAL_MS*2); + public static final long DEFAULT_POLL_INTERVAL_MS = 5000; @Container // see https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility - private KafkaContainer embeddedKafkaBroker = + private final KafkaContainer embeddedKafkaBroker = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0")); private static class CounterLimitedReceiverFactory implements Supplier> { @@ -66,7 +65,7 @@ public Consumer get() { var counter = new AtomicInteger(); return tuple -> { if (counter.incrementAndGet() > stopPoint) { - log.error("Request received after our ingest threshold. Throwing. Discarding " + + log.warn("Request received after our ingest threshold. Throwing. Discarding " + tuple.uniqueRequestKey); var nextStopPoint = stopPoint + new Random(stopPoint).nextInt(stopPoint + 1); nextStopPointRef.compareAndSet(stopPoint, nextStopPoint); @@ -90,7 +89,7 @@ public void fullTest(int testSize, boolean randomize) throws Throwable { response->TestHttpServerContext.makeResponse(random, response)); var streamAndConsumer = TrafficStreamGenerator.generateStreamAndSumOfItsTransactions(testSize, randomize); var trafficStreams = streamAndConsumer.stream.collect(Collectors.toList()); - log.atInfo().setMessage(()->trafficStreams.stream().map(ts-> TrafficStreamUtils.summarizeTrafficStream(ts)) + log.atInfo().setMessage(()->trafficStreams.stream().map(TrafficStreamUtils::summarizeTrafficStream) .collect(Collectors.joining("\n"))).log(); loadStreamsToKafka(buildKafkaConsumer(), @@ -98,15 +97,16 @@ public void fullTest(int testSize, boolean randomize) throws Throwable { TrafficReplayerRunner.runReplayerUntilSourceWasExhausted(streamAndConsumer.numHttpTransactions, httpServer.localhostEndpoint(), new CounterLimitedReceiverFactory(), () -> new SentinelSensingTrafficSource( - new KafkaProtobufConsumer(buildKafkaConsumer(), TEST_TOPIC_NAME, null))); - log.error("done"); + new KafkaTrafficCaptureSource(buildKafkaConsumer(), TEST_TOPIC_NAME, + Duration.ofMillis(DEFAULT_POLL_INTERVAL_MS)))); + log.info("done"); } @SneakyThrows private KafkaConsumer buildKafkaConsumer() { - var kafkaConsumerProps = KafkaProtobufConsumer.buildKafkaProperties(embeddedKafkaBroker.getBootstrapServers(), + var kafkaConsumerProps = KafkaTrafficCaptureSource.buildKafkaProperties(embeddedKafkaBroker.getBootstrapServers(), TEST_GROUP_CONSUMER_ID, false, null); - kafkaConsumerProps.setProperty("max.poll.interval.ms", "5000"); + kafkaConsumerProps.setProperty("max.poll.interval.ms", DEFAULT_POLL_INTERVAL_MS+""); var kafkaConsumer = new KafkaConsumer(kafkaConsumerProps); log.atInfo().setMessage(()->"Just built KafkaConsumer="+kafkaConsumer).log(); return kafkaConsumer; @@ -174,7 +174,8 @@ Producer buildKafkaProducer() { throw Lombok.sneakyThrow(e); } }); - return () -> new KafkaProtobufConsumer(kafkaConsumer, TEST_TOPIC_NAME, null); + return () -> new KafkaTrafficCaptureSource(kafkaConsumer, TEST_TOPIC_NAME, + Duration.ofMillis(DEFAULT_POLL_INTERVAL_MS)); } @SneakyThrows diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/SimpleCapturedTrafficToHttpTransactionAccumulatorTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/SimpleCapturedTrafficToHttpTransactionAccumulatorTest.java index 1ec61749c..53b97d41c 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/SimpleCapturedTrafficToHttpTransactionAccumulatorTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/SimpleCapturedTrafficToHttpTransactionAccumulatorTest.java @@ -236,7 +236,7 @@ public void onConnectionClose(ISourceTrafficChannelKey key, int channelInteracti static void assertReconstructedTransactionsMatchExpectations(List reconstructedTransactions, int[] expectedRequestSizes, int[] expectedResponseSizes) { - log.error("reconstructedTransactions="+ reconstructedTransactions); + log.debug("reconstructedTransactions="+ reconstructedTransactions); Assertions.assertEquals(expectedRequestSizes.length, reconstructedTransactions.size()); for (int i = 0; i< reconstructedTransactions.size(); ++i) { Assertions.assertEquals((long) expectedRequestSizes[i], diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonToByteBufHandlerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonToByteBufHandlerTest.java index 818a53988..9da438ce4 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonToByteBufHandlerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonToByteBufHandlerTest.java @@ -34,7 +34,7 @@ public class NettyJsonToByteBufHandlerTest { @Test public void testThatHttpContentsAreRepackagedToChunkSizeSpec() { for (int i=0; i<10; ++i) { - log.error("Testing w/ random seed="+i); + log.info("Testing w/ random seed="+i); testWithSeed(new Random(i)); System.gc(); System.runFinalization(); @@ -90,7 +90,7 @@ private static List getByteBufSizesFromChannel(EmbeddedChannel channel) static byte nonce = 0; private int writeAndCheck(EmbeddedChannel channel, ArrayList sizesWrittenList, int len) { var bytes = new byte[len]; - log.warn("Writing "+len); + log.debug("Writing "+len); sizesWrittenList.add(len); Arrays.fill(bytes, nonce++); var httpContent = new DefaultHttpContent(Unpooled.wrappedBuffer(bytes)); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaKeepAliveTests.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaKeepAliveTests.java new file mode 100644 index 000000000..b2319765a --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaKeepAliveTests.java @@ -0,0 +1,171 @@ +package org.opensearch.migrations.replay.kafka; + +import lombok.Lombok; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.Producer; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey; +import org.opensearch.migrations.replay.traffic.source.BlockingTrafficSource; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@Slf4j +@Testcontainers(disabledWithoutDocker = true) +@Tag("requiresDocker") +public class KafkaKeepAliveTests { + public static final String TEST_GROUP_CONSUMER_ID = "TEST_GROUP_CONSUMER_ID"; + public static final String HEARTBEAT_INTERVAL_MS_KEY = "heartbeat.interval.ms"; + public static final long MAX_POLL_INTERVAL_MS = 1000; + public static final long HEARTBEAT_INTERVAL_MS = 300; + public static final String testTopicName = "TEST_TOPIC"; + + Producer kafkaProducer; + AtomicInteger sendCompleteCount; + Properties kafkaProperties; + BlockingTrafficSource trafficSource; + ArrayList keysReceived; + + @Container + // see https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility + private final KafkaContainer embeddedKafkaBroker = + new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0")); + private KafkaTrafficCaptureSource kafkaSource; + + /** + * Set up the test case where we've produced and received 1 message, but have not yet committed it. + * Another message is in the process of being produced. + * The BlockingTrafficSource is blocked on everything after a point before the beginning of the test. + * @throws Exception + */ + @BeforeEach + private void setupTestCase() throws Exception { + kafkaProducer = KafkaTestUtils.buildKafkaProducer(embeddedKafkaBroker.getBootstrapServers()); + this.sendCompleteCount = new AtomicInteger(0); + KafkaTestUtils.produceKafkaRecord(testTopicName, kafkaProducer, 0, sendCompleteCount).get(); + Assertions.assertEquals(1, sendCompleteCount.get()); + + this.kafkaProperties = KafkaTrafficCaptureSource.buildKafkaProperties(embeddedKafkaBroker.getBootstrapServers(), + TEST_GROUP_CONSUMER_ID, false, null); + Assertions.assertNull(kafkaProperties.get(KafkaTrafficCaptureSource.MAX_POLL_INTERVAL_KEY)); + + kafkaProperties.put(KafkaTrafficCaptureSource.MAX_POLL_INTERVAL_KEY, MAX_POLL_INTERVAL_MS+""); + kafkaProperties.put(HEARTBEAT_INTERVAL_MS_KEY, HEARTBEAT_INTERVAL_MS+""); + kafkaProperties.put("max.poll.records", 1); + var kafkaConsumer = new KafkaConsumer(kafkaProperties); + this.kafkaSource = new KafkaTrafficCaptureSource(kafkaConsumer, testTopicName, Duration.ofMillis(MAX_POLL_INTERVAL_MS)); + this.trafficSource = new BlockingTrafficSource(kafkaSource, Duration.ZERO); + this.keysReceived = new ArrayList<>(); + + readNextNStreams(trafficSource, keysReceived, 0, 1); + KafkaTestUtils.produceKafkaRecord(testTopicName, kafkaProducer, 1, sendCompleteCount); + } + + @Test + @Tag("longTest") + public void testTimeoutsDontOccurForSlowPolls() throws Exception { + var pollIntervalMs = Optional.ofNullable(kafkaProperties.get(KafkaTrafficCaptureSource.MAX_POLL_INTERVAL_KEY)) + .map(s->Integer.valueOf((String)s)).orElseThrow(); + var executor = Executors.newSingleThreadScheduledExecutor(); + executor.schedule(()-> { + try { + var k = keysReceived.get(0); + log.info("Calling commit traffic stream for "+k); + trafficSource.commitTrafficStream(k); + log.info("finished committing traffic stream"); + log.info("Stop reads to infinity"); + // this is a way to signal back to the main thread that this thread is done + KafkaTestUtils.produceKafkaRecord(testTopicName, kafkaProducer, 2, sendCompleteCount); + } catch (Exception e) { + throw Lombok.sneakyThrow(e); + } + }, + pollIntervalMs, TimeUnit.MILLISECONDS); + + // wait for 2 messages so that they include the last one produced by the async schedule call previously + readNextNStreams(trafficSource, keysReceived, 1, 2); + Assertions.assertEquals(3, keysReceived.size()); + // At this point, we've read all (3) messages produced , committed the first one + // (all the way through to Kafka), and no commits are in-flight yet for the last two messages. + } + + @Test + @Tag("longTest") + public void testBlockedReadsAndBrokenCommitsDontCauseReordering() throws Exception { + for (int i=0; i<2; ++i) { + KafkaTestUtils.produceKafkaRecord(testTopicName, kafkaProducer, 1 + i, sendCompleteCount).get(); + } + readNextNStreams(trafficSource, keysReceived, 1, 1); + + trafficSource.commitTrafficStream(keysReceived.get(0)); + log.info("Called commitTrafficStream but waiting long enough for the client to leave the group. " + + "That will make the previous commit a 'zombie-commit' that should easily be dropped."); + + log.info("1 message was committed, but not synced, 1 message is being processed." + + "wait long enough to fall out of the group before we can commit"); + Thread.sleep(2*MAX_POLL_INTERVAL_MS); + + var keysReceivedUntilDrop1 = keysReceived; + keysReceived = new ArrayList<>(); + + log.info("re-establish a client connection so that the following commit will work"); + log.atInfo().setMessage(()->"1 ..."+renderNextCommitsAsString()).log(); + readNextNStreams(trafficSource, keysReceived, 0, 1); + log.atInfo().setMessage(()->"2 ..."+renderNextCommitsAsString()).log(); + + log.info("wait long enough to fall out of the group again"); + Thread.sleep(2*MAX_POLL_INTERVAL_MS); + + var keysReceivedUntilDrop2 = keysReceived; + keysReceived = new ArrayList<>(); + log.atInfo().setMessage(()->"re-establish... 3 ..."+renderNextCommitsAsString()).log(); + readNextNStreams(trafficSource, keysReceived, 0, 1); + trafficSource.commitTrafficStream(keysReceivedUntilDrop1.get(1)); + log.atInfo().setMessage(()->"re-establish... 4 ..."+renderNextCommitsAsString()).log(); + readNextNStreams(trafficSource, keysReceived, 1, 1); + log.atInfo().setMessage(()->"5 ..."+renderNextCommitsAsString()).log(); + + Thread.sleep(2*MAX_POLL_INTERVAL_MS); + var keysReceivedUntilDrop3 = keysReceived; + keysReceived = new ArrayList<>(); + readNextNStreams(trafficSource, keysReceived, 0, 3); + log.atInfo().setMessage(()->"6 ..."+kafkaSource.trackingKafkaConsumer.nextCommitsToString()).log(); + trafficSource.close(); + } + + private String renderNextCommitsAsString() { + return kafkaSource.trackingKafkaConsumer.nextCommitsToString(); + } + + @SneakyThrows + private static void readNextNStreams(BlockingTrafficSource kafkaSource, List keysReceived, + int from, int count) { + Assertions.assertEquals(from, keysReceived.size()); + for (int i=0; i{ + var tsk = ts.getKey(); + log.atInfo().setMessage(()->"checking for "+tsk).log(); + Assertions.assertFalse(keysReceived.contains(tsk)); + keysReceived.add(tsk); + }); + log.info("Read "+trafficStreams.size()+" traffic streams"); + i += trafficStreams.size(); + } + } +} diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumerLongTermTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumerLongTermTest.java deleted file mode 100644 index e8f478b04..000000000 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumerLongTermTest.java +++ /dev/null @@ -1,156 +0,0 @@ -package org.opensearch.migrations.replay.kafka; - -import com.google.protobuf.ByteString; -import com.google.protobuf.Timestamp; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.jetbrains.annotations.NotNull; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.opensearch.migrations.trafficcapture.protos.ReadObservation; -import org.opensearch.migrations.trafficcapture.protos.TrafficObservation; -import org.opensearch.migrations.trafficcapture.protos.TrafficStream; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; - -import java.nio.charset.StandardCharsets; -import java.time.Instant; -import java.util.Properties; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; - -@Slf4j -@Testcontainers(disabledWithoutDocker = true) -@Tag("requiresDocker") -public class KafkaProtobufConsumerLongTermTest { - - public static final String TEST_GROUP_CONSUMER_ID = "TEST_GROUP_CONSUMER_ID"; - public static final String TEST_GROUP_PRODUCER_ID = "TEST_GROUP_PRODUCER_ID"; - public static final int TEST_RECORD_COUNT = 10; - public static final String TEST_NODE_ID = "TestNodeId"; - public static final String TEST_TRAFFIC_STREAM_ID_STRING = "TEST_TRAFFIC_STREAM_ID_STRING"; - private static final String FAKE_READ_PACKET_DATA = "Fake pa"; - public static final int PRODUCER_SLEEP_INTERVAL_MS = 100; - @Container - // see https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility - private KafkaContainer embeddedKafkaBroker = - new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0")); - - - Producer buildKafkaProducer() { - var kafkaProps = new Properties(); - kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - // Property details: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#delivery-timeout-ms - kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10000); - kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); - kafkaProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000); - kafkaProps.put(ProducerConfig.CLIENT_ID_CONFIG, TEST_GROUP_PRODUCER_ID); - kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBootstrapServers()); - try { - return new KafkaProducer(kafkaProps); - } catch (Exception e) { - log.atError().setCause(e).log(); - System.exit(1); - throw e; - } - } - - TrafficStream makeTestTrafficStream(Instant t, int i) { - var timestamp = Timestamp.newBuilder() - .setSeconds(t.getEpochSecond()) - .setNanos(t.getNano()) - .build(); - var tsb = TrafficStream.newBuilder() - .setNumber(i); - // TODO - add something for setNumberOfThisLastChunk. There's no point in doing that now though - // because the code doesn't make any distinction between the very last one and the previous ones - return tsb.setNodeId(TEST_NODE_ID) - .setConnectionId(getConnectionId(i)) - .addSubStream(TrafficObservation.newBuilder().setTs(timestamp) - .setRead(ReadObservation.newBuilder() - .setData(ByteString.copyFrom(FAKE_READ_PACKET_DATA.getBytes(StandardCharsets.UTF_8))) - .build()) - .build()).build(); - - } - - private String getConnectionId(int i) { - return TEST_TRAFFIC_STREAM_ID_STRING + "_" + i; - } - - //@Test - @Tag("longTest") - public void testTrafficCaptureSource() throws Exception { - String testTopicName = "TEST_TOPIC"; - - var kafkaConsumerProps = KafkaProtobufConsumer.buildKafkaProperties(embeddedKafkaBroker.getBootstrapServers(), - TEST_GROUP_CONSUMER_ID, false, null); - kafkaConsumerProps.setProperty("max.poll.interval.ms", "10000"); - var kafkaConsumer = new KafkaConsumer(kafkaConsumerProps); - var kafkaTrafficCaptureSource = new KafkaProtobufConsumer(kafkaConsumer, testTopicName, null); - - var kafkaProducer = buildKafkaProducer(); - var sendCompleteCount = new AtomicInteger(0); - var scheduledIterationsCount = new AtomicInteger(0); - var executor = Executors.newSingleThreadScheduledExecutor(); - executor.scheduleAtFixedRate(()->{ - var i = scheduledIterationsCount.getAndIncrement(); - if (i >= TEST_RECORD_COUNT) { - executor.shutdown(); - } else { - produceKafkaRecord(testTopicName, kafkaProducer, i, sendCompleteCount); - } - }, 0, PRODUCER_SLEEP_INTERVAL_MS, TimeUnit.MILLISECONDS); - - for (int i=0; i { - var rogueChunk = kafkaTrafficCaptureSource.readNextTrafficStreamChunk().get(1, TimeUnit.SECONDS); - if (rogueChunk.isEmpty()) { - // TimeoutExceptions cannot be thrown by the supplier of the CompletableFuture today, BUT we - // could long-poll on the broker for longer than the timeout value supplied in the get() call above - throw new TimeoutException("read actually returned 0 items, but transforming this to a " + - "TimeoutException because either result would be valid."); - } - log.error("rogue chunk: "+ rogueChunk); - }); - } - - private long getSleepAmountMsForProducerRun(int i) { - return 1*1000; - } - - private void produceKafkaRecord(String testTopicName, Producer kafkaProducer, int i, - AtomicInteger sendCompleteCount) { - var trafficStream = makeTestTrafficStream(Instant.now(), i); - var record = new ProducerRecord(testTopicName, makeKey(i), trafficStream.toByteArray()); - var sendFuture = kafkaProducer.send(record, (metadata, exception) -> { - sendCompleteCount.incrementAndGet(); - }); - } - - @NotNull - private static String makeKey(int i) { - return "KEY_" + i; - } -} \ No newline at end of file diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTestUtils.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTestUtils.java new file mode 100644 index 000000000..691439ff1 --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTestUtils.java @@ -0,0 +1,90 @@ +package org.opensearch.migrations.replay.kafka; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Tag; +import org.opensearch.migrations.trafficcapture.protos.ReadObservation; +import org.opensearch.migrations.trafficcapture.protos.TrafficObservation; +import org.opensearch.migrations.trafficcapture.protos.TrafficStream; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.Properties; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +@Slf4j +public class KafkaTestUtils { + + public static final String TEST_GROUP_PRODUCER_ID = "TEST_GROUP_PRODUCER_ID"; + private static final String FAKE_READ_PACKET_DATA = "Fake pa"; + public static final String TEST_NODE_ID = "TestNodeId"; + public static final String TEST_TRAFFIC_STREAM_ID_STRING = "TEST_TRAFFIC_STREAM_ID_STRING"; + + static Producer buildKafkaProducer(String bootstrapServers) { + var kafkaProps = new Properties(); + kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + // Property details: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#delivery-timeout-ms + kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10000); + kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); + kafkaProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000); + kafkaProps.put(ProducerConfig.CLIENT_ID_CONFIG, TEST_GROUP_PRODUCER_ID); + kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + try { + return new KafkaProducer(kafkaProps); + } catch (Exception e) { + log.atError().setCause(e).log(); + System.exit(1); + throw e; + } + } + + static String getConnectionId(int i) { + return TEST_TRAFFIC_STREAM_ID_STRING + "_" + i; + } + + static TrafficStream makeTestTrafficStream(Instant t, int i) { + var timestamp = Timestamp.newBuilder() + .setSeconds(t.plus(Duration.ofDays(i)).getEpochSecond()) + .setNanos(t.getNano()) + .build(); + var tsb = TrafficStream.newBuilder() + .setNumber(i); + // TODO - add something for setNumberOfThisLastChunk. There's no point in doing that now though + // because the code doesn't make any distinction between the very last one and the previous ones + return tsb.setNodeId(TEST_NODE_ID) + .setConnectionId(getConnectionId(i)) + .addSubStream(TrafficObservation.newBuilder().setTs(timestamp) + .setRead(ReadObservation.newBuilder() + .setData(ByteString.copyFrom(FAKE_READ_PACKET_DATA.getBytes(StandardCharsets.UTF_8))) + .build()) + .build()).build(); + + } + + static Future produceKafkaRecord(String testTopicName, Producer kafkaProducer, + int i, AtomicInteger sendCompleteCount) { + var trafficStream = KafkaTestUtils.makeTestTrafficStream(Instant.now(), i); + var record = new ProducerRecord(testTopicName, makeKey(i), trafficStream.toByteArray()); + return kafkaProducer.send(record, (metadata, exception) -> { + sendCompleteCount.incrementAndGet(); + }); + } + + @NotNull + private static String makeKey(int i) { + return "KEY_" + i; + } +} diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceLongTermTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceLongTermTest.java new file mode 100644 index 000000000..40e15c712 --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceLongTermTest.java @@ -0,0 +1,86 @@ +package org.opensearch.migrations.replay.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +@Slf4j +@Testcontainers(disabledWithoutDocker = true) +@Tag("requiresDocker") +public class KafkaTrafficCaptureSourceLongTermTest { + + public static final int TEST_RECORD_COUNT = 10; + public static final String TEST_GROUP_CONSUMER_ID = "TEST_GROUP_CONSUMER_ID"; + public static final int PRODUCER_SLEEP_INTERVAL_MS = 100; + + @Container + // see https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility + private final KafkaContainer embeddedKafkaBroker = + new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0")); + + + @Test + @Tag("longTest") + public void testTrafficCaptureSource() throws Exception { + String testTopicName = "TEST_TOPIC"; + + var kafkaConsumerProps = KafkaTrafficCaptureSource.buildKafkaProperties(embeddedKafkaBroker.getBootstrapServers(), + TEST_GROUP_CONSUMER_ID, false, null); + final long MAX_POLL_MS = 10000; + kafkaConsumerProps.setProperty(KafkaTrafficCaptureSource.MAX_POLL_INTERVAL_KEY, MAX_POLL_MS+""); + var kafkaConsumer = new KafkaConsumer(kafkaConsumerProps); + var kafkaTrafficCaptureSource = new KafkaTrafficCaptureSource(kafkaConsumer, testTopicName, + Duration.ofMillis(MAX_POLL_MS)); + + var kafkaProducer = KafkaTestUtils.buildKafkaProducer(embeddedKafkaBroker.getBootstrapServers()); + var sendCompleteCount = new AtomicInteger(0); + var scheduledIterationsCount = new AtomicInteger(0); + var executor = Executors.newSingleThreadScheduledExecutor(); + executor.scheduleAtFixedRate(()->{ + var i = scheduledIterationsCount.getAndIncrement(); + if (i >= TEST_RECORD_COUNT) { + executor.shutdown(); + } else { + KafkaTestUtils.produceKafkaRecord(testTopicName, kafkaProducer, i, sendCompleteCount); + } + }, 0, PRODUCER_SLEEP_INTERVAL_MS, TimeUnit.MILLISECONDS); + + for (int i=0; i { + var rogueChunk = kafkaTrafficCaptureSource.readNextTrafficStreamChunk().get(1, TimeUnit.SECONDS); + if (rogueChunk.isEmpty()) { + // TimeoutExceptions cannot be thrown by the supplier of the CompletableFuture today, BUT we + // could long-poll on the broker for longer than the timeout value supplied in the get() call above + throw new TimeoutException("read actually returned 0 items, but transforming this to a " + + "TimeoutException because either result would be valid."); + } + log.error("rogue chunk: "+ rogueChunk); + }); + } + + private long getSleepAmountMsForProducerRun(int i) { + return 1*1000; + } +} \ No newline at end of file diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java similarity index 94% rename from TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumerTest.java rename to TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java index 8ae5c46d3..794da7768 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaProtobufConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java @@ -29,10 +29,9 @@ import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; -import java.util.stream.Stream; @Slf4j -class KafkaProtobufConsumerTest { +class KafkaTrafficCaptureSourceTest { public static final int NUM_READ_ITEMS_BOUND = 1000; public static final String TEST_TOPIC_NAME = "TEST_TOPIC_NAME"; @@ -40,7 +39,8 @@ class KafkaProtobufConsumerTest { public void testSupplyTrafficFromSource() { int numTrafficStreams = 10; MockConsumer mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); - KafkaProtobufConsumer protobufConsumer = new KafkaProtobufConsumer(mockConsumer, TEST_TOPIC_NAME); + KafkaTrafficCaptureSource protobufConsumer = new KafkaTrafficCaptureSource(mockConsumer, TEST_TOPIC_NAME, + Duration.ofHours(1)); initializeMockConsumerTopic(mockConsumer); List substreamCounts = new ArrayList<>(); @@ -80,7 +80,8 @@ public void testSupplyTrafficFromSource() { public void testSupplyTrafficWithUnformattedMessages() { int numTrafficStreams = 10; MockConsumer mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); - KafkaProtobufConsumer protobufConsumer = new KafkaProtobufConsumer(mockConsumer, TEST_TOPIC_NAME); + KafkaTrafficCaptureSource protobufConsumer = new KafkaTrafficCaptureSource(mockConsumer, TEST_TOPIC_NAME, + Duration.ofHours(1)); initializeMockConsumerTopic(mockConsumer); List substreamCounts = new ArrayList<>(); @@ -129,7 +130,7 @@ public void testSupplyTrafficWithUnformattedMessages() { @Test public void testBuildPropertiesBaseCase() throws IOException { - Properties props = KafkaProtobufConsumer.buildKafkaProperties("brokers", "groupId", false, null); + Properties props = KafkaTrafficCaptureSource.buildKafkaProperties("brokers", "groupId", false, null); Assertions.assertEquals("brokers", props.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); Assertions.assertEquals("org.apache.kafka.common.serialization.StringDeserializer", props.get("key.deserializer")); Assertions.assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", props.get("value.deserializer")); @@ -139,7 +140,7 @@ public void testBuildPropertiesBaseCase() throws IOException { @Test public void testBuildPropertiesMSKAuthEnabled() throws IOException { - Properties props = KafkaProtobufConsumer.buildKafkaProperties("brokers", "groupId", true, null); + Properties props = KafkaTrafficCaptureSource.buildKafkaProperties("brokers", "groupId", true, null); Assertions.assertEquals("brokers", props.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); Assertions.assertEquals("org.apache.kafka.common.serialization.StringDeserializer", props.get("key.deserializer")); Assertions.assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", props.get("value.deserializer")); @@ -154,7 +155,7 @@ public void testBuildPropertiesMSKAuthEnabled() throws IOException { @Test public void testBuildPropertiesWithProvidedPropertyFile() throws IOException { File simplePropertiesFile = new File("src/test/resources/kafka/simple-kafka.properties"); - Properties props = KafkaProtobufConsumer.buildKafkaProperties("brokers", "groupId", true, simplePropertiesFile.getPath()); + Properties props = KafkaTrafficCaptureSource.buildKafkaProperties("brokers", "groupId", true, simplePropertiesFile.getPath()); Assertions.assertEquals("brokers", props.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); Assertions.assertEquals("org.apache.kafka.common.serialization.StringDeserializer", props.get("key.deserializer")); Assertions.assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", props.get("value.deserializer")); diff --git a/TrafficCapture/trafficReplayer/src/test/resources/log4j2.properties b/TrafficCapture/trafficReplayer/src/test/resources/log4j2.properties index 52d5190a1..9098da413 100644 --- a/TrafficCapture/trafficReplayer/src/test/resources/log4j2.properties +++ b/TrafficCapture/trafficReplayer/src/test/resources/log4j2.properties @@ -14,3 +14,7 @@ appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} %p %c{1.} [%t # of the logs for tests logger.OutputTupleJsonLogger.name = OutputTupleJsonLogger logger.OutputTupleJsonLogger.level = OFF + +logger.KPC.name = org.opensearch.migrations.replay.kafka.KafkaProtobufConsumer +logger.KPC.level = DEBUG +logger.KPC.appenderRef.stdout.ref = Console diff --git a/deployment/cdk/opensearch-service-migration/README.md b/deployment/cdk/opensearch-service-migration/README.md index 18fe49614..7660b8e17 100644 --- a/deployment/cdk/opensearch-service-migration/README.md +++ b/deployment/cdk/opensearch-service-migration/README.md @@ -200,6 +200,35 @@ Run the `accessAnalyticsDashboard` script, and then open https://localhost:8157/ ./accessAnalyticsDashboard.sh dev us-east-1 ``` +## Configuring Capture Proxy IAM and Security Groups +Although this CDK does not set up the Capture Proxy on source cluster nodes (except in the case of the demo solution), the Capture Proxy instances do need to communicate with resources deployed by this CDK (e.g. Kafka) which this section covers + +#### Capture Proxy on OpenSearch/Elasticsearch nodes +Before [setting up Capture Proxy instances](../../../TrafficCapture/trafficCaptureProxyServer/README.md#how-to-attach-a-capture-proxy-on-a-coordinator-node) on the source cluster, the IAM policies and Security Groups for the nodes should allow access to the Migration tooling: +1. The coordinator nodes should add the `migrationMSKSecurityGroup` security group to allow access to Kafka +2. The IAM role used by the coordinator nodes should have permissions to publish captured traffic to Kafka. A template policy to use, can be seen below + * This can be added through the AWS Console (IAM Role -> Add permissions -> Create inline policy -> JSON view) +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Action": "kafka-cluster:Connect", + "Resource": "arn:aws:kafka:::cluster/migration-msk-cluster-/*", + "Effect": "Allow" + }, + { + "Action": [ + "kafka-cluster:CreateTopic", + "kafka-cluster:DescribeTopic", + "kafka-cluster:WriteData" + ], + "Resource": "arn:aws:kafka:::topic/migration-msk-cluster-/*", + "Effect": "Allow" + } + ] +} +``` ## Tearing down CDK To remove all the CDK stack(s) which get created during a deployment we can execute a command similar to below