Skip to content

Commit

Permalink
feat: initial inline and reference buffer set functionality added
Browse files Browse the repository at this point in the history
  • Loading branch information
vibhatha committed Feb 27, 2024
1 parent 089d710 commit 4a1d2fd
Show file tree
Hide file tree
Showing 3 changed files with 362 additions and 1 deletion.
1 change: 1 addition & 0 deletions java/memory/memory-core/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@
requires jsr305;
requires org.immutables.value;
requires org.slf4j;
requires org.checkerframework.checker.qual;
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public abstract class BaseVariableWidthViewVector extends BaseValueVector
private static final int DEFAULT_RECORD_BYTE_COUNT = 8;
private static final int INITIAL_BYTE_COUNT = INITIAL_VALUE_ALLOCATION * DEFAULT_RECORD_BYTE_COUNT;
private static final int MAX_BUFFER_SIZE = (int) Math.min(MAX_ALLOCATION_SIZE, Integer.MAX_VALUE);

private static final int INLINE_SIZE = 12;
private int lastValueCapacity;
private long lastValueAllocationSizeInBytes;

Expand All @@ -61,6 +63,121 @@ public abstract class BaseVariableWidthViewVector extends BaseValueVector
protected int lastSet;
protected final Field field;

protected List<ArrowBuf> dataBuffers;
protected List<ViewBuffer> views;

interface ViewBuffer {
}

static class InlineValueBuffer implements ViewBuffer {
private final ArrowBuf valueBuffer;
private final int length;

public InlineValueBuffer(ArrowBuf buffer, int length) {
this.valueBuffer = buffer;
this.length = length;
}

public ArrowBuf getValueBuffer() {
return valueBuffer;
}

public int getLength() {
return length;
}
}

static class ReferenceValueBuffer implements ViewBuffer {
private final int length;
private final byte[] prefix;
private final int bufId;
private final int offset;

public ReferenceValueBuffer(int length, byte[] prefix, int bufId, int offset) {
this.length = length;
this.prefix = prefix;
this.bufId = bufId;
this.offset = offset;
}

public int getLength() {
return length;
}

public byte[] getPrefix() {
return prefix;
}

public int getBufId() {
return bufId;
}

public int getOffset() {
return offset;
}
}

static class ViewBufferFactory {
private static ViewBuffer createInlineValueBuffer(ArrowBuf buffer, int length) {
return new InlineValueBuffer(buffer, length);
}

private static ViewBuffer createReferenceValueBuffer(int length, byte[] prefix, int bufId, int offset) {
return new ReferenceValueBuffer(length, prefix, bufId, offset);
}

private static void checkDataBufferSize(long size) {
if (size > MAX_BUFFER_SIZE || size < 0) {
throw new OversizedAllocationException("Memory required for vector " +
"is (" + size + "), which is overflow or more than max allowed (" + MAX_BUFFER_SIZE + "). " +
"You could consider using LargeVarCharVector/LargeVarBinaryVector for large strings/large bytes types");
}
}

private static ArrowBuf allocateViewDataBuffer(BufferAllocator allocator, int size) {
final long newAllocationSize = CommonUtil.nextPowerOfTwo(size);
assert newAllocationSize >= 1;
checkDataBufferSize(newAllocationSize);
return allocator.buffer(newAllocationSize);
}

public static ViewBuffer createValueBuffer(BufferAllocator allocator, byte[] value, int startOffset, int start,
int length,
List<ArrowBuf> dataBuffers) {
if (value.length <= INLINE_SIZE) {
ArrowBuf newBuffer = allocateViewDataBuffer(allocator, length);
newBuffer.setBytes(startOffset, value, start, length);
return new InlineValueBuffer(newBuffer, value.length);
} else {
byte [] prefix = new byte[4];
System.arraycopy(value, 0, prefix, 0, 4);
if (!dataBuffers.isEmpty()) {
// determine bufId
ArrowBuf currentBuf = dataBuffers.get(dataBuffers.size() - 1);
if (currentBuf.capacity() - currentBuf.writerIndex() >= length) {
currentBuf.setBytes(currentBuf.writerIndex(), value, start, length);
currentBuf.writerIndex(currentBuf.writerIndex() + length);
dataBuffers.set(dataBuffers.size() - 1, currentBuf);
return new ReferenceValueBuffer(value.length, prefix, dataBuffers.size() - 1,
(int) currentBuf.writerIndex());
} else {
ArrowBuf newBuffer = allocateViewDataBuffer(allocator, length);
newBuffer.setBytes(startOffset, value, start, length);
newBuffer.writerIndex(length);
dataBuffers.add(newBuffer);
return new ReferenceValueBuffer(value.length, prefix, dataBuffers.size() - 1, 0);
}
} else {
ArrowBuf newBuffer = allocateViewDataBuffer(allocator, INITIAL_VALUE_ALLOCATION);
newBuffer.setBytes(0, value, start, length);
newBuffer.writerIndex(length);
dataBuffers.add(newBuffer);
return new ReferenceValueBuffer(value.length, prefix, 0, 0);
}
}
}
}

