Skip to content

Commit

Permalink
[feature](jni) support complex types
Browse files Browse the repository at this point in the history
  • Loading branch information
AshinGau committed Sep 26, 2023
1 parent a3427cb commit 651ee06
Show file tree
Hide file tree
Showing 10 changed files with 572 additions and 165 deletions.
259 changes: 185 additions & 74 deletions be/src/vec/exec/jni_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include "jni_connector.h"

#include <glog/logging.h>
#include <stdint.h>

#include <sstream>
#include <variant>
Expand All @@ -27,13 +26,17 @@
#include "runtime/decimalv2_value.h"
#include "runtime/runtime_state.h"
#include "util/jni-util.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_map.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_struct.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_map.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_struct.h"

namespace doris {
class RuntimeProfile;
Expand Down Expand Up @@ -301,24 +304,101 @@ Status JniConnector::_fill_column(ColumnPtr& doris_column, DataTypePtr& data_typ
[[fallthrough]];
case TypeIndex::FixedString:
return _fill_string_column(data_column, num_rows);
case TypeIndex::Array:
return _fill_array_column(data_column, data_type, num_rows);
case TypeIndex::Map:
return _fill_map_column(data_column, data_type, num_rows);
case TypeIndex::Struct:
return _fill_struct_column(data_column, data_type, num_rows);
default:
return Status::InvalidArgument("Unsupported type {} in jni scanner",
getTypeName(logical_type));
}
return Status::OK();
}

Status JniConnector::_fill_array_column(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_rows) {
ColumnPtr& element_column = static_cast<ColumnArray&>(*doris_column).get_data_ptr();
DataTypePtr& element_type = const_cast<DataTypePtr&>(
(reinterpret_cast<const DataTypeArray*>(remove_nullable(data_type).get()))
->get_nested_type());
ColumnArray::Offsets64& offsets_data = static_cast<ColumnArray&>(*doris_column).get_offsets();

int64* offsets = reinterpret_cast<int64*>(_next_meta_as_ptr());
size_t origin_size = offsets_data.size();
offsets_data.resize(origin_size + num_rows);
size_t start_offset = offsets_data[origin_size - 1];
for (size_t i = 0; i < num_rows; ++i) {
offsets_data[origin_size + i] = offsets[i] + start_offset;
}

// offsets[num_rows - 1] == offsets_data[origin_size + num_rows - 1] - start_offset
// but num_row equals 0 when there are all empty arrays
return _fill_column(element_column, element_type,
offsets_data[origin_size + num_rows - 1] - start_offset);
}

Status JniConnector::_fill_map_column(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_rows) {
auto& map = static_cast<ColumnMap&>(*doris_column);
DataTypePtr& key_type = const_cast<DataTypePtr&>(
reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get())->get_key_type());
DataTypePtr& value_type = const_cast<DataTypePtr&>(
reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get())
->get_value_type());
ColumnPtr& key_column = map.get_keys_ptr();
ColumnPtr& value_column = map.get_values_ptr();
ColumnArray::Offsets64& map_offsets = map.get_offsets();

int64* offsets = reinterpret_cast<int64*>(_next_meta_as_ptr());
size_t origin_size = map_offsets.size();
map_offsets.resize(origin_size + num_rows);
size_t start_offset = map_offsets[origin_size - 1];
for (size_t i = 0; i < num_rows; ++i) {
map_offsets[origin_size + i] = offsets[i] + start_offset;
}

RETURN_IF_ERROR(_fill_column(key_column, key_type,
map_offsets[origin_size + num_rows - 1] - start_offset));
return _fill_column(value_column, value_type,
map_offsets[origin_size + num_rows - 1] - start_offset);
}

Status JniConnector::_fill_struct_column(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_rows) {
auto& doris_struct = static_cast<ColumnStruct&>(*doris_column);
const DataTypeStruct* doris_struct_type =
reinterpret_cast<const DataTypeStruct*>(remove_nullable(data_type).get());
for (int i = 0; i < doris_struct.tuple_size(); ++i) {
ColumnPtr& struct_field = doris_struct.get_column_ptr(i);
DataTypePtr& field_type = const_cast<DataTypePtr&>(doris_struct_type->get_element(i));
RETURN_IF_ERROR(_fill_column(struct_field, field_type, num_rows));
}
return Status::OK();
}

