Skip to content

Commit

Permalink
Fix DISTINCT ON queries for distributed hyperatbles
Browse files Browse the repository at this point in the history
Previously, we would push DISTINCT ON down to the data nodes even when
the pathkeys of the resulting paths on the data nodes were not
compatible with the given DISTINCT ON columns. This commit disables
pushdown when the sorting is not compatible.

Fixes timescale#3784
  • Loading branch information
akuzm committed Nov 16, 2021
1 parent b78b25d commit 0e99180
Show file tree
Hide file tree
Showing 6 changed files with 315 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ accidentally triggering the load of a previous DB version.**
* #3739 Fix compression policy on tables using INTEGER
* #3766 Fix segfault in ts_hist_sfunc
* #3789 Fix time_bucket comparison transformation
* #3784 Fix DISTINCT ON queries returning incorrect results on distributed hypertables
* #3801 Fail size utility functions when data nodes do not respond

**Thanks**
Expand Down
65 changes: 60 additions & 5 deletions tsl/src/fdw/deparse.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ static void printRemoteParam(int paramindex, Oid paramtype, int32 paramtypmod,
deparse_expr_cxt *context);
static void printRemotePlaceholder(Oid paramtype, int32 paramtypmod, deparse_expr_cxt *context);
static void deparseSelectSql(List *tlist, bool is_subquery, List **retrieved_attrs,
deparse_expr_cxt *context);
deparse_expr_cxt *context, List *pathkeys);
static void deparseLockingClause(deparse_expr_cxt *context);
static void appendOrderByClause(List *pathkeys, deparse_expr_cxt *context);
static void appendLimit(deparse_expr_cxt *context, List *pathkeys);
Expand Down Expand Up @@ -771,7 +771,7 @@ deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *rel, List
context.sca = sca;

/* Construct SELECT clause */
deparseSelectSql(tlist, is_subquery, retrieved_attrs, &context);
deparseSelectSql(tlist, is_subquery, retrieved_attrs, &context, pathkeys);

/* Construct FROM and WHERE clauses */
deparseFromExpr(remote_where, &context);
Expand Down Expand Up @@ -822,7 +822,7 @@ deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *rel, List
* deparsing which happens later is good enough
*/
static void
deparseDistinctClause(StringInfo buf, deparse_expr_cxt *context)
deparseDistinctClause(StringInfo buf, deparse_expr_cxt *context, List *pathkeys)
{
PlannerInfo *root = context->root;
Query *query = root->parse;
Expand Down Expand Up @@ -867,6 +867,60 @@ deparseDistinctClause(StringInfo buf, deparse_expr_cxt *context)
return;
}

if (query->hasDistinctOn)
{
/*
* Pushing down DISTINCT ON is more complex than plain DISTINCT.
* The DISTINCT ON columns must be a prefix of the ORDER BY columns.
* Without this, the DISTINCT ON would return an unpredictable row
* each time. There is a diagnostic for the case where the ORDER BY
* clause doesn't match the DISTINCT ON clause, so in this case we
* would get an error on the data node. There is no diagnostic for
* the case where the ORDER BY is absent, so in this case we would
* get a wrong result.
* The remote ORDER BY clause is created from the pathkeys of the
* corresponding relation. If the DISTINCT ON columns are not a prefix
* of these pathkeys, we cannot push it down.
*/
ListCell *distinct_cell, *pathkey_cell;
forboth (distinct_cell, query->distinctClause, pathkey_cell, pathkeys)
{
SortGroupClause *sgc = lfirst_node(SortGroupClause, distinct_cell);
TargetEntry *tle = get_sortgroupclause_tle(sgc, query->targetList);

PathKey *pk = lfirst_node(PathKey, pathkey_cell);
EquivalenceClass *ec = pk->pk_eclass;

/*
* The find_ec_member_matching_expr() has many checks that don't seem
* to be relevant here. Enumerate the pathkey EquivalenceMembers by
* hand and find the one that matches the DISTINCT ON expression.
*/
ListCell *ec_member_cell;
foreach (ec_member_cell, ec->ec_members)
{
EquivalenceMember *ec_member = lfirst_node(EquivalenceMember, ec_member_cell);
if (equal(ec_member->em_expr, tle->expr))
break;
}

if (ec_member_cell == NULL)
{
/*
* Went through all the equivalence class members and didn't
* find a match.
*/
return;
}
}

if (pathkey_cell == NULL && distinct_cell != NULL)
{
/* Ran out of pathkeys before we matched all the DISTINCT ON columns. */
return;
}
}

