Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: protobuf: serde: exception bypass for canonical repeated nested #69556

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 115 additions & 101 deletions src/Formats/ProtobufSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3322,125 +3322,139 @@ namespace
}
}

/// Complex case: one or more columns are serialized as a nested message.
for (const auto & [field_descriptor, suffix] : field_descriptors_with_suffixes)
{
if (!suffix.empty())
if (suffix.empty())
continue;
Comment on lines +3328 to +3329
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This specific change is not required per se, but it arguably brings all relevant code paths into the diff.
If it may help the review, we can get rid of that 🙏


std::vector<size_t> nested_column_indices;
std::vector<std::string_view> nested_column_names;
nested_column_indices.reserve(num_columns - used_column_indices.size());
nested_column_names.reserve(num_columns - used_column_indices.size());
nested_column_indices.push_back(column_idx);
nested_column_names.push_back(suffix);

for (size_t j : collections::range(column_idx + 1, num_columns))
{
/// Complex case: one or more columns are serialized as a nested message.
std::vector<size_t> nested_column_indices;
std::vector<std::string_view> nested_column_names;
nested_column_indices.reserve(num_columns - used_column_indices.size());
nested_column_names.reserve(num_columns - used_column_indices.size());
nested_column_indices.push_back(column_idx);
nested_column_names.push_back(suffix);

for (size_t j : collections::range(column_idx + 1, num_columns))
{
if (used_column_indices_sorted.count(j))
continue;
std::string_view other_suffix;
if (!columnNameStartsWithFieldName(column_names[j], *field_descriptor, other_suffix))
continue;
nested_column_indices.push_back(j);
nested_column_names.push_back(other_suffix);
}
if (used_column_indices_sorted.count(j))
continue;
std::string_view other_suffix;
if (!columnNameStartsWithFieldName(column_names[j], *field_descriptor, other_suffix))
continue;
nested_column_indices.push_back(j);
nested_column_names.push_back(other_suffix);
}

DataTypes nested_data_types;
nested_data_types.reserve(nested_column_indices.size());
for (size_t j : nested_column_indices)
nested_data_types.push_back(data_types[j]);

DataTypes nested_data_types;
nested_data_types.reserve(nested_column_indices.size());
for (size_t j : nested_column_indices)
nested_data_types.push_back(data_types[j]);
/// Now we have up to `nested_message_column_names.size()` columns
/// which can be serialized as one or many nested message(s)

/// Now we have up to `nested_message_column_names.size()` columns
/// which can be serialized as a nested message.
/// If the field is repeated, and ALL matching columns are array, we serialize as an array of nested messages.
/// Otherwise, we first try to serialize those columns as one nested message,
/// then, if failed, as an array of nested messages (on condition if those columns are array).

/// We will try to serialize those columns as one nested message,
/// then, if failed, as an array of nested messages (on condition if those columns are array).
bool has_fallback_to_array_of_nested_messages = false;
if (field_descriptor->is_repeated())
bool repeated_field_matching_nested_columns_are_all_arrays = false;
bool repeated_field_matching_nested_columns_have_some_arrays = false;
if (field_descriptor->is_repeated())
{
repeated_field_matching_nested_columns_are_all_arrays = true;
for (const auto & nested_data_type : nested_data_types)
{
bool has_arrays
= boost::range::find_if(
nested_data_types, [](const DataTypePtr & dt) { return (dt->getTypeId() == TypeIndex::Array); })
!= nested_data_types.end();
if (has_arrays)
has_fallback_to_array_of_nested_messages = true;
if (nested_data_type->getTypeId() == TypeIndex::Array)
repeated_field_matching_nested_columns_have_some_arrays = true;
else
repeated_field_matching_nested_columns_are_all_arrays = false;
}
}

/// Try to serialize those columns as one nested message.
try
{
std::vector<size_t> used_column_indices_in_nested;
auto nested_message_serializer = buildMessageSerializerImpl(
nested_column_names.size(),
nested_column_names.data(),
nested_data_types.data(),
*field_descriptor->message_type(),
/* with_length_delimiter = */ false,
google_wrappers_special_treatment,
field_descriptor,
used_column_indices_in_nested,
/* columns_are_reordered_outside = */ true,
/* check_nested_while_filling_missing_columns = */ false);
std::vector<size_t> used_column_indices_in_nested;
auto attempt_build_serializer = [&](const DataTypes & passed_nested_data_types)
{
return buildMessageSerializerImpl(
nested_column_names.size(),
nested_column_names.data(),
passed_nested_data_types.data(),
*field_descriptor->message_type(),
/* with_length_delimiter = */ false,
google_wrappers_special_treatment,
field_descriptor,
used_column_indices_in_nested,
/* columns_are_reordered_outside = */ true,
/* check_nested_while_filling_missing_columns = */ false);

/// `columns_are_reordered_outside` is true because column indices are
/// going to be transformed and then written to the outer message,
/// see add_field_serializer() below.
/// `columns_are_reordered_outside` is true because column indices are
/// going to be transformed and then written to the outer message,
/// see next calls to add_field_serializer() further below.
};

if (nested_message_serializer)
{
transformColumnIndices(used_column_indices_in_nested, nested_column_indices);
add_field_serializer(
column_name,
std::move(used_column_indices_in_nested),
*field_descriptor,
std::move(nested_message_serializer));
break;
}
}
catch (Exception & e)
auto attempt_unwrap_and_build_array_serializer = [&]()
{
DataTypes unwrapped_nested_data_types;
unwrapped_nested_data_types.reserve(nested_data_types.size());

for (DataTypePtr & dt : nested_data_types)
unwrapped_nested_data_types.push_back(assert_cast<const DataTypeArray &>(*dt).getNestedType());

if (auto serializer = attempt_build_serializer(unwrapped_nested_data_types))
{
if ((e.code() != ErrorCodes::PROTOBUF_FIELD_NOT_REPEATED) || !has_fallback_to_array_of_nested_messages)
throw;
std::vector<std::string_view> column_names_used;
column_names_used.reserve(used_column_indices_in_nested.size());
for (const size_t i : used_column_indices_in_nested)
column_names_used.emplace_back(nested_column_names[i]);

auto array_serializer = std::make_unique<ProtobufSerializerFlattenedNestedAsArrayOfNestedMessages>(
std::move(column_names_used), field_descriptor, std::move(serializer), get_root_desc_function);

transformColumnIndices(used_column_indices_in_nested, nested_column_indices);
add_field_serializer(column_name,std::move(used_column_indices_in_nested), *field_descriptor, std::move(array_serializer));

return true;
}

if (has_fallback_to_array_of_nested_messages)
{
/// Try to serialize those columns as an array of nested messages.
removeNonArrayElements(nested_data_types, nested_column_names, nested_column_indices);
for (DataTypePtr & dt : nested_data_types)
dt = assert_cast<const DataTypeArray &>(*dt).getNestedType();

std::vector<size_t> used_column_indices_in_nested;
auto nested_message_serializer = buildMessageSerializerImpl(
nested_column_names.size(),
nested_column_names.data(),
nested_data_types.data(),
*field_descriptor->message_type(),
/* with_length_delimiter = */ false,
google_wrappers_special_treatment,
field_descriptor,
used_column_indices_in_nested,
/* columns_are_reordered_outside = */ true,
/* check_nested_while_filling_missing_columns = */ false);
return false;
};

/// `columns_are_reordered_outside` is true because column indices are
/// going to be transformed and then written to the outer message,
/// see add_field_serializer() below.
/// if the protobuf field has the repeated label,
/// for ALL matching nested cols, since they are all of type array
/// try as ProtobufSerializerFlattenedNestedAsArrayOfNestedMessages
if (repeated_field_matching_nested_columns_are_all_arrays)
{
if (attempt_unwrap_and_build_array_serializer())
break;
}

if (nested_message_serializer)
{
std::vector<std::string_view> column_names_used;
column_names_used.reserve(used_column_indices_in_nested.size());
for (size_t i : used_column_indices_in_nested)
column_names_used.emplace_back(nested_column_names[i]);
auto field_serializer = std::make_unique<ProtobufSerializerFlattenedNestedAsArrayOfNestedMessages>(
std::move(column_names_used), field_descriptor, std::move(nested_message_serializer), get_root_desc_function);
transformColumnIndices(used_column_indices_in_nested, nested_column_indices);
add_field_serializer(column_name, std::move(used_column_indices_in_nested), *field_descriptor, std::move(field_serializer));
break;
}
/// for ALL matching nested cols
/// try as ProtobufSerializerMessage
try
{
if (auto serializer = attempt_build_serializer(nested_data_types))
{
transformColumnIndices(used_column_indices_in_nested, nested_column_indices);
add_field_serializer(column_name,std::move(used_column_indices_in_nested), *field_descriptor, std::move(serializer));
break;
}
}

catch (Exception & e)
{
if ((e.code() != ErrorCodes::PROTOBUF_FIELD_NOT_REPEATED) || !repeated_field_matching_nested_columns_have_some_arrays)
throw;
}

/// if the protobuf field has the repeated label,
/// only for the SUBSET of matching nested cols that are of type Array,
/// try as ProtobufSerializerFlattenedNestedAsArrayOfNestedMessages
if (repeated_field_matching_nested_columns_have_some_arrays)
{
removeNonArrayElements(nested_data_types, nested_column_names, nested_column_indices);
if (attempt_unwrap_and_build_array_serializer())
break;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"error_not_raised":1}
{"i":1001,"j.k":[2101,2102],"j.l":[2201,2202],"m":3001,"n":[4001,4002,4003,4004],"o.key":[5001,5002],"o.value":[{"p":[{"q":5111,"r":5121}]},{"p":[{"q":5112,"r":5122},{"q":5113,"r":5123}]}]}
{"i":6001,"j.k":[7101],"j.l":[7201],"m":8001,"n":[],"o.key":[9001],"o.value":[{"p":[{"q":10111,"r":10121}]}]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/usr/bin/env bash
# Tags: no-fasttest

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh

mkdir -p "${CLICKHOUSE_SCHEMA_FILES}"
SOURCE_SCHEMA_FILE="${CURDIR}/format_schemas/03234_proto_complex_nested_repeated_noexception.proto"
TARGET_SCHEMA_FILE="${CLICKHOUSE_SCHEMA_FILES}/03234_proto_complex_nested_repeated_noexception.proto"
cp "${SOURCE_SCHEMA_FILE}" "${TARGET_SCHEMA_FILE}"

cat <<'EOF' | $CLICKHOUSE_CLIENT -mn

DROP TABLE IF EXISTS exception_counter ;
CREATE TABLE exception_counter (`val` UInt32) ENGINE = Memory ;
INSERT INTO exception_counter SELECT sum(value) FROM system.errors WHERE name = 'PROTOBUF_FIELD_NOT_REPEATED' ;

DROP TABLE IF EXISTS table_file ;
CREATE TABLE table_file (
`i` UInt32,
`j.k` Array(UInt32),
`j.l` Array(UInt32),
`m` UInt32,
`n` Array(UInt32),
`o` Nested(
`key` UInt32,
`value` Tuple(
`p` Nested(
`q` UInt32,
`r` UInt32
)
)
)
) ENGINE File(Protobuf) SETTINGS format_schema = '03234_proto_complex_nested_repeated_noexception.proto:A' ;

INSERT INTO table_file VALUES
( 1001, [2101, 2102], [2201, 2202], 3001, [4001, 4002, 4003, 4004], [5001,5002] , [ ([(5111,5121)]), ([(5112,5122),(5113,5123)]) ] ),
( 6001, [7101], [7201], 8001, [], [9001] , [ ([(10111,10121)]) ] ) ;

INSERT INTO exception_counter SELECT sum(value) FROM system.errors WHERE name = 'PROTOBUF_FIELD_NOT_REPEATED' ;
SELECT min(val) == max(val) as error_not_raised FROM exception_counter FORMAT JSONEachRow ;

SELECT * FROM table_file FORMAT JSONEachRow ;

DROP TABLE exception_counter ;
DROP TABLE table_file ;

EOF

rm -f "${TARGET_SCHEMA_FILE}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"error_not_raised":1}
{"u":1001,"v.w":[],"v.x":[],"v.y":[],"v.z":[]}
{"u":2002,"v.w":[2102],"v.x":[2202],"v.y":[[2302]],"v.z":[[2402,2403]]}
{"u":3003,"v.w":[3103,3104],"v.x":[3203,3204],"v.y":[[3303],[3304]],"v.z":[[3403,3404],[3405,3406]]}
{"u":4004,"v.w":[4104,4105,4106],"v.x":[4204,4205,4206],"v.y":[[4304],[4305],[4306]],"v.z":[[4304,4305],[4306,4307],[4308,4309]]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env bash
# Tags: no-fasttest

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh

mkdir -p "${CLICKHOUSE_SCHEMA_FILES}"
SOURCE_SCHEMA_FILE="${CURDIR}/format_schemas/03234_proto_simple_nested_repeated_noexception.proto"
TARGET_SCHEMA_FILE="${CLICKHOUSE_SCHEMA_FILES}/03234_proto_simple_nested_repeated_noexception.proto"
cp "${SOURCE_SCHEMA_FILE}" "${TARGET_SCHEMA_FILE}"

cat <<'EOF' | $CLICKHOUSE_CLIENT -mn

DROP TABLE IF EXISTS exception_counter ;
CREATE TABLE exception_counter (`val` UInt32) ENGINE = Memory ;
INSERT INTO exception_counter SELECT sum(value) FROM system.errors WHERE name = 'PROTOBUF_FIELD_NOT_REPEATED' ;

DROP TABLE IF EXISTS table_file ;
CREATE TABLE table_file (
`u` UInt32,
`v.w` Array(UInt32),
`v.x` Array(UInt32),
`v.y` Array(Array(UInt32)),
`v.z` Array(Array(UInt32))
) ENGINE File(Protobuf) SETTINGS format_schema = '03234_proto_simple_nested_repeated_noexception.proto:M' ;

INSERT INTO table_file VALUES
( 1001, [], [], [], []),
( 2002, [2102], [2202], [[2302]], [[2402, 2403]]),
( 3003, [3103, 3104], [3203, 3204], [[3303], [3304]], [[3403, 3404], [3405, 3406]]),
( 4004, [4104, 4105, 4106], [4204, 4205, 4206], [[4304], [4305], [4306]], [[4304, 4305], [4306, 4307], [4308, 4309]]);


INSERT INTO exception_counter SELECT sum(value) FROM system.errors WHERE name = 'PROTOBUF_FIELD_NOT_REPEATED' ;
SELECT min(val) == max(val) as error_not_raised FROM exception_counter FORMAT JSONEachRow ;

SELECT * FROM table_file FORMAT JSONEachRow ;

DROP TABLE exception_counter ;
DROP TABLE table_file ;

EOF

rm -f "${TARGET_SCHEMA_FILE}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
syntax = "proto3";

message A {
message B {
uint32 k = 2100 ;
uint32 l = 2200 ;
}

uint32 i = 1000 ;
repeated B j = 2000 ;
uint32 m = 3000 ;

repeated uint32 n = 4000 ;

map<uint32, C> o = 5000 ;
}

message D {
uint32 q = 5110 ;
uint32 r = 5120 ;
}

message C {
repeated D p = 5100 ;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";

message M {
uint32 u = 1000 ;
repeated N v = 2000 ;

message N {
uint32 w = 2100 ;
uint32 x = 2200 ;
repeated uint32 y = 2300 ;
repeated uint32 z = 2400 ;
}
}
Loading