From 20b8ab6d9a82d78fd0fd15446feb99235dccf553 Mon Sep 17 00:00:00 2001 From: Aman Sharma Date: Thu, 9 Dec 2021 09:58:21 +0100 Subject: [PATCH 1/6] Upgrade spoon to 10.0.1-beta-2 --- pom.xml | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/pom.xml b/pom.xml index 3697606ca..4440234ea 100644 --- a/pom.xml +++ b/pom.xml @@ -95,7 +95,7 @@ fr.inria.gforge.spoon spoon-core - 9.1.0-beta-11 + 10.0.1-beta-2 @@ -355,18 +355,6 @@ - - - spoonSnapshot - - - ow2.org-snapshot - Maven Repository for Spoon Snapshots - https://repository.ow2.org/nexus/content/repositories/snapshots/ - true - - - From f3b0280cccb39e744acc2315a2e261e2e39b0117 Mon Sep 17 00:00:00 2001 From: Aman Sharma Date: Thu, 9 Dec 2021 09:59:39 +0100 Subject: [PATCH 2/6] Revert "fix: disable custom security manager for tests which indirectly look up the path of mvn executable" This reverts commit 2f7125eef3e7a5bb6576b2250de617ac0b5cee2b. --- src/test/java/sorald/ClasspathModeTest.java | 3 --- src/test/java/sorald/MavenLauncherTest.java | 6 ------ src/test/java/sorald/miner/WarningMinerTest.java | 3 --- 3 files changed, 12 deletions(-) diff --git a/src/test/java/sorald/ClasspathModeTest.java b/src/test/java/sorald/ClasspathModeTest.java index 561e4ca06..773b1ab39 100644 --- a/src/test/java/sorald/ClasspathModeTest.java +++ b/src/test/java/sorald/ClasspathModeTest.java @@ -19,9 +19,6 @@ class ClasspathModeTest { @Test void resolveClasspathFrom_enablesRepairOfViolation_thatRequiresClasspathToDetect( @TempDir File workdir) throws IOException { - // use the default security manager - System.setSecurityManager(null); - // arrange org.apache.commons.io.FileUtils.copyDirectory( TestHelper.PATH_TO_RESOURCES_FOLDER diff --git a/src/test/java/sorald/MavenLauncherTest.java b/src/test/java/sorald/MavenLauncherTest.java index 7c70e249e..dde534078 100644 --- a/src/test/java/sorald/MavenLauncherTest.java +++ b/src/test/java/sorald/MavenLauncherTest.java @@ -22,9 +22,6 @@ public class MavenLauncherTest { @Test public void sorald_repairsProductionAndTestCode_inMavenProject(@TempDir File workdir) throws IOException { - // use the default security manager - System.setSecurityManager(null); - // arrange org.apache.commons.io.FileUtils.copyDirectory( TestHelper.PATH_TO_RESOURCES_FOLDER @@ -67,9 +64,6 @@ public void sorald_repairsProductionAndTestCode_inMavenProject(@TempDir File wor @Test void sorald_repairsRuleViolation_thatRequiresClasspathToDetect(@TempDir File workdir) throws IOException { - // use the default security manager - System.setSecurityManager(null); - // arrange org.apache.commons.io.FileUtils.copyDirectory( TestHelper.PATH_TO_RESOURCES_FOLDER diff --git a/src/test/java/sorald/miner/WarningMinerTest.java b/src/test/java/sorald/miner/WarningMinerTest.java index 1e7daef52..4ab85915d 100644 --- a/src/test/java/sorald/miner/WarningMinerTest.java +++ b/src/test/java/sorald/miner/WarningMinerTest.java @@ -194,9 +194,6 @@ public void extractWarnings_statsOutput_containsExpectedAttributes() throws Exce @Test void canDetectRuleViolation_thatRequiresClasspath_whenResolvingClasspathInMavenProject( @TempDir File tempdir) throws Exception { - // use the default security manager - System.setSecurityManager(null); - Path statsFile = tempdir.toPath().resolve("stats.json"); Path projectRoot = TestHelper.PATH_TO_RESOURCES_FOLDER From 38e17db39d5f92cb028656a28081b0726a55d6a6 Mon Sep 17 00:00:00 2001 From: Aman Sharma Date: Fri, 10 Dec 2021 17:28:17 +0100 Subject: [PATCH 3/6] Add test case for #600 --- .../RepairAndTryWithResourcePrint.java | 250 ++++++++++++++++++ .../RepairAndTryWithResourcePrint.java.exact | 12 + 2 files changed, 262 insertions(+) create mode 100644 src/test/resources/processor_test_files/S2142_InterruptedException/RepairAndTryWithResourcePrint.java create mode 100644 src/test/resources/processor_test_files/S2142_InterruptedException/RepairAndTryWithResourcePrint.java.exact diff --git a/src/test/resources/processor_test_files/S2142_InterruptedException/RepairAndTryWithResourcePrint.java b/src/test/resources/processor_test_files/S2142_InterruptedException/RepairAndTryWithResourcePrint.java new file mode 100644 index 000000000..bf11a61ab --- /dev/null +++ b/src/test/resources/processor_test_files/S2142_InterruptedException/RepairAndTryWithResourcePrint.java @@ -0,0 +1,250 @@ +/* + * Copyright 2020 Matthias Bläsing. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.mitre.dsmiley.httpproxy; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.http.MalformedChunkCodingException; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.eclipse.jetty.server.Handler; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.gzip.GzipHandler; +import org.eclipse.jetty.servlet.ServletHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertTrue; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class ChunkedTransferTest { + @Parameters + public static List data() { + return Arrays.asList(new Object[][] { + {false, false}, + {false, true}, + {true, false}, + {true, true} + }); + } + + private Server server; + private ServletHandler servletHandler; + private int serverPort; + private boolean supportBackendCompression; + private boolean handleCompressionApacheClient; + + public ChunkedTransferTest(boolean supportBackendCompression, boolean handleCompressionApacheClient) { + this.supportBackendCompression = supportBackendCompression; + this.handleCompressionApacheClient = handleCompressionApacheClient; + } + + @Before + public void setUp() throws Exception { + server = new Server(0); + servletHandler = new ServletHandler(); + Handler serverHandler = servletHandler; + if(supportBackendCompression) { + GzipHandler gzipHandler = new GzipHandler(); + gzipHandler.setHandler(serverHandler); + gzipHandler.setSyncFlush(true); + serverHandler = gzipHandler; + } else { + serverHandler = servletHandler; + } + server.setHandler(serverHandler); + server.start(); + + serverPort = ((ServerConnector) server.getConnectors()[0]).getLocalPort(); + } + + @After + public void tearDown() throws Exception { + server.stop(); + serverPort = -1; + } + + @Test + public void testChunkedTransfer() throws Exception { + /* + Check that proxy requests are not buffered in the ProxyServlet, but + immediately flushed. The test works by creating a servlet, that writes + the first message and flushes the outputstream, further processing + is blocked by a count down latch. + + The client now reads the first message. The message must be completely + received and further data must not be present. + + After the first message is consumed, the CountDownLatch is released and + the second messsage is expected. This in turn must be completely be read. + + If the CountDownLatch is not released, it will timeout and the second + message will not be send. + */ + + final CountDownLatch guardForSecondRead = new CountDownLatch(1); + final byte[] data1 = "event: message\ndata: Dummy Data1\n\n".getBytes(StandardCharsets.UTF_8); + final byte[] data2 = "event: message\ndata: Dummy Data2\n\n".getBytes(StandardCharsets.UTF_8); + + ServletHolder servletHolder = servletHandler.addServletWithMapping(ProxyServlet.class, "/chatProxied/*"); + servletHolder.setInitParameter(ProxyServlet.P_LOG, "true"); + servletHolder.setInitParameter(ProxyServlet.P_TARGET_URI, String.format("http://localhost:%d/chat/", serverPort)); + servletHolder.setInitParameter(ProxyServlet.P_HANDLECOMPRESSION, Boolean.toString(handleCompressionApacheClient)); + + ServletHolder dummyBackend = new ServletHolder(new HttpServlet() { + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + resp.setContentType("text/event-stream"); + OutputStream os = resp.getOutputStream(); + // Write first message for client and flush it out + os.write(data1); + os.flush(); + try { + // Wait for client to request the second message by counting down the + // latch - if the latch times out, the second message will not be + // send and the corresponding assert will fail + if (guardForSecondRead.await(10, TimeUnit.SECONDS)) { + os.write(data2); + os.flush(); + } + } catch (InterruptedException ex) { + throw new IOException(ex); + } + } + }); + servletHandler.addServletWithMapping(dummyBackend, "/chat/*"); + + HttpGet url = new HttpGet(String.format("http://localhost:%d/chatProxied/test", serverPort)); + + try (CloseableHttpClient chc = HttpClientBuilder.create().build(); + CloseableHttpResponse chr = chc.execute(url)) { + try (InputStream is = chr.getEntity().getContent()) { + byte[] readData = readBlock(is); + assertTrue("No data received (message1)", readData.length > 0); + assertArrayEquals("Received data: '" + toString(readData) + "' (message1)", data1, readData); + guardForSecondRead.countDown(); + readData = readBlock(is); + assertTrue("No data received (message2)", readData.length > 0); + assertArrayEquals("Received data: '" + toString(readData) + "' (message2)", data2, readData); + } + } + } + + @Test + public void testChunkedTransferClosing() throws Exception { + /* + This test ensures, that the chunk encoded backing connection is closed, + when the closing of the proxy (frontend) connection is detected. + + The idea is, that in the servlet the closing of the backing connection can + be detected, because the output stream is closed and an IOException is + raised. If no exception is raised when writing multiple chunks, it must + be assumed, that the backing connection is not closed. + */ + final CountDownLatch guardForSecondRead = new CountDownLatch(1); + final CountDownLatch guardForEnd = new CountDownLatch(1); + final byte[] data1 = "event: message\ndata: Dummy Data1\n\n".getBytes(StandardCharsets.UTF_8); + final byte[] data2 = "event: message\ndata: Dummy Data2\n\n".getBytes(StandardCharsets.UTF_8); + + ServletHolder servletHolder = servletHandler.addServletWithMapping(ProxyServlet.class, "/chatProxied/*"); + servletHolder.setInitParameter(ProxyServlet.P_LOG, "true"); + servletHolder.setInitParameter(ProxyServlet.P_TARGET_URI, String.format("http://localhost:%d/chat/", serverPort)); + servletHolder.setInitParameter(ProxyServlet.P_HANDLECOMPRESSION, Boolean.toString(handleCompressionApacheClient)); + + ServletHolder dummyBackend = new ServletHolder(new HttpServlet() { + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + resp.setContentType("text/event-stream"); + OutputStream os = resp.getOutputStream(); + // Write first message for client and flush it out + os.write(data1); + os.flush(); + try { + // Wait for client to request the second message by counting down the + // latch - if the latch times out, the second message will not be + // send and the corresponding assert will fail + if (! guardForSecondRead.await(10, TimeUnit.SECONDS)) { + throw new IOException("Wait timed out"); + } + try { + for(int i = 0; i < 100; i++) { + os.write(data2); + os.flush(); + Thread.sleep(100); + } + } catch (IOException ex) { + // This point is reached when the output stream is closed - the + // count down latch is count down to indicate success + guardForEnd.countDown(); + } + } catch (InterruptedException ex) { + throw new IOException(ex); + } + } + }); + servletHandler.addServletWithMapping(dummyBackend, "/chat/*"); + + HttpGet url = new HttpGet(String.format("http://localhost:%d/chatProxied/test", serverPort)); + + try (CloseableHttpClient chc = HttpClientBuilder.create().build()) { + CloseableHttpResponse chr = chc.execute(url); + try (InputStream is = chr.getEntity().getContent()) { + byte[] readData = readBlock(is); + assertTrue("No data received (message1)", readData.length > 0); + assertArrayEquals("Received data: '" + toString(readData) + "' (message1)", data1, readData); + chr.close(); + } catch (MalformedChunkCodingException ex) { + // this is expected + } finally { + chr.close(); + } + } + // Release sending of further messages + guardForSecondRead.countDown(); + // Wait for the reporting of the closed connection + assertTrue(guardForEnd.await(10, TimeUnit.SECONDS)); + } + + private static String toString(byte[] data) { + return new String(data, StandardCharsets.UTF_8); + } + + private static byte[] readBlock(InputStream is) throws IOException { + byte[] buffer = new byte[10 * 1024]; + int read = is.read(buffer); + return Arrays.copyOfRange(buffer, 0, read); + } +} diff --git a/src/test/resources/processor_test_files/S2142_InterruptedException/RepairAndTryWithResourcePrint.java.exact b/src/test/resources/processor_test_files/S2142_InterruptedException/RepairAndTryWithResourcePrint.java.exact new file mode 100644 index 000000000..fc21d1026 --- /dev/null +++ b/src/test/resources/processor_test_files/S2142_InterruptedException/RepairAndTryWithResourcePrint.java.exact @@ -0,0 +1,12 @@ + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IOException(ex); + } + } + }); + servletHandler.addServletWithMapping(dummyBackend, "/chat/*"); + + HttpGet url = new HttpGet(String.format("http://localhost:%d/chatProxied/test", serverPort)); + + try (CloseableHttpClient chc = HttpClientBuilder.create().build(); + CloseableHttpResponse chr = chc.execute(url)) { \ No newline at end of file From e326cee8c72eea2899b8606862c8eb5074ba6adf Mon Sep 17 00:00:00 2001 From: Aman Sharma Date: Fri, 10 Dec 2021 17:59:36 +0100 Subject: [PATCH 4/6] Add test case for #600 --- ... => NOCOMPILE_RepairAndTryWithResourcePrint.java} | 2 ++ ...OCOMPILE_RepairAndTryWithResourcePrint.java.exact | 12 ++++++++++++ .../RepairAndTryWithResourcePrint.java.exact | 12 ------------ 3 files changed, 14 insertions(+), 12 deletions(-) rename src/test/resources/processor_test_files/S2142_InterruptedException/{RepairAndTryWithResourcePrint.java => NOCOMPILE_RepairAndTryWithResourcePrint.java} (99%) create mode 100644 src/test/resources/processor_test_files/S2142_InterruptedException/NOCOMPILE_RepairAndTryWithResourcePrint.java.exact delete mode 100644 src/test/resources/processor_test_files/S2142_InterruptedException/RepairAndTryWithResourcePrint.java.exact diff --git a/src/test/resources/processor_test_files/S2142_InterruptedException/RepairAndTryWithResourcePrint.java b/src/test/resources/processor_test_files/S2142_InterruptedException/NOCOMPILE_RepairAndTryWithResourcePrint.java similarity index 99% rename from src/test/resources/processor_test_files/S2142_InterruptedException/RepairAndTryWithResourcePrint.java rename to src/test/resources/processor_test_files/S2142_InterruptedException/NOCOMPILE_RepairAndTryWithResourcePrint.java index bf11a61ab..c327a4b13 100644 --- a/src/test/resources/processor_test_files/S2142_InterruptedException/RepairAndTryWithResourcePrint.java +++ b/src/test/resources/processor_test_files/S2142_InterruptedException/NOCOMPILE_RepairAndTryWithResourcePrint.java @@ -1,3 +1,5 @@ +// This test resource is a reproduction of https://github.com/SpoonLabs/sorald/issues/600 + /* * Copyright 2020 Matthias Bläsing. * diff --git a/src/test/resources/processor_test_files/S2142_InterruptedException/NOCOMPILE_RepairAndTryWithResourcePrint.java.exact b/src/test/resources/processor_test_files/S2142_InterruptedException/NOCOMPILE_RepairAndTryWithResourcePrint.java.exact new file mode 100644 index 000000000..15cc6d42f --- /dev/null +++ b/src/test/resources/processor_test_files/S2142_InterruptedException/NOCOMPILE_RepairAndTryWithResourcePrint.java.exact @@ -0,0 +1,12 @@ + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IOException(ex); + } + } + }); + servletHandler.addServletWithMapping(dummyBackend, "/chat/*"); + + HttpGet url = new HttpGet(String.format("http://localhost:%d/chatProxied/test", serverPort)); + + try (CloseableHttpClient chc = HttpClientBuilder.create().build(); + CloseableHttpResponse chr = chc.execute(url)) { \ No newline at end of file diff --git a/src/test/resources/processor_test_files/S2142_InterruptedException/RepairAndTryWithResourcePrint.java.exact b/src/test/resources/processor_test_files/S2142_InterruptedException/RepairAndTryWithResourcePrint.java.exact deleted file mode 100644 index fc21d1026..000000000 --- a/src/test/resources/processor_test_files/S2142_InterruptedException/RepairAndTryWithResourcePrint.java.exact +++ /dev/null @@ -1,12 +0,0 @@ - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new IOException(ex); - } - } - }); - servletHandler.addServletWithMapping(dummyBackend, "/chat/*"); - - HttpGet url = new HttpGet(String.format("http://localhost:%d/chatProxied/test", serverPort)); - - try (CloseableHttpClient chc = HttpClientBuilder.create().build(); - CloseableHttpResponse chr = chc.execute(url)) { \ No newline at end of file From 9992719c1ea4aca6105c83d99f0c7fd369bb9994 Mon Sep 17 00:00:00 2001 From: Aman Sharma Date: Fri, 10 Dec 2021 18:07:23 +0100 Subject: [PATCH 5/6] Add test case for #603 --- .../NOCOMPILE_RepairAndFieldTypePrint.java | 386 ++++++++++++++++++ ...COMPILE_RepairAndFieldTypePrint.java.exact | 162 ++++++++ 2 files changed, 548 insertions(+) create mode 100644 src/test/resources/processor_test_files/S2142_InterruptedException/NOCOMPILE_RepairAndFieldTypePrint.java create mode 100644 src/test/resources/processor_test_files/S2142_InterruptedException/NOCOMPILE_RepairAndFieldTypePrint.java.exact diff --git a/src/test/resources/processor_test_files/S2142_InterruptedException/NOCOMPILE_RepairAndFieldTypePrint.java b/src/test/resources/processor_test_files/S2142_InterruptedException/NOCOMPILE_RepairAndFieldTypePrint.java new file mode 100644 index 000000000..7c430f290 --- /dev/null +++ b/src/test/resources/processor_test_files/S2142_InterruptedException/NOCOMPILE_RepairAndFieldTypePrint.java @@ -0,0 +1,386 @@ +// This test resource is a reproduction of bug written in the issue description of +// https://github.com/SpoonLabs/sorald/issues/603. + +/* + * Copyright (c) 2018, salesforce.com, inc. + * All rights reserved. + * SPDX-License-Identifier: BSD-3-Clause + * For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause + * + */ + +package com.salesforce.mirus; + +import com.salesforce.mirus.config.SourceConfig; +import com.salesforce.mirus.metrics.MissingPartitionsJmxReporter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.transforms.RegexRouter; +import org.apache.kafka.connect.transforms.Transformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The KafkaMonitor thread is started by {@link MirusSourceConnector} and polls the source and + * destination clusters to maintain an up-to-date list of partitions eligible for mirroring. When a + * change is detected the thread requests that tasks for this source are reconfigured. Partitions + * that match the configured whitelist are validated to ensure they exist in both source and the + * destination cluster. Validation supports topic re-routing with the RegexRouter Transformation, + * but no other topic re-routing is supported. Validation may be disabled by setting the + * enable.destination.topic.checking config option to false. + * + *

