Skip to content

Commit

Permalink
Fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Nov 14, 2024
1 parent 27d15ee commit a33eb89
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
public class KudoHostMergeResult implements AutoCloseable {
private final Schema schema;
private final List<ColumnViewInfo> columnInfoList;
private final HostMemoryBuffer hostBuf;
private HostMemoryBuffer hostBuf;

KudoHostMergeResult(Schema schema, HostMemoryBuffer hostBuf, List<ColumnViewInfo> columnInfoList) {
requireNonNull(schema, "schema is null");
requireNonNull(columnInfoList, "columnOffsets is null");
requireNonNull(columnInfoList, "columnInfoList is null");
ensure(schema.getFlattenedColumnNames().length == columnInfoList.size(), () ->
"Column offsets size does not match flattened schema size, column offsets size: " + columnInfoList.size() +
", flattened schema size: " + schema.getFlattenedColumnNames().length);
Expand All @@ -46,11 +46,14 @@ public class KudoHostMergeResult implements AutoCloseable {

@Override
public void close() throws Exception {
if (hostBuf != null) {
hostBuf.close();
}
hostBuf.close();
hostBuf = null;
}

/**
* Convert the host buffer into a cudf table.
* @return the cudf table
*/
public Table toTable() {
try (DeviceMemoryBuffer deviceMemBuf = DeviceMemoryBuffer.allocate(hostBuf.getLength())) {
if (hostBuf.getLength() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,17 +277,16 @@ public Pair<KudoHostMergeResult, MergeMetrics> mergeOnHost(List<KudoTable> kudoT
* @param kudoTables list of kudo tables. This method doesn't take ownership of the input tables, and caller should
* take care of closing them after calling this method.
* @return the merged table, and metrics during merge.
* @throws Exception if any error occurs during merge.
*/
public Pair<Table, MergeMetrics> mergeToTable(List<KudoTable> kudoTables) {
public Pair<Table, MergeMetrics> mergeToTable(List<KudoTable> kudoTables) throws Exception {
Pair<KudoHostMergeResult, MergeMetrics> result = mergeOnHost(kudoTables);
MergeMetrics.Builder builder = MergeMetrics.builder(result.getRight());
try (KudoHostMergeResult children = result.getLeft()) {
Table table = withTime(children::toTable,
builder::convertToTableTime);

return Pair.of(table, builder.build());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private int deserializeValidityBuffer(ColumnOffsetInfo curColOffset) {

startRow += sliceInfo.getRowCount();
}
return toIntExact(nullCountTotal);
return nullCountTotal;
}
} else {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,16 @@ protected MultiKudoTableVisitor(List<KudoTable> inputTables) {
this.currentOffsetOffsets = new long[tables.size()];
this.currentDataOffset = new long[tables.size()];
this.sliceInfoStack = new Deque[tables.size()];
long totalRowCount = 0L;
for (int i = 0; i < tables.size(); i++) {
this.currentValidityOffsets[i] = 0;
KudoTableHeader header = tables.get(i).getHeader();
this.currentOffsetOffsets[i] = header.getValidityBufferLen();
this.currentDataOffset[i] = header.getValidityBufferLen() + header.getOffsetBufferLen();
this.sliceInfoStack[i] = new ArrayDeque<>(16);
this.sliceInfoStack[i].add(new SliceInfo(header.getOffset(), header.getNumRows()));
totalRowCount += header.getNumRows();
}
long totalRowCount = tables.stream().mapToLong(t -> t.getHeader().getNumRows()).sum();
this.totalRowCountStack = new ArrayDeque<>(16);
totalRowCountStack.addLast(toIntExact(totalRowCount));
this.hasNull = true;
Expand All @@ -88,7 +89,12 @@ public R visitTopSchema(Schema schema, List<T> children) {
public T visitStruct(Schema structType, List<T> children) {
updateHasNull();
T t = doVisitStruct(structType, children);
updateOffsets(false, false, false, -1);
updateOffsets(
false, // Update offset buffer offset
false, // Update data buffer offset
false, // Update slice info
-1 // element size in bytes, not used for struct
);
currentIdx += 1;
return t;
}
Expand All @@ -99,7 +105,12 @@ public T visitStruct(Schema structType, List<T> children) {
public P preVisitList(Schema listType) {
updateHasNull();
P t = doPreVisitList(listType);
updateOffsets(true, false, true, Integer.BYTES);
updateOffsets(
true, // update offset buffer offset
false, // update data buffer offset
true, // update slice info
Integer.BYTES // element size in bytes
);
currentIdx += 1;
return t;
}
Expand Down Expand Up @@ -128,9 +139,19 @@ public T visit(Schema primitiveType) {

T t = doVisit(primitiveType);
if (primitiveType.getType().hasOffsets()) {
updateOffsets(true, true, false, -1);
updateOffsets(
true, // update offset buffer offset
true, // update data buffer offset
false, // update slice info
-1 // element size in bytes, not used for string
);
} else {
updateOffsets(false, true, false, primitiveType.getType().getSizeInBytes());
updateOffsets(
false, //update offset buffer offset
true, // update data buffer offset
false, // update slice info
primitiveType.getType().getSizeInBytes() // element size in bytes
);
}
currentIdx += 1;
return t;
Expand Down

0 comments on commit a33eb89

Please sign in to comment.