diff --git a/.unreleased/5655_decompression_workers.txt b/.unreleased/5655_decompression_workers.txt new file mode 100644 index 00000000000..9a7d946ec0a --- /dev/null +++ b/.unreleased/5655_decompression_workers.txt @@ -0,0 +1 @@ +Implements: #5655 Improve the number of parallel workers for decompression \ No newline at end of file diff --git a/src/import/allpaths.c b/src/import/allpaths.c index 3cbabf4563f..c9dcace59ff 100644 --- a/src/import/allpaths.c +++ b/src/import/allpaths.c @@ -90,8 +90,8 @@ set_tablesample_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry * } /* copied from allpaths.c */ -static void -create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel) +void +ts_create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel) { int parallel_workers; @@ -124,7 +124,7 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte) /* If appropriate, consider parallel sequential scan */ if (rel->consider_parallel && required_outer == NULL) - create_plain_partial_paths(root, rel); + ts_create_plain_partial_paths(root, rel); /* Consider index scans */ create_index_paths(root, rel); diff --git a/src/import/allpaths.h b/src/import/allpaths.h index e8ba9c237de..03da063efb2 100644 --- a/src/import/allpaths.h +++ b/src/import/allpaths.h @@ -11,6 +11,7 @@ #include "export.h" +extern TSDLLEXPORT void ts_create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel); extern void ts_set_rel_size(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte); extern void ts_set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte); diff --git a/tsl/src/nodes/decompress_chunk/decompress_chunk.c b/tsl/src/nodes/decompress_chunk/decompress_chunk.c index ce7feeeb362..a8b53fc743a 100644 --- a/tsl/src/nodes/decompress_chunk/decompress_chunk.c +++ b/tsl/src/nodes/decompress_chunk/decompress_chunk.c @@ -26,6 +26,7 @@ #include "debug_assert.h" #include "ts_catalog/hypertable_compression.h" #include "import/planner.h" +#include "import/allpaths.h" #include "compression/create.h" #include "nodes/decompress_chunk/sorted_merge.h" #include "nodes/decompress_chunk/decompress_chunk.h" @@ -59,8 +60,7 @@ typedef enum MergeBatchResult static RangeTblEntry *decompress_chunk_make_rte(Oid compressed_relid, LOCKMODE lockmode); static void create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel, - int parallel_workers, CompressionInfo *info, - SortInfo *sort_info); + CompressionInfo *info, SortInfo *sort_info); static DecompressChunkPath *decompress_chunk_path_create(PlannerInfo *root, CompressionInfo *info, int parallel_workers, @@ -576,12 +576,6 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp (info->chunk_rel->reloptkind == RELOPT_BASEREL && ts_rte_is_marked_for_expansion(info->chunk_rte))); - /* - * since we rely on parallel coordination from the scan below - * this node it is probably not beneficial to have more - * than a single worker per chunk - */ - int parallel_workers = 1; SortInfo sort_info = build_sortinfo(chunk, chunk_rel, info, root->query_pathkeys); Assert(chunk->fd.compressed_chunk_id > 0); @@ -616,11 +610,8 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp } chunk_rel->rows = new_row_estimate; - create_compressed_scan_paths(root, - compressed_rel, - compressed_rel->consider_parallel ? parallel_workers : 0, - info, - &sort_info); + + create_compressed_scan_paths(root, compressed_rel, info, &sort_info); /* compute parent relids of the chunk and use it to filter paths*/ Relids parent_relids = NULL; @@ -876,7 +867,8 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp * If this is a partially compressed chunk we have to combine data * from compressed and uncompressed chunk. */ - path = (Path *) decompress_chunk_path_create(root, info, parallel_workers, child_path); + path = (Path *) + decompress_chunk_path_create(root, info, child_path->parallel_workers, child_path); if (ts_chunk_is_partial(chunk)) { @@ -917,7 +909,8 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp list_make2(path, uncompressed_path), NIL /* pathkeys */, req_outer, - parallel_workers, + Max(path->parallel_workers, + uncompressed_path->parallel_workers), false, NIL, path->rows + uncompressed_path->rows); @@ -1547,8 +1540,8 @@ decompress_chunk_path_create(PlannerInfo *root, CompressionInfo *info, int paral */ static void -create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel, int parallel_workers, - CompressionInfo *info, SortInfo *sort_info) +create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel, CompressionInfo *info, + SortInfo *sort_info) { Path *compressed_path; @@ -1563,12 +1556,8 @@ create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel, int add_path(compressed_rel, compressed_path); /* create parallel scan path */ - if (compressed_rel->consider_parallel && parallel_workers > 0) - { - compressed_path = create_seqscan_path(root, compressed_rel, NULL, parallel_workers); - Assert(compressed_path->parallel_aware); - add_partial_path(compressed_rel, compressed_path); - } + if (compressed_rel->consider_parallel) + ts_create_plain_partial_paths(root, compressed_rel); /* * We set enable_bitmapscan to false here to ensure any pathes with bitmapscan do not diff --git a/tsl/test/expected/compression.out b/tsl/test/expected/compression.out index d3a84135d07..346c0cc96b9 100644 --- a/tsl/test/expected/compression.out +++ b/tsl/test/expected/compression.out @@ -1711,3 +1711,167 @@ SELECT time, const, numeric,first, avg1, avg2 FROM tidrangescan_expr ORDER BY ti RESET timescaledb.enable_chunk_append; RESET enable_indexscan; +-- Test the number of allocated parallel workers for decompression +-- Test that a parallel plan is generated +-- with different number of parallel workers +CREATE TABLE f_sensor_data( + time timestamptz NOT NULL, + sensor_id integer NOT NULL, + cpu double precision NULL, + temperature double precision NULL + ); +SELECT FROM create_hypertable('f_sensor_data','time'); +-- +(1 row) + +SELECT set_chunk_time_interval('f_sensor_data', INTERVAL '1 year'); + set_chunk_time_interval +------------------------- + +(1 row) + +-- Create one chunk manually to ensure, all data is inserted into one chunk +SELECT * FROM _timescaledb_internal.create_chunk('f_sensor_data',' {"time": [181900977000000, 515024000000000]}'); + chunk_id | hypertable_id | schema_name | table_name | relkind | slices | created +----------+---------------+-----------------------+--------------------+---------+----------------------------------------------+--------- + 71 | 37 | _timescaledb_internal | _hyper_37_71_chunk | r | {"time": [181900977000000, 515024000000000]} | t +(1 row) + +INSERT INTO f_sensor_data +SELECT + time AS time, + sensor_id, + 100.0, + 36.6 +FROM + generate_series('1980-01-01 00:00'::timestamp, '1980-02-28 12:00', INTERVAL '1 day') AS g1(time), + generate_series(1, 1700, 1 ) AS g2(sensor_id) +ORDER BY + time; +ALTER TABLE f_sensor_data SET (timescaledb.compress, timescaledb.compress_segmentby='sensor_id' ,timescaledb.compress_orderby = 'time DESC'); +SELECT compress_chunk(i) FROM show_chunks('f_sensor_data') i; + compress_chunk +------------------------------------------ + _timescaledb_internal._hyper_37_71_chunk +(1 row) + +-- Encourage use of parallel plans +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0; +SET min_parallel_table_scan_size TO '0'; +\set explain 'EXPLAIN (VERBOSE, COSTS OFF)' +SHOW min_parallel_table_scan_size; + min_parallel_table_scan_size +------------------------------ + 0 +(1 row) + +SHOW max_parallel_workers; + max_parallel_workers +---------------------- + 8 +(1 row) + +SHOW max_parallel_workers_per_gather; + max_parallel_workers_per_gather +--------------------------------- + 2 +(1 row) + +SET max_parallel_workers_per_gather = 4; +SHOW max_parallel_workers_per_gather; + max_parallel_workers_per_gather +--------------------------------- + 4 +(1 row) + +:explain +SELECT sum(cpu) FROM f_sensor_data; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Finalize Aggregate + Output: sum(_hyper_37_71_chunk.cpu) + -> Gather + Output: (PARTIAL sum(_hyper_37_71_chunk.cpu)) + Workers Planned: 4 + -> Partial Aggregate + Output: PARTIAL sum(_hyper_37_71_chunk.cpu) + -> Parallel Append + -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_37_71_chunk + Output: _hyper_37_71_chunk.cpu + -> Parallel Seq Scan on _timescaledb_internal.compress_hyper_38_72_chunk + Output: compress_hyper_38_72_chunk."time", compress_hyper_38_72_chunk.sensor_id, compress_hyper_38_72_chunk.cpu, compress_hyper_38_72_chunk.temperature, compress_hyper_38_72_chunk._ts_meta_count, compress_hyper_38_72_chunk._ts_meta_sequence_num, compress_hyper_38_72_chunk._ts_meta_min_1, compress_hyper_38_72_chunk._ts_meta_max_1 +(12 rows) + +-- Encourage use of Index Scan +SET enable_seqscan = false; +SET enable_indexscan = true; +SET min_parallel_index_scan_size = 0; +SET min_parallel_table_scan_size = 0; +CREATE INDEX ON f_sensor_data (time, sensor_id); +:explain +SELECT * FROM f_sensor_data WHERE sensor_id > 100; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather + Output: _hyper_37_71_chunk."time", _hyper_37_71_chunk.sensor_id, _hyper_37_71_chunk.cpu, _hyper_37_71_chunk.temperature + Workers Planned: 2 + -> Parallel Append + -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_37_71_chunk + Output: _hyper_37_71_chunk."time", _hyper_37_71_chunk.sensor_id, _hyper_37_71_chunk.cpu, _hyper_37_71_chunk.temperature + -> Parallel Index Scan using compress_hyper_38_72_chunk__compressed_hypertable_38_sensor_id_ on _timescaledb_internal.compress_hyper_38_72_chunk + Output: compress_hyper_38_72_chunk."time", compress_hyper_38_72_chunk.sensor_id, compress_hyper_38_72_chunk.cpu, compress_hyper_38_72_chunk.temperature, compress_hyper_38_72_chunk._ts_meta_count, compress_hyper_38_72_chunk._ts_meta_sequence_num, compress_hyper_38_72_chunk._ts_meta_min_1, compress_hyper_38_72_chunk._ts_meta_max_1 + Index Cond: (compress_hyper_38_72_chunk.sensor_id > 100) +(9 rows) + +-- Test for partially compressed chunks +INSERT INTO f_sensor_data +SELECT + time AS time, + sensor_id, + 100.0, + 36.6 +FROM + generate_series('1980-01-01 00:00'::timestamp, '1980-01-30 12:00', INTERVAL '1 day') AS g1(time), + generate_series(1700, 1800, 1 ) AS g2(sensor_id) +ORDER BY + time; +:explain +SELECT sum(cpu) FROM f_sensor_data; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Finalize Aggregate + Output: sum(_hyper_37_71_chunk.cpu) + -> Gather + Output: (PARTIAL sum(_hyper_37_71_chunk.cpu)) + Workers Planned: 4 + -> Partial Aggregate + Output: PARTIAL sum(_hyper_37_71_chunk.cpu) + -> Parallel Append + -> Parallel Seq Scan on _timescaledb_internal._hyper_37_71_chunk + Output: _hyper_37_71_chunk.cpu + -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_37_71_chunk + Output: _hyper_37_71_chunk.cpu + -> Parallel Seq Scan on _timescaledb_internal.compress_hyper_38_72_chunk + Output: compress_hyper_38_72_chunk."time", compress_hyper_38_72_chunk.sensor_id, compress_hyper_38_72_chunk.cpu, compress_hyper_38_72_chunk.temperature, compress_hyper_38_72_chunk._ts_meta_count, compress_hyper_38_72_chunk._ts_meta_sequence_num, compress_hyper_38_72_chunk._ts_meta_min_1, compress_hyper_38_72_chunk._ts_meta_max_1 +(14 rows) + +:explain +SELECT * FROM f_sensor_data WHERE sensor_id > 100; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather + Output: _hyper_37_71_chunk."time", _hyper_37_71_chunk.sensor_id, _hyper_37_71_chunk.cpu, _hyper_37_71_chunk.temperature + Workers Planned: 3 + -> Parallel Append + -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_37_71_chunk + Output: _hyper_37_71_chunk."time", _hyper_37_71_chunk.sensor_id, _hyper_37_71_chunk.cpu, _hyper_37_71_chunk.temperature + Filter: (_hyper_37_71_chunk.sensor_id > 100) + -> Parallel Index Scan using compress_hyper_38_72_chunk__compressed_hypertable_38_sensor_id_ on _timescaledb_internal.compress_hyper_38_72_chunk + Output: compress_hyper_38_72_chunk."time", compress_hyper_38_72_chunk.sensor_id, compress_hyper_38_72_chunk.cpu, compress_hyper_38_72_chunk.temperature, compress_hyper_38_72_chunk._ts_meta_count, compress_hyper_38_72_chunk._ts_meta_sequence_num, compress_hyper_38_72_chunk._ts_meta_min_1, compress_hyper_38_72_chunk._ts_meta_max_1 + Index Cond: (compress_hyper_38_72_chunk.sensor_id > 100) + -> Parallel Index Scan using _hyper_37_71_chunk_f_sensor_data_time_sensor_id_idx on _timescaledb_internal._hyper_37_71_chunk + Output: _hyper_37_71_chunk."time", _hyper_37_71_chunk.sensor_id, _hyper_37_71_chunk.cpu, _hyper_37_71_chunk.temperature + Index Cond: (_hyper_37_71_chunk.sensor_id > 100) +(13 rows) + diff --git a/tsl/test/expected/transparent_decompression-12.out b/tsl/test/expected/transparent_decompression-12.out index ca3f1f57a1b..bf1dd4d14f7 100644 --- a/tsl/test/expected/transparent_decompression-12.out +++ b/tsl/test/expected/transparent_decompression-12.out @@ -8382,11 +8382,12 @@ GROUP BY fleet; SET max_parallel_workers_per_gather TO 4; SET parallel_setup_cost = 0; SET parallel_tuple_cost = 0; +SET min_parallel_table_scan_size TO '0'; EXPLAIN (costs off) SELECT * FROM metrics ORDER BY time, device_id; QUERY PLAN ------------------------------------------------------------------------ Gather Merge - Workers Planned: 2 + Workers Planned: 3 -> Sort Sort Key: _hyper_1_1_chunk."time", _hyper_1_1_chunk.device_id -> Parallel Append @@ -8403,7 +8404,7 @@ EXPLAIN (costs off) SELECT time_bucket('10 minutes', time) bucket, avg(v0) avg_v Finalize HashAggregate Group Key: (time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time")) -> Gather - Workers Planned: 2 + Workers Planned: 3 -> Partial HashAggregate Group Key: time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time") -> Result @@ -8439,6 +8440,7 @@ EXPLAIN (costs off) SELECT * FROM metrics_space ORDER BY time, device_id; -> Parallel Seq Scan on _hyper_2_12_chunk (19 rows) +RESET min_parallel_table_scan_size; RESET parallel_setup_cost; RESET parallel_tuple_cost; SET enable_seqscan TO false; diff --git a/tsl/test/expected/transparent_decompression-13.out b/tsl/test/expected/transparent_decompression-13.out index bb55e71c972..25c4001f1ff 100644 --- a/tsl/test/expected/transparent_decompression-13.out +++ b/tsl/test/expected/transparent_decompression-13.out @@ -9465,11 +9465,12 @@ GROUP BY fleet; SET max_parallel_workers_per_gather TO 4; SET parallel_setup_cost = 0; SET parallel_tuple_cost = 0; +SET min_parallel_table_scan_size TO '0'; EXPLAIN (costs off) SELECT * FROM metrics ORDER BY time, device_id; QUERY PLAN ------------------------------------------------------------------------ Gather Merge - Workers Planned: 2 + Workers Planned: 3 -> Sort Sort Key: _hyper_1_1_chunk."time", _hyper_1_1_chunk.device_id -> Parallel Append @@ -9486,7 +9487,7 @@ EXPLAIN (costs off) SELECT time_bucket('10 minutes', time) bucket, avg(v0) avg_v Finalize HashAggregate Group Key: (time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time")) -> Gather - Workers Planned: 2 + Workers Planned: 3 -> Partial HashAggregate Group Key: time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time") -> Result @@ -9522,6 +9523,7 @@ EXPLAIN (costs off) SELECT * FROM metrics_space ORDER BY time, device_id; -> Parallel Seq Scan on _hyper_2_12_chunk (19 rows) +RESET min_parallel_table_scan_size; RESET parallel_setup_cost; RESET parallel_tuple_cost; SET enable_seqscan TO false; diff --git a/tsl/test/expected/transparent_decompression-14.out b/tsl/test/expected/transparent_decompression-14.out index e3417e5caa5..0fe9508bcc8 100644 --- a/tsl/test/expected/transparent_decompression-14.out +++ b/tsl/test/expected/transparent_decompression-14.out @@ -9679,11 +9679,12 @@ GROUP BY fleet; SET max_parallel_workers_per_gather TO 4; SET parallel_setup_cost = 0; SET parallel_tuple_cost = 0; +SET min_parallel_table_scan_size TO '0'; EXPLAIN (costs off) SELECT * FROM metrics ORDER BY time, device_id; QUERY PLAN ------------------------------------------------------------------------ Gather Merge - Workers Planned: 2 + Workers Planned: 3 -> Sort Sort Key: _hyper_1_1_chunk."time", _hyper_1_1_chunk.device_id -> Parallel Append @@ -9700,7 +9701,7 @@ EXPLAIN (costs off) SELECT time_bucket('10 minutes', time) bucket, avg(v0) avg_v Finalize HashAggregate Group Key: (time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time")) -> Gather - Workers Planned: 2 + Workers Planned: 3 -> Partial HashAggregate Group Key: time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time") -> Result @@ -9736,6 +9737,7 @@ EXPLAIN (costs off) SELECT * FROM metrics_space ORDER BY time, device_id; -> Parallel Seq Scan on _hyper_2_12_chunk (19 rows) +RESET min_parallel_table_scan_size; RESET parallel_setup_cost; RESET parallel_tuple_cost; SET enable_seqscan TO false; diff --git a/tsl/test/expected/transparent_decompression-15.out b/tsl/test/expected/transparent_decompression-15.out index 56e7fe2fac5..a57e3d5ff78 100644 --- a/tsl/test/expected/transparent_decompression-15.out +++ b/tsl/test/expected/transparent_decompression-15.out @@ -9682,11 +9682,12 @@ GROUP BY fleet; SET max_parallel_workers_per_gather TO 4; SET parallel_setup_cost = 0; SET parallel_tuple_cost = 0; +SET min_parallel_table_scan_size TO '0'; EXPLAIN (costs off) SELECT * FROM metrics ORDER BY time, device_id; QUERY PLAN ------------------------------------------------------------------------ Gather Merge - Workers Planned: 2 + Workers Planned: 3 -> Sort Sort Key: _hyper_1_1_chunk."time", _hyper_1_1_chunk.device_id -> Parallel Append @@ -9703,7 +9704,7 @@ EXPLAIN (costs off) SELECT time_bucket('10 minutes', time) bucket, avg(v0) avg_v Finalize HashAggregate Group Key: (time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time")) -> Gather - Workers Planned: 2 + Workers Planned: 3 -> Partial HashAggregate Group Key: time_bucket('@ 10 mins'::interval, _hyper_1_1_chunk."time") -> Result @@ -9739,6 +9740,7 @@ EXPLAIN (costs off) SELECT * FROM metrics_space ORDER BY time, device_id; -> Parallel Seq Scan on _hyper_2_12_chunk (19 rows) +RESET min_parallel_table_scan_size; RESET parallel_setup_cost; RESET parallel_tuple_cost; SET enable_seqscan TO false; diff --git a/tsl/test/shared/expected/transparent_decompress_chunk-12.out b/tsl/test/shared/expected/transparent_decompress_chunk-12.out index 9d0576efa02..7153987661e 100644 --- a/tsl/test/shared/expected/transparent_decompress_chunk-12.out +++ b/tsl/test/shared/expected/transparent_decompress_chunk-12.out @@ -657,6 +657,7 @@ QUERY PLAN DROP VIEW compressed_view; SET parallel_leader_participation TO off; +SET min_parallel_table_scan_size TO '0'; -- test INNER JOIN :PREFIX_NO_VERBOSE SELECT * @@ -709,6 +710,7 @@ QUERY PLAN Index Cond: (device_id = 3) (16 rows) +RESET min_parallel_table_scan_size; :PREFIX_NO_VERBOSE SELECT * FROM :TEST_TABLE m1 @@ -768,6 +770,7 @@ QUERY PLAN (20 rows) -- test OUTER JOIN +SET min_parallel_table_scan_size TO '0'; :PREFIX_NO_VERBOSE SELECT * FROM :TEST_TABLE m1 @@ -790,6 +793,7 @@ QUERY PLAN -> Seq Scan on compress_hyper_X_X_chunk compress_hyper_X_X_chunk_1 (11 rows) +RESET min_parallel_table_scan_size; :PREFIX_NO_VERBOSE SELECT * FROM :TEST_TABLE m1 diff --git a/tsl/test/shared/expected/transparent_decompress_chunk-13.out b/tsl/test/shared/expected/transparent_decompress_chunk-13.out index b0ab0e64653..04c1d8adef5 100644 --- a/tsl/test/shared/expected/transparent_decompress_chunk-13.out +++ b/tsl/test/shared/expected/transparent_decompress_chunk-13.out @@ -659,6 +659,7 @@ QUERY PLAN DROP VIEW compressed_view; SET parallel_leader_participation TO off; +SET min_parallel_table_scan_size TO '0'; -- test INNER JOIN :PREFIX_NO_VERBOSE SELECT * @@ -711,6 +712,7 @@ QUERY PLAN Index Cond: (device_id = 3) (16 rows) +RESET min_parallel_table_scan_size; :PREFIX_NO_VERBOSE SELECT * FROM :TEST_TABLE m1 @@ -770,6 +772,7 @@ QUERY PLAN (20 rows) -- test OUTER JOIN +SET min_parallel_table_scan_size TO '0'; :PREFIX_NO_VERBOSE SELECT * FROM :TEST_TABLE m1 @@ -792,6 +795,7 @@ QUERY PLAN -> Seq Scan on compress_hyper_X_X_chunk compress_hyper_X_X_chunk_1 (11 rows) +RESET min_parallel_table_scan_size; :PREFIX_NO_VERBOSE SELECT * FROM :TEST_TABLE m1 diff --git a/tsl/test/shared/expected/transparent_decompress_chunk-14.out b/tsl/test/shared/expected/transparent_decompress_chunk-14.out index b0ab0e64653..04c1d8adef5 100644 --- a/tsl/test/shared/expected/transparent_decompress_chunk-14.out +++ b/tsl/test/shared/expected/transparent_decompress_chunk-14.out @@ -659,6 +659,7 @@ QUERY PLAN DROP VIEW compressed_view; SET parallel_leader_participation TO off; +SET min_parallel_table_scan_size TO '0'; -- test INNER JOIN :PREFIX_NO_VERBOSE SELECT * @@ -711,6 +712,7 @@ QUERY PLAN Index Cond: (device_id = 3) (16 rows) +RESET min_parallel_table_scan_size; :PREFIX_NO_VERBOSE SELECT * FROM :TEST_TABLE m1 @@ -770,6 +772,7 @@ QUERY PLAN (20 rows) -- test OUTER JOIN +SET min_parallel_table_scan_size TO '0'; :PREFIX_NO_VERBOSE SELECT * FROM :TEST_TABLE m1 @@ -792,6 +795,7 @@ QUERY PLAN -> Seq Scan on compress_hyper_X_X_chunk compress_hyper_X_X_chunk_1 (11 rows) +RESET min_parallel_table_scan_size; :PREFIX_NO_VERBOSE SELECT * FROM :TEST_TABLE m1 diff --git a/tsl/test/shared/expected/transparent_decompress_chunk-15.out b/tsl/test/shared/expected/transparent_decompress_chunk-15.out index 2f22cd63016..97c1ece1cd0 100644 --- a/tsl/test/shared/expected/transparent_decompress_chunk-15.out +++ b/tsl/test/shared/expected/transparent_decompress_chunk-15.out @@ -661,6 +661,7 @@ QUERY PLAN DROP VIEW compressed_view; SET parallel_leader_participation TO off; +SET min_parallel_table_scan_size TO '0'; -- test INNER JOIN :PREFIX_NO_VERBOSE SELECT * @@ -713,6 +714,7 @@ QUERY PLAN Index Cond: (device_id = 3) (16 rows) +RESET min_parallel_table_scan_size; :PREFIX_NO_VERBOSE SELECT * FROM :TEST_TABLE m1 @@ -772,6 +774,7 @@ QUERY PLAN (20 rows) -- test OUTER JOIN +SET min_parallel_table_scan_size TO '0'; :PREFIX_NO_VERBOSE SELECT * FROM :TEST_TABLE m1 @@ -794,6 +797,7 @@ QUERY PLAN -> Seq Scan on compress_hyper_X_X_chunk compress_hyper_X_X_chunk_1 (11 rows) +RESET min_parallel_table_scan_size; :PREFIX_NO_VERBOSE SELECT * FROM :TEST_TABLE m1 diff --git a/tsl/test/shared/sql/transparent_decompress_chunk.sql.in b/tsl/test/shared/sql/transparent_decompress_chunk.sql.in index 954b426adf7..8db2bdc488b 100644 --- a/tsl/test/shared/sql/transparent_decompress_chunk.sql.in +++ b/tsl/test/shared/sql/transparent_decompress_chunk.sql.in @@ -188,6 +188,7 @@ CREATE OR REPLACE VIEW compressed_view AS SELECT time, device_id, v1, v2 FROM :T DROP VIEW compressed_view; SET parallel_leader_participation TO off; +SET min_parallel_table_scan_size TO '0'; -- test INNER JOIN :PREFIX_NO_VERBOSE SELECT * @@ -208,6 +209,7 @@ FROM :TEST_TABLE m1 ORDER BY m1.time, m1.device_id LIMIT 10; +RESET min_parallel_table_scan_size; :PREFIX_NO_VERBOSE SELECT * @@ -234,6 +236,7 @@ FROM metrics m1 LIMIT 100; -- test OUTER JOIN +SET min_parallel_table_scan_size TO '0'; :PREFIX_NO_VERBOSE SELECT * FROM :TEST_TABLE m1 @@ -242,6 +245,7 @@ FROM :TEST_TABLE m1 ORDER BY m1.time, m1.device_id LIMIT 10; +RESET min_parallel_table_scan_size; :PREFIX_NO_VERBOSE SELECT * diff --git a/tsl/test/sql/compression.sql b/tsl/test/sql/compression.sql index 5644e6fed43..c4ad3fdfa80 100644 --- a/tsl/test/sql/compression.sql +++ b/tsl/test/sql/compression.sql @@ -757,3 +757,84 @@ SET enable_indexscan to off; SELECT time, const, numeric,first, avg1, avg2 FROM tidrangescan_expr ORDER BY time LIMIT 5; RESET timescaledb.enable_chunk_append; RESET enable_indexscan; + + +-- Test the number of allocated parallel workers for decompression + +-- Test that a parallel plan is generated +-- with different number of parallel workers +CREATE TABLE f_sensor_data( + time timestamptz NOT NULL, + sensor_id integer NOT NULL, + cpu double precision NULL, + temperature double precision NULL + ); + +SELECT FROM create_hypertable('f_sensor_data','time'); +SELECT set_chunk_time_interval('f_sensor_data', INTERVAL '1 year'); + +-- Create one chunk manually to ensure, all data is inserted into one chunk +SELECT * FROM _timescaledb_internal.create_chunk('f_sensor_data',' {"time": [181900977000000, 515024000000000]}'); + +INSERT INTO f_sensor_data +SELECT + time AS time, + sensor_id, + 100.0, + 36.6 +FROM + generate_series('1980-01-01 00:00'::timestamp, '1980-02-28 12:00', INTERVAL '1 day') AS g1(time), + generate_series(1, 1700, 1 ) AS g2(sensor_id) +ORDER BY + time; + +ALTER TABLE f_sensor_data SET (timescaledb.compress, timescaledb.compress_segmentby='sensor_id' ,timescaledb.compress_orderby = 'time DESC'); + +SELECT compress_chunk(i) FROM show_chunks('f_sensor_data') i; + +-- Encourage use of parallel plans +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0; +SET min_parallel_table_scan_size TO '0'; + +\set explain 'EXPLAIN (VERBOSE, COSTS OFF)' + +SHOW min_parallel_table_scan_size; +SHOW max_parallel_workers; +SHOW max_parallel_workers_per_gather; + +SET max_parallel_workers_per_gather = 4; +SHOW max_parallel_workers_per_gather; +:explain +SELECT sum(cpu) FROM f_sensor_data; + +-- Encourage use of Index Scan + +SET enable_seqscan = false; +SET enable_indexscan = true; +SET min_parallel_index_scan_size = 0; +SET min_parallel_table_scan_size = 0; + +CREATE INDEX ON f_sensor_data (time, sensor_id); +:explain +SELECT * FROM f_sensor_data WHERE sensor_id > 100; + +-- Test for partially compressed chunks + +INSERT INTO f_sensor_data +SELECT + time AS time, + sensor_id, + 100.0, + 36.6 +FROM + generate_series('1980-01-01 00:00'::timestamp, '1980-01-30 12:00', INTERVAL '1 day') AS g1(time), + generate_series(1700, 1800, 1 ) AS g2(sensor_id) +ORDER BY + time; + +:explain +SELECT sum(cpu) FROM f_sensor_data; + +:explain +SELECT * FROM f_sensor_data WHERE sensor_id > 100; diff --git a/tsl/test/sql/include/transparent_decompression_undiffed.sql b/tsl/test/sql/include/transparent_decompression_undiffed.sql index 58fc634595d..25526d4750e 100644 --- a/tsl/test/sql/include/transparent_decompression_undiffed.sql +++ b/tsl/test/sql/include/transparent_decompression_undiffed.sql @@ -30,12 +30,14 @@ SET max_parallel_workers_per_gather TO 4; SET parallel_setup_cost = 0; SET parallel_tuple_cost = 0; +SET min_parallel_table_scan_size TO '0'; EXPLAIN (costs off) SELECT * FROM metrics ORDER BY time, device_id; EXPLAIN (costs off) SELECT time_bucket('10 minutes', time) bucket, avg(v0) avg_v0 FROM metrics GROUP BY bucket; EXPLAIN (costs off) SELECT * FROM metrics_space ORDER BY time, device_id; +RESET min_parallel_table_scan_size; RESET parallel_setup_cost; RESET parallel_tuple_cost;