-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-6089 Add Parquet record reader and writer #3679
Conversation
NIFI-5755 Allow PutParquet processor to specify avro write configuration
...ord-utils/src/main/java/org/apache/nifi/record/listen/StandardSocketChannelRecordReader.java
Show resolved
Hide resolved
...quet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java
Outdated
Show resolved
Hide resolved
...ifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java
Outdated
Show resolved
Hide resolved
...bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetUtils.java
Outdated
Show resolved
Hide resolved
...parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java
Outdated
Show resolved
Hide resolved
...parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java
Show resolved
Hide resolved
...ifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetRecordSetWriter.java
Show resolved
Hide resolved
Added new commit to address feedback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good, just added some refactor suggestions.
I also did a manual test and seems to work well.
I used this template:
NIFI-6089_test.xml.zip
PutFile(f1ab37c0-016c-1000-75ed-2c5c42ea90c6) writes to: /tmp/NIFI-6089/out
That being said, this is quite a big change as it introduces the concept of inputLength and with that alters the RecordReaderFactory
interface.
I tried to think of a solution that would make it possible to introduce this in a less invasive manner but I can't see a way around it.
Basically we can never know when we attach a record reader that requires this information.
As I see it, this feature indeed requires this large scale change.
...bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetUtils.java
Outdated
Show resolved
Hide resolved
...ifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetRecordSetWriter.java
Outdated
Show resolved
Hide resolved
New commit for latest feedback |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
Reviewing... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 LGTM. Tested with ConvertRecord and works like a charm with inferred schema.
NIFI-5755 Allow PutParquet processor to specify avro write configuration Review feedback Additional review feedback This closes apache#3679 Signed-off-by: Mike Thomsen <[email protected]>
NIFI-5755 Allow PutParquet processor to specify avro write configuration Review feedback Additional review feedback This closes apache#3679 Signed-off-by: Mike Thomsen <[email protected]>
NIFI-5755 Allow PutParquet processor to specify avro write configuration Review feedback Additional review feedback This closes apache#3679 Signed-off-by: Mike Thomsen <[email protected]>
NIFI-5755 Allow PutParquet processor to specify avro write configuration
This PR adds ParquetReader and ParquetRecordSetWriter, and also added some additional configuration to the PutParquet processor that was also needed in the record writer.
In order to implement the ParquetReader, the Parquet API requires knowledge of the inputLength ahead of time, which currently wasn't available to the RecordReaderFactory method that didn't have a FlowFile passed in.
To work around this I added an optional
long inputLength
parameter to that method and updated all the corresponding places in our code where it is called. The only place where we really don't know the input length is when reading from an unbound stream like ListenTCPRecord.