Skip to content

Commit

Permalink
Improve parallel workers for decompression
Browse files Browse the repository at this point in the history
Planner may also choose parallel plans when decompressing chunks.
Can use for decompression as many workers wants per chunk.

Co-authored-by: Jan Kristof Nidzwetzki <[email protected]>
  • Loading branch information
sotirissl and jnidzwetzki committed May 25, 2023
1 parent 8e69a99 commit 7f37842
Show file tree
Hide file tree
Showing 15 changed files with 230 additions and 34 deletions.
6 changes: 3 additions & 3 deletions src/import/allpaths.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ set_tablesample_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *
}

/* copied from allpaths.c */
static void
create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel)
void
ts_create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel)
{
int parallel_workers;

Expand Down Expand Up @@ -124,7 +124,7 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)

/* If appropriate, consider parallel sequential scan */
if (rel->consider_parallel && required_outer == NULL)
create_plain_partial_paths(root, rel);
ts_create_plain_partial_paths(root, rel);

/* Consider index scans */
create_index_paths(root, rel);
Expand Down
1 change: 1 addition & 0 deletions src/import/allpaths.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "export.h"

extern TSDLLEXPORT void ts_create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel);
extern void ts_set_rel_size(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte);
extern void ts_set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti,
RangeTblEntry *rte);
Expand Down
35 changes: 12 additions & 23 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "debug_assert.h"
#include "ts_catalog/hypertable_compression.h"
#include "import/planner.h"
#include "import/allpaths.h"
#include "compression/create.h"
#include "nodes/decompress_chunk/sorted_merge.h"
#include "nodes/decompress_chunk/decompress_chunk.h"
Expand Down Expand Up @@ -59,8 +60,7 @@ typedef enum MergeBatchResult

static RangeTblEntry *decompress_chunk_make_rte(Oid compressed_relid, LOCKMODE lockmode);
static void create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel,
int parallel_workers, CompressionInfo *info,
SortInfo *sort_info);
CompressionInfo *info, SortInfo *sort_info);

static DecompressChunkPath *decompress_chunk_path_create(PlannerInfo *root, CompressionInfo *info,
int parallel_workers,
Expand Down Expand Up @@ -576,12 +576,6 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
(info->chunk_rel->reloptkind == RELOPT_BASEREL &&
ts_rte_is_marked_for_expansion(info->chunk_rte)));

/*
* since we rely on parallel coordination from the scan below
* this node it is probably not beneficial to have more
* than a single worker per chunk
*/
int parallel_workers = 1;
SortInfo sort_info = build_sortinfo(chunk, chunk_rel, info, root->query_pathkeys);

Assert(chunk->fd.compressed_chunk_id > 0);
Expand Down Expand Up @@ -616,11 +610,8 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
}

chunk_rel->rows = new_row_estimate;
create_compressed_scan_paths(root,
compressed_rel,
compressed_rel->consider_parallel ? parallel_workers : 0,
info,
&sort_info);

create_compressed_scan_paths(root, compressed_rel, info, &sort_info);

/* compute parent relids of the chunk and use it to filter paths*/
Relids parent_relids = NULL;
Expand Down Expand Up @@ -838,7 +829,8 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
* If this is a partially compressed chunk we have to combine data
* from compressed and uncompressed chunk.
*/
path = (Path *) decompress_chunk_path_create(root, info, parallel_workers, child_path);
path = (Path *)
decompress_chunk_path_create(root, info, child_path->parallel_workers, child_path);

