Skip to content

Commit

Permalink
Merge pull request duckdb#10850 from Maxxen/array-all-types
Browse files Browse the repository at this point in the history
Add ARRAY to test_all_types + IO and some clients
  • Loading branch information
Mytherin authored Feb 29, 2024
2 parents 403485d + f2da889 commit 68e4f93
Show file tree
Hide file tree
Showing 47 changed files with 732 additions and 91 deletions.
6 changes: 6 additions & 0 deletions extension/json/json_functions/json_create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ static LogicalType GetJSONType(StructNames &const_struct_names, const LogicalTyp
// The nested types need to conform as well
case LogicalTypeId::LIST:
return LogicalType::LIST(GetJSONType(const_struct_names, ListType::GetChildType(type)));
case LogicalTypeId::ARRAY:
return LogicalType::ARRAY(GetJSONType(const_struct_names, ArrayType::GetChildType(type)),
ArrayType::GetSize(type));
// Struct and MAP are treated as JSON values
case LogicalTypeId::STRUCT: {
child_list_t<LogicalType> child_types;
Expand Down Expand Up @@ -435,6 +438,9 @@ static void CreateValuesList(const StructNames &names, yyjson_mut_doc *doc, yyjs

static void CreateValuesArray(const StructNames &names, yyjson_mut_doc *doc, yyjson_mut_val *vals[], Vector &value_v,
idx_t count) {

value_v.Flatten(count);

// Initialize array for the nested values
auto &child_v = ArrayVector::GetEntry(value_v);
auto array_size = ArrayType::GetSize(value_v.GetType());
Expand Down
111 changes: 105 additions & 6 deletions extension/parquet/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1826,9 +1826,102 @@ void ListColumnWriter::FinalizeWrite(ColumnWriterState &state_p) {
child_writer->FinalizeWrite(*state.child_state);
}

//===--------------------------------------------------------------------===//
// Array Column Writer
//===--------------------------------------------------------------------===//
class ArrayColumnWriter : public ListColumnWriter {
public:
ArrayColumnWriter(ParquetWriter &writer, idx_t schema_idx, vector<string> schema_path_p, idx_t max_repeat,
idx_t max_define, unique_ptr<ColumnWriter> child_writer_p, bool can_have_nulls)
: ListColumnWriter(writer, schema_idx, std::move(schema_path_p), max_repeat, max_define,
std::move(child_writer_p), can_have_nulls) {
}
~ArrayColumnWriter() override = default;

public:
void Analyze(ColumnWriterState &state, ColumnWriterState *parent, Vector &vector, idx_t count) override;
void Prepare(ColumnWriterState &state, ColumnWriterState *parent, Vector &vector, idx_t count) override;
void Write(ColumnWriterState &state, Vector &vector, idx_t count) override;
};

void ArrayColumnWriter::Analyze(ColumnWriterState &state_p, ColumnWriterState *parent, Vector &vector, idx_t count) {
auto &state = state_p.Cast<ListColumnWriterState>();
auto &array_child = ArrayVector::GetEntry(vector);
auto array_size = ArrayType::GetSize(vector.GetType());
child_writer->Analyze(*state.child_state, &state_p, array_child, array_size * count);
}

void ArrayColumnWriter::Prepare(ColumnWriterState &state_p, ColumnWriterState *parent, Vector &vector, idx_t count) {
auto &state = state_p.Cast<ListColumnWriterState>();

auto array_size = ArrayType::GetSize(vector.GetType());
auto &validity = FlatVector::Validity(vector);

// write definition levels and repeats
// the main difference between this and ListColumnWriter::Prepare is that we need to make sure to write out
// repetition levels and definitions for the child elements of the array even if the array itself is NULL.
idx_t start = 0;
idx_t vcount = parent ? parent->definition_levels.size() - state.parent_index : count;
idx_t vector_index = 0;
for (idx_t i = start; i < vcount; i++) {
idx_t parent_index = state.parent_index + i;
if (parent && !parent->is_empty.empty() && parent->is_empty[parent_index]) {
state.definition_levels.push_back(parent->definition_levels[parent_index]);
state.repetition_levels.push_back(parent->repetition_levels[parent_index]);
state.is_empty.push_back(true);
continue;
}
auto first_repeat_level =
parent && !parent->repetition_levels.empty() ? parent->repetition_levels[parent_index] : max_repeat;
if (parent && parent->definition_levels[parent_index] != PARQUET_DEFINE_VALID) {
state.definition_levels.push_back(parent->definition_levels[parent_index]);
state.repetition_levels.push_back(first_repeat_level);
state.is_empty.push_back(false);
for (idx_t k = 1; k < array_size; k++) {
state.repetition_levels.push_back(max_repeat + 1);
state.definition_levels.push_back(parent->definition_levels[parent_index]);
state.is_empty.push_back(false);
}
} else if (validity.RowIsValid(vector_index)) {
// push the repetition levels
state.definition_levels.push_back(PARQUET_DEFINE_VALID);
state.is_empty.push_back(false);

state.repetition_levels.push_back(first_repeat_level);
for (idx_t k = 1; k < array_size; k++) {
state.repetition_levels.push_back(max_repeat + 1);
state.definition_levels.push_back(PARQUET_DEFINE_VALID);
state.is_empty.push_back(false);
}
} else {
state.definition_levels.push_back(max_define - 1);
state.repetition_levels.push_back(first_repeat_level);
state.is_empty.push_back(false);
for (idx_t k = 1; k < array_size; k++) {
state.repetition_levels.push_back(max_repeat + 1);
state.definition_levels.push_back(max_define - 1);
state.is_empty.push_back(false);
}
}
vector_index++;
}
state.parent_index += vcount;

auto &array_child = ArrayVector::GetEntry(vector);
child_writer->Prepare(*state.child_state, &state_p, array_child, count * array_size);
}

void ArrayColumnWriter::Write(ColumnWriterState &state_p, Vector &vector, idx_t count) {
auto &state = state_p.Cast<ListColumnWriterState>();
auto array_size = ArrayType::GetSize(vector.GetType());
auto &array_child = ArrayVector::GetEntry(vector);
child_writer->Write(*state.child_state, array_child, count * array_size);
}

//===--------------------------------------------------------------------===//
// Create Column Writer
//===--------------------------------------------------------------------===//

unique_ptr<ColumnWriter> ColumnWriter::CreateWriterRecursive(vector<duckdb_parquet::format::SchemaElement> &schemas,
ParquetWriter &writer, const LogicalType &type,
const string &name, vector<string> schema_path,
Expand Down Expand Up @@ -1877,8 +1970,9 @@ unique_ptr<ColumnWriter> ColumnWriter::CreateWriterRecursive(vector<duckdb_parqu
return make_uniq<StructColumnWriter>(writer, schema_idx, std::move(schema_path), max_repeat, max_define,
std::move(child_writers), can_have_nulls);
}
if (type.id() == LogicalTypeId::LIST) {
auto &child_type = ListType::GetChildType(type);
if (type.id() == LogicalTypeId::LIST || type.id() == LogicalTypeId::ARRAY) {
auto is_list = type.id() == LogicalTypeId::LIST;
auto &child_type = is_list ? ListType::GetChildType(type) : ArrayType::GetChildType(type);
// set up the two schema elements for the list
// for some reason we only set the converted type in the OPTIONAL element
// first an OPTIONAL element
Expand All @@ -1905,14 +1999,19 @@ unique_ptr<ColumnWriter> ColumnWriter::CreateWriterRecursive(vector<duckdb_parqu
repeated_element.__isset.num_children = true;
repeated_element.__isset.type = false;
repeated_element.__isset.repetition_type = true;
repeated_element.name = "list";
repeated_element.name = is_list ? "list" : "array";
schemas.push_back(std::move(repeated_element));
schema_path.emplace_back("list");
schema_path.emplace_back(is_list ? "list" : "array");

auto child_writer = CreateWriterRecursive(schemas, writer, child_type, "element", schema_path, child_field_ids,
max_repeat + 1, max_define + 2);
return make_uniq<ListColumnWriter>(writer, schema_idx, std::move(schema_path), max_repeat, max_define,
std::move(child_writer), can_have_nulls);
if (is_list) {
return make_uniq<ListColumnWriter>(writer, schema_idx, std::move(schema_path), max_repeat, max_define,
std::move(child_writer), can_have_nulls);
} else {
return make_uniq<ArrayColumnWriter>(writer, schema_idx, std::move(schema_path), max_repeat, max_define,
std::move(child_writer), can_have_nulls);
}
}
if (type.id() == LogicalTypeId::MAP) {
// map type
Expand Down
4 changes: 4 additions & 0 deletions extension/parquet/parquet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ CopyTypeSupport ParquetWriter::TypeIsSupported(const LogicalType &type) {
auto &child_type = ListType::GetChildType(type);
return TypeIsSupported(child_type);
}
if (id == LogicalTypeId::ARRAY) {
auto &child_type = ArrayType::GetChildType(type);
return TypeIsSupported(child_type);
}
if (id == LogicalTypeId::UNION) {
auto count = UnionType::GetMemberCount(type);
for (idx_t i = 0; i < count; i++) {
Expand Down
1 change: 1 addition & 0 deletions src/catalog/catalog_entry/table_catalog_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ static void BindExtraColumns(TableCatalogEntry &table, LogicalGet &get, LogicalP
static bool TypeSupportsRegularUpdate(const LogicalType &type) {
switch (type.id()) {
case LogicalTypeId::LIST:
case LogicalTypeId::ARRAY:
case LogicalTypeId::MAP:
case LogicalTypeId::UNION:
// lists and maps and unions don't support updates directly
Expand Down
2 changes: 1 addition & 1 deletion src/common/arrow/appender/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
add_library_unity(duckdb_common_arrow_appender OBJECT bool_data.cpp
struct_data.cpp union_data.cpp)
struct_data.cpp union_data.cpp fixed_size_list_data.cpp)
set(ALL_OBJECT_FILES
${ALL_OBJECT_FILES} $<TARGET_OBJECTS:duckdb_common_arrow_appender>
PARENT_SCOPE)
39 changes: 39 additions & 0 deletions src/common/arrow/appender/fixed_size_list_data.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#include "duckdb/common/arrow/arrow_appender.hpp"
#include "duckdb/common/arrow/appender/fixed_size_list_data.hpp"

namespace duckdb {

//===--------------------------------------------------------------------===//
// Arrays
//===--------------------------------------------------------------------===//
void ArrowFixedSizeListData::Initialize(ArrowAppendData &result, const LogicalType &type, idx_t capacity) {
auto &child_type = ArrayType::GetChildType(type);
auto array_size = ArrayType::GetSize(type);
auto child_buffer = ArrowAppender::InitializeChild(child_type, capacity * array_size, result.options);
result.child_data.push_back(std::move(child_buffer));
}

void ArrowFixedSizeListData::Append(ArrowAppendData &append_data, Vector &input, idx_t from, idx_t to,
idx_t input_size) {
UnifiedVectorFormat format;
input.ToUnifiedFormat(input_size, format);
idx_t size = to - from;
AppendValidity(append_data, format, from, to);

auto array_size = ArrayType::GetSize(input.GetType());
auto &child_vector = ArrayVector::GetEntry(input);
auto &child_data = *append_data.child_data[0];
child_data.append_vector(child_data, child_vector, from * array_size, to * array_size, size * array_size);
append_data.row_count += size;
}

void ArrowFixedSizeListData::Finalize(ArrowAppendData &append_data, const LogicalType &type, ArrowArray *result) {
result->n_buffers = 1;
auto &child_type = ArrayType::GetChildType(type);
ArrowAppender::AddChildren(append_data, 1);
result->children = append_data.child_pointers.data();
result->n_children = 1;
append_data.child_arrays[0] = *ArrowAppender::FinalizeChild(child_type, std::move(append_data.child_data[0]));
}

} // namespace duckdb
3 changes: 3 additions & 0 deletions src/common/arrow/arrow_appender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ static void InitializeFunctionPointers(ArrowAppendData &append_data, const Logic
case LogicalTypeId::STRUCT:
InitializeAppenderForType<ArrowStructData>(append_data);
break;
case LogicalTypeId::ARRAY:
InitializeAppenderForType<ArrowFixedSizeListData>(append_data);
break;
case LogicalTypeId::LIST: {
if (append_data.options.arrow_offset_size == ArrowOffsetSize::LARGE) {
InitializeAppenderForType<ArrowListData<int64_t>>(append_data);
Expand Down
17 changes: 17 additions & 0 deletions src/common/arrow/arrow_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,23 @@ void SetArrowFormat(DuckDBArrowSchemaHolder &root_holder, ArrowSchema &child, co
}
break;
}
case LogicalTypeId::ARRAY: {
auto array_size = ArrayType::GetSize(type);
auto child_type = ArrayType::GetChildType(type);
auto format = "+w:" + to_string(array_size);
root_holder.owned_type_names.push_back(AddName(format));
child.format = root_holder.owned_type_names.back().get();

child.n_children = 1;
root_holder.nested_children.emplace_back();
root_holder.nested_children.back().resize(1);
root_holder.nested_children_ptr.emplace_back();
root_holder.nested_children_ptr.back().push_back(&root_holder.nested_children.back()[0]);
InitializeChild(root_holder.nested_children.back()[0], root_holder);
child.children = &root_holder.nested_children_ptr.back()[0];
SetArrowFormat(root_holder, **child.children, child_type, options);
break;
}
case LogicalTypeId::MAP: {
SetArrowMapFormat(root_holder, child, type, options);
break;
Expand Down
12 changes: 7 additions & 5 deletions src/common/types/vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -889,14 +889,14 @@ void Vector::Flatten(idx_t count) {
break;
}
case PhysicalType::ARRAY: {
auto &child = ArrayVector::GetEntry(*this);
auto &original_child = ArrayVector::GetEntry(*this);
auto array_size = ArrayType::GetSize(GetType());

auto flattened_buffer = make_uniq<VectorArrayBuffer>(GetType(), count);
auto &new_child = flattened_buffer->GetChild();

// Make sure to initialize a validity mask for the new child vector with the correct size
if (!child.validity.AllValid()) {
if (!original_child.validity.AllValid()) {
new_child.validity.Initialize(array_size * count);
}

Expand All @@ -909,23 +909,24 @@ void Vector::Flatten(idx_t count) {
// | 2 |
// ...

child.Flatten(count * array_size);
auto child_vec = make_uniq<Vector>(original_child);
child_vec->Flatten(count * array_size);

// Create a selection vector
SelectionVector sel(count * array_size);
for (idx_t array_idx = 0; array_idx < count; array_idx++) {
for (idx_t elem_idx = 0; elem_idx < array_size; elem_idx++) {
auto position = array_idx * array_size + elem_idx;
// Broadcast the validity
if (FlatVector::IsNull(child, elem_idx)) {
if (FlatVector::IsNull(*child_vec, elem_idx)) {
FlatVector::SetNull(new_child, position, true);
}
sel.set_index(position, elem_idx);
}
}

// Copy over the data to the new buffer
VectorOperations::Copy(child, new_child, sel, count * array_size, 0, 0);
VectorOperations::Copy(*child_vec, new_child, sel, count * array_size, 0, 0);
auxiliary = shared_ptr<VectorBuffer>(flattened_buffer.release());

} break;
Expand Down Expand Up @@ -1067,6 +1068,7 @@ void Vector::Sequence(int64_t start, int64_t increment, idx_t count) {
auxiliary.reset();
}

// FIXME: This should ideally be const
void Vector::Serialize(Serializer &serializer, idx_t count) {
auto &logical_type = GetType();

Expand Down
4 changes: 2 additions & 2 deletions src/common/vector_operations/vector_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,11 @@ void VectorOperations::Copy(const Vector &source_p, Vector &target, const Select
auto array_size = ArrayType::GetSize(source->GetType());

// Create a selection vector for the child elements
SelectionVector child_sel(copy_count * array_size);
SelectionVector child_sel(source_count * array_size);
for (idx_t i = 0; i < copy_count; i++) {
auto source_idx = sel->get_index(source_offset + i);
for (idx_t j = 0; j < array_size; j++) {
child_sel.set_index(i * array_size + j, source_idx * array_size + j);
child_sel.set_index((source_offset * array_size) + (i * array_size + j), source_idx * array_size + j);
}
}
VectorOperations::Copy(source_child, target_child, child_sel, source_count * array_size,
Expand Down
2 changes: 1 addition & 1 deletion src/execution/expression_executor/execute_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ void ExpressionExecutor::Execute(const BoundFunctionExpression &expr, Expression
}
#endif
}
arguments.Verify();
}
arguments.SetCardinality(count);
arguments.Verify();

state->profiler.BeginSample();
D_ASSERT(expr.function.function);
Expand Down
2 changes: 1 addition & 1 deletion src/function/table/arrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ static unique_ptr<ArrowType> GetArrowLogicalTypeNoDictionary(ArrowSchema &schema
std::string parameters = format.substr(format.find(':') + 1);
idx_t fixed_size = std::stoi(parameters);
auto child_type = ArrowTableFunction::GetArrowLogicalType(*schema.children[0]);
auto list_type = make_uniq<ArrowType>(LogicalType::LIST(child_type->GetDuckType()), fixed_size);
auto list_type = make_uniq<ArrowType>(LogicalType::ARRAY(child_type->GetDuckType(), fixed_size), fixed_size);
list_type->AddChild(std::move(child_type));
return list_type;
} else if (format == "+s") {
Expand Down
Loading

0 comments on commit 68e4f93

Please sign in to comment.