From 73c0ee06794813a4698979462560bdb58dcd8fdd Mon Sep 17 00:00:00 2001 From: Paul Grey Date: Fri, 17 May 2024 20:20:36 -0400 Subject: [PATCH] NIFI-11259 - checkstyle conformance; component property renames --- .../nifi/kafka/processors/AbstractKafkaBaseIT.java | 8 ++++---- .../kafka/processors/ConsumeKafkaDemarcatorIT.java | 12 ++++++------ .../publish/additional/PublishKafkaContentX1IT.java | 8 ++++---- .../publish/additional/PublishKafkaWrapperX1IT.java | 4 ++-- .../publish/additional/PublishKafkaWrapperX2IT.java | 4 ++-- .../publish/additional/PublishKafkaWrapperX3IT.java | 4 ++-- .../publish/additional/PublishKafkaWrapperX4IT.java | 4 ++-- .../publish/additional/PublishKafkaWrapperX5IT.java | 8 ++++---- .../kafka/service/Kafka3ConnectionServiceBaseIT.java | 2 -- .../apache/nifi/kafka/processors/PublishKafka.java | 10 +++++----- 10 files changed, 31 insertions(+), 33 deletions(-) diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java index c20c5ffe6de33..743c9d0b00ef5 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java @@ -69,7 +69,7 @@ protected String addKafkaConnectionService(final TestRunner runner) throws Initi } protected String addRecordReaderService(final TestRunner runner) throws InitializationException { - final String readerId = "record-reader"; + final String readerId = ConsumeKafka.RECORD_READER.getName(); final RecordReaderFactory readerService = new JsonTreeReader(); runner.addControllerService(readerId, readerService); runner.enableControllerService(readerService); @@ -78,7 +78,7 @@ protected String addRecordReaderService(final TestRunner runner) throws Initiali } protected String addRecordWriterService(final TestRunner runner) throws InitializationException { - final String writerId = "record-writer"; + final String writerId = ConsumeKafka.RECORD_WRITER.getName(); final RecordSetWriterFactory writerService = new JsonRecordSetWriter(); runner.addControllerService(writerId, writerService); runner.enableControllerService(writerService); @@ -87,7 +87,7 @@ protected String addRecordWriterService(final TestRunner runner) throws Initiali } protected String addRecordKeyReaderService(final TestRunner runner) throws InitializationException { - final String readerId = "key-record-reader"; + final String readerId = ConsumeKafka.KEY_RECORD_READER.getName(); final RecordReaderFactory readerService = new JsonTreeReader(); runner.addControllerService(readerId, readerService); runner.enableControllerService(readerService); @@ -96,7 +96,7 @@ protected String addRecordKeyReaderService(final TestRunner runner) throws Initi } protected String addRecordKeyWriterService(final TestRunner runner) throws InitializationException { - final String writerId = "record-key-writer"; + final String writerId = PublishKafka.RECORD_KEY_WRITER.getName(); final RecordSetWriterFactory writerService = new JsonRecordSetWriter(); runner.addControllerService(writerId, writerService); runner.enableControllerService(writerService); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaDemarcatorIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaDemarcatorIT.java index 6d3534d61628f..f81021b84ff85 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaDemarcatorIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaDemarcatorIT.java @@ -75,8 +75,8 @@ void testConsumeDemarcated() throws InterruptedException, ExecutionException { runner.setProperty(ConsumeKafka.TOPICS, topic); runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, ProcessingStrategy.DEMARCATOR); - runner.setProperty("message-demarcator", "|"); - runner.setProperty("separate-by-key", Boolean.FALSE.toString()); + runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "|"); + runner.setProperty(ConsumeKafka.SEPARATE_BY_KEY, Boolean.FALSE.toString()); runner.run(1, false, true); final Collection> records = new ArrayList<>(); @@ -120,8 +120,8 @@ void testConsumeDemarcatedSeparateByKey() throws InterruptedException, Execution final String groupId = topic.substring(0, topic.indexOf("-")); runner.setProperty(ConsumeKafka.GROUP_ID, groupId); runner.setProperty(ConsumeKafka.TOPICS, topic); - runner.setProperty("message-demarcator", "|"); - runner.setProperty("separate-by-key", Boolean.TRUE.toString()); + runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "|"); + runner.setProperty(ConsumeKafka.SEPARATE_BY_KEY, Boolean.TRUE.toString()); runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, ProcessingStrategy.DEMARCATOR); runner.run(1, false, true); @@ -174,8 +174,8 @@ void testConsumeDemarcatedSeparateByHeader() throws InterruptedException, Execut final String groupId = topic.substring(0, topic.indexOf("-")); runner.setProperty(ConsumeKafka.GROUP_ID, groupId); runner.setProperty(ConsumeKafka.TOPICS, topic); - runner.setProperty("message-demarcator", "|"); - runner.setProperty("Header Name Pattern", "A.*"); + runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "|"); + runner.setProperty(ConsumeKafka.HEADER_NAME_PATTERN, "A.*"); runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, ProcessingStrategy.DEMARCATOR); runner.run(1, false, true); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaContentX1IT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaContentX1IT.java index ea717897b07ec..bb13694ba2c40 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaContentX1IT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaContentX1IT.java @@ -59,10 +59,10 @@ public void test1ProduceOneFlowFile() throws InitializationException, IOExceptio addRecordWriterService(runner); addRecordKeyWriterService(runner); - runner.setProperty("Topic Name", getClass().getName()); - runner.setProperty("publish-strategy", PublishStrategy.USE_VALUE.name()); - runner.setProperty("message-key-field", "account"); - runner.setProperty("attribute-name-regex", "attribute.*"); + runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName()); + runner.setProperty(PublishKafka.PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.name()); + runner.setProperty(PublishKafka.MESSAGE_KEY_FIELD, "account"); + runner.setProperty(PublishKafka.ATTRIBUTE_NAME_REGEX, "attribute.*"); final Map attributes = new HashMap<>(); attributes.put("attributeA", "valueA"); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX1IT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX1IT.java index a2f604e742ede..12e697381230c 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX1IT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX1IT.java @@ -58,8 +58,8 @@ public void test1ProduceOneFlowFile() throws InitializationException, IOExceptio addRecordReaderService(runner); addRecordWriterService(runner); - runner.setProperty("Topic Name", getClass().getName()); - runner.setProperty("publish-strategy", PublishStrategy.USE_WRAPPER.name()); + runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName()); + runner.setProperty(PublishKafka.PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.name()); final Map attributes = new HashMap<>(); final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull( diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX2IT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX2IT.java index 93d22ede13ce3..71299901d095e 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX2IT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX2IT.java @@ -57,8 +57,8 @@ public void test1ProduceOneFlowFile() throws InitializationException, IOExceptio addRecordWriterService(runner); addRecordKeyWriterService(runner); - runner.setProperty("Topic Name", getClass().getName()); - runner.setProperty("publish-strategy", PublishStrategy.USE_WRAPPER.name()); + runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName()); + runner.setProperty(PublishKafka.PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.name()); final Map attributes = new HashMap<>(); final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull( diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX3IT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX3IT.java index 3ddfe4cfc3fb1..00fa62341ea26 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX3IT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX3IT.java @@ -58,8 +58,8 @@ public void test1ProduceOneFlowFile() throws InitializationException, IOExceptio addRecordReaderService(runner); addRecordWriterService(runner); - runner.setProperty("Topic Name", getClass().getName()); - runner.setProperty("publish-strategy", PublishStrategy.USE_WRAPPER.name()); + runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName()); + runner.setProperty(PublishKafka.PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.name()); final Map attributes = new HashMap<>(); final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull( diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX4IT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX4IT.java index 905dfaba7928f..d16387d59bc33 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX4IT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX4IT.java @@ -59,8 +59,8 @@ public void test1ProduceOneFlowFile() throws InitializationException, IOExceptio addRecordReaderService(runner); addRecordWriterService(runner); - runner.setProperty("Topic Name", getClass().getName()); - runner.setProperty("publish-strategy", PublishStrategy.USE_WRAPPER.name()); + runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName()); + runner.setProperty(PublishKafka.PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.name()); final Map attributes = new HashMap<>(); final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull( diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX5IT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX5IT.java index aa5db0d3c7e59..3de3da941741b 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX5IT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX5IT.java @@ -96,11 +96,11 @@ public void test1ProduceOneFlowFile() throws InitializationException, IOExceptio addRecordReaderService(runner); addRecordWriterService(runner); - runner.setProperty("Topic Name", TEST_TOPIC); - runner.setProperty("partition", Integer.toString(TEST_PARTITION)); + runner.setProperty(PublishKafka.TOPIC_NAME, TEST_TOPIC); + runner.setProperty(PublishKafka.PARTITION, Integer.toString(TEST_PARTITION)); runner.getLogger().info("partition={}", TEST_PARTITION); - runner.setProperty("publish-strategy", PublishStrategy.USE_WRAPPER.name()); - runner.setProperty("Record Metadata Strategy", RecordMetadataStrategy.FROM_RECORD.getValue()); + runner.setProperty(PublishKafka.PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.name()); + runner.setProperty(PublishKafka.RECORD_METADATA_STRATEGY, RecordMetadataStrategy.FROM_RECORD.getValue()); final Map attributes = new HashMap<>(); final byte[] bytesFlowFileTemplate = IOUtils.toByteArray(Objects.requireNonNull( diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java index 0801ae4590c0d..5435e63b777f7 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java @@ -56,9 +56,7 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java index d40bead2e51e0..ae996f8af106b 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java @@ -129,7 +129,7 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build(); - static final PropertyDescriptor PUBLISH_STRATEGY = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PUBLISH_STRATEGY = new PropertyDescriptor.Builder() .name("Publish Strategy") .description("The format used to publish the incoming FlowFile record to Kafka.") .required(true) @@ -137,7 +137,7 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo .allowableValues(PublishStrategy.class) .build(); - static final PropertyDescriptor MESSAGE_KEY_FIELD = new PropertyDescriptor.Builder() + public static final PropertyDescriptor MESSAGE_KEY_FIELD = new PropertyDescriptor.Builder() .name("Message Key Field") .description("The name of a field in the Input Records that should be used as the Key for the Kafka message.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -195,7 +195,7 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo .defaultValue("none") .build(); - static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder() + public static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder() .name("Attributes to Send as Headers (Regex)") .description("A Regular Expression that is matched against all FlowFile attribute names. " + "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. " @@ -242,7 +242,7 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.getValue()) .build(); - static final PropertyDescriptor RECORD_METADATA_STRATEGY = new PropertyDescriptor.Builder() + public static final PropertyDescriptor RECORD_METADATA_STRATEGY = new PropertyDescriptor.Builder() .name("Record Metadata Strategy") .description("Specifies whether the Record's metadata (topic and partition) should come from the Record's metadata field or if it should come from the configured " + "Topic Name and Partition / Partitioner class properties") @@ -273,7 +273,7 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo .required(false) .build(); - static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder() .name("partition") .displayName("Partition") .description("Specifies which Partition Records will go to. How this value is interpreted is dictated by the property.")