Skip to content

Commit

Permalink
NIFI-11259 - checkstyle conformance; component property renames
Browse files Browse the repository at this point in the history
  • Loading branch information
greyp9 committed May 21, 2024
1 parent f47ad48 commit 73c0ee0
Show file tree
Hide file tree
Showing 10 changed files with 31 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ protected String addKafkaConnectionService(final TestRunner runner) throws Initi
}

protected String addRecordReaderService(final TestRunner runner) throws InitializationException {
final String readerId = "record-reader";
final String readerId = ConsumeKafka.RECORD_READER.getName();
final RecordReaderFactory readerService = new JsonTreeReader();
runner.addControllerService(readerId, readerService);
runner.enableControllerService(readerService);
Expand All @@ -78,7 +78,7 @@ protected String addRecordReaderService(final TestRunner runner) throws Initiali
}

protected String addRecordWriterService(final TestRunner runner) throws InitializationException {
final String writerId = "record-writer";
final String writerId = ConsumeKafka.RECORD_WRITER.getName();
final RecordSetWriterFactory writerService = new JsonRecordSetWriter();
runner.addControllerService(writerId, writerService);
runner.enableControllerService(writerService);
Expand All @@ -87,7 +87,7 @@ protected String addRecordWriterService(final TestRunner runner) throws Initiali
}

protected String addRecordKeyReaderService(final TestRunner runner) throws InitializationException {
final String readerId = "key-record-reader";
final String readerId = ConsumeKafka.KEY_RECORD_READER.getName();
final RecordReaderFactory readerService = new JsonTreeReader();
runner.addControllerService(readerId, readerService);
runner.enableControllerService(readerService);
Expand All @@ -96,7 +96,7 @@ protected String addRecordKeyReaderService(final TestRunner runner) throws Initi
}

protected String addRecordKeyWriterService(final TestRunner runner) throws InitializationException {
final String writerId = "record-key-writer";
final String writerId = PublishKafka.RECORD_KEY_WRITER.getName();
final RecordSetWriterFactory writerService = new JsonRecordSetWriter();
runner.addControllerService(writerId, writerService);
runner.enableControllerService(writerService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ void testConsumeDemarcated() throws InterruptedException, ExecutionException {
runner.setProperty(ConsumeKafka.TOPICS, topic);
runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, ProcessingStrategy.DEMARCATOR);

runner.setProperty("message-demarcator", "|");
runner.setProperty("separate-by-key", Boolean.FALSE.toString());
runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "|");
runner.setProperty(ConsumeKafka.SEPARATE_BY_KEY, Boolean.FALSE.toString());
runner.run(1, false, true);

final Collection<ProducerRecord<String, String>> records = new ArrayList<>();
Expand Down Expand Up @@ -120,8 +120,8 @@ void testConsumeDemarcatedSeparateByKey() throws InterruptedException, Execution
final String groupId = topic.substring(0, topic.indexOf("-"));
runner.setProperty(ConsumeKafka.GROUP_ID, groupId);
runner.setProperty(ConsumeKafka.TOPICS, topic);
runner.setProperty("message-demarcator", "|");
runner.setProperty("separate-by-key", Boolean.TRUE.toString());
runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "|");
runner.setProperty(ConsumeKafka.SEPARATE_BY_KEY, Boolean.TRUE.toString());
runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, ProcessingStrategy.DEMARCATOR);
runner.run(1, false, true);

Expand Down Expand Up @@ -174,8 +174,8 @@ void testConsumeDemarcatedSeparateByHeader() throws InterruptedException, Execut
final String groupId = topic.substring(0, topic.indexOf("-"));
runner.setProperty(ConsumeKafka.GROUP_ID, groupId);
runner.setProperty(ConsumeKafka.TOPICS, topic);
runner.setProperty("message-demarcator", "|");
runner.setProperty("Header Name Pattern", "A.*");
runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "|");
runner.setProperty(ConsumeKafka.HEADER_NAME_PATTERN, "A.*");

runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, ProcessingStrategy.DEMARCATOR);
runner.run(1, false, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ public void test1ProduceOneFlowFile() throws InitializationException, IOExceptio
addRecordWriterService(runner);
addRecordKeyWriterService(runner);

runner.setProperty("Topic Name", getClass().getName());
runner.setProperty("publish-strategy", PublishStrategy.USE_VALUE.name());
runner.setProperty("message-key-field", "account");
runner.setProperty("attribute-name-regex", "attribute.*");
runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
runner.setProperty(PublishKafka.PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.name());
runner.setProperty(PublishKafka.MESSAGE_KEY_FIELD, "account");
runner.setProperty(PublishKafka.ATTRIBUTE_NAME_REGEX, "attribute.*");

final Map<String, String> attributes = new HashMap<>();
attributes.put("attributeA", "valueA");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public void test1ProduceOneFlowFile() throws InitializationException, IOExceptio
addRecordReaderService(runner);
addRecordWriterService(runner);

runner.setProperty("Topic Name", getClass().getName());
runner.setProperty("publish-strategy", PublishStrategy.USE_WRAPPER.name());
runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
runner.setProperty(PublishKafka.PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.name());

final Map<String, String> attributes = new HashMap<>();
final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ public void test1ProduceOneFlowFile() throws InitializationException, IOExceptio
addRecordWriterService(runner);
addRecordKeyWriterService(runner);

runner.setProperty("Topic Name", getClass().getName());
runner.setProperty("publish-strategy", PublishStrategy.USE_WRAPPER.name());
runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
runner.setProperty(PublishKafka.PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.name());

final Map<String, String> attributes = new HashMap<>();
final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public void test1ProduceOneFlowFile() throws InitializationException, IOExceptio
addRecordReaderService(runner);
addRecordWriterService(runner);

runner.setProperty("Topic Name", getClass().getName());
runner.setProperty("publish-strategy", PublishStrategy.USE_WRAPPER.name());
runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
runner.setProperty(PublishKafka.PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.name());

final Map<String, String> attributes = new HashMap<>();
final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public void test1ProduceOneFlowFile() throws InitializationException, IOExceptio
addRecordReaderService(runner);
addRecordWriterService(runner);

runner.setProperty("Topic Name", getClass().getName());
runner.setProperty("publish-strategy", PublishStrategy.USE_WRAPPER.name());
runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
runner.setProperty(PublishKafka.PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.name());

final Map<String, String> attributes = new HashMap<>();
final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ public void test1ProduceOneFlowFile() throws InitializationException, IOExceptio
addRecordReaderService(runner);
addRecordWriterService(runner);

runner.setProperty("Topic Name", TEST_TOPIC);
runner.setProperty("partition", Integer.toString(TEST_PARTITION));
runner.setProperty(PublishKafka.TOPIC_NAME, TEST_TOPIC);
runner.setProperty(PublishKafka.PARTITION, Integer.toString(TEST_PARTITION));
runner.getLogger().info("partition={}", TEST_PARTITION);
runner.setProperty("publish-strategy", PublishStrategy.USE_WRAPPER.name());
runner.setProperty("Record Metadata Strategy", RecordMetadataStrategy.FROM_RECORD.getValue());
runner.setProperty(PublishKafka.PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.name());
runner.setProperty(PublishKafka.RECORD_METADATA_STRATEGY, RecordMetadataStrategy.FROM_RECORD.getValue());

final Map<String, String> attributes = new HashMap<>();
final byte[] bytesFlowFileTemplate = IOUtils.toByteArray(Objects.requireNonNull(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,15 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();

static final PropertyDescriptor PUBLISH_STRATEGY = new PropertyDescriptor.Builder()
public static final PropertyDescriptor PUBLISH_STRATEGY = new PropertyDescriptor.Builder()
.name("Publish Strategy")
.description("The format used to publish the incoming FlowFile record to Kafka.")
.required(true)
.defaultValue(PublishStrategy.USE_VALUE.getValue())
.allowableValues(PublishStrategy.class)
.build();

static final PropertyDescriptor MESSAGE_KEY_FIELD = new PropertyDescriptor.Builder()
public static final PropertyDescriptor MESSAGE_KEY_FIELD = new PropertyDescriptor.Builder()
.name("Message Key Field")
.description("The name of a field in the Input Records that should be used as the Key for the Kafka message.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
Expand Down Expand Up @@ -195,7 +195,7 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo
.defaultValue("none")
.build();

static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder()
public static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder()
.name("Attributes to Send as Headers (Regex)")
.description("A Regular Expression that is matched against all FlowFile attribute names. "
+ "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. "
Expand Down Expand Up @@ -242,7 +242,7 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo
.dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.getValue())
.build();

static final PropertyDescriptor RECORD_METADATA_STRATEGY = new PropertyDescriptor.Builder()
public static final PropertyDescriptor RECORD_METADATA_STRATEGY = new PropertyDescriptor.Builder()
.name("Record Metadata Strategy")
.description("Specifies whether the Record's metadata (topic and partition) should come from the Record's metadata field or if it should come from the configured " +
"Topic Name and Partition / Partitioner class properties")
Expand Down Expand Up @@ -273,7 +273,7 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo
.required(false)
.build();

static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder()
public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder()
.name("partition")
.displayName("Partition")
.description("Specifies which Partition Records will go to. How this value is interpreted is dictated by the <Partitioner class> property.")
Expand Down

0 comments on commit 73c0ee0

Please sign in to comment.