Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: adjust fill behavior of range query #3489

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
324 changes: 254 additions & 70 deletions src/query/src/range_select/plan.rs

Large diffs are not rendered by default.

40 changes: 24 additions & 16 deletions src/query/src/range_select/plan_rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,20 +216,28 @@ impl<'a> TreeNodeRewriter for RangeExprRewriter<'a> {
let mut data_type = range_expr.get_type(self.input_plan.schema())?;
let mut need_cast = false;
let fill = Fill::try_from_str(parse_str_expr(&func.args, 2)?, &data_type)?;
if matches!(fill, Fill::Linear) && data_type.is_integer() {
if matches!(fill, Some(Fill::Linear)) && data_type.is_integer() {
data_type = DataType::Float64;
need_cast = true;
}
inconsistent_check!(self.by, !self.by.is_empty());
inconsistent_check!(self.align, self.align != Duration::default());
inconsistent_check!(self.align_to, self.align_to != 0);
let range_fn = RangeFn {
name: format!(
"{} RANGE {} FILL {}",
range_expr.display_name()?,
parse_expr_to_string(&func.args, 1)?,
fill
),
name: if let Some(fill) = &fill {
format!(
"{} RANGE {} FILL {}",
range_expr.display_name()?,
parse_expr_to_string(&func.args, 1)?,
fill
)
} else {
format!(
"{} RANGE {}",
range_expr.display_name()?,
parse_expr_to_string(&func.args, 1)?,
)
},
data_type,
expr: range_expr,
range,
Expand Down Expand Up @@ -551,7 +559,7 @@ mod test {
async fn range_no_project() {
let query = r#"SELECT timestamp, tag_0, tag_1, avg(field_0 + field_1) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
let expected = String::from(
"RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL:Float64;N]\
"RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, AVG(test.field_0 + test.field_1) RANGE 5m:Float64;N]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
Expand All @@ -561,8 +569,8 @@ mod test {
async fn range_expr_calculation() {
let query = r#"SELECT (avg(field_0 + field_1)/4) RANGE '5m' FROM test ALIGN '1h' by (tag_0,tag_1);"#;
let expected = String::from(
"Projection: AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL / Int64(4) [AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL / Int64(4):Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
"Projection: AVG(test.field_0 + test.field_1) RANGE 5m / Int64(4) [AVG(test.field_0 + test.field_1) RANGE 5m / Int64(4):Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
Expand All @@ -573,8 +581,8 @@ mod test {
let query =
r#"SELECT (covar(field_0 + field_1, field_1)/4) RANGE '5m' FROM test ALIGN '1h';"#;
let expected = String::from(
"Projection: COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m FILL NULL / Int64(4) [COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m FILL NULL / Int64(4):Float64;N]\
\n RangeSelect: range_exprs=[COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1, test.tag_2, test.tag_3, test.tag_4], time_index=timestamp [COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8]\
"Projection: COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4) [COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m / Int64(4):Float64;N]\
\n RangeSelect: range_exprs=[COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1, test.tag_2, test.tag_3, test.tag_4], time_index=timestamp [COVARIANCE(test.field_0 + test.field_1,test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
Expand Down Expand Up @@ -621,8 +629,8 @@ mod test {
async fn range_in_expr() {
let query = r#"SELECT sin(avg(field_0 + field_1) RANGE '5m' + 1) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
let expected = String::from(
"Projection: sin(AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL + Int64(1)) [sin(AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL + Int64(1)):Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
"Projection: sin(AVG(test.field_0 + test.field_1) RANGE 5m + Int64(1)) [sin(AVG(test.field_0 + test.field_1) RANGE 5m + Int64(1)):Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
Expand All @@ -643,8 +651,8 @@ mod test {
async fn deep_nest_range_expr() {
let query = r#"SELECT round(sin(avg(field_0 + field_1) RANGE '5m' + 1)) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
let expected = String::from(
"Projection: round(sin(AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL + Int64(1))) [round(sin(AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL + Int64(1))):Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m FILL NULL:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
"Projection: round(sin(AVG(test.field_0 + test.field_1) RANGE 5m + Int64(1))) [round(sin(AVG(test.field_0 + test.field_1) RANGE 5m + Int64(1))):Float64;N]\
\n RangeSelect: range_exprs=[AVG(test.field_0 + test.field_1) RANGE 5m], align=3600000ms, align_to=0ms, align_by=[test.tag_0, test.tag_1], time_index=timestamp [AVG(test.field_0 + test.field_1) RANGE 5m:Float64;N, timestamp:Timestamp(Millisecond, None), tag_0:Utf8, tag_1:Utf8]\
\n TableScan: test [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, tag_4:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N, field_3:Float64;N, field_4:Float64;N]"
);
query_plan_compare(query, expected).await;
Expand Down
62 changes: 31 additions & 31 deletions tests/cases/standalone/common/range/by.result
Original file line number Diff line number Diff line change
Expand Up @@ -23,47 +23,47 @@ Affected Rows: 10
-- Test by calculate
SELECT ts, length(host), max(val) RANGE '5s' FROM host ALIGN '20s' BY (length(host)) ORDER BY ts;

+---------------------+-----------------------------+----------------------------------+
| ts | character_length(host.host) | MAX(host.val) RANGE 5s FILL NULL |
+---------------------+-----------------------------+----------------------------------+
| 1970-01-01T00:00:00 | 5 | 3 |
| 1970-01-01T00:00:20 | 5 | 5 |
+---------------------+-----------------------------+----------------------------------+
+---------------------+-----------------------------+------------------------+
| ts | character_length(host.host) | MAX(host.val) RANGE 5s |
+---------------------+-----------------------------+------------------------+
| 1970-01-01T00:00:00 | 5 | 3 |
| 1970-01-01T00:00:20 | 5 | 5 |
+---------------------+-----------------------------+------------------------+

SELECT ts, max(val) RANGE '5s' FROM host ALIGN '20s' BY (2) ORDER BY ts;

+---------------------+----------------------------------+
| ts | MAX(host.val) RANGE 5s FILL NULL |
+---------------------+----------------------------------+
| 1970-01-01T00:00:00 | 3 |
| 1970-01-01T00:00:20 | 5 |
+---------------------+----------------------------------+
+---------------------+------------------------+
| ts | MAX(host.val) RANGE 5s |
+---------------------+------------------------+
| 1970-01-01T00:00:00 | 3 |
| 1970-01-01T00:00:20 | 5 |
+---------------------+------------------------+

-- The user explicitly specifies that the aggregation key is empty. In this case, there is no aggregation key. All data will be aggregated into a group.
-- Implement by rewrite `BY()` to `BY(1)` automatically through sqlparser. They are semantically equivalent.
SELECT ts, max(val) RANGE '5s' FROM host ALIGN '20s' BY () ORDER BY ts;

+---------------------+----------------------------------+
| ts | MAX(host.val) RANGE 5s FILL NULL |
+---------------------+----------------------------------+
| 1970-01-01T00:00:00 | 3 |
| 1970-01-01T00:00:20 | 5 |
+---------------------+----------------------------------+
+---------------------+------------------------+
| ts | MAX(host.val) RANGE 5s |
+---------------------+------------------------+
| 1970-01-01T00:00:00 | 3 |
| 1970-01-01T00:00:20 | 5 |
+---------------------+------------------------+

SELECT ts, length(host)::INT64 + 2, max(val) RANGE '5s' FROM host ALIGN '20s' BY (length(host)::INT64 + 2) ORDER BY ts;

+---------------------+----------------------------------------+----------------------------------+
| ts | character_length(host.host) + Int64(2) | MAX(host.val) RANGE 5s FILL NULL |
+---------------------+----------------------------------------+----------------------------------+
| 1970-01-01T00:00:00 | 7 | 3 |
| 1970-01-01T00:00:20 | 7 | 5 |
+---------------------+----------------------------------------+----------------------------------+
+---------------------+----------------------------------------+------------------------+
| ts | character_length(host.host) + Int64(2) | MAX(host.val) RANGE 5s |
+---------------------+----------------------------------------+------------------------+
| 1970-01-01T00:00:00 | 7 | 3 |
| 1970-01-01T00:00:20 | 7 | 5 |
+---------------------+----------------------------------------+------------------------+

-- Test error
-- project non-aggregation key
SELECT ts, host, max(val) RANGE '5s' FROM host ALIGN '20s' BY () ORDER BY ts;

Error: 3001(EngineExecuteQuery), DataFusion error: No field named host.host. Valid fields are "MAX(host.val) RANGE 5s FILL NULL", host.ts, "Int64(1)".
Error: 3001(EngineExecuteQuery), DataFusion error: No field named host.host. Valid fields are "MAX(host.val) RANGE 5s", host.ts, "Int64(1)".

DROP TABLE host;

Expand Down Expand Up @@ -94,12 +94,12 @@ Affected Rows: 10

SELECT ts, max(val) RANGE '5s' FROM host ALIGN '20s' ORDER BY ts;

+---------------------+----------------------------------+
| ts | MAX(host.val) RANGE 5s FILL NULL |
+---------------------+----------------------------------+
| 1970-01-01T00:00:00 | 3 |
| 1970-01-01T00:00:20 | 5 |
+---------------------+----------------------------------+
+---------------------+------------------------+
| ts | MAX(host.val) RANGE 5s |
+---------------------+------------------------+
| 1970-01-01T00:00:00 | 3 |
| 1970-01-01T00:00:20 | 5 |
+---------------------+------------------------+

DROP TABLE host;

Expand Down
Loading