Skip to content

Commit

Permalink
[SPARK-20537][CORE] Fixing OffHeapColumnVector reallocation
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

As #17773 revealed `OnHeapColumnVector` may copy a part of the original storage.

`OffHeapColumnVector` reallocation also copies to the new storage data up to 'elementsAppended'. This variable is only updated when using the `ColumnVector.appendX` API, while `ColumnVector.putX` is more commonly used.
This PR copies the new storage data up to the previously-allocated size in`OffHeapColumnVector`.

## How was this patch tested?

Existing test suites

Author: Kazuaki Ishizaki <[email protected]>

Closes #17811 from kiszk/SPARK-20537.
  • Loading branch information
kiszk authored and cloud-fan committed May 2, 2017
1 parent 90d77e9 commit afb21bf
Showing 1 changed file with 9 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -436,28 +436,29 @@ public void loadBytes(ColumnVector.Array array) {
// Split out the slow path.
@Override
protected void reserveInternal(int newCapacity) {
int oldCapacity = (this.data == 0L) ? 0 : capacity;
if (this.resultArray != null) {
this.lengthData =
Platform.reallocateMemory(lengthData, elementsAppended * 4, newCapacity * 4);
Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4);
this.offsetData =
Platform.reallocateMemory(offsetData, elementsAppended * 4, newCapacity * 4);
Platform.reallocateMemory(offsetData, oldCapacity * 4, newCapacity * 4);
} else if (type instanceof ByteType || type instanceof BooleanType) {
this.data = Platform.reallocateMemory(data, elementsAppended, newCapacity);
this.data = Platform.reallocateMemory(data, oldCapacity, newCapacity);
} else if (type instanceof ShortType) {
this.data = Platform.reallocateMemory(data, elementsAppended * 2, newCapacity * 2);
this.data = Platform.reallocateMemory(data, oldCapacity * 2, newCapacity * 2);
} else if (type instanceof IntegerType || type instanceof FloatType ||
type instanceof DateType || DecimalType.is32BitDecimalType(type)) {
this.data = Platform.reallocateMemory(data, elementsAppended * 4, newCapacity * 4);
this.data = Platform.reallocateMemory(data, oldCapacity * 4, newCapacity * 4);
} else if (type instanceof LongType || type instanceof DoubleType ||
DecimalType.is64BitDecimalType(type) || type instanceof TimestampType) {
this.data = Platform.reallocateMemory(data, elementsAppended * 8, newCapacity * 8);
this.data = Platform.reallocateMemory(data, oldCapacity * 8, newCapacity * 8);
} else if (resultStruct != null) {
// Nothing to store.
} else {
throw new RuntimeException("Unhandled " + type);
}
this.nulls = Platform.reallocateMemory(nulls, elementsAppended, newCapacity);
Platform.setMemory(nulls + elementsAppended, (byte)0, newCapacity - elementsAppended);
this.nulls = Platform.reallocateMemory(nulls, oldCapacity, newCapacity);
Platform.setMemory(nulls + oldCapacity, (byte)0, newCapacity - oldCapacity);
capacity = newCapacity;
}
}

0 comments on commit afb21bf

Please sign in to comment.