Skip to content

Commit

Permalink
Improve parallel workers for decompression
Browse files Browse the repository at this point in the history
So far, we have set the number of desired workers for decompression to
1. If a query touches only one chunk, we end up with one worker in a
parallel plan. Only if the query touches multiple chunks PostgreSQL
spins up multiple workers. These workers could then be used to process
the data of one chunk.

This patch removes our custom worker calculation and relies on
PostgreSQL logic to calculate the desired parallelity.

Co-authored-by: Jan Kristof Nidzwetzki <[email protected]>
  • Loading branch information
sotirissl and jnidzwetzki committed Jun 2, 2023
1 parent 10cab43 commit 086acc3
Show file tree
Hide file tree
Showing 16 changed files with 300 additions and 34 deletions.
1 change: 1 addition & 0 deletions .unreleased/5655_decompression_workers.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #5655 Improve the number of parallel workers for decompression
6 changes: 3 additions & 3 deletions src/import/allpaths.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ set_tablesample_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *
}

/* copied from allpaths.c */
static void
create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel)
void
ts_create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel)
{
int parallel_workers;

Expand Down Expand Up @@ -124,7 +124,7 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)

/* If appropriate, consider parallel sequential scan */
if (rel->consider_parallel && required_outer == NULL)
create_plain_partial_paths(root, rel);
ts_create_plain_partial_paths(root, rel);

/* Consider index scans */
create_index_paths(root, rel);
Expand Down
1 change: 1 addition & 0 deletions src/import/allpaths.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "export.h"

extern TSDLLEXPORT void ts_create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel);
extern void ts_set_rel_size(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte);
extern void ts_set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti,
RangeTblEntry *rte);
Expand Down
35 changes: 12 additions & 23 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "debug_assert.h"
#include "ts_catalog/hypertable_compression.h"
#include "import/planner.h"
#include "import/allpaths.h"
#include "compression/create.h"
#include "nodes/decompress_chunk/sorted_merge.h"
#include "nodes/decompress_chunk/decompress_chunk.h"
Expand Down Expand Up @@ -59,8 +60,7 @@ typedef enum MergeBatchResult

static RangeTblEntry *decompress_chunk_make_rte(Oid compressed_relid, LOCKMODE lockmode);
static void create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel,
int parallel_workers, CompressionInfo *info,
SortInfo *sort_info);
CompressionInfo *info, SortInfo *sort_info);

static DecompressChunkPath *decompress_chunk_path_create(PlannerInfo *root, CompressionInfo *info,
int parallel_workers,
Expand Down Expand Up @@ -576,12 +576,6 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
(info->chunk_rel->reloptkind == RELOPT_BASEREL &&
ts_rte_is_marked_for_expansion(info->chunk_rte)));

/*
* since we rely on parallel coordination from the scan below
* this node it is probably not beneficial to have more
* than a single worker per chunk
*/
int parallel_workers = 1;
SortInfo sort_info = build_sortinfo(chunk, chunk_rel, info, root->query_pathkeys);

Assert(chunk->fd.compressed_chunk_id > 0);
Expand Down Expand Up @@ -616,11 +610,8 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
}

chunk_rel->rows = new_row_estimate;
create_compressed_scan_paths(root,
compressed_rel,
compressed_rel->consider_parallel ? parallel_workers : 0,
info,
&sort_info);

create_compressed_scan_paths(root, compressed_rel, info, &sort_info);

/* compute parent relids of the chunk and use it to filter paths*/
Relids parent_relids = NULL;
Expand Down Expand Up @@ -876,7 +867,8 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
* If this is a partially compressed chunk we have to combine data
* from compressed and uncompressed chunk.
*/
path = (Path *) decompress_chunk_path_create(root, info, parallel_workers, child_path);
path = (Path *)
decompress_chunk_path_create(root, info, child_path->parallel_workers, child_path);

