Skip to content

Commit

Permalink
Update to restore Block behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
nongli committed Jan 20, 2017
1 parent 27b3909 commit 7c6f7ef
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 46 deletions.
8 changes: 7 additions & 1 deletion format/File.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,15 @@ table Footer {

struct Block {

/// Index to the start of the RecordBlock (note this is past the Message header)
offset: long;

length: int;
/// Length of the metadata
metaDataLength: int;

/// Length of the data (this is aligned so there can be a gap between this and
/// the metatdata).
bodyLength: int;
}

root_type Footer;
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,39 @@
public class ArrowBlock implements FBSerializable {

private final long offset;
private final int length;
private final int metadataLength;
private final int bodyLength;

public ArrowBlock(long offset, int length) {
public ArrowBlock(long offset, int metadataLength, int bodyLength) {
super();
this.offset = offset;
this.length = length;
this.metadataLength = metadataLength;
this.bodyLength = bodyLength;
}

public long getOffset() {
return offset;
}

public int getLength() {
return length;
public int getMetadataLength() {
return metadataLength;
}

public int getBodyLength() {
return bodyLength;
}

@Override
public int writeTo(FlatBufferBuilder builder) {
return Block.createBlock(builder, offset, length);
return Block.createBlock(builder, offset, metadataLength, bodyLength);
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (length ^ (length >>> 32));
result = prime * result + bodyLength;
result = prime * result + metadataLength;
result = prime * result + (int) (offset ^ (offset >>> 32));
return result;
}
Expand All @@ -64,7 +71,9 @@ public boolean equals(Object obj) {
if (getClass() != obj.getClass())
return false;
ArrowBlock other = (ArrowBlock) obj;
if (length != other.length)
if (bodyLength != other.bodyLength)
return false;
if (metadataLength != other.metadataLength)
return false;
if (offset != other.offset)
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,10 @@ public ArrowFooter(Footer footer) {
private static List<ArrowBlock> recordBatches(Footer footer) {
List<ArrowBlock> recordBatches = new ArrayList<>();
Block tempBlock = new Block();

int recordBatchesLength = footer.recordBatchesLength();
for (int i = 0; i < recordBatchesLength; i++) {
Block block = footer.recordBatches(tempBlock, i);
recordBatches.add(new ArrowBlock(block.offset(), block.length()));
recordBatches.add(new ArrowBlock(block.offset(), block.metaDataLength(), block.bodyLength()));
}
return recordBatches;
}
Expand All @@ -71,7 +70,7 @@ private static List<ArrowBlock> dictionaries(Footer footer) {
int dictionariesLength = footer.dictionariesLength();
for (int i = 0; i < dictionariesLength; i++) {
Block block = footer.dictionaries(tempBlock, i);
dictionaries.add(new ArrowBlock(block.offset(), block.length()));
dictionaries.add(new ArrowBlock(block.offset(), block.metaDataLength(), block.bodyLength()));
}
return dictionaries;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,12 @@ public ArrowFooter readFooter() throws IOException {
// TODO: read dictionaries

public ArrowRecordBatch readRecordBatch(ArrowBlock block) throws IOException {
LOGGER.debug(String.format("RecordBatch at offset %d len: %d",
block.getOffset(), block.getLength()));
LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d",
block.getOffset(), block.getMetadataLength(),
block.getBodyLength()));
in.position(block.getOffset());
ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(
new ReadChannel(in, block.getOffset()), (int)block.getLength(), allocator);
new ReadChannel(in, block.getOffset()), block, allocator);
if (batch == null) {
throw new IOException("Invalid file. No batch at offset: " + block.getOffset());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ private void start() throws IOException {
public void writeRecordBatch(ArrowRecordBatch recordBatch) throws IOException {
checkStarted();
ArrowBlock batchDesc = MessageSerializer.serialize(out, recordBatch);
LOGGER.debug(String.format("RecordBatch at offset: %d len: %d",
batchDesc.getOffset(), batchDesc.getLength()));
LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d",
batchDesc.getOffset(), batchDesc.getMetadataLength(), batchDesc.getBodyLength()));

// add metadata to footer
recordBatches.add(batchDesc);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,21 @@ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch)

// Write batch header. with the 4 byte little endian prefix
out.writeIntLittleEndian(metadata.remaining());
int metadataSize = metadata.remaining();
long batchStart = out.getCurrentPosition();
out.write(metadata);

// Align the output to 8 byte boundary.
out.align();

long offset = out.getCurrentPosition();
long bufferStart = out.getCurrentPosition();
List<ArrowBuf> buffers = batch.getBuffers();
List<ArrowBuffer> buffersLayout = batch.getBuffersLayout();

for (int i = 0; i < buffers.size(); i++) {
ArrowBuf buffer = buffers.get(i);
ArrowBuffer layout = buffersLayout.get(i);
long startPosition = offset + layout.getOffset();
long startPosition = bufferStart + layout.getOffset();
if (startPosition != out.getCurrentPosition()) {
out.writeZeros((int)(startPosition - out.getCurrentPosition()));
}
Expand All @@ -152,7 +154,7 @@ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch)
" != " + startPosition + layout.getSize());
}
}
return new ArrowBlock(start, (int)(out.getCurrentPosition() - start));
return new ArrowBlock(batchStart, metadataSize, (int)(out.getCurrentPosition() - bufferStart));
}

/**
Expand All @@ -170,48 +172,45 @@ public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in,
if (in.readFully(buffer, messageLen) != messageLen) {
throw new IOException("Unexpected end of input trying to read batch.");
}
return deserializeRecordBatch(buffer, readPosition, messageLen);

// Read the length of the metadata.
int metadataLen = buffer.readInt();
buffer = buffer.slice(4, messageLen - 4);
readPosition += 4;
messageLen -= 4;
return deserializeRecordBatch(buffer, readPosition, metadataLen, messageLen);
}

/**
* Deserializes a RecordBatch knowing the size of the entire message up front. This
* minimizes the number of reads to the underlying stream.
*/
public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, int messageLen,
public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, ArrowBlock block,
BufferAllocator alloc) throws IOException {
ArrowBuf buffer = alloc.buffer(messageLen);
long readPosition = in.getCurrentPositiion();
if (in.readFully(buffer, messageLen) != messageLen) {
throw new IOException("Unexpected end of input trying to read batch.");
int totalLen = block.getMetadataLength() + block.getBodyLength();
if ((readPosition + block.getMetadataLength()) % 8 != 0) {
// Compute padded size.
totalLen += (8 - (readPosition + block.getMetadataLength()) % 8);
}

byte[] headerLenBytes = new byte[4];
buffer.getBytes(0, headerLenBytes);
int headerLen = bytesToInt(headerLenBytes);
buffer = buffer.slice(4, messageLen - 4);
messageLen -=4;
readPosition += 4;

Message header = Message.getRootAsMessage(buffer.nioBuffer());
if (header.headerType() != MessageHeader.RecordBatch) {
throw new IOException("Invalid message: expecting " + MessageHeader.RecordBatch +
". Message contained: " + header.headerType());
ArrowBuf buffer = alloc.buffer(totalLen);
if (in.readFully(buffer, totalLen) != totalLen) {
throw new IOException("Unexpected end of input trying to read batch.");
}

buffer = buffer.slice(headerLen, messageLen - headerLen);
messageLen -= headerLen;
readPosition += headerLen;
return deserializeRecordBatch(buffer, readPosition, messageLen);
return deserializeRecordBatch(buffer, readPosition, block.getMetadataLength(), totalLen);
}

// Deserializes a record batch. Buffer should start at the RecordBatch and include
// all the bytes for the metadata and then data buffers.
private static ArrowRecordBatch deserializeRecordBatch(
ArrowBuf buffer, long readPosition, int bufferLen) {
// Read the metadata. It starts with the 4 byte size of the metadata.
int metadataSize = buffer.readInt();
ArrowBuf buffer, long readPosition, int metadataLen, int bufferLen) {
// Read the metadata.
RecordBatch recordBatchFB =
RecordBatch.getRootAsRecordBatch(buffer.nioBuffer().asReadOnlyBuffer());

int bufferOffset = 4 + metadataSize;
int bufferOffset = metadataLen;
readPosition += bufferOffset;
if (readPosition % 8 != 0) {
bufferOffset += (int)(8 - readPosition % 8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public void test() {
assertEquals(footer, newFooter);

List<ArrowBlock> ids = new ArrayList<>();
ids.add(new ArrowBlock(0, 1));
ids.add(new ArrowBlock(4, 5));
ids.add(new ArrowBlock(0, 1, 2));
ids.add(new ArrowBlock(4, 5, 6));
footer = new ArrowFooter(schema, ids, ids);
assertEquals(footer, roundTrip(footer));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.util.Collections;
import java.util.List;

import org.apache.arrow.flatbuf.FieldNode;
import org.apache.arrow.flatbuf.RecordBatch;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.schema.ArrowFieldNode;
Expand Down Expand Up @@ -96,6 +99,17 @@ public void test() throws IOException {
assertArrayEquals(validity, array(buffers.get(0)));
assertArrayEquals(values, array(buffers.get(1)));

// Read just the header. This demonstrates being able to read without need to
// deserialize the buffer.
ByteBuffer headerBuffer = ByteBuffer.allocate(recordBatches.get(0).getMetadataLength());
headerBuffer.put(byteArray, (int)recordBatches.get(0).getOffset(), headerBuffer.capacity());
headerBuffer.rewind();
RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(headerBuffer);
assertEquals(2, recordBatchFB.buffersLength());
assertEquals(1, recordBatchFB.nodesLength());
FieldNode nodeFB = recordBatchFB.nodes(0);
assertEquals(16, nodeFB.length());
assertEquals(8, nodeFB.nullCount());
}
}

Expand Down

0 comments on commit 7c6f7ef

Please sign in to comment.