if (ts_chunk_is_partial(chunk))
{
Expand Down Expand Up @@ -879,7 +871,8 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
list_make2(path, uncompressed_path),
NIL /* pathkeys */,
req_outer,
parallel_workers,
Max(path->parallel_workers,
uncompressed_path->parallel_workers),
false,
NIL,
path->rows + uncompressed_path->rows);
Expand Down Expand Up @@ -1509,8 +1502,8 @@ decompress_chunk_path_create(PlannerInfo *root, CompressionInfo *info, int paral
*/

static void
create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel, int parallel_workers,
CompressionInfo *info, SortInfo *sort_info)
create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel, CompressionInfo *info,
SortInfo *sort_info)
{
Path *compressed_path;

Expand All @@ -1525,12 +1518,8 @@ create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel, int
add_path(compressed_rel, compressed_path);

/* create parallel scan path */
if (compressed_rel->consider_parallel && parallel_workers > 0)
{
compressed_path = create_seqscan_path(root, compressed_rel, NULL, parallel_workers);
Assert(compressed_path->parallel_aware);
add_partial_path(compressed_rel, compressed_path);
}
if (compressed_rel->consider_parallel)
ts_create_plain_partial_paths(root, compressed_rel);

/*
* We set enable_bitmapscan to false here to ensure any pathes with bitmapscan do not
Expand Down
112 changes: 112 additions & 0 deletions tsl/test/expected/compression.out
Original file line number Diff line number Diff line change
Expand Up @@ -1711,3 +1711,115 @@ SELECT time, const, numeric,first, avg1, avg2 FROM tidrangescan_expr ORDER BY ti

RESET timescaledb.enable_chunk_append;
RESET enable_indexscan;
-- Improve the number of parallel workers for decompression
-- Test that a parallel plan is generated
-- with different number of parallel workers
CREATE TABLE f_sensor_data(
time timestamptz not null,
sensor_id integer not null,
cpu double precision null,
temperature double precision null
);
SELECT FROM create_hypertable('f_sensor_data','time');
--
(1 row)

SELECT set_chunk_time_interval('f_sensor_data', INTERVAL '1 year');
set_chunk_time_interval
-------------------------

(1 row)

SELECT * FROM _timescaledb_internal.create_chunk('f_sensor_data',' {"time": [181900977000000, 515024000000000]}');
chunk_id | hypertable_id | schema_name | table_name | relkind | slices | created
----------+---------------+-----------------------+--------------------+---------+----------------------------------------------+---------
71 | 37 | _timescaledb_internal | _hyper_37_71_chunk | r | {"time": [181900977000000, 515024000000000]} | t
(1 row)

INSERT INTO f_sensor_data
SELECT
time AS time,
sensor_id,
100.0,
36.6
FROM
generate_series('1980-01-01 00:00'::timestamp, '1980-02-28 12:00', INTERVAL '1 day') AS g1(time),
generate_series(1, 1700, 1 ) AS g2(sensor_id)
ORDER BY
time;
ALTER TABLE f_sensor_data SET (timescaledb.compress, timescaledb.compress_segmentby='sensor_id' ,timescaledb.compress_orderby = 'time DESC');
SELECT compress_chunk(i) FROM show_chunks('f_sensor_data') i;
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_37_71_chunk
(1 row)

-- Encourage use of parallel plans
SET parallel_setup_cost = 0;
SET parallel_tuple_cost = 0;
SET min_parallel_table_scan_size TO '0';
\set explain 'EXPLAIN (VERBOSE, COSTS OFF)'
SHOW min_parallel_table_scan_size;
min_parallel_table_scan_size
------------------------------
0
(1 row)

SHOW max_parallel_workers;
max_parallel_workers
----------------------
8
(1 row)

SHOW max_parallel_workers_per_gather;
max_parallel_workers_per_gather
---------------------------------
2
(1 row)

SET max_parallel_workers_per_gather = 4;
SHOW max_parallel_workers_per_gather;
max_parallel_workers_per_gather
---------------------------------
4
(1 row)

:explain
SELECT sum(cpu) FROM f_sensor_data;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate
Output: sum(_hyper_37_71_chunk.cpu)
-> Gather
Output: (PARTIAL sum(_hyper_37_71_chunk.cpu))
Workers Planned: 4
-> Partial Aggregate
Output: PARTIAL sum(_hyper_37_71_chunk.cpu)
-> Parallel Append
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_37_71_chunk
Output: _hyper_37_71_chunk.cpu
-> Parallel Seq Scan on _timescaledb_internal.compress_hyper_38_72_chunk
Output: compress_hyper_38_72_chunk."time", compress_hyper_38_72_chunk.sensor_id, compress_hyper_38_72_chunk.cpu, compress_hyper_38_72_chunk.temperature, compress_hyper_38_72_chunk._ts_meta_count, compress_hyper_38_72_chunk._ts_meta_sequence_num, compress_hyper_38_72_chunk._ts_meta_min_1, compress_hyper_38_72_chunk._ts_meta_max_1
(12 rows)

-- Encourage use of Index Scan
SET enable_seqscan = false;
SET enable_indexscan = true;
SET min_parallel_index_scan_size = 0;
SET min_parallel_table_scan_size = 0;
CREATE INDEX ON f_sensor_data (time, sensor_id);
:explain
SELECT * FROM f_sensor_data WHERE sensor_id > 100;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Gather
Output: _hyper_37_71_chunk."time", _hyper_37_71_chunk.sensor_id, _hyper_37_71_chunk.cpu, _hyper_37_71_chunk.temperature
Workers Planned: 2
-> Parallel Append
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_37_71_chunk
Output: _hyper_37_71_chunk."time", _hyper_37_71_chunk.sensor_id, _hyper_37_71_chunk.cpu, _hyper_37_71_chunk.temperature
-> Parallel Index Scan using compress_hyper_38_72_chunk__compressed_hypertable_38_sensor_id_ on _timescaledb_internal.compress_hyper_38_72_chunk
Output: compress_hyper_38_72_chunk."time", compress_hyper_38_72_chunk.sensor_id, compress_hyper_38_72_chunk.cpu, compress_hyper_38_72_chunk.temperature, compress_hyper_38_72_chunk._ts_meta_count, compress_hyper_38_72_chunk._ts_meta_sequence_num, compress_hyper_38_72_chunk._ts_meta_min_1, compress_hyper_38_72_chunk._ts_meta_max_1
Index Cond: (compress_hyper_38_72_chunk.sensor_id > 100)
(9 rows)

6 changes: 4 additions & 2 deletions tsl/test/expected/transparent_decompression-12.out
Original file line number Diff line number Diff line change
Expand Up @@ -8382,11 +8382,12 @@ GROUP BY fleet;
SET max_parallel_workers_per_gather TO 4;
SET parallel_setup_cost = 0;
SET parallel_tuple_cost = 0;
SET min_parallel_table_scan_size TO '0';
EXPLAIN (costs off) SELECT * FROM metrics ORDER BY time, device_id;
QUERY PLAN
------------------------------------------------------------------------
Gather Merge
Workers Planned: 2
Workers Planned: 3
-> Sort
Sort Key: _hyper_1_1_chunk."time", _hyper_1_1_chunk.device_id
-> Parallel Append
Expand All @@ -8403,7 +8404,7 @@ EXPLAIN (costs off) SELECT time_bucket('10 minutes', time) bucket, avg(v0) avg_v
Finalize HashAggregate
Group Key: (time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time"))
-> Gather
Workers Planned: 2
Workers Planned: 3
-> Partial HashAggregate
Group Key: time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time")
-> Result
Expand Down Expand Up @@ -8439,6 +8440,7 @@ EXPLAIN (costs off) SELECT * FROM metrics_space ORDER BY time, device_id;
-> Parallel Seq Scan on _hyper_2_12_chunk
(19 rows)

RESET min_parallel_table_scan_size;
RESET parallel_setup_cost;
RESET parallel_tuple_cost;
SET enable_seqscan TO false;
Expand Down
6 changes: 4 additions & 2 deletions tsl/test/expected/transparent_decompression-13.out
Original file line number Diff line number Diff line change
Expand Up @@ -9465,11 +9465,12 @@ GROUP BY fleet;
SET max_parallel_workers_per_gather TO 4;
SET parallel_setup_cost = 0;
SET parallel_tuple_cost = 0;
SET min_parallel_table_scan_size TO '0';
EXPLAIN (costs off) SELECT * FROM metrics ORDER BY time, device_id;
QUERY PLAN
------------------------------------------------------------------------
Gather Merge
Workers Planned: 2
Workers Planned: 3
-> Sort
Sort Key: _hyper_1_1_chunk."time", _hyper_1_1_chunk.device_id
-> Parallel Append
Expand All @@ -9486,7 +9487,7 @@ EXPLAIN (costs off) SELECT time_bucket('10 minutes', time) bucket, avg(v0) avg_v
Finalize HashAggregate
Group Key: (time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time"))
-> Gather
Workers Planned: 2
Workers Planned: 3
-> Partial HashAggregate
Group Key: time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time")
-> Result
Expand Down Expand Up @@ -9522,6 +9523,7 @@ EXPLAIN (costs off) SELECT * FROM metrics_space ORDER BY time, device_id;
-> Parallel Seq Scan on _hyper_2_12_chunk
(19 rows)

RESET min_parallel_table_scan_size;
RESET parallel_setup_cost;
RESET parallel_tuple_cost;
SET enable_seqscan TO false;
Expand Down
6 changes: 4 additions & 2 deletions tsl/test/expected/transparent_decompression-14.out
Original file line number Diff line number Diff line change
Expand Up @@ -9679,11 +9679,12 @@ GROUP BY fleet;
SET max_parallel_workers_per_gather TO 4;
SET parallel_setup_cost = 0;
SET parallel_tuple_cost = 0;
SET min_parallel_table_scan_size TO '0';
EXPLAIN (costs off) SELECT * FROM metrics ORDER BY time, device_id;
QUERY PLAN
------------------------------------------------------------------------
Gather Merge
Workers Planned: 2
Workers Planned: 3
-> Sort
Sort Key: _hyper_1_1_chunk."time", _hyper_1_1_chunk.device_id
-> Parallel Append
Expand All @@ -9700,7 +9701,7 @@ EXPLAIN (costs off) SELECT time_bucket('10 minutes', time) bucket, avg(v0) avg_v
Finalize HashAggregate
Group Key: (time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time"))
-> Gather
Workers Planned: 2
Workers Planned: 3
-> Partial HashAggregate
Group Key: time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time")
-> Result
Expand Down Expand Up @@ -9736,6 +9737,7 @@ EXPLAIN (costs off) SELECT * FROM metrics_space ORDER BY time, device_id;
-> Parallel Seq Scan on _hyper_2_12_chunk
(19 rows)

RESET min_parallel_table_scan_size;
RESET parallel_setup_cost;
RESET parallel_tuple_cost;
SET enable_seqscan TO false;
Expand Down
6 changes: 4 additions & 2 deletions tsl/test/expected/transparent_decompression-15.out
Original file line number Diff line number Diff line change
Expand Up @@ -9682,11 +9682,12 @@ GROUP BY fleet;
SET max_parallel_workers_per_gather TO 4;
SET parallel_setup_cost = 0;
SET parallel_tuple_cost = 0;
SET min_parallel_table_scan_size TO '0';
EXPLAIN (costs off) SELECT * FROM metrics ORDER BY time, device_id;
QUERY PLAN
------------------------------------------------------------------------
Gather Merge
Workers Planned: 2
Workers Planned: 3
-> Sort
Sort Key: _hyper_1_1_chunk."time", _hyper_1_1_chunk.device_id
-> Parallel Append
Expand All @@ -9703,7 +9704,7 @@ EXPLAIN (costs off) SELECT time_bucket('10 minutes', time) bucket, avg(v0) avg_v
Finalize HashAggregate
Group Key: (time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time"))
-> Gather
Workers Planned: 2
Workers Planned: 3
-> Partial HashAggregate
Group Key: time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time")
-> Result
Expand Down Expand Up @@ -9739,6 +9740,7 @@ EXPLAIN (costs off) SELECT * FROM metrics_space ORDER BY time, device_id;
-> Parallel Seq Scan on _hyper_2_12_chunk
(19 rows)

RESET min_parallel_table_scan_size;
RESET parallel_setup_cost;
RESET parallel_tuple_cost;
SET enable_seqscan TO false;
Expand Down
4 changes: 4 additions & 0 deletions tsl/test/shared/expected/transparent_decompress_chunk-12.out
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ QUERY PLAN

DROP VIEW compressed_view;
SET parallel_leader_participation TO off;
SET min_parallel_table_scan_size TO '0';
-- test INNER JOIN
:PREFIX_NO_VERBOSE
SELECT *
Expand Down Expand Up @@ -709,6 +710,7 @@ QUERY PLAN
Index Cond: (device_id = 3)
(16 rows)

RESET min_parallel_table_scan_size;
:PREFIX_NO_VERBOSE
SELECT *
FROM :TEST_TABLE m1
Expand Down Expand Up @@ -768,6 +770,7 @@ QUERY PLAN
(20 rows)

-- test OUTER JOIN
SET min_parallel_table_scan_size TO '0';
:PREFIX_NO_VERBOSE
SELECT *
FROM :TEST_TABLE m1
Expand All @@ -790,6 +793,7 @@ QUERY PLAN
-> Seq Scan on compress_hyper_X_X_chunk compress_hyper_X_X_chunk_1
(11 rows)

RESET min_parallel_table_scan_size;
:PREFIX_NO_VERBOSE
SELECT *
FROM :TEST_TABLE m1
Expand Down
Loading

0 comments on commit 7f37842

Please sign in to comment.