Status JniConnector::_fill_string_column(MutableColumnPtr& doris_column, size_t num_rows) {
if (num_rows == 0) {
return Status::OK();
}
auto& string_col = static_cast<const ColumnString&>(*doris_column);
ColumnString::Chars& string_chars = const_cast<ColumnString::Chars&>(string_col.get_chars());
ColumnString::Offsets& string_offsets =
const_cast<ColumnString::Offsets&>(string_col.get_offsets());
int* offsets = reinterpret_cast<int*>(_next_meta_as_ptr());
char* data = reinterpret_cast<char*>(_next_meta_as_ptr());
std::vector<StringRef> string_values;
string_values.reserve(num_rows);
char* chars = reinterpret_cast<char*>(_next_meta_as_ptr());

size_t origin_chars_size = string_chars.size();
string_chars.resize(origin_chars_size + offsets[num_rows - 1]);
memcpy(string_chars.data() + origin_chars_size, chars, offsets[num_rows - 1]);

size_t origin_offsets_size = string_offsets.size();
size_t start_offset = string_offsets[origin_offsets_size - 1];
string_offsets.resize(origin_offsets_size + num_rows);
for (size_t i = 0; i < num_rows; ++i) {
int start_offset = i == 0 ? 0 : offsets[i - 1];
int end_offset = offsets[i];
string_values.emplace_back(data + start_offset, end_offset - start_offset);
string_offsets[origin_offsets_size + i] = offsets[i] + start_offset;
}
doris_column->insert_many_strings(&string_values[0], num_rows);
return Status::OK();
}

Expand Down Expand Up @@ -418,77 +498,108 @@ std::string JniConnector::get_hive_type(const TypeDescriptor& desc) {
}
}

