diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java index 2e74a74bd45b..3b7824fa449e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java @@ -22,14 +22,11 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; -import java.io.ByteArrayInputStream; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.InvalidObjectException; import java.io.ObjectInputStream; import java.io.ObjectStreamException; -import java.io.PushbackInputStream; import java.io.Serializable; import java.nio.ByteBuffer; import java.nio.channels.Channels; @@ -37,14 +34,14 @@ import java.nio.channels.SeekableByteChannel; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Iterator; import java.util.Map; import java.util.WeakHashMap; -import java.util.zip.Inflater; -import java.util.zip.InflaterInputStream; import javax.annotation.concurrent.GuardedBy; import org.apache.avro.Schema; -import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileConstants; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.SeekableInput; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; @@ -63,14 +60,9 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.VarInt; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; -import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; -import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream; -import org.apache.commons.compress.compressors.xz.XZCompressorInputStream; -import org.apache.commons.compress.utils.CountingInputStream; -import org.apache.commons.compress.utils.IOUtils; import org.checkerframework.checker.nullness.qual.Nullable; // CHECKSTYLE.OFF: JavadocStyle @@ -525,7 +517,7 @@ private static synchronized String internSchemaString(String schema) { return schema; } - private static synchronized Schema internOrParseSchemaString(String schemaString) { + static synchronized Schema internOrParseSchemaString(String schemaString) { Schema schema = schemaLogicalReferenceCache.get(schemaString); if (schema != null) { return schema; @@ -561,10 +553,6 @@ private Object readResolve() throws ObjectStreamException { */ @Experimental(Kind.SOURCE_SINK) static class AvroBlock extends Block { - private final Mode mode; - - // The number of records in the block. - private final long numRecords; // The current record in the block. Initialized in readNextRecord. private @Nullable T currentRecord; @@ -572,66 +560,17 @@ static class AvroBlock extends Block { // The index of the current record in the block. private long currentRecordIndex = 0; - // A DatumReader to read records from the block. - private final DatumReader reader; + private final Iterator iterator; - // A BinaryDecoder used by the reader to decode records. - private final BinaryDecoder decoder; + private final SerializableFunction parseFn; - /** - * Decodes a byte array as an InputStream. The byte array may be compressed using some codec. - * Reads from the returned stream will result in decompressed bytes. - * - *

This supports the same codecs as Avro's {@link CodecFactory}, namely those defined in - * {@link DataFileConstants}. - * - *

    - *
  • "snappy" : Google's Snappy compression - *
  • "deflate" : deflate compression - *
  • "bzip2" : Bzip2 compression - *
  • "xz" : xz compression - *
  • "null" (the string, not the value): Uncompressed data - *
