Skip to content

Commit

Permalink
[BugFix] Fix the crash caused by JniScanner (#44903)
Browse files Browse the repository at this point in the history
Signed-off-by: changxin <[email protected]>
  • Loading branch information
MicePilot authored May 24, 2024
1 parent bac2d92 commit b8cbc29
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,45 @@ Map<String, String> createScanTestParams() {
return params;
}

Map<String, String> createComplexTypeScanTestParams() {
Map<String, String> params = new HashMap<>();
URL resource = TestHiveScanner.class.getResource("/test_complex_type");
String basePath = resource.getPath().toString();
String filePath = basePath + "/complex_type_test.avro";
File file = new File(filePath);
params.put("data_file_path", filePath);
params.put("block_offset", "0");
params.put("block_length", String.valueOf(file.length()));
params.put("hive_column_names",
"id,array_col,map_col,struct_col");
params.put("hive_column_types",
"string#array<string>#map<string,string>#struct<uid:string,device_list:array<string>,info:map<string,string>>");
params.put("input_format", "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat");
params.put("serde", "org.apache.hadoop.hive.serde2.avro.AvroSerDe");
params.put("required_fields", "id,array_col,map_col,struct_col");
return params;
}

String runComplexTypeScanOnParams(Map<String, String> params) throws Exception {
HiveScanner scanner = new HiveScanner(4096, params);
System.out.println(scanner.toString());
scanner.open();
StringBuilder sb = new StringBuilder();
while (true) {
scanner.getNextOffHeapChunk();
OffHeapTable table = scanner.getOffHeapTable();
if (table.getNumRows() == 0) {
break;
}
table.show(4096);
sb.append(table.dump(4096));
table.checkTableMeta(true);
table.close();
}
scanner.close();
return sb.toString();
}

String runScanOnParams(Map<String, String> params) throws IOException {
HiveScanner scanner = new HiveScanner(4096, params);
System.out.println(scanner.toString());
Expand All @@ -70,4 +109,11 @@ public void c1DoScanTestOnPrimitiveType() throws IOException {
Map<String, String> params = createScanTestParams();
runScanOnParams(params);
}

@Test
public void complexTypeTest() throws Exception {
Map<String, String> params = createComplexTypeScanTestParams();
// if error, NegativeArraySizeException will be throw
runComplexTypeScanOnParams(params);
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,31 @@ private void reserveInternal(int newCapacity) {
this.data = Platform.reallocateMemory(data, oldCapacity * typeSize, newCapacity * typeSize);
} else if (type.isByteStorageType()) {
this.offsetData = Platform.reallocateMemory(offsetData, oldOffsetSize, newOffsetSize);
int childCapacity = newCapacity * DEFAULT_STRING_LENGTH;
this.childColumns = new OffHeapColumnVector[1];
this.childColumns[0] = new OffHeapColumnVector(childCapacity, new ColumnType(type.name + "#data",
ColumnType.TypeValue.BYTE));
// Just create a new object at the first time, otherwise the data will be lost during expansion,
// and because the OFFSET record is continuous, the new offset address starts from 0 during the
// expansion, which will cause the offset records to be negatively numbered. After being passed
// to BE, it becomes a non-sign number. This is a very huge number. When processing, the array
// will cross the boundary, which may cause crash.
//
// The child's capacity is not expanded here because the child's appendValue function is used
// to add data to it will automatically expand.
if (this.childColumns == null) {
int childCapacity = newCapacity * DEFAULT_STRING_LENGTH;
this.childColumns = new OffHeapColumnVector[1];
this.childColumns[0] = new OffHeapColumnVector(childCapacity, new ColumnType(type.name + "#data",
ColumnType.TypeValue.BYTE));
}
} else if (type.isArray() || type.isMap() || type.isStruct()) {
if (type.isArray() || type.isMap()) {
this.offsetData = Platform.reallocateMemory(offsetData, oldOffsetSize, newOffsetSize);
}
int size = type.childTypes.size();
this.childColumns = new OffHeapColumnVector[size];
for (int i = 0; i < size; i++) {
this.childColumns[i] = new OffHeapColumnVector(newCapacity, type.childTypes.get(i));
// Same as the above
if (this.childColumns == null) {
int size = type.childTypes.size();
this.childColumns = new OffHeapColumnVector[size];
for (int i = 0; i < size; i++) {
this.childColumns[i] = new OffHeapColumnVector(newCapacity, type.childTypes.get(i));
}
}
} else {
throw new RuntimeException("Unhandled type: " + type);
Expand Down

0 comments on commit b8cbc29

Please sign in to comment.