-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-11259 - Kafka processor refactor #8463
Conversation
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
Outdated
Show resolved
Hide resolved
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
Outdated
Show resolved
Hide resolved
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
Outdated
Show resolved
Hide resolved
...ifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
Outdated
Show resolved
Hide resolved
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the extensive work on putting together these new Kafka components @greyp9!
The number of integration tests provide a solid foundation for verifying the functionality.
I plan to continue going through the changes, but I noted a few general things on an initial review, such as trace logs, property naming, and the Kafka 3 client version.
...-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaWrapperRecordIT.java
Outdated
Show resolved
Hide resolved
...a-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/readme.txt
Outdated
Show resolved
Hide resolved
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/README.md
Outdated
Show resolved
Hide resolved
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/pom.xml
Outdated
Show resolved
Hide resolved
...ifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
Outdated
Show resolved
Hide resolved
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
Outdated
Show resolved
Hide resolved
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
Outdated
Show resolved
Hide resolved
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
Outdated
Show resolved
Hide resolved
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
Outdated
Show resolved
Hide resolved
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
Outdated
Show resolved
Hide resolved
Can we just drop the old Kafka processors and go all in on the latest release? At this point the client libraries have stabilized greatly and us supporting the older stuff feels far less important and worthwhile now. |
It was difficult to tease out all of the business logic from the existing processor set. Some logic is there because of actual user need, and some is there because of the way Kafka and NiFi evolved alongside each other. It seems like the happy path works well for the new components, but not sure about all of the possible runtime permutations. In my mind, we’re only talking about the 2.6 processor set; the previous implementations aren’t available in the development executable. It does seem like a good idea to stop investing in enhancements to the 2.6 processors. The 2.6 work in this PR is only scaffolding, intended to better understand runtime processor behavior so it can be reproduced in the new components. The concern for me is that if we drop the 2.6 set as part of this PR, we’ll make things harder for users during the transition period, in case some important edge case is missed. It would also complicate this PR. I’d like to pitch the idea of a transition period where both sets of components exist, to facilitate transition, and to provide a simple fallback if needed. Longer term, it makes a lot of sense for 2.6 to go the way of 2.0. |
@greyp9 Yeah that is fair. Dropping the 2.x stuff is certainly fair in a diff PR. More relevant for this PR though would be the naming for the Kafka3 components and breaking the cycle of users having to change processors with Kafka changes. We had to do this during the 0x/1x/2x transitions as the client behaviors just weren't stable enough. But I suspect now they are (just my view -others may have different info). |
Agree with your thoughts on the 0x/1x/2x naming. The expectation is that, with 3+, the processor will not change. It is written to interact with Kafka through a stable controller service API. The controller service implementation will abstract away any changes needed to conform to future versions of Kafka. The controller service in this PR does use the Kafka major version suffix, but that could change. |
I see what you mean. I'm suggesting even needing to swap out the controller service seems problematic. I like the controller service model and I recall why this direction was taken. But we should be able to call it Kafka and not Kafka3 even there. IF a later client changes behavior we can call that by specific version. But in any case...I dont feel that strongly. Fine as is just sharing my thoughts. |
The jump from 2.6 to 3.0 involved * slight * changes in the NiFi component code (IIRC compatible API parameter types and additional method arguments). Less than an hour to remediate. It is a good point. Aside from practical considerations (Maven dependencies declared at compile time), it is hard to anticipate the scope of future changes to the client library API. It'll be interesting to get additional perspectives here.
And thanks for that! :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @greyp9. I noted a number of minor recommendations, mostly related to property naming.
I realize the property naming has gone in different directions over time. In general, it seems best that when a NiFi property mirrors a Kafka property, the Kafka property name can be used as the NiFi property name, and then use a display name. If the property is strictly for NiFi, like a Record Reader or Writer, then there is no need to have a different property name versus display name. There are a couple places where some names were carried over, so this is a good opportunity to make adjustments. I think the majority of the property names work as defined in the PR, but there are a handful that should be adjusted.
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/pom.xml
Outdated
Show resolved
Hide resolved
...ifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
Outdated
Show resolved
Hide resolved
...ifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
Outdated
Show resolved
Hide resolved
...ka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java
Outdated
Show resolved
Hide resolved
...i-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/ProducerCallback.java
Outdated
Show resolved
Hide resolved
...rs/src/main/java/org/apache/nifi/kafka/processors/producer/wrapper/RecordFieldConverter.java
Outdated
Show resolved
Hide resolved
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/pom.xml
Outdated
Show resolved
Hide resolved
...service-api/src/main/java/org/apache/nifi/kafka/service/api/common/ServiceConfiguration.java
Outdated
Show resolved
Hide resolved
...fka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/PollingContext.java
Outdated
Show resolved
Hide resolved
...vice-api/src/main/java/org/apache/nifi/kafka/service/api/producer/ProducerConfiguration.java
Outdated
Show resolved
Hide resolved
@greyp9 It looks like the latest changes are failing on integration tests due to property name changes. |
Thanks; pushed an update. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your patience on this @greyp9. I think it is close to completion for an initial version.
I noted a couple more property and message related recommendations, particularly about the use of reflection in a test class. If you are able to address those comments, This should be ready for merging soon.
...ration/src/test/java/org/apache/nifi/kafka/processors/consume/ConsumeKafkaConfigureTest.java
Outdated
Show resolved
Hide resolved
...ka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java
Outdated
Show resolved
Hide resolved
...ka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java
Outdated
Show resolved
Hide resolved
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
Outdated
Show resolved
Hide resolved
...undle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
Outdated
Show resolved
Hide resolved
...java/org/apache/nifi/kafka/processors/producer/convert/RecordStreamKafkaRecordConverter.java
Outdated
Show resolved
Hide resolved
...g/apache/nifi/kafka/processors/producer/convert/RecordWrapperStreamKafkaRecordConverter.java
Show resolved
Hide resolved
...g/apache/nifi/kafka/processors/producer/convert/RecordWrapperStreamKafkaRecordConverter.java
Outdated
Show resolved
Hide resolved
...rs/src/main/java/org/apache/nifi/kafka/processors/producer/wrapper/RecordFieldConverter.java
Outdated
Show resolved
Hide resolved
...3-service/src/main/java/org/apache/nifi/kafka/service/producer/txn/KafkaProducerWrapper.java
Outdated
Show resolved
Hide resolved
- strings to constants - Kafka3ConnectionService.TRANSACTION_ISOLATION_LEVEL - checkstyle issues - conform to project standard MD format - sensible renames of component properties (use Kafka names where appropriate) - update kafka docker image used for integration tests - optimize POMs to account for nifi-bom - refactor log statements (remove) - refactor development comments (remove) - refactor PropertyDescriptor descriptions (HTML interoperability) - NiFi component annotations (DynamicProperty, CapabilityDescription, Tags, WritesAttributes, SeeAlso) - notes for followup issues - dependency updates - rename jacoco module to nifi-kafka-code-coverage - additional messaging on runtime exceptions
@greyp9 I ran through some integration and runtime tests, evaluating the configuration experience in particular. I adjusted the property ordering in the Processors and Controller Service to follow a logical group of required and then optional, and also keeping groups of dependent properties together. I made one substantive change to the Transaction Isolation Level property, changing it to a described value instead of a Boolean, which is what I had intended in a previous comment. This way, it reflects the Kafka property value directly, and has corresponding description information. With these changes, I am planning to merge pending successful automated builds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again for the work on these new components @greyp9! As you noted in the summary, there will be some additional work involved, but this moves things forward and provides an opportunity for additional iterations, as well as subsequent migration away from the 2.6 versions. +1 merging
Background
Previous iterations of support for Kafka client versions in NiFi (1.0, 2.0, 2.6) duplicated code from existing Kafka processors into new Maven modules, adjusted Kafka client library dependencies for the new modules, and adjusted for API differences as needed. The original JIRA associated with NiFi support for Kafka 3 (
NIFI-9330
), followed this same approach. After discussion, a new approach was chosen, and a new JIRA (NIFI-11259
) created.In particular, the new processor module should have no Kafka dependencies. This is expected to ease the transition to future Kafka versions.
PublishKafka
andConsumeKafka
processors would need minimal / no changes to enable interactivity with the new controller service.Other refactoring activities have been undertaken at the same time:
PublishKafka
processor is intended as a replacement for the existingPublishKafka_2_6
andPublishKafkaRecord_2_6
processor pair. It provides FlowFile-based or record-based data handling modes, controlled via a per-processor property. Similarly,ConsumeKafka
replaces bothConsumeKafka_2_6
andConsumeKafkaRecord_2_6
. This design adjustment reduces the code duplication that was present in the 2.6 processor set.New Modules
nifi-kafka-service-api
- API contract forKafkaConnectionService
, which exposes access to instances ofKafkaConsumerService
andKafkaProducerService
in a manner agnostic to a particular version of KafkaKafkaProducerService
- intermediary logical service brokering interactions with the producer APIs of the Kafka client librariesKafkaConsumerService
- intermediary logical service brokering interactions with the producer APIs of the Kafka client librariesnifi-kafka-service-api-nar
- NiFi NAR wrapper for theKafkaConnectionService
API contractnifi-kafka-3-service
- home forKafka3ConnectionService
, which abstracts Kafka dependencies away from the new Kafka processors, and manages runtime connections to a remote Kafka 3 service instancenifi-kafka-3-service-nar
- NiFi NAR wrapper forKafka3ConnectionService
nifi-kafka-processors
- home forPublishKafka
andConsumeKafka
processors, which allow interactivity with remote Kafka service instances agnostic to a particular Kafka versionnifi-kafka-nar
- NiFi NAR wrapper for thePublishKafka
andConsumeKafka
processorsnifi-kafka-2-6-integration
- test bed to establish runtime behavior (testcontainers/kafka) of Kafka 2.6 processors for certain conditions, in order to better replicate those behaviorsnifi-kafka-3-integration
- testing infrastructure to test new processors / controller service while communicating with an actual (testcontainers) Kafka instancenifi-kafka-jacoco
- module home for configuration to instrument executions ofnifi-kafka-bundle
unit tests / integration tests, in order to assess test coverageNotes
testcontainers
.simplelogger.properties
have been useful during development, but these would be removed before PR merge.nifi-kafka-shared
module; I’ve avoided that during development to ease the process of rebasing tonifi/main
.PLAINTEXT
,SSL
,SASL_PLAINTEXT
, andSASL_SSL
. Support for additional authentication strategies could be handled via follow on JIRAs.KafkaRecordSink
form factor could be handled via follow on JIRAs.Issue Tracking
Pull Request Tracking
NIFI-00000
NIFI-00000
Pull Request Formatting
main
branchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
mvn clean install -P contrib-check
Licensing
LICENSE
andNOTICE
filesDocumentation