Status JniConnector::generate_meta_info(Block* block, std::unique_ptr<long[]>& meta) {
std::vector<long> meta_data;
// insert number of rows
meta_data.emplace_back(block->rows());
for (int i = 0; i < block->columns(); ++i) {
auto& column_with_type_and_name = block->get_by_position(i);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
TypeIndex logical_type = remove_nullable(column_type)->get_type_id();

// insert null map address
MutableColumnPtr data_column;
if (column_ptr->is_nullable()) {
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
column_ptr->assume_mutable().get());
data_column = nullable_column->get_nested_column_ptr();
NullMap& null_map = nullable_column->get_null_map_data();
meta_data.emplace_back((long)null_map.data());
} else {
meta_data.emplace_back(0);
data_column = column_ptr->assume_mutable();
}

switch (logical_type) {
void JniConnector::_fill_column_meta(ColumnPtr& doris_column, DataTypePtr& data_type,
std::vector<long>& meta_data) {
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
// insert null map address
MutableColumnPtr data_column;
if (doris_column->is_nullable()) {
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(doris_column->assume_mutable().get());
data_column = nullable_column->get_nested_column_ptr();
NullMap& null_map = nullable_column->get_null_map_data();
meta_data.emplace_back((long)null_map.data());
} else {
meta_data.emplace_back(0);
data_column = doris_column->assume_mutable();
}
switch (logical_type) {
#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
case NUMERIC_TYPE: { \
meta_data.emplace_back(_get_numeric_data_address<CPP_NUMERIC_TYPE>(data_column)); \
break; \
}
FOR_LOGICAL_NUMERIC_TYPES(DISPATCH)
FOR_LOGICAL_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
case TypeIndex::Decimal128:
[[fallthrough]];
case TypeIndex::Decimal128I: {
meta_data.emplace_back(_get_decimal_data_address<Int128>(data_column));
break;
}
case TypeIndex::Decimal32: {
meta_data.emplace_back(_get_decimal_data_address<Int32>(data_column));
break;
}
case TypeIndex::Decimal64: {
meta_data.emplace_back(_get_decimal_data_address<Int64>(data_column));
break;
}
case TypeIndex::DateV2: {
meta_data.emplace_back(_get_time_data_address<UInt32>(data_column));
break;
}
case TypeIndex::DateTimeV2: {
meta_data.emplace_back(_get_time_data_address<UInt64>(data_column));
break;
}
case TypeIndex::String:
[[fallthrough]];
case TypeIndex::FixedString: {
auto& string_column = static_cast<ColumnString&>(*data_column);
// inert offsets
meta_data.emplace_back((long)string_column.get_offsets().data());
meta_data.emplace_back((long)string_column.get_chars().data());
break;
}
case TypeIndex::Array:
[[fallthrough]];
case TypeIndex::Struct:
[[fallthrough]];
case TypeIndex::Map:
return Status::IOError("Unhandled type {}", getTypeName(logical_type));
default:
return Status::IOError("Unsupported type {}", getTypeName(logical_type));
case TypeIndex::Decimal128:
[[fallthrough]];
case TypeIndex::Decimal128I: {
meta_data.emplace_back(_get_decimal_data_address<Int128>(data_column));
break;
}
case TypeIndex::Decimal32: {
meta_data.emplace_back(_get_decimal_data_address<Int32>(data_column));
break;
}
case TypeIndex::Decimal64: {
meta_data.emplace_back(_get_decimal_data_address<Int64>(data_column));
break;
}
case TypeIndex::DateV2: {
meta_data.emplace_back(_get_time_data_address<UInt32>(data_column));
break;
}
case TypeIndex::DateTimeV2: {
meta_data.emplace_back(_get_time_data_address<UInt64>(data_column));
break;
}
case TypeIndex::String:
[[fallthrough]];
case TypeIndex::FixedString: {
auto& string_column = static_cast<ColumnString&>(*data_column);
// inert offsets
meta_data.emplace_back((long)string_column.get_offsets().data());
meta_data.emplace_back((long)string_column.get_chars().data());
break;
}
case TypeIndex::Array: {
ColumnPtr& element_column = static_cast<ColumnArray&>(*data_column).get_data_ptr();
meta_data.emplace_back((long)static_cast<ColumnArray&>(*data_column).get_offsets().data());
DataTypePtr& element_type = const_cast<DataTypePtr&>(
(reinterpret_cast<const DataTypeArray*>(remove_nullable(data_type).get()))
->get_nested_type());
_fill_column_meta(element_column, element_type, meta_data);
break;
}
case TypeIndex::Struct: {
auto& doris_struct = static_cast<ColumnStruct&>(*data_column);
const DataTypeStruct* doris_struct_type =
reinterpret_cast<const DataTypeStruct*>(remove_nullable(data_type).get());
for (int i = 0; i < doris_struct.tuple_size(); ++i) {
ColumnPtr& struct_field = doris_struct.get_column_ptr(i);
DataTypePtr& field_type = const_cast<DataTypePtr&>(doris_struct_type->get_element(i));
_fill_column_meta(struct_field, field_type, meta_data);
}
break;
}
case TypeIndex::Map: {
auto& map = static_cast<ColumnMap&>(*data_column);
DataTypePtr& key_type = const_cast<DataTypePtr&>(
reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get())
->get_key_type());
DataTypePtr& value_type = const_cast<DataTypePtr&>(
reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get())
->get_value_type());
ColumnPtr& key_column = map.get_keys_ptr();
ColumnPtr& value_column = map.get_values_ptr();
meta_data.emplace_back((long)map.get_offsets().data());
_fill_column_meta(key_column, key_type, meta_data);
_fill_column_meta(value_column, value_type, meta_data);
break;
}
default:
return;
}
}

Status JniConnector::generate_meta_info(Block* block, std::unique_ptr<long[]>& meta) {
std::vector<long> meta_data;
// insert number of rows
meta_data.emplace_back(block->rows());
for (int i = 0; i < block->columns(); ++i) {
auto& column_with_type_and_name = block->get_by_position(i);
_fill_column_meta(column_with_type_and_name.column, column_with_type_and_name.type,
meta_data);
}

meta.reset(new long[meta_data.size()]);
Expand Down
12 changes: 12 additions & 0 deletions be/src/vec/exec/jni_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,18 @@ class JniConnector {

Status _fill_column(ColumnPtr& doris_column, DataTypePtr& data_type, size_t num_rows);

Status _fill_map_column(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_rows);

Status _fill_array_column(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_rows);

Status _fill_struct_column(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_rows);

static void _fill_column_meta(ColumnPtr& doris_column, DataTypePtr& data_type,
std::vector<long>& meta_data);

template <typename CppType>
Status _fill_numeric_column(MutableColumnPtr& doris_column, CppType* ptr, size_t num_rows) {
auto& column_data = static_cast<ColumnVector<CppType>&>(*doris_column).get_data();
Expand Down
Loading

0 comments on commit 651ee06

Please sign in to comment.