MirusSourceConnector also uses KafkaMonitor for task assignment. A round-robin style algorithm + * is used to assign partitions to SourceTask instances. + */ +class KafkaMonitor implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(KafkaMonitor.class); + private static final int[] BACKOFF_WAIT_SECONDS = {0, 1, 2, 4, 8, 16, 32, 64}; + + private final ConnectorContext context; + private final List topicsWhitelist; + private final Pattern topicsRegexPattern; + private final List topicsRegexList; + private final CountDownLatch shutDownLatch = new CountDownLatch(1); + private final Consumer sourceConsumer; + private final Consumer destinationConsumer; + private final Long monitorPollWaitMs; + private final TaskConfigBuilder taskConfigBuilder; + private final SourcePartitionValidator.MatchingStrategy validationStrategy; + private final MissingPartitionsJmxReporter missingPartsJmxReporter = + new MissingPartitionsJmxReporter(); + private final List> routers; + private final boolean topicCheckingEnabled; + + // The current list of partitions to replicate. + private volatile List topicPartitionList; + + KafkaMonitor(ConnectorContext context, SourceConfig config, TaskConfigBuilder taskConfigBuilder) { + this( + context, + config, + newSourceConsumer(config), + newDestinationConsumer(config), + taskConfigBuilder); + } + + KafkaMonitor( + ConnectorContext context, + SourceConfig config, + Consumer sourceConsumer, + Consumer destinationConsumer, + TaskConfigBuilder taskConfigBuilder) { + this.context = context; + this.topicsWhitelist = config.getTopicsWhitelist(); + this.monitorPollWaitMs = config.getMonitorPollWaitMs(); + this.topicsRegexPattern = Pattern.compile(config.getTopicsRegex()); + this.topicsRegexList = config.getTopicsRegexList(); + this.sourceConsumer = sourceConsumer; + this.destinationConsumer = destinationConsumer; + if (topicsWhitelist.isEmpty() + && config.getTopicsRegex().isEmpty() + && config.getTopicsRegexList().isEmpty()) { + logger.warn("No whitelist configured"); + } + this.taskConfigBuilder = taskConfigBuilder; + this.validationStrategy = + config.getEnablePartitionMatching() + ? SourcePartitionValidator.MatchingStrategy.PARTITION + : SourcePartitionValidator.MatchingStrategy.TOPIC; + this.topicCheckingEnabled = config.getTopicCheckingEnabled(); + this.routers = this.validateTransformations(config.transformations()); + } + + private List> validateTransformations( + List> transformations) { + List> regexRouters = new ArrayList<>(); + + // No need to validate transforms if we're not checking destination partitions + if (this.topicCheckingEnabled) { + for (Transformation transform : transformations) { + String transformName = transform.getClass().getSimpleName(); + if (transform instanceof RegexRouter) { + regexRouters.add(transform); + // Slightly awkward check to see if any other routing transforms are configured + } else if (transformName.contains("Router")) { + throw new IllegalArgumentException( + String.format( + "Unsupported Router Transformation %s found." + + " To use it, please disable destination topic checking by setting 'enable.destination.topic.checking' to false.", + transformName)); + } else { + logger.debug("Ignoring non-routing Transformation {}", transformName); + } + } + } + return regexRouters; + } + + private String applyRoutersToTopic(String topic) { + TopicPartition topicPartition = new TopicPartition(topic, 0); + Map sourcePartition = TopicPartitionSerDe.asMap(topicPartition); + SourceRecord record = + new SourceRecord( + sourcePartition, + null, + topicPartition.topic(), + topicPartition.partition(), + Schema.BYTES_SCHEMA, + null, + Schema.OPTIONAL_BYTES_SCHEMA, + null); + for (Transformation transform : this.routers) { + record = transform.apply(record); + } + return record.topic(); + } + + private static Consumer newSourceConsumer(SourceConfig config) { + Map consumerProperties = config.getConsumerProperties(); + + // The "monitor1" client id suffix is used to keep JMX bean names distinct + consumerProperties.computeIfPresent( + CommonClientConfigs.CLIENT_ID_CONFIG, (k, v) -> v + "monitor1"); + return new KafkaConsumer<>(consumerProperties); + } + + /** + * * Reconciles the default consumer properties with the destination-consumer properties. The + * destination-consumer properties have higher precedence. + * + * @param config config of the source connector + * @return map that includes the consumer configs + */ + static Map getReconciledDestConsumerConfigs(SourceConfig config) { + // handle destination.bootstrap.server separately + // keeping this config for backward compatibility + String destBootstrap = config.getDestinationBootstrapServers(); + Map destConsumerProps = config.getDestinationConsumerProperties(); + + if (!destConsumerProps.containsKey("bootstrap.servers")) { + destConsumerProps.put("bootstrap.servers", destBootstrap); + } + Map reconciledConsumerConfigs = config.getConsumerProperties(); + // use destination.consumer properties to override default consumer properties + destConsumerProps.forEach((k, v) -> reconciledConsumerConfigs.put(k, v)); + return reconciledConsumerConfigs; + } + + private static Consumer newDestinationConsumer(SourceConfig config) { + Map consumerProperties = getReconciledDestConsumerConfigs(config); + // The "monitor2" client id suffix is used to keep JMX bean names distinct + consumerProperties.computeIfPresent( + CommonClientConfigs.CLIENT_ID_CONFIG, (k, v) -> v + "monitor2"); + return new KafkaConsumer<>(consumerProperties); + } + + @Override + public void run() { + int consecutiveRetriableErrors = 0; + while (true) { + try { + // Do a fast shutdown check first thing in case we're in an exponential backoff retry loop, + // which will never hit the poll wait below + if (shutDownLatch.await(0, TimeUnit.MILLISECONDS)) { + logger.debug("Exiting KafkaMonitor"); + return; + } + if (this.topicPartitionList == null) { + // Need to initialize here to prevent the constructor hanging on startup if the + // source cluster is unavailable. + this.topicPartitionList = fetchTopicPartitionList(); + } + + if (partitionsChanged()) { + logger.info("Source partition change detected. Requesting task reconfiguration."); + this.context.requestTaskReconfiguration(); + } + + if (shutDownLatch.await(monitorPollWaitMs, TimeUnit.MILLISECONDS)) { + logger.debug("Exiting KafkaMonitor"); + return; + } + consecutiveRetriableErrors = 0; + } catch (WakeupException | InterruptedException e) { + // Assume we've been woken or interrupted to shutdown, so continue on to checking the + // shutDownLatch next iteration. + logger.debug("KafkaMonitor woken up, checking if shutdown requested..."); + } catch (RetriableException e) { + consecutiveRetriableErrors += 1; + logger.warn( + "Retriable exception encountered ({} consecutive), continuing processing...", + consecutiveRetriableErrors, + e); + exponentialBackoffWait(consecutiveRetriableErrors); + } catch (Exception e) { + logger.error("Raising exception to connect runtime", e); + context.raiseError(e); + } + } + } + + private void exponentialBackoffWait(int numErrors) { + int secondsToWait = + BACKOFF_WAIT_SECONDS[Math.min(Math.max(numErrors - 1, 0), BACKOFF_WAIT_SECONDS.length - 1)]; + if (secondsToWait > 0) { + try { + TimeUnit.SECONDS.sleep(secondsToWait); + } catch (InterruptedException e) { + logger.debug("Interrupted while sleeping due to retriable errors, resuming..."); + } + } + } + + /** Returns true if the subscribed partition list has changed and we need a rebalance. */ + boolean partitionsChanged() { + List oldPartitionInfoList = topicPartitionList; + topicPartitionList = fetchTopicPartitionList(); + logger.trace("Detected {} matching partitions", topicPartitionList.size()); + + if (oldPartitionInfoList == null) { + // List not initialized yet, so don't trigger rebalance + return false; + } + + // Note: These two lists are already sorted + boolean partitionsChanged = !topicPartitionList.equals(oldPartitionInfoList); + if (partitionsChanged) { + logger.debug( + "Replicated partition set change. Triggering re-balance with {} partitions", + topicPartitionList.size()); + } else { + logger.debug("No change to replicated partition set"); + } + return partitionsChanged; + } + + private List fetchMatchingPartitions(Consumer consumer) { + return consumer + .listTopics() + .entrySet() + .stream() + .filter( + e -> + topicsWhitelist.contains(e.getKey()) + || topicsRegexPattern.matcher(e.getKey()).matches() + || topicsRegexList.stream().anyMatch(r -> r.matcher(e.getKey()).matches())) + .flatMap(e -> e.getValue().stream()) + .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())) + .collect(Collectors.toList()); + } + + private List fetchTopicPartitionList() { + logger.trace( + "Fetching partition details for topicsWhitelist: {}, topicsRegex: {}", + topicsWhitelist, + topicsRegexPattern.toString()); + + List sourcePartitionList; + synchronized (sourceConsumer) { + sourcePartitionList = fetchMatchingPartitions(sourceConsumer); + } + + List result; + if (this.topicCheckingEnabled) { + result = getDestinationAvailablePartitions(sourcePartitionList); + } else { + result = sourcePartitionList; + } + + // Sort the result for order-independent comparison + result.sort(Comparator.comparing(tp -> tp.topic() + tp.partition())); + return result; + } + + private List getDestinationAvailablePartitions( + List sourcePartitionList) { + SourcePartitionValidator sourcePartitionValidator = + new SourcePartitionValidator( + destinationConsumer, validationStrategy, this::applyRoutersToTopic); + + // Split the source partition list into those contained in the destination, and those + // missing. Using toCollection(ArrayList::new) to guarantee we can sort successfully. + Map> partitionedSourceIds = + sourcePartitionList + .stream() + .collect( + Collectors.partitioningBy( + sourcePartitionValidator::isHealthy, Collectors.toCollection(ArrayList::new))); + + List missingPartitions = partitionedSourceIds.get(false); + if (!missingPartitions.isEmpty()) { + logger.warn( + "Missing target partitions {}. Topics may need to be added to the target cluster.", + missingPartitions); + } + + missingPartsJmxReporter.recordMetric(missingPartitions.size()); + + List result = partitionedSourceIds.get(true); + return result; + } + + /** + * Determines the MirusSourceConnector task configuration, including partition assignment. This is + * called from the main SourceConnector thread. + * + * @param maxTasks The maximum number of tasks this SourceConnector is configured to run in + * parallel across the cluster. + */ + List> taskConfigs(int maxTasks) { + logger.debug("Called taskConfigs maxTasks {}", maxTasks); + + // No need to synchronize while waiting for initialization + long tStart = System.currentTimeMillis(); + while (this.topicPartitionList == null && (System.currentTimeMillis() - tStart) < 60000) { + try { + Thread.sleep(3000); + // List will normally be initialized within this time + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + synchronized (this) { + if (topicPartitionList == null) { + // Timed out waiting for initialization. Initialize with empty list so we can detect + // changes. + logger.error("Topic partition list not initialized. Falling back on empty list."); + this.topicPartitionList = new ArrayList<>(); + } + if (topicPartitionList.isEmpty()) { + return Collections.emptyList(); + } + return taskConfigBuilder.fromPartitionList(maxTasks, topicPartitionList); + } + } + + void stop() { + missingPartsJmxReporter.recordMetric(0); + shutDownLatch.countDown(); + sourceConsumer.wakeup(); + destinationConsumer.wakeup(); + synchronized (sourceConsumer) { + sourceConsumer.close(); + } + synchronized (destinationConsumer) { + destinationConsumer.close(); + } + } +} diff --git a/src/test/resources/processor_test_files/S2142_InterruptedException/NOCOMPILE_RepairAndFieldTypePrint.java.exact b/src/test/resources/processor_test_files/S2142_InterruptedException/NOCOMPILE_RepairAndFieldTypePrint.java.exact new file mode 100644 index 000000000..86df0d48c --- /dev/null +++ b/src/test/resources/processor_test_files/S2142_InterruptedException/NOCOMPILE_RepairAndFieldTypePrint.java.exact @@ -0,0 +1,162 @@ + private final SourcePartitionValidator.MatchingStrategy validationStrategy; + private final MissingPartitionsJmxReporter missingPartsJmxReporter = + new MissingPartitionsJmxReporter(); + private final List> routers; + private final boolean topicCheckingEnabled; + + // The current list of partitions to replicate. + private volatile List topicPartitionList; + + KafkaMonitor(ConnectorContext context, SourceConfig config, TaskConfigBuilder taskConfigBuilder) { + this( + context, + config, + newSourceConsumer(config), + newDestinationConsumer(config), + taskConfigBuilder); + } + + KafkaMonitor( + ConnectorContext context, + SourceConfig config, + Consumer sourceConsumer, + Consumer destinationConsumer, + TaskConfigBuilder taskConfigBuilder) { + this.context = context; + this.topicsWhitelist = config.getTopicsWhitelist(); + this.monitorPollWaitMs = config.getMonitorPollWaitMs(); + this.topicsRegexPattern = Pattern.compile(config.getTopicsRegex()); + this.topicsRegexList = config.getTopicsRegexList(); + this.sourceConsumer = sourceConsumer; + this.destinationConsumer = destinationConsumer; + if (topicsWhitelist.isEmpty() + && config.getTopicsRegex().isEmpty() + && config.getTopicsRegexList().isEmpty()) { + logger.warn("No whitelist configured"); + } + this.taskConfigBuilder = taskConfigBuilder; + this.validationStrategy = + config.getEnablePartitionMatching() + ? SourcePartitionValidator.MatchingStrategy.PARTITION + : SourcePartitionValidator.MatchingStrategy.TOPIC; + this.topicCheckingEnabled = config.getTopicCheckingEnabled(); + this.routers = this.validateTransformations(config.transformations()); + } + + private List> validateTransformations( + List> transformations) { + List> regexRouters = new ArrayList<>(); + + // No need to validate transforms if we're not checking destination partitions + if (this.topicCheckingEnabled) { + for (Transformation transform : transformations) { + String transformName = transform.getClass().getSimpleName(); + if (transform instanceof RegexRouter) { + regexRouters.add(transform); + // Slightly awkward check to see if any other routing transforms are configured + } else if (transformName.contains("Router")) { + throw new IllegalArgumentException( + String.format( + "Unsupported Router Transformation %s found." + + " To use it, please disable destination topic checking by setting 'enable.destination.topic.checking' to false.", + transformName)); + } else { + logger.debug("Ignoring non-routing Transformation {}", transformName); + } + } + } + return regexRouters; + } + + private String applyRoutersToTopic(String topic) { + TopicPartition topicPartition = new TopicPartition(topic, 0); + Map sourcePartition = TopicPartitionSerDe.asMap(topicPartition); + SourceRecord record = + new SourceRecord( + sourcePartition, + null, + topicPartition.topic(), + topicPartition.partition(), + Schema.BYTES_SCHEMA, + null, + Schema.OPTIONAL_BYTES_SCHEMA, + null); + for (Transformation transform : this.routers) { + record = transform.apply(record); + } + return record.topic(); + } + + private static Consumer newSourceConsumer(SourceConfig config) { + Map consumerProperties = config.getConsumerProperties(); + + // The "monitor1" client id suffix is used to keep JMX bean names distinct + consumerProperties.computeIfPresent( + CommonClientConfigs.CLIENT_ID_CONFIG, (k, v) -> v + "monitor1"); + return new KafkaConsumer<>(consumerProperties); + } + + /** + * * Reconciles the default consumer properties with the destination-consumer properties. The + * destination-consumer properties have higher precedence. + * + * @param config config of the source connector + * @return map that includes the consumer configs + */ + static Map getReconciledDestConsumerConfigs(SourceConfig config) { + // handle destination.bootstrap.server separately + // keeping this config for backward compatibility + String destBootstrap = config.getDestinationBootstrapServers(); + Map destConsumerProps = config.getDestinationConsumerProperties(); + + if (!destConsumerProps.containsKey("bootstrap.servers")) { + destConsumerProps.put("bootstrap.servers", destBootstrap); + } + Map reconciledConsumerConfigs = config.getConsumerProperties(); + // use destination.consumer properties to override default consumer properties + destConsumerProps.forEach((k, v) -> reconciledConsumerConfigs.put(k, v)); + return reconciledConsumerConfigs; + } + + private static Consumer newDestinationConsumer(SourceConfig config) { + Map consumerProperties = getReconciledDestConsumerConfigs(config); + // The "monitor2" client id suffix is used to keep JMX bean names distinct + consumerProperties.computeIfPresent( + CommonClientConfigs.CLIENT_ID_CONFIG, (k, v) -> v + "monitor2"); + return new KafkaConsumer<>(consumerProperties); + } + + @Override + public void run() { + int consecutiveRetriableErrors = 0; + while (true) { + try { + // Do a fast shutdown check first thing in case we're in an exponential backoff retry loop, + // which will never hit the poll wait below + if (shutDownLatch.await(0, TimeUnit.MILLISECONDS)) { + logger.debug("Exiting KafkaMonitor"); + return; + } + if (this.topicPartitionList == null) { + // Need to initialize here to prevent the constructor hanging on startup if the + // source cluster is unavailable. + this.topicPartitionList = fetchTopicPartitionList(); + } + + if (partitionsChanged()) { + logger.info("Source partition change detected. Requesting task reconfiguration."); + this.context.requestTaskReconfiguration(); + } + + if (shutDownLatch.await(monitorPollWaitMs, TimeUnit.MILLISECONDS)) { + logger.debug("Exiting KafkaMonitor"); + return; + } + consecutiveRetriableErrors = 0; + } catch (WakeupException | InterruptedException e) { + // Assume we've been woken or interrupted to shutdown, so continue on to checking the + // shutDownLatch next iteration. + logger.debug("KafkaMonitor woken up, checking if shutdown requested..."); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } \ No newline at end of file From 76c1e5b516f9184a07417d130cf070bf5e5de7b2 Mon Sep 17 00:00:00 2001 From: Aman Sharma Date: Fri, 10 Dec 2021 18:15:58 +0100 Subject: [PATCH 6/6] Add test case for #602 --- ...NOCOMPILE_RepairAndDoNotRemoveImports.java | 110 ++++++++++++++++++ ...ILE_RepairAndDoNotRemoveImports.java.exact | 37 ++++++ 2 files changed, 147 insertions(+) create mode 100644 src/test/resources/processor_test_files/S2184_CastArithmeticOperand/NOCOMPILE_RepairAndDoNotRemoveImports.java create mode 100644 src/test/resources/processor_test_files/S2184_CastArithmeticOperand/NOCOMPILE_RepairAndDoNotRemoveImports.java.exact diff --git a/src/test/resources/processor_test_files/S2184_CastArithmeticOperand/NOCOMPILE_RepairAndDoNotRemoveImports.java b/src/test/resources/processor_test_files/S2184_CastArithmeticOperand/NOCOMPILE_RepairAndDoNotRemoveImports.java new file mode 100644 index 000000000..f8be4d18c --- /dev/null +++ b/src/test/resources/processor_test_files/S2184_CastArithmeticOperand/NOCOMPILE_RepairAndDoNotRemoveImports.java @@ -0,0 +1,110 @@ +// This test resource is a reproduction of https://github.com/SpoonLabs/sorald/issues/602 + +/* + * This file is licensed under the MIT License (MIT). + * + * Copyright (c) 2014 Daniel Ennis + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package co.aikar.timings; + +import static co.aikar.timings.TimingIdentifier.DEFAULT_GROUP; +import static co.aikar.timings.TimingsManager.*; + +public class FullServerTickTiming extends Timing { + private static final TimingIdentifier IDENTIFIER = new TimingIdentifier(DEFAULT_GROUP.name, "Full Server Tick", null); + final TimingData minuteData; + double avgFreeMemory = -1D; + double avgUsedMemory = -1D; + + FullServerTickTiming() { + super(IDENTIFIER); + this.minuteData = new TimingData(this.id); + + TIMING_MAP.put(IDENTIFIER, this); + } + + @Override + public Timing startTiming() { + if (TimingsManager.needsFullReset) { + TimingsManager.resetTimings(); + } else if (TimingsManager.needsRecheckEnabled) { + TimingsManager.recheckEnabled(); + } + super.startTiming(); + return this; + } + + @Override + public void stopTiming() { + super.stopTiming(); + if (!this.enabled) { + return; + } + + if (TimingsHistory.timedTicks % 20 == 0) { + final Runtime runtime = Runtime.getRuntime(); + double usedMemory = runtime.totalMemory() - runtime.freeMemory(); + double freeMemory = runtime.maxMemory() - usedMemory; + + if (this.avgFreeMemory == -1) { + this.avgFreeMemory = freeMemory; + } else { + this.avgFreeMemory = (this.avgFreeMemory * (59 / 60D)) + (freeMemory * (1 / 60D)); + } + + if (this.avgUsedMemory == -1) { + this.avgUsedMemory = usedMemory; + } else { + this.avgUsedMemory = (this.avgUsedMemory * (59 / 60D)) + (usedMemory * (1 / 60D)); + } + } + + long start = System.nanoTime(); + TimingsManager.tick(); + long diff = System.nanoTime() - start; + + CURRENT = Timings.timingsTickTimer; + Timings.timingsTickTimer.addDiff(diff); + //addDiff for timingsTickTimer incremented this, bring it back down to 1 per tick. + this.record.curTickCount--; + this.minuteData.curTickTotal = this.record.curTickTotal; + this.minuteData.curTickCount = 1; + boolean violated = isViolated(); + this.minuteData.tick(violated); + Timings.timingsTickTimer.tick(violated); + tick(violated); + + if (TimingsHistory.timedTicks % 1200 == 0) { + MINUTE_REPORTS.add(new TimingsHistory.MinuteReport()); + TimingsHistory.resetTicks(false); + this.minuteData.reset(); + } + + if (TimingsHistory.timedTicks % Timings.getHistoryInterval() == 0) { + TimingsManager.HISTORY.add(new TimingsHistory()); + TimingsManager.resetTimings(); + } + } + + boolean isViolated() { + return this.record.curTickTotal > 50000000; + } +} diff --git a/src/test/resources/processor_test_files/S2184_CastArithmeticOperand/NOCOMPILE_RepairAndDoNotRemoveImports.java.exact b/src/test/resources/processor_test_files/S2184_CastArithmeticOperand/NOCOMPILE_RepairAndDoNotRemoveImports.java.exact new file mode 100644 index 000000000..6bd253b72 --- /dev/null +++ b/src/test/resources/processor_test_files/S2184_CastArithmeticOperand/NOCOMPILE_RepairAndDoNotRemoveImports.java.exact @@ -0,0 +1,37 @@ +import static co.aikar.timings.TimingIdentifier.DEFAULT_GROUP; +import static co.aikar.timings.TimingsManager.*; + +public class FullServerTickTiming extends Timing { + private static final TimingIdentifier IDENTIFIER = new TimingIdentifier(DEFAULT_GROUP.name, "Full Server Tick", null); + final TimingData minuteData; + double avgFreeMemory = -1D; + double avgUsedMemory = -1D; + + FullServerTickTiming() { + super(IDENTIFIER); + this.minuteData = new TimingData(this.id); + + TIMING_MAP.put(IDENTIFIER, this); + } + + @Override + public Timing startTiming() { + if (TimingsManager.needsFullReset) { + TimingsManager.resetTimings(); + } else if (TimingsManager.needsRecheckEnabled) { + TimingsManager.recheckEnabled(); + } + super.startTiming(); + return this; + } + + @Override + public void stopTiming() { + super.stopTiming(); + if (!this.enabled) { + return; + } + + if (TimingsHistory.timedTicks % 20 == 0) { + final Runtime runtime = Runtime.getRuntime(); + double usedMemory = (double) runtime.totalMemory() - runtime.freeMemory(); \ No newline at end of file