diff --git a/src/test/resources/processor_test_files/S2142_InterruptedException/MirusKafkaMonitor.java b/src/test/resources/processor_test_files/S2142_InterruptedException/MirusKafkaMonitor.java new file mode 100644 index 000000000..e098c840c --- /dev/null +++ b/src/test/resources/processor_test_files/S2142_InterruptedException/MirusKafkaMonitor.java @@ -0,0 +1,389 @@ +// This test resource is a reproduction of issue description of https://github.com/SpoonLabs/sorald/issues/603 + +/* + * Copyright (c) 2018, salesforce.com, inc. + * All rights reserved. + * SPDX-License-Identifier: BSD-3-Clause + * For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause + * + */ + +package com.salesforce.mirus; + +import com.salesforce.mirus.config.SourceConfig; +import com.salesforce.mirus.metrics.MissingPartitionsJmxReporter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.transforms.RegexRouter; +import org.apache.kafka.connect.transforms.Transformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The KafkaMonitor thread is started by {@link MirusSourceConnector} and polls the source and + * destination clusters to maintain an up-to-date list of partitions eligible for mirroring. When a + * change is detected the thread requests that tasks for this source are reconfigured. Partitions + * that match the configured whitelist are validated to ensure they exist in both source and the + * destination cluster. Validation supports topic re-routing with the RegexRouter Transformation, + * but no other topic re-routing is supported. Validation may be disabled by setting the + * enable.destination.topic.checking config option to false. + * + *