/**
* Constructs a new instance.
*
Expand All @@ -78,6 +195,8 @@ public BaseVariableWidthViewVector(Field field, final BufferAllocator allocator)
offsetBuffer = allocator.getEmpty();
validityBuffer = allocator.getEmpty();
valueBuffer = allocator.getEmpty();
views = new ArrayList<>();
dataBuffers = new ArrayList<>();
}

@Override
Expand Down Expand Up @@ -271,10 +390,37 @@ public void clear() {
validityBuffer = releaseBuffer(validityBuffer);
valueBuffer = releaseBuffer(valueBuffer);
offsetBuffer = releaseBuffer(offsetBuffer);
clearDataBuffers();
clearViews();
lastSet = -1;
valueCount = 0;
}

/**
* Release the buffers in views and clear the views.
*/
public void clearViews() {
for (ViewBuffer view : views) {
if (view instanceof InlineValueBuffer) {
ArrowBuf valueBuf = ((InlineValueBuffer) view).valueBuffer;
if (valueBuf != null) {
valueBuf.getReferenceManager().release();
}
}
}
views.clear();
}

/**
* Release the data buffers and clear the list.
*/
public void clearDataBuffers() {
for (ArrowBuf buffer : dataBuffers) {
buffer.getReferenceManager().release();
}
dataBuffers.clear();
}

/**
* Get the inner vectors.
*
Expand Down Expand Up @@ -375,6 +521,34 @@ private void setReaderAndWriterIndex() {
}
}

private void setReaderAndWriteIndexForViews() {
if (valueCount == 0) {
for (ViewBuffer view : views) {
if (view instanceof InlineValueBuffer) {
((InlineValueBuffer) view).getValueBuffer().readerIndex(0);
((InlineValueBuffer) view).getValueBuffer().writerIndex(0);
} else {
for (ArrowBuf refBuffer : dataBuffers) {
refBuffer.readerIndex(0);
refBuffer.writerIndex(0);
}
}
}
} else {
final int lastDataOffset = getStartOffset(valueCount);
ViewBuffer lastBuffer = views.get(views.size() - 1);
if (lastBuffer instanceof InlineValueBuffer) {
// since we allocate a valueBuffer per each inline, this wouldn't make much sense
((InlineValueBuffer) lastBuffer).getValueBuffer().writerIndex(lastDataOffset);
} else {
for (ArrowBuf refBuffer : dataBuffers) {
refBuffer.writerIndex(lastDataOffset);
}
}
}

}

/**
* Same as {@link #allocateNewSafe()}.
*/
Expand Down Expand Up @@ -956,6 +1130,7 @@ public void setValueCount(int valueCount) {
fillHoles(valueCount);
lastSet = valueCount - 1;
setReaderAndWriterIndex();
setReaderAndWriteIndexForViews();
}

/**
Expand Down Expand Up @@ -1279,7 +1454,18 @@ protected final void setBytes(int index, byte[] value, int start, int length) {
/* set new end offset */
offsetBuffer.setInt((long) (index + 1) * OFFSET_WIDTH, startOffset + length);
/* store the var length data in value buffer */
valueBuffer.setBytes(startOffset, value, start, length);
/*check whether the buffer is inline or reference buffer*/
ViewBuffer viewBuffer = ViewBufferFactory.createValueBuffer(allocator, value, startOffset, start, length,
dataBuffers);
if (viewBuffer instanceof InlineValueBuffer) {
lastValueAllocationSizeInBytes = ((InlineValueBuffer) viewBuffer).getValueBuffer().capacity();
} else {
ReferenceValueBuffer referenceValueBuffer = (ReferenceValueBuffer) viewBuffer;
lastValueAllocationSizeInBytes = dataBuffers.get(referenceValueBuffer.getBufId()).capacity();
}

views.add(viewBuffer);
// valueBuffer.setBytes(startOffset, value, start, length);
}

public final int getStartOffset(int index) {
Expand Down
Loading

0 comments on commit 4a1d2fd

Please sign in to comment.