Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-6089 Add Parquet record reader and writer #3679

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
Expand All @@ -74,7 +75,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;

import static org.apache.nifi.util.StringUtils.isEmpty;

Expand Down Expand Up @@ -454,9 +454,9 @@ private void writeRecords(PartitionContext context, Iterable<EventData> messages

try (final OutputStream out = session.write(flowFile)) {
for (final EventData eventData : messages) {

try (final InputStream in = new ByteArrayInputStream(eventData.getBytes())) {
final RecordReader reader = readerFactory.createRecordReader(schemaRetrievalVariables, in, logger);
final byte[] eventDataBytes = eventData.getBytes();
try (final InputStream in = new ByteArrayInputStream(eventDataBytes)) {
final RecordReader reader = readerFactory.createRecordReader(schemaRetrievalVariables, in, eventDataBytes.length, logger);

Record record;
while ((record = reader.nextRecord()) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -204,7 +205,7 @@ private void setupRecordReader(List<EventData> eventDataList, int throwException
final RecordReaderFactory readerFactory = mock(RecordReaderFactory.class);
processor.setReaderFactory(readerFactory);
final RecordReader reader = mock(RecordReader.class);
when(readerFactory.createRecordReader(anyMap(), any(), any())).thenReturn(reader);
when(readerFactory.createRecordReader(anyMap(), any(), anyLong(), any())).thenReturn(reader);
final List<Record> recordList = eventDataList.stream()
.map(eventData -> toRecord(new String(eventData.getBytes())))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFai
recordReaderVariables.put(k, value.toString());
}
});
return new Tuple<>(null, readerFactory.createRecordReader(recordReaderVariables, in, getLogger()));
return new Tuple<>(null, readerFactory.createRecordReader(recordReaderVariables, in, -1, getLogger()));
} catch (Exception e) {
return new Tuple<>(e, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public ArrayListRecordReader(final RecordSchema schema) {
}

@Override
public ArrayListReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) {
public ArrayListReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger) {
return new ArrayListReader(records, schema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@

package org.apache.nifi.serialization.record;

import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -26,14 +34,6 @@
import java.util.List;
import java.util.Map;

import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;

public class CommaSeparatedRecordReader extends AbstractControllerService implements RecordReaderFactory {
private int failAfterN;
private int recordCount = 0;
Expand All @@ -51,7 +51,7 @@ public void failAfter(final int failAfterN) {
}

@Override
public RecordReader createRecordReader(Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, SchemaNotFoundException {
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));

final List<RecordField> fields = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@

package org.apache.nifi.serialization.record;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
Expand All @@ -33,6 +25,14 @@
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory {
private final List<Object[]> records = new ArrayList<>();
private final List<RecordField> fields = new ArrayList<>();
Expand Down Expand Up @@ -67,7 +67,7 @@ public void addRecord(Object... values) {
}

@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException {
final Iterator<Object[]> itr = records.iterator();

return new RecordReader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.nifi.record.listen;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
Expand All @@ -30,6 +29,7 @@
import java.io.InputStream;
import java.net.InetAddress;
import java.nio.channels.SocketChannel;
import java.util.Collections;

/**
* Encapsulates an SSLSocketChannel and a RecordReader created for the given channel.
Expand All @@ -54,14 +54,14 @@ public SSLSocketChannelRecordReader(final SocketChannel socketChannel,
}

@Override
public RecordReader createRecordReader(final FlowFile flowFile, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
public RecordReader createRecordReader(final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
if (recordReader != null) {
throw new IllegalStateException("Cannot create RecordReader because already created");
}

final InputStream socketIn = new SSLSocketChannelInputStream(sslSocketChannel);
final InputStream in = new BufferedInputStream(socketIn);
recordReader = readerFactory.createRecordReader(flowFile, in, logger);
recordReader = readerFactory.createRecordReader(Collections.emptyMap(), in, -1, logger);
return recordReader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.nifi.record.listen;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
Expand All @@ -32,18 +31,14 @@
public interface SocketChannelRecordReader extends Closeable {

/**
* Currently a RecordReader can only be created with a FlowFile. Since we won't have a FlowFile at the time
* a connection is accepted, this method will be used to lazily create the RecordReader later. Eventually this
* method should be removed and the reader should be passed in through the constructor.
* Lazily creates the RecordReader.
*
*
* @param flowFile the flow file we are creating the reader for
* @param logger the logger of the component creating the reader
* @return a RecordReader
*
* @throws IllegalStateException if create is called after a reader has already been created
*/
RecordReader createRecordReader(final FlowFile flowFile, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException;
RecordReader createRecordReader(ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException;

/**
* @return the RecordReader created by calling createRecordReader, or null if one has not been created yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.nifi.record.listen;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
Expand All @@ -27,6 +26,7 @@
import java.io.InputStream;
import java.net.InetAddress;
import java.nio.channels.SocketChannel;
import java.util.Collections;

/**
* Encapsulates a SocketChannel and a RecordReader created for the given channel.
Expand All @@ -48,13 +48,13 @@ public StandardSocketChannelRecordReader(final SocketChannel socketChannel,
}

@Override
public RecordReader createRecordReader(final FlowFile flowFile, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
public RecordReader createRecordReader(final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
if (recordReader != null) {
bbende marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalStateException("Cannot create RecordReader because already created");
}

final InputStream in = socketChannel.socket().getInputStream();
recordReader = readerFactory.createRecordReader(flowFile, in, logger);
recordReader = readerFactory.createRecordReader(Collections.emptyMap(), in, -1, logger);
return recordReader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ private void configure(final PutHive3Streaming processor, final int numUsers, fi
runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
MockRecordParser readerFactory = new MockRecordParser() {
@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException {
if (failOnCreateReader) {
throw new SchemaNotFoundException("test");
}
return super.createRecordReader(variables, in, logger);
return super.createRecordReader(variables, in, inputLength, logger);
}
};
final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,6 @@
*/
package org.apache.nifi.processors.kafka.pubsub;

import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.REL_PARSE_FAILURE;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.REL_SUCCESS;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

import javax.xml.bind.DatatypeConverter;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -60,6 +37,28 @@
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;

import javax.xml.bind.DatatypeConverter;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.REL_PARSE_FAILURE;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.REL_SUCCESS;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;

/**
* This class represents a lease to access a Kafka Consumer object. The lease is
* intended to be obtained from a ConsumerPool. The lease is closeable to allow
Expand Down Expand Up @@ -471,7 +470,7 @@ private void writeRecordData(final ProcessSession session, final List<ConsumerRe
final Record firstRecord;

try {
reader = readerFactory.createRecordReader(Collections.emptyMap(), in, logger);
reader = readerFactory.createRecordReader(Collections.emptyMap(), in, recordBytes.length, logger);
firstRecord = reader.nextRecord();
} catch (final Exception e) {
handleParseFailure.accept(consumerRecord, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
@Override
public void process(final InputStream in) throws IOException {
try {
final RecordReader reader = readerFactory.createRecordReader(attributes, in, getLogger());
final RecordReader reader = readerFactory.createRecordReader(attributes, in, flowFile.getSize(), getLogger());
final RecordSet recordSet = reader.createRecordSet();

final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,6 @@

package org.apache.nifi.processors.kafka.pubsub;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -57,6 +41,22 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;


public class TestPublisherLease {
private ComponentLog logger;
Expand Down Expand Up @@ -270,7 +270,7 @@ public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException,
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);

final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), logger);
final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), -1, logger);
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = reader.getSchema();

Expand Down
Loading