if (ts_chunk_is_partial(chunk))
{
Expand Down Expand Up @@ -917,7 +909,8 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
list_make2(path, uncompressed_path),
NIL /* pathkeys */,
req_outer,
parallel_workers,
Max(path->parallel_workers,
uncompressed_path->parallel_workers),
false,
NIL,
path->rows + uncompressed_path->rows);
Expand Down Expand Up @@ -1547,8 +1540,8 @@ decompress_chunk_path_create(PlannerInfo *root, CompressionInfo *info, int paral
*/

static void
create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel, int parallel_workers,
CompressionInfo *info, SortInfo *sort_info)
create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel, CompressionInfo *info,
SortInfo *sort_info)
{
Path *compressed_path;

Expand All @@ -1563,12 +1556,8 @@ create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel, int
add_path(compressed_rel, compressed_path);

/* create parallel scan path */
if (compressed_rel->consider_parallel && parallel_workers > 0)
{
compressed_path = create_seqscan_path(root, compressed_rel, NULL, parallel_workers);
Assert(compressed_path->parallel_aware);
add_partial_path(compressed_rel, compressed_path);
}
if (compressed_rel->consider_parallel)
ts_create_plain_partial_paths(root, compressed_rel);

/*
* We set enable_bitmapscan to false here to ensure any pathes with bitmapscan do not
Expand Down
164 changes: 164 additions & 0 deletions tsl/test/expected/compression.out
Original file line number Diff line number Diff line change
Expand Up @@ -1711,3 +1711,167 @@ SELECT time, const, numeric,first, avg1, avg2 FROM tidrangescan_expr ORDER BY ti

RESET timescaledb.enable_chunk_append;
RESET enable_indexscan;
-- Test the number of allocated parallel workers for decompression
-- Test that a parallel plan is generated
-- with different number of parallel workers
CREATE TABLE f_sensor_data(
time timestamptz NOT NULL,
sensor_id integer NOT NULL,
cpu double precision NULL,
temperature double precision NULL
);
SELECT FROM create_hypertable('f_sensor_data','time');
--
(1 row)

SELECT set_chunk_time_interval('f_sensor_data', INTERVAL '1 year');
set_chunk_time_interval
-------------------------

(1 row)

-- Create one chunk manually to ensure, all data is inserted into one chunk
SELECT * FROM _timescaledb_internal.create_chunk('f_sensor_data',' {"time": [181900977000000, 515024000000000]}');
chunk_id | hypertable_id | schema_name | table_name | relkind | slices | created
----------+---------------+-----------------------+--------------------+---------+----------------------------------------------+---------
71 | 37 | _timescaledb_internal | _hyper_37_71_chunk | r | {"time": [181900977000000, 515024000000000]} | t
(1 row)

INSERT INTO f_sensor_data
SELECT
time AS time,
sensor_id,
100.0,
36.6
FROM
generate_series('1980-01-01 00:00'::timestamp, '1980-02-28 12:00', INTERVAL '1 day') AS g1(time),
generate_series(1, 1700, 1 ) AS g2(sensor_id)
ORDER BY
time;
ALTER TABLE f_sensor_data SET (timescaledb.compress, timescaledb.compress_segmentby='sensor_id' ,timescaledb.compress_orderby = 'time DESC');
SELECT compress_chunk(i) FROM show_chunks('f_sensor_data') i;
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_37_71_chunk
(1 row)

-- Encourage use of parallel plans
SET parallel_setup_cost = 0;
SET parallel_tuple_cost = 0;
SET min_parallel_table_scan_size TO '0';
\set explain 'EXPLAIN (VERBOSE, COSTS OFF)'
SHOW min_parallel_table_scan_size;
min_parallel_table_scan_size
------------------------------
0
(1 row)

SHOW max_parallel_workers;
max_parallel_workers
----------------------
8
(1 row)

SHOW max_parallel_workers_per_gather;
max_parallel_workers_per_gather
---------------------------------
2
(1 row)

SET max_parallel_workers_per_gather = 4;
SHOW max_parallel_workers_per_gather;
max_parallel_workers_per_gather
---------------------------------
4
(1 row)

:explain
SELECT sum(cpu) FROM f_sensor_data;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate
Output: sum(_hyper_37_71_chunk.cpu)
-> Gather
Output: (PARTIAL sum(_hyper_37_71_chunk.cpu))
Workers Planned: 4
-> Partial Aggregate
Output: PARTIAL sum(_hyper_37_71_chunk.cpu)
-> Parallel Append
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_37_71_chunk
Output: _hyper_37_71_chunk.cpu
-> Parallel Seq Scan on _timescaledb_internal.compress_hyper_38_72_chunk
Output: compress_hyper_38_72_chunk."time", compress_hyper_38_72_chunk.sensor_id, compress_hyper_38_72_chunk.cpu, compress_hyper_38_72_chunk.temperature, compress_hyper_38_72_chunk._ts_meta_count, compress_hyper_38_72_chunk._ts_meta_sequence_num, compress_hyper_38_72_chunk._ts_meta_min_1, compress_hyper_38_72_chunk._ts_meta_max_1
(12 rows)

-- Encourage use of Index Scan
SET enable_seqscan = false;
SET enable_indexscan = true;
SET min_parallel_index_scan_size = 0;
SET min_parallel_table_scan_size = 0;
CREATE INDEX ON f_sensor_data (time, sensor_id);
:explain
SELECT * FROM f_sensor_data WHERE sensor_id > 100;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Gather
Output: _hyper_37_71_chunk."time", _hyper_37_71_chunk.sensor_id, _hyper_37_71_chunk.cpu, _hyper_37_71_chunk.temperature
Workers Planned: 2
-> Parallel Append
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_37_71_chunk
Output: _hyper_37_71_chunk."time", _hyper_37_71_chunk.sensor_id, _hyper_37_71_chunk.cpu, _hyper_37_71_chunk.temperature
-> Parallel Index Scan using compress_hyper_38_72_chunk__compressed_hypertable_38_sensor_id_ on _timescaledb_internal.compress_hyper_38_72_chunk
Output: compress_hyper_38_72_chunk."time", compress_hyper_38_72_chunk.sensor_id, compress_hyper_38_72_chunk.cpu, compress_hyper_38_72_chunk.temperature, compress_hyper_38_72_chunk._ts_meta_count, compress_hyper_38_72_chunk._ts_meta_sequence_num, compress_hyper_38_72_chunk._ts_meta_min_1, compress_hyper_38_72_chunk._ts_meta_max_1
Index Cond: (compress_hyper_38_72_chunk.sensor_id > 100)
(9 rows)

-- Test for partially compressed chunks
INSERT INTO f_sensor_data
SELECT
time AS time,
sensor_id,
100.0,
36.6
FROM
generate_series('1980-01-01 00:00'::timestamp, '1980-01-30 12:00', INTERVAL '1 day') AS g1(time),
generate_series(1700, 1800, 1 ) AS g2(sensor_id)
ORDER BY
time;
:explain
SELECT sum(cpu) FROM f_sensor_data;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate
Output: sum(_hyper_37_71_chunk.cpu)
-> Gather
Output: (PARTIAL sum(_hyper_37_71_chunk.cpu))
Workers Planned: 4
-> Partial Aggregate
Output: PARTIAL sum(_hyper_37_71_chunk.cpu)
-> Parallel Append
-> Parallel Seq Scan on _timescaledb_internal._hyper_37_71_chunk
Output: _hyper_37_71_chunk.cpu
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_37_71_chunk
Output: _hyper_37_71_chunk.cpu
-> Parallel Seq Scan on _timescaledb_internal.compress_hyper_38_72_chunk
Output: compress_hyper_38_72_chunk."time", compress_hyper_38_72_chunk.sensor_id, compress_hyper_38_72_chunk.cpu, compress_hyper_38_72_chunk.temperature, compress_hyper_38_72_chunk._ts_meta_count, compress_hyper_38_72_chunk._ts_meta_sequence_num, compress_hyper_38_72_chunk._ts_meta_min_1, compress_hyper_38_72_chunk._ts_meta_max_1
(14 rows)

:explain
SELECT * FROM f_sensor_data WHERE sensor_id > 100;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Gather
Output: _hyper_37_71_chunk."time", _hyper_37_71_chunk.sensor_id, _hyper_37_71_chunk.cpu, _hyper_37_71_chunk.temperature
Workers Planned: 3
-> Parallel Append
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_37_71_chunk
Output: _hyper_37_71_chunk."time", _hyper_37_71_chunk.sensor_id, _hyper_37_71_chunk.cpu, _hyper_37_71_chunk.temperature
Filter: (_hyper_37_71_chunk.sensor_id > 100)
-> Parallel Index Scan using compress_hyper_38_72_chunk__compressed_hypertable_38_sensor_id_ on _timescaledb_internal.compress_hyper_38_72_chunk
Output: compress_hyper_38_72_chunk."time", compress_hyper_38_72_chunk.sensor_id, compress_hyper_38_72_chunk.cpu, compress_hyper_38_72_chunk.temperature, compress_hyper_38_72_chunk._ts_meta_count, compress_hyper_38_72_chunk._ts_meta_sequence_num, compress_hyper_38_72_chunk._ts_meta_min_1, compress_hyper_38_72_chunk._ts_meta_max_1
Index Cond: (compress_hyper_38_72_chunk.sensor_id > 100)
-> Parallel Index Scan using _hyper_37_71_chunk_f_sensor_data_time_sensor_id_idx on _timescaledb_internal._hyper_37_71_chunk
Output: _hyper_37_71_chunk."time", _hyper_37_71_chunk.sensor_id, _hyper_37_71_chunk.cpu, _hyper_37_71_chunk.temperature
Index Cond: (_hyper_37_71_chunk.sensor_id > 100)
(13 rows)

6 changes: 4 additions & 2 deletions tsl/test/expected/transparent_decompression-12.out
Original file line number Diff line number Diff line change
Expand Up @@ -8382,11 +8382,12 @@ GROUP BY fleet;
SET max_parallel_workers_per_gather TO 4;
SET parallel_setup_cost = 0;
SET parallel_tuple_cost = 0;
SET min_parallel_table_scan_size TO '0';
EXPLAIN (costs off) SELECT * FROM metrics ORDER BY time, device_id;
QUERY PLAN
------------------------------------------------------------------------
Gather Merge
Workers Planned: 2
Workers Planned: 3
-> Sort
Sort Key: _hyper_1_1_chunk."time", _hyper_1_1_chunk.device_id
-> Parallel Append
Expand All @@ -8403,7 +8404,7 @@ EXPLAIN (costs off) SELECT time_bucket('10 minutes', time) bucket, avg(v0) avg_v
Finalize HashAggregate
Group Key: (time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time"))
-> Gather
Workers Planned: 2
Workers Planned: 3
-> Partial HashAggregate
Group Key: time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time")
-> Result
Expand Down Expand Up @@ -8439,6 +8440,7 @@ EXPLAIN (costs off) SELECT * FROM metrics_space ORDER BY time, device_id;
-> Parallel Seq Scan on _hyper_2_12_chunk
(19 rows)

RESET min_parallel_table_scan_size;
RESET parallel_setup_cost;
RESET parallel_tuple_cost;
SET enable_seqscan TO false;
Expand Down
6 changes: 4 additions & 2 deletions tsl/test/expected/transparent_decompression-13.out
Original file line number Diff line number Diff line change
Expand Up @@ -9465,11 +9465,12 @@ GROUP BY fleet;
SET max_parallel_workers_per_gather TO 4;
SET parallel_setup_cost = 0;
SET parallel_tuple_cost = 0;
SET min_parallel_table_scan_size TO '0';
EXPLAIN (costs off) SELECT * FROM metrics ORDER BY time, device_id;
QUERY PLAN
------------------------------------------------------------------------
Gather Merge
Workers Planned: 2
Workers Planned: 3
-> Sort
Sort Key: _hyper_1_1_chunk."time", _hyper_1_1_chunk.device_id
-> Parallel Append
Expand All @@ -9486,7 +9487,7 @@ EXPLAIN (costs off) SELECT time_bucket('10 minutes', time) bucket, avg(v0) avg_v
Finalize HashAggregate
Group Key: (time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time"))
-> Gather
Workers Planned: 2
Workers Planned: 3
-> Partial HashAggregate
Group Key: time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time")
-> Result
Expand Down Expand Up @@ -9522,6 +9523,7 @@ EXPLAIN (costs off) SELECT * FROM metrics_space ORDER BY time, device_id;
-> Parallel Seq Scan on _hyper_2_12_chunk
(19 rows)

RESET min_parallel_table_scan_size;
RESET parallel_setup_cost;
RESET parallel_tuple_cost;
SET enable_seqscan TO false;
Expand Down
6 changes: 4 additions & 2 deletions tsl/test/expected/transparent_decompression-14.out
Original file line number Diff line number Diff line change
Expand Up @@ -9679,11 +9679,12 @@ GROUP BY fleet;
SET max_parallel_workers_per_gather TO 4;
SET parallel_setup_cost = 0;
SET parallel_tuple_cost = 0;
SET min_parallel_table_scan_size TO '0';
EXPLAIN (costs off) SELECT * FROM metrics ORDER BY time, device_id;
QUERY PLAN
------------------------------------------------------------------------
Gather Merge
Workers Planned: 2
Workers Planned: 3
-> Sort
Sort Key: _hyper_1_1_chunk."time", _hyper_1_1_chunk.device_id
-> Parallel Append
Expand All @@ -9700,7 +9701,7 @@ EXPLAIN (costs off) SELECT time_bucket('10 minutes', time) bucket, avg(v0) avg_v
Finalize HashAggregate
Group Key: (time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time"))
-> Gather
Workers Planned: 2
Workers Planned: 3
-> Partial HashAggregate
Group Key: time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time")
-> Result
Expand Down Expand Up @@ -9736,6 +9737,7 @@ EXPLAIN (costs off) SELECT * FROM metrics_space ORDER BY time, device_id;
-> Parallel Seq Scan on _hyper_2_12_chunk
(19 rows)

RESET min_parallel_table_scan_size;
RESET parallel_setup_cost;
RESET parallel_tuple_cost;
SET enable_seqscan TO false;
Expand Down
Loading

0 comments on commit 086acc3

Please sign in to comment.