/* If there are no varno entries in the distinctClause, we are done */
if (!varno_assigned)
return;
Expand Down Expand Up @@ -937,7 +991,8 @@ deparseDistinctClause(StringInfo buf, deparse_expr_cxt *context)
* Read prologue of deparseSelectStmtForRel() for details.
*/
static void
deparseSelectSql(List *tlist, bool is_subquery, List **retrieved_attrs, deparse_expr_cxt *context)
deparseSelectSql(List *tlist, bool is_subquery, List **retrieved_attrs, deparse_expr_cxt *context,
List *pathkeys)
{
StringInfo buf = context->buf;
RelOptInfo *foreignrel = context->foreignrel;
Expand Down Expand Up @@ -981,7 +1036,7 @@ deparseSelectSql(List *tlist, bool is_subquery, List **retrieved_attrs, deparse_
Relation rel = table_open(rte->relid, NoLock);

if (root->parse->distinctClause != NIL)
deparseDistinctClause(buf, context);
deparseDistinctClause(buf, context, pathkeys);

deparseTargetList(buf,
rte,
Expand Down
178 changes: 178 additions & 0 deletions tsl/test/shared/expected/dist_distinct_pushdown.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
-- Test DISTINCT ON pushdown.
-- The case with LIMIT serves as a reference.
select ts, id from distinct_on_hypertable order by id, ts desc limit 1;
ts | id
--------------------------+----
Fri Jan 01 03:47:41 2021 | 0
(1 row)

select ts, id from distinct_on_distributed order by id, ts desc limit 1;
ts | id
--------------------------+----
Fri Jan 01 03:47:41 2021 | 0
(1 row)

-- DISTINCT ON should match the above LIMIT for the first id.
select distinct on (id) ts, id from distinct_on_hypertable order by id, ts desc;
ts | id
--------------------------+----
Fri Jan 01 03:47:41 2021 | 0
Fri Jan 01 03:47:38 2021 | 1
Fri Jan 01 03:47:39 2021 | 2
Fri Jan 01 03:47:40 2021 | 3
(4 rows)

select distinct on (id) ts, id from distinct_on_distributed order by id, ts desc;
ts | id
--------------------------+----
Fri Jan 01 03:47:41 2021 | 0
Fri Jan 01 03:47:38 2021 | 1
Fri Jan 01 03:47:39 2021 | 2
Fri Jan 01 03:47:40 2021 | 3
(4 rows)

explain (costs off, verbose)
select distinct on (id) ts, id from distinct_on_distributed order by id, ts desc;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Unique
Output: distinct_on_distributed.ts, distinct_on_distributed.id
-> Custom Scan (DataNodeScan) on public.distinct_on_distributed
Output: distinct_on_distributed.ts, distinct_on_distributed.id
Data node: data_node_1
Chunks: _dist_hyper_X_X_chunk
Remote SQL: SELECT DISTINCT ON (id) ts, id FROM public.distinct_on_distributed WHERE _timescaledb_internal.chunks_in(public.distinct_on_distributed.*, ARRAY[27]) ORDER BY id ASC NULLS LAST, ts DESC NULLS FIRST
(7 rows)

-- A case where we have a filter on the DISTINCT ON column.
select distinct on (id) ts, id from distinct_on_distributed where id in ('0', '1') order by id, ts desc;
ts | id
--------------------------+----
Fri Jan 01 03:47:41 2021 | 0
Fri Jan 01 03:47:38 2021 | 1
(2 rows)

explain (costs off, verbose)
select distinct on (id) ts, id from distinct_on_distributed where id in ('0', '1') order by id, ts desc;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Unique
Output: distinct_on_distributed.ts, distinct_on_distributed.id
-> Custom Scan (DataNodeScan) on public.distinct_on_distributed
Output: distinct_on_distributed.ts, distinct_on_distributed.id
Data node: data_node_1
Chunks: _dist_hyper_X_X_chunk
Remote SQL: SELECT DISTINCT ON (id) ts, id FROM public.distinct_on_distributed WHERE _timescaledb_internal.chunks_in(public.distinct_on_distributed.*, ARRAY[27]) AND ((id = ANY ('{0,1}'::integer[]))) ORDER BY id ASC NULLS LAST, ts DESC NULLS FIRST
(7 rows)

-- A somewhat dumb case where the DISTINCT ON column is deduced to be constant
-- and not added to pathkeys.
select distinct on (id) ts, id from distinct_on_distributed where id in ('0') order by id, ts desc;
ts | id
--------------------------+----
Fri Jan 01 03:47:41 2021 | 0
(1 row)

explain (costs off, verbose)
select distinct on (id) ts, id from distinct_on_distributed where id in ('0') order by id, ts desc;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Unique
Output: distinct_on_distributed.ts, distinct_on_distributed.id
-> Sort
Output: distinct_on_distributed.ts, distinct_on_distributed.id
Sort Key: distinct_on_distributed.ts DESC
-> Custom Scan (DataNodeScan) on public.distinct_on_distributed
Output: distinct_on_distributed.ts, distinct_on_distributed.id
Data node: data_node_1
Chunks: _dist_hyper_X_X_chunk
Remote SQL: SELECT ts, id FROM public.distinct_on_distributed WHERE _timescaledb_internal.chunks_in(public.distinct_on_distributed.*, ARRAY[27]) AND ((id = 0))
(10 rows)

-- All above but with disabled local sort, to try to force more interesting plans where the sort
-- is pushed down.
set enable_sort = 0;
select ts, id from distinct_on_distributed order by id, ts desc limit 1;
ts | id
--------------------------+----
Fri Jan 01 03:47:41 2021 | 0
(1 row)

explain (costs off, verbose)
select ts, id from distinct_on_distributed order by id, ts desc limit 1;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit
Output: distinct_on_distributed.ts, distinct_on_distributed.id
-> Custom Scan (DataNodeScan) on public.distinct_on_distributed
Output: distinct_on_distributed.ts, distinct_on_distributed.id
Data node: data_node_1
Chunks: _dist_hyper_X_X_chunk
Remote SQL: SELECT ts, id FROM public.distinct_on_distributed WHERE _timescaledb_internal.chunks_in(public.distinct_on_distributed.*, ARRAY[27]) ORDER BY id ASC NULLS LAST, ts DESC NULLS FIRST LIMIT 1
(7 rows)

select distinct on (id) ts, id from distinct_on_distributed order by id, ts desc;
ts | id
--------------------------+----
Fri Jan 01 03:47:41 2021 | 0
Fri Jan 01 03:47:38 2021 | 1
Fri Jan 01 03:47:39 2021 | 2
Fri Jan 01 03:47:40 2021 | 3
(4 rows)

explain (costs off, verbose)
select distinct on (id) ts, id from distinct_on_distributed order by id, ts desc;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Unique
Output: distinct_on_distributed.ts, distinct_on_distributed.id
-> Custom Scan (DataNodeScan) on public.distinct_on_distributed
Output: distinct_on_distributed.ts, distinct_on_distributed.id
Data node: data_node_1
Chunks: _dist_hyper_X_X_chunk
Remote SQL: SELECT DISTINCT ON (id) ts, id FROM public.distinct_on_distributed WHERE _timescaledb_internal.chunks_in(public.distinct_on_distributed.*, ARRAY[27]) ORDER BY id ASC NULLS LAST, ts DESC NULLS FIRST
(7 rows)

select distinct on (id) ts, id from distinct_on_distributed where id in ('0', '1') order by id, ts desc;
ts | id
--------------------------+----
Fri Jan 01 03:47:41 2021 | 0
Fri Jan 01 03:47:38 2021 | 1
(2 rows)

explain (costs off, verbose)
select distinct on (id) ts, id from distinct_on_distributed where id in ('0', '1') order by id, ts desc;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Unique
Output: distinct_on_distributed.ts, distinct_on_distributed.id
-> Custom Scan (DataNodeScan) on public.distinct_on_distributed
Output: distinct_on_distributed.ts, distinct_on_distributed.id
Data node: data_node_1
Chunks: _dist_hyper_X_X_chunk
Remote SQL: SELECT DISTINCT ON (id) ts, id FROM public.distinct_on_distributed WHERE _timescaledb_internal.chunks_in(public.distinct_on_distributed.*, ARRAY[27]) AND ((id = ANY ('{0,1}'::integer[]))) ORDER BY id ASC NULLS LAST, ts DESC NULLS FIRST
(7 rows)

select distinct on (id) ts, id from distinct_on_distributed where id in ('0') order by id, ts desc;
ts | id
--------------------------+----
Fri Jan 01 03:47:41 2021 | 0
(1 row)

explain (costs off, verbose)
select distinct on (id) ts, id from distinct_on_distributed where id in ('0') order by id, ts desc;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Unique
Output: distinct_on_distributed.ts, distinct_on_distributed.id
-> Custom Scan (DataNodeScan) on public.distinct_on_distributed
Output: distinct_on_distributed.ts, distinct_on_distributed.id
Data node: data_node_1
Chunks: _dist_hyper_X_X_chunk
Remote SQL: SELECT ts, id FROM public.distinct_on_distributed WHERE _timescaledb_internal.chunks_in(public.distinct_on_distributed.*, ARRAY[27]) AND ((id = 0)) ORDER BY ts DESC NULLS FIRST
(7 rows)

reset enable_sort;
3 changes: 2 additions & 1 deletion tsl/test/shared/sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ set(TEST_FILES_SHARED
dist_chunk.sql
dist_gapfill.sql
dist_insert.sql
dist_distinct.sql)
dist_distinct.sql
dist_distinct_pushdown.sql)

if((${PG_VERSION_MAJOR} GREATER_EQUAL "14"))
list(APPEND TEST_FILES_SHARED memoize.sql)
Expand Down
61 changes: 61 additions & 0 deletions tsl/test/shared/sql/dist_distinct_pushdown.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.

-- Test DISTINCT ON pushdown.


-- The case with LIMIT serves as a reference.
select ts, id from distinct_on_hypertable order by id, ts desc limit 1;
select ts, id from distinct_on_distributed order by id, ts desc limit 1;


-- DISTINCT ON should match the above LIMIT for the first id.
select distinct on (id) ts, id from distinct_on_hypertable order by id, ts desc;

select distinct on (id) ts, id from distinct_on_distributed order by id, ts desc;

explain (costs off, verbose)
select distinct on (id) ts, id from distinct_on_distributed order by id, ts desc;


-- A case where we have a filter on the DISTINCT ON column.
select distinct on (id) ts, id from distinct_on_distributed where id in ('0', '1') order by id, ts desc;

explain (costs off, verbose)
select distinct on (id) ts, id from distinct_on_distributed where id in ('0', '1') order by id, ts desc;


-- A somewhat dumb case where the DISTINCT ON column is deduced to be constant
-- and not added to pathkeys.
select distinct on (id) ts, id from distinct_on_distributed where id in ('0') order by id, ts desc;

explain (costs off, verbose)
select distinct on (id) ts, id from distinct_on_distributed where id in ('0') order by id, ts desc;


-- All above but with disabled local sort, to try to force more interesting plans where the sort
-- is pushed down.
set enable_sort = 0;

select ts, id from distinct_on_distributed order by id, ts desc limit 1;

explain (costs off, verbose)
select ts, id from distinct_on_distributed order by id, ts desc limit 1;

select distinct on (id) ts, id from distinct_on_distributed order by id, ts desc;

explain (costs off, verbose)
select distinct on (id) ts, id from distinct_on_distributed order by id, ts desc;

select distinct on (id) ts, id from distinct_on_distributed where id in ('0', '1') order by id, ts desc;

explain (costs off, verbose)
select distinct on (id) ts, id from distinct_on_distributed where id in ('0', '1') order by id, ts desc;

select distinct on (id) ts, id from distinct_on_distributed where id in ('0') order by id, ts desc;

explain (costs off, verbose)
select distinct on (id) ts, id from distinct_on_distributed where id in ('0') order by id, ts desc;

reset enable_sort;
13 changes: 13 additions & 0 deletions tsl/test/shared/sql/include/shared_setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,16 @@ SELECT table_name FROM create_distributed_hypertable('mvcp_hyper', 'time',
ALTER TABLE mvcp_hyper SET (timescaledb.compress, timescaledb.compress_orderby='time DESC');

INSERT INTO mvcp_hyper SELECT g, g FROM generate_series(0,1000) g;


-- Tables for the DISTINCT ON pushdown test
create table distinct_on_hypertable(ts timestamp, id int, val numeric);
select create_hypertable('distinct_on_hypertable', 'ts');
insert into distinct_on_hypertable select '2021-01-01 01:01:01'::timestamp + x * interval '1 second' ts,
mod(x, 4) id, r val
from (select random() r, x from generate_series(1, 10000) x) tt
order by r;

create table distinct_on_distributed(ts timestamp, id int, val numeric);
select create_distributed_hypertable('distinct_on_distributed', 'ts');
insert into distinct_on_distributed select * from distinct_on_hypertable;

0 comments on commit 0e99180

Please sign in to comment.