MirusSourceConnector also uses KafkaMonitor for task assignment. A round-robin style algorithm + * is used to assign partitions to SourceTask instances. + */ +class KafkaMonitor implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(KafkaMonitor.class); + private static final int[] BACKOFF_WAIT_SECONDS = {0, 1, 2, 4, 8, 16, 32, 64}; + + private final ConnectorContext context; + private final List topicsWhitelist; + private final Pattern topicsRegexPattern; + private final List topicsRegexList; + private final CountDownLatch shutDownLatch = new CountDownLatch(1); + private final Consumer sourceConsumer; + private final Consumer destinationConsumer; + private final Long monitorPollWaitMs; + private final TaskConfigBuilder taskConfigBuilder; + private final SourcePartitionValidator.MatchingStrategy validationStrategy; + private final MissingPartitionsJmxReporter missingPartsJmxReporter = + new MissingPartitionsJmxReporter(); + private final List> routers; + private final boolean topicCheckingEnabled; + + // The current list of partitions to replicate. + private volatile List topicPartitionList; + + KafkaMonitor(ConnectorContext context, SourceConfig config, TaskConfigBuilder taskConfigBuilder) { + this( + context, + config, + newSourceConsumer(config), + newDestinationConsumer(config), + taskConfigBuilder); + } + + KafkaMonitor( + ConnectorContext context, + SourceConfig config, + Consumer sourceConsumer, + Consumer destinationConsumer, + TaskConfigBuilder taskConfigBuilder) { + this.context = context; + this.topicsWhitelist = config.getTopicsWhitelist(); + this.monitorPollWaitMs = config.getMonitorPollWaitMs(); + this.topicsRegexPattern = Pattern.compile(config.getTopicsRegex()); + this.topicsRegexList = config.getTopicsRegexList(); + this.sourceConsumer = sourceConsumer; + this.destinationConsumer = destinationConsumer; + if (topicsWhitelist.isEmpty() + && config.getTopicsRegex().isEmpty() + && config.getTopicsRegexList().isEmpty()) { + logger.warn("No whitelist configured"); + } + this.taskConfigBuilder = taskConfigBuilder; + this.validationStrategy = + config.getEnablePartitionMatching() + ? SourcePartitionValidator.MatchingStrategy.PARTITION + : SourcePartitionValidator.MatchingStrategy.TOPIC; + this.topicCheckingEnabled = config.getTopicCheckingEnabled(); + this.routers = this.validateTransformations(config.transformations()); + } + + private List> validateTransformations( + List> transformations) { + List> regexRouters = new ArrayList<>(); + + // No need to validate transforms if we're not checking destination partitions + if (this.topicCheckingEnabled) { + for (Transformation transform : transformations) { + String transformName = transform.getClass().getSimpleName(); + if (transform instanceof RegexRouter) { + regexRouters.add(transform); + // Slightly awkward check to see if any other routing transforms are configured + } else if (transformName.contains("Router")) { + throw new IllegalArgumentException( + String.format( + "Unsupported Router Transformation %s found." + + " To use it, please disable destination topic checking by setting 'enable.destination.topic.checking' to false.", + transformName)); + } else { + logger.debug("Ignoring non-routing Transformation {}", transformName); + } + } + } + return regexRouters; + } + + private String applyRoutersToTopic(String topic) { + TopicPartition topicPartition = new TopicPartition(topic, 0); + Map sourcePartition = TopicPartitionSerDe.asMap(topicPartition); + SourceRecord record = + new SourceRecord( + sourcePartition, + null, + topicPartition.topic(), + topicPartition.partition(), + Schema.BYTES_SCHEMA, + null, + Schema.OPTIONAL_BYTES_SCHEMA, + null); + for (Transformation transform : this.routers) { + record = transform.apply(record); + } + return record.topic(); + } + + private static Consumer newSourceConsumer(SourceConfig config) { + Map consumerProperties = config.getConsumerProperties(); + + // The "monitor1" client id suffix is used to keep JMX bean names distinct + consumerProperties.computeIfPresent( + CommonClientConfigs.CLIENT_ID_CONFIG, (k, v) -> v + "monitor1"); + return new KafkaConsumer<>(consumerProperties); + } + + /** + * * Reconciles the default consumer properties with the destination-consumer properties. The + * destination-consumer properties have higher precedence. + * + * @param config config of the source connector + * @return map that includes the consumer configs + */ + static Map getReconciledDestConsumerConfigs(SourceConfig config) { + // handle destination.bootstrap.server separately + // keeping this config for backward compatibility + String destBootstrap = config.getDestinationBootstrapServers(); + Map destConsumerProps = config.getDestinationConsumerProperties(); + + if (!destConsumerProps.containsKey("bootstrap.servers")) { + destConsumerProps.put("bootstrap.servers", destBootstrap); + } + Map reconciledConsumerConfigs = config.getConsumerProperties(); + // use destination.consumer properties to override default consumer properties + destConsumerProps.forEach((k, v) -> reconciledConsumerConfigs.put(k, v)); + return reconciledConsumerConfigs; + } + + private static Consumer newDestinationConsumer(SourceConfig config) { + Map consumerProperties = getReconciledDestConsumerConfigs(config); + // The "monitor2" client id suffix is used to keep JMX bean names distinct + consumerProperties.computeIfPresent( + CommonClientConfigs.CLIENT_ID_CONFIG, (k, v) -> v + "monitor2"); + return new KafkaConsumer<>(consumerProperties); + } + + @Override + public void run() { + int consecutiveRetriableErrors = 0; + while (true) { + try { + // Do a fast shutdown check first thing in case we're in an exponential backoff retry loop, + // which will never hit the poll wait below + if (shutDownLatch.await(0, TimeUnit.MILLISECONDS)) { + logger.debug("Exiting KafkaMonitor"); + return; + } + if (this.topicPartitionList == null) { + // Need to initialize here to prevent the constructor hanging on startup if the + // source cluster is unavailable. + this.topicPartitionList = fetchTopicPartitionList(); + } + + if (partitionsChanged()) { + logger.info("Source partition change detected. Requesting task reconfiguration."); + this.context.requestTaskReconfiguration(); + } + + if (shutDownLatch.await(monitorPollWaitMs, TimeUnit.MILLISECONDS)) { + logger.debug("Exiting KafkaMonitor"); + return; + } + consecutiveRetriableErrors = 0; + } catch (WakeupException | InterruptedException e) { + // Assume we've been woken or interrupted to shutdown, so continue on to checking the + // shutDownLatch next iteration. + logger.debug("KafkaMonitor woken up, checking if shutdown requested..."); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + } catch (RetriableException e) { + consecutiveRetriableErrors += 1; + logger.warn( + "Retriable exception encountered ({} consecutive), continuing processing...", + consecutiveRetriableErrors, + e); + exponentialBackoffWait(consecutiveRetriableErrors); + } catch (Exception e) { + logger.error("Raising exception to connect runtime", e); + context.raiseError(e); + } + } + } + + private void exponentialBackoffWait(int numErrors) { + int secondsToWait = + BACKOFF_WAIT_SECONDS[Math.min(Math.max(numErrors - 1, 0), BACKOFF_WAIT_SECONDS.length - 1)]; + if (secondsToWait > 0) { + try { + TimeUnit.SECONDS.sleep(secondsToWait); + } catch (InterruptedException e) { + logger.debug("Interrupted while sleeping due to retriable errors, resuming..."); + Thread.currentThread().interrupt(); + } + } + } + + /** Returns true if the subscribed partition list has changed and we need a rebalance. */ + boolean partitionsChanged() { + List oldPartitionInfoList = topicPartitionList; + topicPartitionList = fetchTopicPartitionList(); + logger.trace("Detected {} matching partitions", topicPartitionList.size()); + + if (oldPartitionInfoList == null) { + // List not initialized yet, so don't trigger rebalance + return false; + } + + // Note: These two lists are already sorted + boolean partitionsChanged = !topicPartitionList.equals(oldPartitionInfoList); + if (partitionsChanged) { + logger.debug( + "Replicated partition set change. Triggering re-balance with {} partitions", + topicPartitionList.size()); + } else { + logger.debug("No change to replicated partition set"); + } + return partitionsChanged; + } + + private List fetchMatchingPartitions(Consumer consumer) { + return consumer + .listTopics() + .entrySet() + .stream() + .filter( + e -> + topicsWhitelist.contains(e.getKey()) + || topicsRegexPattern.matcher(e.getKey()).matches() + || topicsRegexList.stream().anyMatch(r -> r.matcher(e.getKey()).matches())) + .flatMap(e -> e.getValue().stream()) + .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())) + .collect(Collectors.toList()); + } + + private List fetchTopicPartitionList() { + logger.trace( + "Fetching partition details for topicsWhitelist: {}, topicsRegex: {}", + topicsWhitelist, + topicsRegexPattern.toString()); + + List sourcePartitionList; + synchronized (sourceConsumer) { + sourcePartitionList = fetchMatchingPartitions(sourceConsumer); + } + + List result; + if (this.topicCheckingEnabled) { + result = getDestinationAvailablePartitions(sourcePartitionList); + } else { + result = sourcePartitionList; + } + + // Sort the result for order-independent comparison + result.sort(Comparator.comparing(tp -> tp.topic() + tp.partition())); + return result; + } + + private List getDestinationAvailablePartitions( + List sourcePartitionList) { + SourcePartitionValidator sourcePartitionValidator = + new SourcePartitionValidator( + destinationConsumer, validationStrategy, this::applyRoutersToTopic); + + // Split the source partition list into those contained in the destination, and those + // missing. Using toCollection(ArrayList::new) to guarantee we can sort successfully. + Map> partitionedSourceIds = + sourcePartitionList + .stream() + .collect( + Collectors.partitioningBy( + sourcePartitionValidator::isHealthy, Collectors.toCollection(ArrayList::new))); + + List missingPartitions = partitionedSourceIds.get(false); + if (!missingPartitions.isEmpty()) { + logger.warn( + "Missing target partitions {}. Topics may need to be added to the target cluster.", + missingPartitions); + } + + missingPartsJmxReporter.recordMetric(missingPartitions.size()); + + List result = partitionedSourceIds.get(true); + return result; + } + + /** + * Determines the MirusSourceConnector task configuration, including partition assignment. This is + * called from the main SourceConnector thread. + * + * @param maxTasks The maximum number of tasks this SourceConnector is configured to run in + * parallel across the cluster. + */ + List> taskConfigs(int maxTasks) { + logger.debug("Called taskConfigs maxTasks {}", maxTasks); + + // No need to synchronize while waiting for initialization + long tStart = System.currentTimeMillis(); + while (this.topicPartitionList == null && (System.currentTimeMillis() - tStart) < 60000) { + try { + Thread.sleep(3000); + // List will normally be initialized within this time + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + synchronized (this) { + if (topicPartitionList == null) { + // Timed out waiting for initialization. Initialize with empty list so we can detect + // changes. + logger.error("Topic partition list not initialized. Falling back on empty list."); + this.topicPartitionList = new ArrayList<>(); + } + if (topicPartitionList.isEmpty()) { + return Collections.emptyList(); + } + return taskConfigBuilder.fromPartitionList(maxTasks, topicPartitionList); + } + } + + void stop() { + missingPartsJmxReporter.recordMetric(0); + shutDownLatch.countDown(); + sourceConsumer.wakeup(); + destinationConsumer.wakeup(); + synchronized (sourceConsumer) { + sourceConsumer.close(); + } + synchronized (destinationConsumer) { + destinationConsumer.close(); + } + } +} diff --git a/src/test/resources/processor_test_files/S2142_InterruptedException/MirusKafkaMonitor.java.exact b/src/test/resources/processor_test_files/S2142_InterruptedException/MirusKafkaMonitor.java.exact new file mode 100644 index 000000000..29cb55ca1 --- /dev/null +++ b/src/test/resources/processor_test_files/S2142_InterruptedException/MirusKafkaMonitor.java.exact @@ -0,0 +1,162 @@ + private finalcom.salesforce.mirus.SourcePartitionValidator.MatchingStrategy validationStrategy; + private final MissingPartitionsJmxReporter missingPartsJmxReporter = + new MissingPartitionsJmxReporter(); + private final List> routers; + private final boolean topicCheckingEnabled; + + // The current list of partitions to replicate. + private volatile List topicPartitionList; + + KafkaMonitor(ConnectorContext context, SourceConfig config, TaskConfigBuilder taskConfigBuilder) { + this( + context, + config, + newSourceConsumer(config), + newDestinationConsumer(config), + taskConfigBuilder); + } + + KafkaMonitor( + ConnectorContext context, + SourceConfig config, + Consumer sourceConsumer, + Consumer destinationConsumer, + TaskConfigBuilder taskConfigBuilder) { + this.context = context; + this.topicsWhitelist = config.getTopicsWhitelist(); + this.monitorPollWaitMs = config.getMonitorPollWaitMs(); + this.topicsRegexPattern = Pattern.compile(config.getTopicsRegex()); + this.topicsRegexList = config.getTopicsRegexList(); + this.sourceConsumer = sourceConsumer; + this.destinationConsumer = destinationConsumer; + if (topicsWhitelist.isEmpty() + && config.getTopicsRegex().isEmpty() + && config.getTopicsRegexList().isEmpty()) { + logger.warn("No whitelist configured"); + } + this.taskConfigBuilder = taskConfigBuilder; + this.validationStrategy = + config.getEnablePartitionMatching() + ? SourcePartitionValidator.MatchingStrategy.PARTITION + : SourcePartitionValidator.MatchingStrategy.TOPIC; + this.topicCheckingEnabled = config.getTopicCheckingEnabled(); + this.routers = this.validateTransformations(config.transformations()); + } + + private List> validateTransformations( + List> transformations) { + List> regexRouters = new ArrayList<>(); + + // No need to validate transforms if we're not checking destination partitions + if (this.topicCheckingEnabled) { + for (Transformation transform : transformations) { + String transformName = transform.getClass().getSimpleName(); + if (transform instanceof RegexRouter) { + regexRouters.add(transform); + // Slightly awkward check to see if any other routing transforms are configured + } else if (transformName.contains("Router")) { + throw new IllegalArgumentException( + String.format( + "Unsupported Router Transformation %s found." + + " To use it, please disable destination topic checking by setting 'enable.destination.topic.checking' to false.", + transformName)); + } else { + logger.debug("Ignoring non-routing Transformation {}", transformName); + } + } + } + return regexRouters; + } + + private String applyRoutersToTopic(String topic) { + TopicPartition topicPartition = new TopicPartition(topic, 0); + Map sourcePartition = TopicPartitionSerDe.asMap(topicPartition); + SourceRecord record = + new SourceRecord( + sourcePartition, + null, + topicPartition.topic(), + topicPartition.partition(), + Schema.BYTES_SCHEMA, + null, + Schema.OPTIONAL_BYTES_SCHEMA, + null); + for (Transformation transform : this.routers) { + record = transform.apply(record); + } + return record.topic(); + } + + private static Consumer newSourceConsumer(SourceConfig config) { + Map consumerProperties = config.getConsumerProperties(); + + // The "monitor1" client id suffix is used to keep JMX bean names distinct + consumerProperties.computeIfPresent( + CommonClientConfigs.CLIENT_ID_CONFIG, (k, v) -> v + "monitor1"); + return new KafkaConsumer<>(consumerProperties); + } + + /** + * * Reconciles the default consumer properties with the destination-consumer properties. The + * destination-consumer properties have higher precedence. + * + * @param config config of the source connector + * @return map that includes the consumer configs + */ + static Map getReconciledDestConsumerConfigs(SourceConfig config) { + // handle destination.bootstrap.server separately + // keeping this config for backward compatibility + String destBootstrap = config.getDestinationBootstrapServers(); + Map destConsumerProps = config.getDestinationConsumerProperties(); + + if (!destConsumerProps.containsKey("bootstrap.servers")) { + destConsumerProps.put("bootstrap.servers", destBootstrap); + } + Map reconciledConsumerConfigs = config.getConsumerProperties(); + // use destination.consumer properties to override default consumer properties + destConsumerProps.forEach((k, v) -> reconciledConsumerConfigs.put(k, v)); + return reconciledConsumerConfigs; + } + + private static Consumer newDestinationConsumer(SourceConfig config) { + Map consumerProperties = getReconciledDestConsumerConfigs(config); + // The "monitor2" client id suffix is used to keep JMX bean names distinct + consumerProperties.computeIfPresent( + CommonClientConfigs.CLIENT_ID_CONFIG, (k, v) -> v + "monitor2"); + return new KafkaConsumer<>(consumerProperties); + } + + @Override + public void run() { + int consecutiveRetriableErrors = 0; + while (true) { + try { + // Do a fast shutdown check first thing in case we're in an exponential backoff retry loop, + // which will never hit the poll wait below + if (shutDownLatch.await(0, TimeUnit.MILLISECONDS)) { + logger.debug("Exiting KafkaMonitor"); + return; + } + if (this.topicPartitionList == null) { + // Need to initialize here to prevent the constructor hanging on startup if the + // source cluster is unavailable. + this.topicPartitionList = fetchTopicPartitionList(); + } + + if (partitionsChanged()) { + logger.info("Source partition change detected. Requesting task reconfiguration."); + this.context.requestTaskReconfiguration(); + } + + if (shutDownLatch.await(monitorPollWaitMs, TimeUnit.MILLISECONDS)) { + logger.debug("Exiting KafkaMonitor"); + return; + } + consecutiveRetriableErrors = 0; + } catch (WakeupException | InterruptedException e) { + // Assume we've been woken or interrupted to shutdown, so continue on to checking the + // shutDownLatch next iteration. + logger.debug("KafkaMonitor woken up, checking if shutdown requested..."); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } \ No newline at end of file