- */ - private static InputStream decodeAsInputStream(byte[] data, String codec) throws IOException { - ByteArrayInputStream byteStream = new ByteArrayInputStream(data); - switch (codec) { - case DataFileConstants.SNAPPY_CODEC: - return new SnappyCompressorInputStream(byteStream, 1 << 16 /* Avro uses 64KB blocks */); - case DataFileConstants.DEFLATE_CODEC: - // nowrap == true: Do not expect ZLIB header or checksum, as Avro does not write them. - Inflater inflater = new Inflater(true); - return new InflaterInputStream(byteStream, inflater); - case DataFileConstants.XZ_CODEC: - return new XZCompressorInputStream(byteStream); - case DataFileConstants.BZIP2_CODEC: - return new BZip2CompressorInputStream(byteStream); - case DataFileConstants.NULL_CODEC: - return byteStream; - default: - throw new IllegalArgumentException("Unsupported codec: " + codec); - } - } + private final long numRecordsInBlock; - AvroBlock(byte[] data, long numRecords, Mode mode, String writerSchemaString, String codec) - throws IOException { - this.mode = mode; - this.numRecords = numRecords; - checkNotNull(writerSchemaString, "writerSchemaString"); - Schema writerSchema = internOrParseSchemaString(writerSchemaString); - Schema readerSchema = - internOrParseSchemaString( - MoreObjects.firstNonNull(mode.readerSchemaString, writerSchemaString)); - - this.reader = mode.createReader(writerSchema, readerSchema); - - if (codec.equals(DataFileConstants.NULL_CODEC)) { - // Avro can read from a byte[] using a more efficient implementation. If the input is not - // compressed, pass the data in directly. - this.decoder = DecoderFactory.get().binaryDecoder(data, null); - } else { - this.decoder = DecoderFactory.get().binaryDecoder(decodeAsInputStream(data, codec), null); - } + AvroBlock( + Iterator iter, SerializableFunction parseFn, long numRecordsInBlock) { + this.iterator = iter; + this.parseFn = parseFn; + this.numRecordsInBlock = numRecordsInBlock; } @Override @@ -640,20 +579,20 @@ public T getCurrentRecord() { } @Override - public boolean readNextRecord() throws IOException { - if (currentRecordIndex >= numRecords) { + public boolean readNextRecord() { + if (currentRecordIndex >= numRecordsInBlock) { return false; } - Object record = reader.read(null, decoder); - currentRecord = - (mode.parseFn == null) ? ((T) record) : mode.parseFn.apply((GenericRecord) record); + + Object record = iterator.next(); + currentRecord = (parseFn == null) ? ((T) record) : parseFn.apply((GenericRecord) record); currentRecordIndex++; return true; } @Override public double getFractionOfBlockConsumed() { - return ((double) currentRecordIndex) / numRecords; + return ((double) currentRecordIndex) / numRecordsInBlock; } } @@ -669,13 +608,49 @@ public double getFractionOfBlockConsumed() { */ @Experimental(Kind.SOURCE_SINK) public static class AvroReader extends BlockBasedReader { - // Initialized in startReading. - private @Nullable AvroMetadata metadata; + + private static class SeekableChannelInput implements SeekableInput { + + private final SeekableByteChannel channel; + private final InputStream input; + + SeekableChannelInput(SeekableByteChannel channel) { + this.channel = channel; + this.input = Channels.newInputStream(channel); + } + + @Override + public void seek(long p) throws IOException { + channel.position(p); + } + + @Override + public long tell() throws IOException { + return channel.position(); + } + + @Override + public long length() throws IOException { + return channel.size(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return input.read(b, off, len); + } + + @Override + public void close() throws IOException { + channel.close(); + } + } // The current block. // Initialized in readNextRecord. private @Nullable AvroBlock currentBlock; + private @Nullable DataFileReader dataFileReader; + // A lock used to synchronize block offsets for getRemainingParallelism private final Object progressLock = new Object(); @@ -687,20 +662,6 @@ public static class AvroReader extends BlockBasedReader { @GuardedBy("progressLock") private long currentBlockSizeBytes = 0; - // Stream used to read from the underlying file. - // A pushback stream is used to restore bytes buffered during seeking. - // Initialized in startReading. - private @Nullable PushbackInputStream stream; - - // Counts the number of bytes read. Used only to tell how many bytes are taken up in - // a block's variable-length header. - // Initialized in startReading. - private @Nullable CountingInputStream countStream; - - // Caches the Avro DirectBinaryDecoder used to decode binary-encoded values from the buffer. - // Initialized in readNextBlock. - private @Nullable BinaryDecoder decoder; - /** Reads Avro records of type {@code T} from the specified source. */ public AvroReader(AvroSource source) { super(source); @@ -717,71 +678,25 @@ public synchronized AvroSource getCurrentSource() { // // Postcondition: same as above, but for the new current (formerly next) block. @Override - public boolean readNextBlock() throws IOException { - long startOfNextBlock; - synchronized (progressLock) { - startOfNextBlock = currentBlockOffset + currentBlockSizeBytes; - } - - // Before reading the variable-sized block header, record the current number of bytes read. - long preHeaderCount = countStream.getBytesRead(); - decoder = DecoderFactory.get().directBinaryDecoder(countStream, decoder); - long numRecords; - try { - numRecords = decoder.readLong(); - } catch (EOFException e) { - // Expected for the last block, at which the start position is the EOF. The way to detect - // stream ending is to try reading from it. + public boolean readNextBlock() { + if (!dataFileReader.hasNext()) { return false; } - long blockSize = decoder.readLong(); - - // Mark header size as the change in the number of bytes read. - long headerSize = countStream.getBytesRead() - preHeaderCount; - - // Create the current block by reading blockSize bytes. Block sizes permitted by the Avro - // specification are [32, 2^30], so the cast is safe. - byte[] data = new byte[(int) blockSize]; - int bytesRead = IOUtils.readFully(stream, data); - checkState( - blockSize == bytesRead, - "Only able to read %s/%s bytes in the block before EOF reached.", - bytesRead, - blockSize); + + long headerLength = + (long) VarInt.getLength(dataFileReader.getBlockCount()) + + VarInt.getLength(dataFileReader.getBlockSize()) + + DataFileConstants.SYNC_SIZE; + currentBlock = new AvroBlock<>( - data, - numRecords, - getCurrentSource().mode, - metadata.getSchemaString(), - metadata.getCodec()); - - // Read the end of this block, which MUST be a sync marker for correctness. - byte[] syncMarker = metadata.getSyncMarker(); - byte[] readSyncMarker = new byte[syncMarker.length]; - long syncMarkerOffset = startOfNextBlock + headerSize + blockSize; - bytesRead = IOUtils.readFully(stream, readSyncMarker); - checkState( - bytesRead == syncMarker.length, - "Only able to read %s/%s bytes of Avro sync marker at position %s before EOF reached.", - bytesRead, - syncMarker.length, - syncMarkerOffset); - if (!Arrays.equals(syncMarker, readSyncMarker)) { - throw new IllegalStateException( - String.format( - "Expected the bytes [%d,%d) in file %s to be a sync marker, but found %s", - syncMarkerOffset, - syncMarkerOffset + syncMarker.length, - getCurrentSource().getFileOrPatternSpec(), - Arrays.toString(readSyncMarker))); - } + dataFileReader, getCurrentSource().mode.parseFn, dataFileReader.getBlockCount()); // Atomically update both the position and offset of the new block. synchronized (progressLock) { - currentBlockOffset = startOfNextBlock; + currentBlockOffset = dataFileReader.previousSync(); // Total block size includes the header, block content, and trailing sync marker. - currentBlockSizeBytes = headerSize + blockSize + syncMarker.length; + currentBlockSizeBytes = dataFileReader.getBlockSize() + headerLength; } return true; @@ -820,124 +735,38 @@ public long getSplitPointsRemaining() { return super.getSplitPointsRemaining(); } - /** - * Creates a {@link PushbackInputStream} that has a large enough pushback buffer to be able to - * push back the syncBuffer. - */ - private PushbackInputStream createStream(ReadableByteChannel channel) { - return new PushbackInputStream( - Channels.newInputStream(channel), metadata.getSyncMarker().length); - } - // Postcondition: the stream is positioned at the beginning of the first block after the start // of the current source, and currentBlockOffset is that position. Additionally, // currentBlockSizeBytes will be set to 0 indicating that the previous block was empty. @Override protected void startReading(ReadableByteChannel channel) throws IOException { - try { - metadata = readMetadataFromFile(getCurrentSource().getSingleFileMetadata().resourceId()); - } catch (IOException e) { - throw new RuntimeException( - "Error reading metadata from file " + getCurrentSource().getSingleFileMetadata(), e); + SeekableChannelInput seekableChannelInput = + new SeekableChannelInput((SeekableByteChannel) channel); + // the channel needs to be at the beginning of the file in order for the DataFileReader to + // read the header, etc, we'll seek it back to where it should be after creating the DFR. + seekableChannelInput.seek(0); + + Schema readerSchema = null; + String readerSchemaString = this.getCurrentSource().getReaderSchemaString(); + if (readerSchemaString != null) { + readerSchema = AvroSource.internOrParseSchemaString(readerSchemaString); } + // the DataFileReader will call setSchema with the writer schema when created. + DatumReader reader = this.getCurrentSource().mode.createReader(readerSchema, readerSchema); - long startOffset = getCurrentSource().getStartOffset(); - byte[] syncMarker = metadata.getSyncMarker(); - long syncMarkerLength = syncMarker.length; + dataFileReader = new DataFileReader<>(seekableChannelInput, reader); + long startOffset = getCurrentSource().getStartOffset(); if (startOffset != 0) { - // Rewind order to find the sync marker ending the previous block. - long position = Math.max(0, startOffset - syncMarkerLength); - ((SeekableByteChannel) channel).position(position); - startOffset = position; + // the start offset may be in the middle of a sync marker, by rewinding SYNC_SIZE bytes we + // ensure that we won't miss the block if so. + dataFileReader.sync(Math.max(0, startOffset - DataFileConstants.SYNC_SIZE)); } - // Satisfy the post condition. - stream = createStream(channel); - countStream = new CountingInputStream(stream); synchronized (progressLock) { - currentBlockOffset = startOffset + advancePastNextSyncMarker(stream, syncMarker); + currentBlockOffset = dataFileReader.previousSync(); currentBlockSizeBytes = 0; } } - - /** - * Advances to the first byte after the next occurrence of the sync marker in the stream when - * reading from the current offset. Returns the number of bytes consumed from the stream. Note - * that this method requires a PushbackInputStream with a buffer at least as big as the marker - * it is seeking for. - */ - static long advancePastNextSyncMarker(PushbackInputStream stream, byte[] syncMarker) - throws IOException { - Seeker seeker = new Seeker(syncMarker); - byte[] syncBuffer = new byte[syncMarker.length]; - long totalBytesConsumed = 0; - // Seek until either a sync marker is found or we reach the end of the file. - int mark = -1; // Position of the last byte in the sync marker. - int read; // Number of bytes read. - do { - read = stream.read(syncBuffer); - if (read >= 0) { - mark = seeker.find(syncBuffer, read); - // Update the currentOffset with the number of bytes read. - totalBytesConsumed += read; - } - } while (mark < 0 && read > 0); - - // If the sync marker was found, unread block data and update the current offsets. - if (mark >= 0) { - // The current offset after this call should be just past the sync marker, so we should - // unread the remaining buffer contents and update the currentOffset accordingly. - stream.unread(syncBuffer, mark + 1, read - (mark + 1)); - totalBytesConsumed = totalBytesConsumed - (read - (mark + 1)); - } - return totalBytesConsumed; - } - - /** - * A {@link Seeker} looks for a given marker within a byte buffer. Uses naive string matching - * with a sliding window, as sync markers are small and random. - */ - static class Seeker { - // The marker to search for. - private byte[] marker; - - // Buffer used for the sliding window. - private byte[] searchBuffer; - - // Number of bytes available to be matched in the buffer. - private int available = 0; - - /** Create a {@link Seeker} that looks for the given marker. */ - public Seeker(byte[] marker) { - this.marker = marker; - this.searchBuffer = new byte[marker.length]; - } - - /** - * Find the marker in the byte buffer. Returns the index of the end of the marker in the - * buffer. If the marker is not found, returns -1. - * - *

State is maintained between calls. If the marker was partially matched, a subsequent - * call to find will resume matching the marker. - * - * @param buffer - * @return the index of the end of the marker within the buffer, or -1 if the buffer was not - * found. - */ - public int find(byte[] buffer, int length) { - for (int i = 0; i < length; i++) { - System.arraycopy(searchBuffer, 1, searchBuffer, 0, searchBuffer.length - 1); - searchBuffer[searchBuffer.length - 1] = buffer[i]; - available = Math.min(available + 1, searchBuffer.length); - if (ByteBuffer.wrap(searchBuffer, searchBuffer.length - available, available) - .equals(ByteBuffer.wrap(marker))) { - available = 0; - return i; - } - } - return -1; - } - } } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java index c58872ea13eb..c9afb4713b3f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java @@ -25,11 +25,9 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; -import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.io.PushbackInputStream; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -52,8 +50,6 @@ import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.io.AvroSource.AvroMetadata; -import org.apache.beam.sdk.io.AvroSource.AvroReader; -import org.apache.beam.sdk.io.AvroSource.AvroReader.Seeker; import org.apache.beam.sdk.io.BlockBasedSource.BlockBasedReader; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; @@ -592,168 +588,6 @@ private void assertEqualsWithGeneric(List expected, List ac } } - /** - * Creates a haystack byte array of the give size with a needle that starts at the given position. - */ - private byte[] createHaystack(byte[] needle, int position, int size) { - byte[] haystack = new byte[size]; - for (int i = position, j = 0; i < size && j < needle.length; i++, j++) { - haystack[i] = needle[j]; - } - return haystack; - } - - /** - * Asserts that advancePastNextSyncMarker advances an input stream past a sync marker and - * correctly returns the number of bytes consumed from the stream. Creates a haystack of size - * bytes and places a 16-byte sync marker at the position specified. - */ - private void testAdvancePastNextSyncMarkerAt(int position, int size) throws IOException { - byte sentinel = (byte) 0xFF; - byte[] marker = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6}; - byte[] haystack = createHaystack(marker, position, size); - PushbackInputStream stream = - new PushbackInputStream(new ByteArrayInputStream(haystack), marker.length); - if (position + marker.length < size) { - haystack[position + marker.length] = sentinel; - assertEquals(position + marker.length, AvroReader.advancePastNextSyncMarker(stream, marker)); - assertEquals(sentinel, (byte) stream.read()); - } else { - assertEquals(size, AvroReader.advancePastNextSyncMarker(stream, marker)); - assertEquals(-1, stream.read()); - } - } - - @Test - public void testAdvancePastNextSyncMarker() throws IOException { - // Test placing the sync marker at different locations at the start and in the middle of the - // buffer. - for (int i = 0; i <= 16; i++) { - testAdvancePastNextSyncMarkerAt(i, 1000); - testAdvancePastNextSyncMarkerAt(160 + i, 1000); - } - // Test placing the sync marker at the end of the buffer. - testAdvancePastNextSyncMarkerAt(983, 1000); - // Test placing the sync marker so that it begins at the end of the buffer. - testAdvancePastNextSyncMarkerAt(984, 1000); - testAdvancePastNextSyncMarkerAt(985, 1000); - testAdvancePastNextSyncMarkerAt(999, 1000); - // Test with no sync marker. - testAdvancePastNextSyncMarkerAt(1000, 1000); - } - - // Tests for Seeker. - @Test - public void testSeekerFind() { - byte[] marker = {0, 1, 2, 3}; - byte[] buffer; - Seeker s; - s = new Seeker(marker); - - buffer = new byte[] {0, 1, 2, 3, 4, 5, 6, 7}; - assertEquals(3, s.find(buffer, buffer.length)); - - buffer = new byte[] {0, 0, 0, 0, 0, 1, 2, 3}; - assertEquals(7, s.find(buffer, buffer.length)); - - buffer = new byte[] {0, 1, 2, 0, 0, 1, 2, 3}; - assertEquals(7, s.find(buffer, buffer.length)); - - buffer = new byte[] {0, 1, 2, 3}; - assertEquals(3, s.find(buffer, buffer.length)); - } - - @Test - public void testSeekerFindResume() { - byte[] marker = {0, 1, 2, 3}; - byte[] buffer; - Seeker s; - s = new Seeker(marker); - - buffer = new byte[] {0, 0, 0, 0, 0, 0, 0, 0}; - assertEquals(-1, s.find(buffer, buffer.length)); - buffer = new byte[] {1, 2, 3, 0, 0, 0, 0, 0}; - assertEquals(2, s.find(buffer, buffer.length)); - - buffer = new byte[] {0, 0, 0, 0, 0, 0, 1, 2}; - assertEquals(-1, s.find(buffer, buffer.length)); - buffer = new byte[] {3, 0, 1, 2, 3, 0, 1, 2}; - assertEquals(0, s.find(buffer, buffer.length)); - - buffer = new byte[] {0}; - assertEquals(-1, s.find(buffer, buffer.length)); - buffer = new byte[] {1}; - assertEquals(-1, s.find(buffer, buffer.length)); - buffer = new byte[] {2}; - assertEquals(-1, s.find(buffer, buffer.length)); - buffer = new byte[] {3}; - assertEquals(0, s.find(buffer, buffer.length)); - } - - @Test - public void testSeekerUsesBufferLength() { - byte[] marker = {0, 0, 1}; - byte[] buffer; - Seeker s; - s = new Seeker(marker); - - buffer = new byte[] {0, 0, 0, 1}; - assertEquals(-1, s.find(buffer, 3)); - - s = new Seeker(marker); - buffer = new byte[] {0, 0}; - assertEquals(-1, s.find(buffer, 1)); - buffer = new byte[] {1, 0}; - assertEquals(-1, s.find(buffer, 1)); - - s = new Seeker(marker); - buffer = new byte[] {0, 2}; - assertEquals(-1, s.find(buffer, 1)); - buffer = new byte[] {0, 2}; - assertEquals(-1, s.find(buffer, 1)); - buffer = new byte[] {1, 2}; - assertEquals(0, s.find(buffer, 1)); - } - - @Test - public void testSeekerFindPartial() { - byte[] marker = {0, 0, 1}; - byte[] buffer; - Seeker s; - s = new Seeker(marker); - - buffer = new byte[] {0, 0, 0, 1}; - assertEquals(3, s.find(buffer, buffer.length)); - - marker = new byte[] {1, 1, 1, 2}; - s = new Seeker(marker); - - buffer = new byte[] {1, 1, 1, 1, 1}; - assertEquals(-1, s.find(buffer, buffer.length)); - buffer = new byte[] {1, 1, 2}; - assertEquals(2, s.find(buffer, buffer.length)); - - buffer = new byte[] {1, 1, 1, 1, 1}; - assertEquals(-1, s.find(buffer, buffer.length)); - buffer = new byte[] {2, 1, 1, 1, 2}; - assertEquals(0, s.find(buffer, buffer.length)); - } - - @Test - public void testSeekerFindAllLocations() { - byte[] marker = {1, 1, 2}; - byte[] allOnes = new byte[] {1, 1, 1, 1}; - byte[] findIn = new byte[] {1, 1, 1, 1}; - Seeker s = new Seeker(marker); - - for (int i = 0; i < findIn.length; i++) { - assertEquals(-1, s.find(allOnes, allOnes.length)); - findIn[i] = 2; - assertEquals(i, s.find(findIn, findIn.length)); - findIn[i] = 1; - } - } - @Test public void testDisplayData() { AvroSource source =