diff --git a/java-extensions/hive-reader/src/test/java/com/starrocks/hive/reader/TestHiveScanner.java b/java-extensions/hive-reader/src/test/java/com/starrocks/hive/reader/TestHiveScanner.java index a8e3b2f8d2877..6eb313b74c8bf 100644 --- a/java-extensions/hive-reader/src/test/java/com/starrocks/hive/reader/TestHiveScanner.java +++ b/java-extensions/hive-reader/src/test/java/com/starrocks/hive/reader/TestHiveScanner.java @@ -44,6 +44,45 @@ Map createScanTestParams() { return params; } + Map createComplexTypeScanTestParams() { + Map params = new HashMap<>(); + URL resource = TestHiveScanner.class.getResource("/test_complex_type"); + String basePath = resource.getPath().toString(); + String filePath = basePath + "/complex_type_test.avro"; + File file = new File(filePath); + params.put("data_file_path", filePath); + params.put("block_offset", "0"); + params.put("block_length", String.valueOf(file.length())); + params.put("hive_column_names", + "id,array_col,map_col,struct_col"); + params.put("hive_column_types", + "string#array#map#struct,info:map>"); + params.put("input_format", "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"); + params.put("serde", "org.apache.hadoop.hive.serde2.avro.AvroSerDe"); + params.put("required_fields", "id,array_col,map_col,struct_col"); + return params; + } + + String runComplexTypeScanOnParams(Map params) throws Exception { + HiveScanner scanner = new HiveScanner(4096, params); + System.out.println(scanner.toString()); + scanner.open(); + StringBuilder sb = new StringBuilder(); + while (true) { + scanner.getNextOffHeapChunk(); + OffHeapTable table = scanner.getOffHeapTable(); + if (table.getNumRows() == 0) { + break; + } + table.show(4096); + sb.append(table.dump(4096)); + table.checkTableMeta(true); + table.close(); + } + scanner.close(); + return sb.toString(); + } + String runScanOnParams(Map params) throws IOException { HiveScanner scanner = new HiveScanner(4096, params); System.out.println(scanner.toString()); @@ -70,4 +109,11 @@ public void c1DoScanTestOnPrimitiveType() throws IOException { Map params = createScanTestParams(); runScanOnParams(params); } + + @Test + public void complexTypeTest() throws Exception { + Map params = createComplexTypeScanTestParams(); + // if error, NegativeArraySizeException will be throw + runComplexTypeScanOnParams(params); + } } diff --git a/java-extensions/hive-reader/src/test/resources/test_complex_type/complex_type_test.avro b/java-extensions/hive-reader/src/test/resources/test_complex_type/complex_type_test.avro new file mode 100644 index 0000000000000..c013a43a7b0b9 Binary files /dev/null and b/java-extensions/hive-reader/src/test/resources/test_complex_type/complex_type_test.avro differ diff --git a/java-extensions/jni-connector/src/main/java/com/starrocks/jni/connector/OffHeapColumnVector.java b/java-extensions/jni-connector/src/main/java/com/starrocks/jni/connector/OffHeapColumnVector.java index 79cd5da92833b..dcfdec1373132 100644 --- a/java-extensions/jni-connector/src/main/java/com/starrocks/jni/connector/OffHeapColumnVector.java +++ b/java-extensions/jni-connector/src/main/java/com/starrocks/jni/connector/OffHeapColumnVector.java @@ -142,18 +142,31 @@ private void reserveInternal(int newCapacity) { this.data = Platform.reallocateMemory(data, oldCapacity * typeSize, newCapacity * typeSize); } else if (type.isByteStorageType()) { this.offsetData = Platform.reallocateMemory(offsetData, oldOffsetSize, newOffsetSize); - int childCapacity = newCapacity * DEFAULT_STRING_LENGTH; - this.childColumns = new OffHeapColumnVector[1]; - this.childColumns[0] = new OffHeapColumnVector(childCapacity, new ColumnType(type.name + "#data", - ColumnType.TypeValue.BYTE)); + // Just create a new object at the first time, otherwise the data will be lost during expansion, + // and because the OFFSET record is continuous, the new offset address starts from 0 during the + // expansion, which will cause the offset records to be negatively numbered. After being passed + // to BE, it becomes a non-sign number. This is a very huge number. When processing, the array + // will cross the boundary, which may cause crash. + // + // The child's capacity is not expanded here because the child's appendValue function is used + // to add data to it will automatically expand. + if (this.childColumns == null) { + int childCapacity = newCapacity * DEFAULT_STRING_LENGTH; + this.childColumns = new OffHeapColumnVector[1]; + this.childColumns[0] = new OffHeapColumnVector(childCapacity, new ColumnType(type.name + "#data", + ColumnType.TypeValue.BYTE)); + } } else if (type.isArray() || type.isMap() || type.isStruct()) { if (type.isArray() || type.isMap()) { this.offsetData = Platform.reallocateMemory(offsetData, oldOffsetSize, newOffsetSize); } - int size = type.childTypes.size(); - this.childColumns = new OffHeapColumnVector[size]; - for (int i = 0; i < size; i++) { - this.childColumns[i] = new OffHeapColumnVector(newCapacity, type.childTypes.get(i)); + // Same as the above + if (this.childColumns == null) { + int size = type.childTypes.size(); + this.childColumns = new OffHeapColumnVector[size]; + for (int i = 0; i < size; i++) { + this.childColumns[i] = new OffHeapColumnVector(newCapacity, type.childTypes.get(i)); + } } } else { throw new RuntimeException("Unhandled type: " + type);