Skip to content

Commit

Permalink
NIFI-6089 Add Parquet record reader and writer
Browse files Browse the repository at this point in the history
NIFI-5755 Allow PutParquet processor to specify avro write configuration
Review feedback
Additional review feedback

This closes apache#3679

Signed-off-by: Mike Thomsen <[email protected]>
  • Loading branch information
bbende authored and patricker committed Jan 22, 2020
1 parent 6ffe634 commit 552892a
Show file tree
Hide file tree
Showing 69 changed files with 1,339 additions and 314 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
Expand All @@ -74,7 +75,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;

import static org.apache.nifi.util.StringUtils.isEmpty;

Expand Down Expand Up @@ -454,9 +454,9 @@ private void writeRecords(PartitionContext context, Iterable<EventData> messages

try (final OutputStream out = session.write(flowFile)) {
for (final EventData eventData : messages) {

try (final InputStream in = new ByteArrayInputStream(eventData.getBytes())) {
final RecordReader reader = readerFactory.createRecordReader(schemaRetrievalVariables, in, logger);
final byte[] eventDataBytes = eventData.getBytes();
try (final InputStream in = new ByteArrayInputStream(eventDataBytes)) {
final RecordReader reader = readerFactory.createRecordReader(schemaRetrievalVariables, in, eventDataBytes.length, logger);

Record record;
while ((record = reader.nextRecord()) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -204,7 +205,7 @@ private void setupRecordReader(List<EventData> eventDataList, int throwException
final RecordReaderFactory readerFactory = mock(RecordReaderFactory.class);
processor.setReaderFactory(readerFactory);
final RecordReader reader = mock(RecordReader.class);
when(readerFactory.createRecordReader(anyMap(), any(), any())).thenReturn(reader);
when(readerFactory.createRecordReader(anyMap(), any(), anyLong(), any())).thenReturn(reader);
final List<Record> recordList = eventDataList.stream()
.map(eventData -> toRecord(new String(eventData.getBytes())))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFai
recordReaderVariables.put(k, value.toString());
}
});
return new Tuple<>(null, readerFactory.createRecordReader(recordReaderVariables, in, getLogger()));
return new Tuple<>(null, readerFactory.createRecordReader(recordReaderVariables, in, -1, getLogger()));
} catch (Exception e) {
return new Tuple<>(e, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public ArrayListRecordReader(final RecordSchema schema) {
}

@Override
public ArrayListReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) {
public ArrayListReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger) {
return new ArrayListReader(records, schema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@

package org.apache.nifi.serialization.record;

import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -26,14 +34,6 @@
import java.util.List;
import java.util.Map;

import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;

public class CommaSeparatedRecordReader extends AbstractControllerService implements RecordReaderFactory {
private int failAfterN;
private int recordCount = 0;
Expand All @@ -51,7 +51,7 @@ public void failAfter(final int failAfterN) {
}

@Override
public RecordReader createRecordReader(Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, SchemaNotFoundException {
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));

final List<RecordField> fields = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@

package org.apache.nifi.serialization.record;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
Expand All @@ -33,6 +25,14 @@
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory {
private final List<Object[]> records = new ArrayList<>();
private final List<RecordField> fields = new ArrayList<>();
Expand Down Expand Up @@ -67,7 +67,7 @@ public void addRecord(Object... values) {
}

@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException {
final Iterator<Object[]> itr = records.iterator();

return new RecordReader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.nifi.record.listen;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
Expand All @@ -30,6 +29,7 @@
import java.io.InputStream;
import java.net.InetAddress;
import java.nio.channels.SocketChannel;
import java.util.Collections;

/**
* Encapsulates an SSLSocketChannel and a RecordReader created for the given channel.
Expand All @@ -54,14 +54,14 @@ public SSLSocketChannelRecordReader(final SocketChannel socketChannel,
}

@Override
public RecordReader createRecordReader(final FlowFile flowFile, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
public RecordReader createRecordReader(final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
if (recordReader != null) {
throw new IllegalStateException("Cannot create RecordReader because already created");
}

final InputStream socketIn = new SSLSocketChannelInputStream(sslSocketChannel);
final InputStream in = new BufferedInputStream(socketIn);
recordReader = readerFactory.createRecordReader(flowFile, in, logger);
recordReader = readerFactory.createRecordReader(Collections.emptyMap(), in, -1, logger);
return recordReader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.nifi.record.listen;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
Expand All @@ -32,18 +31,14 @@
public interface SocketChannelRecordReader extends Closeable {

/**
* Currently a RecordReader can only be created with a FlowFile. Since we won't have a FlowFile at the time
* a connection is accepted, this method will be used to lazily create the RecordReader later. Eventually this
* method should be removed and the reader should be passed in through the constructor.
* Lazily creates the RecordReader.
*
*
* @param flowFile the flow file we are creating the reader for
* @param logger the logger of the component creating the reader
* @return a RecordReader
*
* @throws IllegalStateException if create is called after a reader has already been created
*/
RecordReader createRecordReader(final FlowFile flowFile, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException;
RecordReader createRecordReader(ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException;

/**
* @return the RecordReader created by calling createRecordReader, or null if one has not been created yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.nifi.record.listen;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
Expand All @@ -27,6 +26,7 @@
import java.io.InputStream;
import java.net.InetAddress;
import java.nio.channels.SocketChannel;
import java.util.Collections;

/**
* Encapsulates a SocketChannel and a RecordReader created for the given channel.
Expand All @@ -48,13 +48,13 @@ public StandardSocketChannelRecordReader(final SocketChannel socketChannel,
}

@Override
public RecordReader createRecordReader(final FlowFile flowFile, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
public RecordReader createRecordReader(final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
if (recordReader != null) {
throw new IllegalStateException("Cannot create RecordReader because already created");
}

final InputStream in = socketChannel.socket().getInputStream();
recordReader = readerFactory.createRecordReader(flowFile, in, logger);
recordReader = readerFactory.createRecordReader(Collections.emptyMap(), in, -1, logger);
return recordReader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ private void configure(final PutHive3Streaming processor, final int numUsers, fi
runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
MockRecordParser readerFactory = new MockRecordParser() {
@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException {
if (failOnCreateReader) {
throw new SchemaNotFoundException("test");
}
return super.createRecordReader(variables, in, logger);
return super.createRecordReader(variables, in, inputLength, logger);
}
};
final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,6 @@
*/
package org.apache.nifi.processors.kafka.pubsub;

import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.REL_PARSE_FAILURE;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.REL_SUCCESS;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

import javax.xml.bind.DatatypeConverter;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -60,6 +37,28 @@
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;

import javax.xml.bind.DatatypeConverter;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.REL_PARSE_FAILURE;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.REL_SUCCESS;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;

/**
* This class represents a lease to access a Kafka Consumer object. The lease is
* intended to be obtained from a ConsumerPool. The lease is closeable to allow
Expand Down Expand Up @@ -471,7 +470,7 @@ private void writeRecordData(final ProcessSession session, final List<ConsumerRe
final Record firstRecord;

try {
reader = readerFactory.createRecordReader(Collections.emptyMap(), in, logger);
reader = readerFactory.createRecordReader(Collections.emptyMap(), in, recordBytes.length, logger);
firstRecord = reader.nextRecord();
} catch (final Exception e) {
handleParseFailure.accept(consumerRecord, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
@Override
public void process(final InputStream in) throws IOException {
try {
final RecordReader reader = readerFactory.createRecordReader(attributes, in, getLogger());
final RecordReader reader = readerFactory.createRecordReader(attributes, in, flowFile.getSize(), getLogger());
final RecordSet recordSet = reader.createRecordSet();

final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,6 @@

package org.apache.nifi.processors.kafka.pubsub;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -57,6 +41,22 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;


public class TestPublisherLease {
private ComponentLog logger;
Expand Down Expand Up @@ -270,7 +270,7 @@ public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException,
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);

final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), logger);
final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), -1, logger);
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = reader.getSchema();

Expand Down
Loading

0 comments on commit 552892a

Please sign in to comment.