Skip to content

Commit

Permalink
fix(clickhouse sink)!: make skip_unknown_fields optional
Browse files Browse the repository at this point in the history
Problem: The Clickhouse sink's `skip_unknown_fields` doesn't follow the Clickhouse
server default. It always sends a value for `skip_unknown_fields`;
furthermore, there is also no way to _disable_ `skip_unknown_fields`
setting a "strict" mode for Clickhouse. We really want to permit either
a default value from the Clickhouse server, meaning we shouldn't specify
`skip_unknown_fields` by default. Otherwise, if a user _wants_ to
specify the strict mode for unknown fields, they should then pass either
`true` or `false` for click house.

Solution: Change the `skip_unknown_fields` value to be of an
`Option<bool>` instead of just a `bool`. This permits using the defaults
provided by the Clickhouse server and doesn't send the
`skip_unknown_fields` value to the server if left unspecified.

See #22013 for the original
issue report.

Closes #22013
  • Loading branch information
PriceHiller committed Dec 11, 2024
1 parent 0b801eb commit 2f1edf0
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Allow the `skip_unknown_fields` setting to be optional, thereby allowing use of the defaults provided by the ClickHouse server. Setting it to `true` will permit skipping unknown fields and `false` will make ClickHouse strict on what fields it accepts.

authors: PriceHiller
4 changes: 3 additions & 1 deletion src/sinks/clickhouse/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ pub struct ClickhouseConfig {
pub format: Format,

/// Sets `input_format_skip_unknown_fields`, allowing ClickHouse to discard fields not present in the table schema.
///
/// If left unspecified, use the default provided by the `ClickHouse` server.
#[serde(default)]
pub skip_unknown_fields: bool,
pub skip_unknown_fields: Option<bool>,

/// Sets `date_time_input_format` to `best_effort`, allowing ClickHouse to properly parse RFC3339/ISO 8601.
#[serde(default)]
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/clickhouse/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ async fn skip_unknown_fields() {
let config = ClickhouseConfig {
endpoint: host.parse().unwrap(),
table: table.clone().try_into().unwrap(),
skip_unknown_fields: true,
skip_unknown_fields: Some(true),
compression: Compression::None,
batch,
request: TowerRequestConfig {
Expand Down
37 changes: 29 additions & 8 deletions src/sinks/clickhouse/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl RetryLogic for ClickhouseRetryLogic {
pub(super) struct ClickhouseServiceRequestBuilder {
pub(super) auth: Option<Auth>,
pub(super) endpoint: Uri,
pub(super) skip_unknown_fields: bool,
pub(super) skip_unknown_fields: Option<bool>,
pub(super) date_time_best_effort: bool,
pub(super) insert_random_shard: bool,
pub(super) compression: Compression,
Expand Down Expand Up @@ -112,7 +112,7 @@ fn set_uri_query(
database: &str,
table: &str,
format: Format,
skip_unknown: bool,
skip_unknown: Option<bool>,
date_time_best_effort: bool,
insert_random_shard: bool,
) -> crate::Result<Uri> {
Expand All @@ -135,8 +135,12 @@ fn set_uri_query(
}

uri.push_str("?input_format_import_nested_json=1&");
if skip_unknown {
uri.push_str("input_format_skip_unknown_fields=1&");
if let Some(skip_unknown) = skip_unknown {
if skip_unknown {
uri.push_str("input_format_skip_unknown_fields=1&");
} else {
uri.push_str("input_format_skip_unknown_fields=0&")
}
}
if date_time_best_effort {
uri.push_str("date_time_input_format=best_effort&")
Expand All @@ -162,13 +166,14 @@ mod tests {
"my_database",
"my_table",
Format::JsonEachRow,
false,
Some(false),
true,
false,
)
.unwrap();
assert_eq!(uri.to_string(), "http://localhost:80/?\
input_format_import_nested_json=1&\
input_format_skip_unknown_fields=0&\
date_time_input_format=best_effort&\
query=INSERT+INTO+%22my_database%22.%22my_table%22+FORMAT+JSONEachRow");

Expand All @@ -177,21 +182,22 @@ mod tests {
"my_database",
"my_\"table\"",
Format::JsonEachRow,
false,
Some(false),
false,
false,
)
.unwrap();
assert_eq!(uri.to_string(), "http://localhost:80/?\
input_format_import_nested_json=1&\
input_format_skip_unknown_fields=0&\
query=INSERT+INTO+%22my_database%22.%22my_%5C%22table%5C%22%22+FORMAT+JSONEachRow");

let uri = set_uri_query(
&"http://localhost:80".parse().unwrap(),
"my_database",
"my_\"table\"",
Format::JsonAsObject,
true,
Some(true),
true,
false,
)
Expand All @@ -201,6 +207,21 @@ mod tests {
input_format_skip_unknown_fields=1&\
date_time_input_format=best_effort&\
query=INSERT+INTO+%22my_database%22.%22my_%5C%22table%5C%22%22+FORMAT+JSONAsObject");

let uri = set_uri_query(
&"http://localhost:80".parse().unwrap(),
"my_database",
"my_\"table\"",
Format::JsonAsObject,
None,
true,
false,
)
.unwrap();
assert_eq!(uri.to_string(), "http://localhost:80/?\
input_format_import_nested_json=1&\
date_time_input_format=best_effort&\
query=INSERT+INTO+%22my_database%22.%22my_%5C%22table%5C%22%22+FORMAT+JSONAsObject");
}

#[test]
Expand All @@ -210,7 +231,7 @@ mod tests {
"my_database",
"my_table",
Format::JsonEachRow,
false,
Some(false),
false,
false,
)
Expand Down

0 comments on commit 2f1edf0

Please sign in to comment.