Skip to content

Commit

Permalink
fix: adding variadic size buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
vibhatha committed Jun 9, 2024
1 parent b467b46 commit d1f70d7
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 37 deletions.
29 changes: 28 additions & 1 deletion java/c/src/main/java/org/apache/arrow/c/ArrayExporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.arrow.c.jni.PrivateData;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.BaseVariableWidthViewVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
Expand Down Expand Up @@ -96,8 +97,34 @@ void export(ArrowArray array, FieldVector vector, DictionaryProvider dictionaryP
}

if (buffers != null) {
final long bufferSize = buffers.size();
/*
* For Variadic types, an additional buffer is kept to store
* the size of each variadic buffer since that information
* cannot be retrieved in the import component.
* Here, the dataBufferReqCount is calculated to determine
* the additional number of buffers required.
* Also note that if the bufferSize is greater than 2, it means
* there is one or more data buffers.
* Thus, the dataBufferReqCount is set to 1 to get additional buffer
* for to store variadic size buffer.
* If it is not the case, the dataBuffer is not present.
* According to the spec and C Data interface in C++, there must be
* at least 3 data buffers present at the import component.
* Thus, the dataBufferReqCount is set to 2 to get additional buffer
* for empty dataBuffer and the variadic size buffer.
*/
int dataBufferReqCount = 0;
if (vector instanceof BaseVariableWidthViewVector) {
if (bufferSize > 2) {
dataBufferReqCount = 1;
} else {
dataBufferReqCount = 2;
}
}

data.buffers = new ArrayList<>(buffers.size());
data.buffers_ptrs = allocator.buffer((long) buffers.size() * Long.BYTES);
data.buffers_ptrs = allocator.buffer((bufferSize + dataBufferReqCount) * Long.BYTES);
vector.exportCDataBuffers(data.buffers, data.buffers_ptrs, NULL);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,13 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.util.VisibleForTesting;
import org.apache.arrow.vector.BaseVariableWidthViewVector;
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.DateMilliVector;
import org.apache.arrow.vector.DurationVector;
Expand All @@ -57,7 +54,6 @@
import org.apache.arrow.vector.complex.UnionVector;
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.ArrowType.ListView;
import org.apache.arrow.vector.util.DataSizeRoundingUtil;

/**
Expand Down Expand Up @@ -222,34 +218,23 @@ private List<ArrowBuf> visitVariableWidthView(ArrowType type) {
ArrowBuf maybeValidityBuffer = maybeImportBitmap(type);
buffers.add(maybeValidityBuffer);
buffers.add(view);
final int elementSize = BaseVariableWidthViewVector.ELEMENT_SIZE;
final int lengthWidth = BaseVariableWidthViewVector.LENGTH_WIDTH;
final int prefixWidth = BaseVariableWidthViewVector.PREFIX_WIDTH;
final int lengthPrefixWidth = lengthWidth + prefixWidth;
// Map to store the data buffer index and the total length of data in that buffer
Map<Integer, Long> dataBufferInfo = new HashMap<>();
for (int i = 0; i < fieldNode.getLength(); i++) {
final int length = view.getInt((long) i * elementSize);
if (length > BaseVariableWidthViewVector.INLINE_SIZE) {
checkState(maybeValidityBuffer != null,
"Validity buffer is required for data of type " + type);
if (BitVectorHelper.get(maybeValidityBuffer, i) == 1) {
final int bufferIndex =
view.getInt(((long) i * elementSize) + lengthPrefixWidth);
if (dataBufferInfo.containsKey(bufferIndex)) {
dataBufferInfo.compute(bufferIndex, (key, value) -> value != null ? value + (long) length : 0);
} else {
dataBufferInfo.put(bufferIndex, (long) length);
}
}

final int variadicSizeBufferIndex = this.buffers.length - 1;
final long numOfVariadicBuffers = this.buffers.length - 3;
final long variadicSizeBufferCapacity = numOfVariadicBuffers * Long.BYTES;
// 0th buffer is validity buffer
// 1st buffer is view buffer
// 2nd buffer onwards are variadic buffer
// N-1 (this.buffers.length - 1) buffer is variadic size buffer
final int variadicBufferReadOffset = 2;
try (ArrowBuf variadicSizeBufferPrime = importBuffer(type, variadicSizeBufferIndex,
variadicSizeBufferCapacity)) {
variadicSizeBufferPrime.getReferenceManager().retain();
for (int i = 0; i < numOfVariadicBuffers; i++) {
long size = variadicSizeBufferPrime.getLong((long) i * Long.BYTES);
buffers.add(importBuffer(type, i + variadicBufferReadOffset, size));
}
}
// fixed buffers for Utf8View or BinaryView are the validity buffer and the view buffer.
final int fixedBufferCount = 2;
// import data buffers
for (Map.Entry<Integer, Long> entry : dataBufferInfo.entrySet()) {
buffers.add(importBuffer(type, entry.getKey() + fixedBufferCount, entry.getValue()));
}
return buffers;
}
}
Expand Down Expand Up @@ -375,7 +360,7 @@ public List<ArrowBuf> visit(ArrowType.Duration type) {
}

@Override
public List<ArrowBuf> visit(ListView type) {
public List<ArrowBuf> visit(ArrowType.ListView type) {
throw new UnsupportedOperationException("Importing buffers for view type: " + type + " not supported");
}
}
4 changes: 2 additions & 2 deletions java/c/src/main/java/org/apache/arrow/c/Format.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ static String asString(ArrowType arrowType) {
case Utf8View:
return "vu";
case BinaryView:
return "VZ";
return "vz";
case NONE:
throw new IllegalArgumentException("Arrow type ID is NONE");
default:
Expand Down Expand Up @@ -297,7 +297,7 @@ static ArrowType asType(String format, long flags)
return new ArrowType.Map(keysSorted);
case "vu":
return new ArrowType.Utf8View();
case "VZ":
case "vz":
return new ArrowType.BinaryView();
default:
String[] parts = format.split(":", 2);
Expand Down
15 changes: 15 additions & 0 deletions java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,14 @@ public void testVarBinaryVector() {
}
}

private String generateString(String str, int repetition) {
StringBuilder aRepeated = new StringBuilder();
for (int i = 0; i < repetition; i++) {
aRepeated.append(str);
}
return aRepeated.toString();
}

@Test
public void testViewVector() {
// ViewVarCharVector with short strings
Expand Down Expand Up @@ -567,6 +575,13 @@ public void testViewVector() {
setVector(vector, byteArrayList.toArray(new byte[0][]));
assertTrue(roundtrip(vector, ViewVarBinaryVector.class));
}

try (final ViewVarBinaryVector vector = new ViewVarBinaryVector("v4", allocator)) {
setVector(vector, null, generateString("a", 123).getBytes(StandardCharsets.UTF_8),
generateString("bb", 7).getBytes(StandardCharsets.UTF_8),
generateString("cc", 10).getBytes(StandardCharsets.UTF_8));
assertTrue(roundtrip(vector, ViewVarBinaryVector.class));
}
}

@Test
Expand Down
36 changes: 36 additions & 0 deletions java/c/src/test/python/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ def setup_jvm():
kwargs = {}
# This will be the default behaviour in jpype 0.8+
kwargs['convertStrings'] = False

# For debugging purpose please uncomment the following, and include *jvm_args, before **kwargs
# in startJVM function call
# jvm_args = [
# "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005"
# ]

jpype.startJVM(jpype.getDefaultJVMPath(), "-Djava.class.path=" + jar_path, **kwargs)


Expand Down Expand Up @@ -184,7 +191,36 @@ def test_string_array(self):
self.round_trip_array(lambda: pa.array([None, "a", "bb", "ccc"]))

def test_stringview_array(self):
# with nulls short strings
self.round_trip_array(lambda: pa.array([None, "a", "bb", "c"], type=pa.string_view()))
# with nulls long and strings
self.round_trip_array(lambda: pa.array([None, "a", "bb"*10, "c"*13], type=pa.string_view()))
# without nulls short strings
self.round_trip_array(lambda: pa.array(["a", "bb", "c"], type=pa.string_view()))
# without nulls long and strings
self.round_trip_array(lambda: pa.array(["a", "bb"*10, "c"*13], type=pa.string_view()))
# with multiple data buffers
data = []
for i in range(1, 501):
s = ''.join(str(j) for j in range(i))
data.append(s)
self.round_trip_array(lambda: pa.array(data, type=pa.string_view()))

def test_binaryview_array(self):
# with nulls short strings
self.round_trip_array(lambda: pa.array([None, b"a", b"bb", b"c"], type=pa.binary_view()))
# with nulls long and strings
self.round_trip_array(lambda: pa.array([None, b"a", b"bb"*10, b"c"*13], type=pa.binary_view()))
# without nulls short strings
self.round_trip_array(lambda: pa.array([b"a", b"bb", b"c"], type=pa.binary_view()))
# without nulls long and strings
self.round_trip_array(lambda: pa.array([b"a", b"bb"*10, b"c"*13], type=pa.binary_view()))
# with multiple data buffers
data = []
for i in range(1, 501):
s = bytes(''.join(str(j) for j in range(i)), 'utf-8')
data.append(s)
self.round_trip_array(lambda: pa.array(data, type=pa.binary_view()))

def test_decimal_array(self):
data = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1668,6 +1668,14 @@ public <OUT, IN> OUT accept(VectorVisitor<OUT, IN> visitor, IN value) {
return visitor.visit(this, value);
}

/**
* Get the data buffer of the vector.
* Note that an additional buffer is appended to store
* the size of each variadic buffer's size.
* @param buffers list of buffers to be exported
* @param buffersPtr buffer to store the pointers to the exported buffers
* @param nullValue null value
*/
@Override
public void exportCDataBuffers(List<ArrowBuf> buffers, ArrowBuf buffersPtr, long nullValue) {
exportBuffer(validityBuffer, buffers, buffersPtr, nullValue, true);
Expand All @@ -1677,13 +1685,43 @@ public void exportCDataBuffers(List<ArrowBuf> buffers, ArrowBuf buffersPtr, long
// We set `retain = false` to explicitly not increase the ref count for the exported buffer.
// The ref count of the newly created buffer (i.e., 1) already represents the usage
// of the imported side.
exportBuffer(allocator.getEmpty(), buffers, buffersPtr, nullValue, false);
ArrowBuf emptyViewBuffer = allocator.buffer(INITIAL_BYTE_COUNT);
emptyViewBuffer.readerIndex(0);
emptyViewBuffer.setZero(0, emptyViewBuffer.capacity());
exportBuffer(emptyViewBuffer, buffers, buffersPtr, nullValue, false);
} else {
exportBuffer(viewBuffer, buffers, buffersPtr, nullValue, true);
}

for (int i = 0; i < dataBuffers.size(); i++) {
exportBuffer(dataBuffers.get(i), buffers, buffersPtr, nullValue, true);
if (dataBuffers.isEmpty()) {
ArrowBuf emptyViewBuffer = allocator.buffer(INITIAL_BYTE_COUNT);
emptyViewBuffer.readerIndex(0);
exportBuffer(emptyViewBuffer, buffers, buffersPtr, nullValue, false);
ArrowBuf variadicSizeBuffer = allocator.buffer(Long.BYTES);
// since no data buffers are present, the size of variadic size buffer is 0
variadicSizeBuffer.setLong(0, 0);
exportBuffer(variadicSizeBuffer, buffers, buffersPtr, nullValue, false);
} else {
// allocating additional space to keep the number of variadic buffers
ArrowBuf variadicSizeBuffer = allocator.buffer((long) Long.BYTES * dataBuffers.size());
variadicSizeBuffer.setZero(0, variadicSizeBuffer.capacity());
// export data buffers
for (int i = 0; i < dataBuffers.size(); i++) {
ArrowBuf dataBuf = dataBuffers.get(i);
exportBuffer(dataBuf, buffers, buffersPtr, nullValue, true);
}
// calculate sizes for variadic size buffer
for (int i = 0; i < valueCount; i++) {
int length = getValueLength(i);
if (length > 12) {
final int bufIndex = viewBuffer.getInt(((long) i * ELEMENT_SIZE) + LENGTH_WIDTH + PREFIX_WIDTH);
long variadicSizeBufIndex = (long) bufIndex * Long.BYTES;
long currentBufLength = variadicSizeBuffer.getLong(variadicSizeBufIndex);
variadicSizeBuffer.setLong(variadicSizeBufIndex, currentBufLength + length);
}
}
// export variadic size buffer
exportBuffer(variadicSizeBuffer, buffers, buffersPtr, nullValue, false);
}
}
}

0 comments on commit d1f70d7

Please sign in to comment.