Skip to content

Commit

Permalink
Fix index inserts during decompression
Browse files Browse the repository at this point in the history
Since version 2.11.0 we would get a segmentation fault during
decompression when there was an expressional or partial index on the
uncompressed chunk.
This patch fixes this by calling ExecInsertIndexTuples to insert into
indexes during chunk decompression, instead of CatalogIndexInsert.

In addition, when enabling compression on a hypertable, we check the
unique indexes defined on it to provide performance improvement hints
in case the unique index columns are not specified as compression
parameters.
However this check threw an error when expression columns were present
in the index, preventing the user from enabling compression.
This patch fixes this by simply ignoring the expression columns in the
index, since we cannot currently segment by an expression.

Fixes #6205, #6186
  • Loading branch information
konskov committed Oct 24, 2023
1 parent 7797c41 commit 708c392
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 8 deletions.
1 change: 1 addition & 0 deletions .unreleased/PR_6222
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes: #6222 Allow enabling compression on hypertable with unique expression index
2 changes: 2 additions & 0 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,8 @@ tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS)
RowDecompressor decompressor = build_decompressor(compressed_chunk_rel, uncompressed_chunk_rel);
/* do not need the indexes on the uncompressed chunk as we do not write to it anymore */
ts_catalog_close_indexes(decompressor.indexstate);
/* also do not need estate because we don't insert into indexes */
FreeExecutorState(decompressor.estate);
/********** row compressor *******************/
RowCompressor row_compressor;
row_compressor_init(&row_compressor,
Expand Down
42 changes: 34 additions & 8 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,7 @@ build_decompressor(Relation in_rel, Relation out_rel)
.per_compressed_row_ctx = AllocSetContextCreate(CurrentMemoryContext,
"decompress chunk per-compressed row",
ALLOCSET_DEFAULT_SIZES),
.estate = CreateExecutorState(),
};

/*
Expand Down Expand Up @@ -1445,6 +1446,7 @@ decompress_chunk(Oid in_table, Oid out_table)
FreeBulkInsertState(decompressor.bistate);
MemoryContextDelete(decompressor.per_compressed_row_ctx);
ts_catalog_close_indexes(decompressor.indexstate);
FreeExecutorState(decompressor.estate);

table_close(out_rel, NoLock);
table_close(in_rel, NoLock);
Expand Down Expand Up @@ -1576,24 +1578,48 @@ row_decompressor_decompress_row(RowDecompressor *decompressor, Tuplesortstate *t
*/
if (!is_done || !wrote_data)
{
HeapTuple decompressed_tuple = heap_form_tuple(decompressor->out_desc,
decompressor->decompressed_datums,
decompressor->decompressed_is_nulls);
TupleTableSlot *slot = MakeSingleTupleTableSlot(decompressor->out_desc, &TTSOpsVirtual);
decompressor->tuples_decompressed++;

if (tuplesortstate == NULL)
{
HeapTuple decompressed_tuple = heap_form_tuple(decompressor->out_desc,
decompressor->decompressed_datums,
decompressor->decompressed_is_nulls);

heap_insert(decompressor->out_rel,
decompressed_tuple,
decompressor->mycid,
0 /*=options*/,
decompressor->bistate);

ts_catalog_index_insert(decompressor->indexstate, decompressed_tuple);
EState *estate = decompressor->estate;
ExprContext *econtext = GetPerTupleExprContext(estate);

TupleTableSlot *tts_slot =
MakeSingleTupleTableSlot(decompressor->out_desc, &TTSOpsHeapTuple);
ExecStoreHeapTuple(decompressed_tuple, tts_slot, false);

/* Arrange for econtext's scan tuple to be the tuple under test */
econtext->ecxt_scantuple = tts_slot;
#if PG14_LT
estate->es_result_relation_info = decompressor->indexstate;
#endif
ExecInsertIndexTuplesCompat(decompressor->indexstate,
tts_slot,
estate,
false,
false,
NULL,
NIL,
false);

ExecDropSingleTupleTableSlot(tts_slot);
heap_freetuple(decompressed_tuple);
}
else
{
TupleTableSlot *slot =
MakeSingleTupleTableSlot(decompressor->out_desc, &TTSOpsVirtual);
/* create the virtual tuple slot */
ExecClearTuple(slot);
for (int i = 0; i < decompressor->out_desc->natts; i++)
Expand All @@ -1607,10 +1633,8 @@ row_decompressor_decompress_row(RowDecompressor *decompressor, Tuplesortstate *t
slot_getallattrs(slot);

tuplesort_puttupleslot(tuplesortstate, slot);
ExecDropSingleTupleTableSlot(slot);
}

ExecDropSingleTupleTableSlot(slot);
heap_freetuple(decompressed_tuple);
wrote_data = true;
}
} while (!is_done);
Expand Down Expand Up @@ -2107,6 +2131,7 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo
table_endscan(heapScan);

ts_catalog_close_indexes(decompressor.indexstate);
FreeExecutorState(decompressor.estate);
FreeBulkInsertState(decompressor.bistate);

CommandCounterIncrement();
Expand Down Expand Up @@ -3286,6 +3311,7 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
ts_chunk_set_partial(chunk);

ts_catalog_close_indexes(decompressor.indexstate);
FreeExecutorState(decompressor.estate);
FreeBulkInsertState(decompressor.bistate);

table_close(chunk_rel, NoLock);
Expand Down
1 change: 1 addition & 0 deletions tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ typedef struct RowDecompressor
TupleDesc out_desc;
Relation out_rel;
ResultRelInfo *indexstate;
EState *estate;

CommandId mycid;
BulkInsertState bistate;
Expand Down
2 changes: 2 additions & 0 deletions tsl/src/compression/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,8 @@ validate_existing_indexes(Hypertable *ht, CompressColInfo *colinfo, Bitmapset *i
for (int i = 0; i < index->indnkeyatts; i++)
{
int attno = index->indkey.values[i];
if (attno == 0)
continue; /* skip check for expression column */
Form_hypertable_compression col_def = get_col_info_for_attnum(ht, colinfo, attno);
Ensure(col_def, "missing column definition for unique index");
if (col_def->segmentby_column_index < 1 && col_def->orderby_column_index < 1)
Expand Down
132 changes: 132 additions & 0 deletions tsl/test/expected/compression_ddl.out
Original file line number Diff line number Diff line change
Expand Up @@ -2392,3 +2392,135 @@ EXPLAIN (COSTS OFF) SELECT * FROM space_part ORDER BY time;
-> Seq Scan on _hyper_36_135_chunk
(35 rows)

-- test creation of unique expression index does not interfere with enabling compression
-- github issue 6205
create table mytab (col1 varchar(100) not null, col2 integer not null,
value double precision not null default 0, arrival_ts timestamptz not null, departure_ts timestamptz not null
default current_timestamp);
select create_hypertable('mytab', 'departure_ts');
WARNING: column type "character varying" used for "col1" does not follow best practices
create_hypertable
---------------------
(38,public,mytab,t)
(1 row)

create unique index myidx_unique ON
mytab (lower(col1::text), col2, departure_ts, arrival_ts);
alter table mytab set (timescaledb.compress);
WARNING: column "col2" should be used for segmenting or ordering
WARNING: column "arrival_ts" should be used for segmenting or ordering
-- github issue 6186
-- verify inserting into index works as expected during decompression
insert into mytab (col1, col2, value, arrival_ts, departure_ts)
values ('meter1', 1, 2.3, '2022-01-01'::timestamptz, '2022-01-01'::timestamptz),
('meTEr1', 2, 2.5, '2022-01-01'::timestamptz, '2022-01-01'::timestamptz),
('METEr1', 1, 2.9, '2022-01-01'::timestamptz, '2022-01-01 01:00'::timestamptz);
select compress_chunk(show_chunks('mytab'));
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_38_138_chunk
(1 row)

REINDEX TABLE mytab; -- should update index
select decompress_chunk(show_chunks('mytab'));
decompress_chunk
-------------------------------------------
_timescaledb_internal._hyper_38_138_chunk
(1 row)

\set EXPLAIN 'EXPLAIN (costs off,timing off,summary off)'
\set EXPLAIN_ANALYZE 'EXPLAIN (analyze,costs off,timing off,summary off)'
-- do index scan on uncompressed, should give correct results
set enable_seqscan = off;
set enable_indexscan = on;
:EXPLAIN_ANALYZE select * from mytab where lower(col1::text) = 'meter1';
QUERY PLAN
--------------------------------------------------------------------------------------------------
Index Scan using _hyper_38_138_chunk_myidx_unique on _hyper_38_138_chunk (actual rows=3 loops=1)
Index Cond: (lower((col1)::text) = 'meter1'::text)
(2 rows)

select * from mytab where lower(col1::text) = 'meter1';
col1 | col2 | value | arrival_ts | departure_ts
--------+------+-------+------------------------------+------------------------------
meter1 | 1 | 2.3 | Sat Jan 01 00:00:00 2022 PST | Sat Jan 01 00:00:00 2022 PST
METEr1 | 1 | 2.9 | Sat Jan 01 00:00:00 2022 PST | Sat Jan 01 01:00:00 2022 PST
meTEr1 | 2 | 2.5 | Sat Jan 01 00:00:00 2022 PST | Sat Jan 01 00:00:00 2022 PST
(3 rows)

-- check predicate index
CREATE INDEX myidx_partial ON mytab (value)
WHERE (value > 2.4 AND value < 3);
select compress_chunk(show_chunks('mytab'));
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_38_138_chunk
(1 row)

select decompress_chunk(show_chunks('mytab'));
decompress_chunk
-------------------------------------------
_timescaledb_internal._hyper_38_138_chunk
(1 row)

:EXPLAIN_ANALYZE SELECT * FROM mytab WHERE value BETWEEN 2.4 AND 2.8;
QUERY PLAN
---------------------------------------------------------------------------------------
Seq Scan on _hyper_38_138_chunk (actual rows=1 loops=1)
Filter: ((value >= '2.4'::double precision) AND (value <= '2.8'::double precision))
Rows Removed by Filter: 2
(3 rows)

-- check exclusion constraint with index - should not be supported
CREATE TABLE hyper_ex (
time timestamptz,
device_id TEXT NOT NULL,
sensor_1 NUMERIC NULL DEFAULT 1,
canceled boolean DEFAULT false,
EXCLUDE USING btree (
time WITH =, device_id WITH =
) WHERE (not canceled)
);
SELECT * FROM create_hypertable('hyper_ex', 'time', chunk_time_interval=> interval '1 month');
NOTICE: adding not-null constraint to column "time"
hypertable_id | schema_name | table_name | created
---------------+-------------+------------+---------
40 | public | hyper_ex | t
(1 row)

INSERT INTO hyper_ex(time, device_id,sensor_1) VALUES
('2022-01-01', 'dev2', 11),
('2022-01-01 01:00', 'dev2', 12);
\set ON_ERROR_STOP 0
ALTER TABLE hyper_ex SET (timescaledb.compress, timescaledb.compress_segmentby='device_id');
ERROR: constraint hyper_ex_time_device_id_excl is not supported for compression
\set ON_ERROR_STOP 1
-- check deferred uniqueness
CREATE TABLE hyper_unique_deferred (
time BIGINT UNIQUE DEFERRABLE INITIALLY DEFERRED,
device_id TEXT NOT NULL,
sensor_1 NUMERIC NULL DEFAULT 1 CHECK (sensor_1 > 10)
);
SELECT * FROM create_hypertable('hyper_unique_deferred', 'time', chunk_time_interval => 10);
NOTICE: adding not-null constraint to column "time"
hypertable_id | schema_name | table_name | created
---------------+-------------+-----------------------+---------
41 | public | hyper_unique_deferred | t
(1 row)

INSERT INTO hyper_unique_deferred(time, device_id,sensor_1) VALUES (1257987700000000000, 'dev2', 11);
alter table hyper_unique_deferred set (timescaledb.compress);
select compress_chunk(show_chunks('hyper_unique_deferred')); -- also worked fine before 2.11.0
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_41_142_chunk
(1 row)

select decompress_chunk(show_chunks('hyper_unique_deferred'));
decompress_chunk
-------------------------------------------
_timescaledb_internal._hyper_41_142_chunk
(1 row)

begin; insert INTO hyper_unique_deferred values (1257987700000000000, 'dev1', 1); abort;
ERROR: new row for relation "_hyper_41_142_chunk" violates check constraint "hyper_unique_deferred_sensor_1_check"
69 changes: 69 additions & 0 deletions tsl/test/sql/compression_ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -965,3 +965,72 @@ EXPLAIN (COSTS OFF) SELECT * FROM space_part ORDER BY time;
INSERT INTO space_part VALUES
('2022-01-01 00:02', 1, 1, 1);
EXPLAIN (COSTS OFF) SELECT * FROM space_part ORDER BY time;

-- test creation of unique expression index does not interfere with enabling compression
-- github issue 6205
create table mytab (col1 varchar(100) not null, col2 integer not null,
value double precision not null default 0, arrival_ts timestamptz not null, departure_ts timestamptz not null
default current_timestamp);

select create_hypertable('mytab', 'departure_ts');
create unique index myidx_unique ON
mytab (lower(col1::text), col2, departure_ts, arrival_ts);
alter table mytab set (timescaledb.compress);

-- github issue 6186
-- verify inserting into index works as expected during decompression
insert into mytab (col1, col2, value, arrival_ts, departure_ts)
values ('meter1', 1, 2.3, '2022-01-01'::timestamptz, '2022-01-01'::timestamptz),
('meTEr1', 2, 2.5, '2022-01-01'::timestamptz, '2022-01-01'::timestamptz),
('METEr1', 1, 2.9, '2022-01-01'::timestamptz, '2022-01-01 01:00'::timestamptz);
select compress_chunk(show_chunks('mytab'));
REINDEX TABLE mytab; -- should update index
select decompress_chunk(show_chunks('mytab'));
\set EXPLAIN 'EXPLAIN (costs off,timing off,summary off)'
\set EXPLAIN_ANALYZE 'EXPLAIN (analyze,costs off,timing off,summary off)'
-- do index scan on uncompressed, should give correct results
set enable_seqscan = off;
set enable_indexscan = on;
:EXPLAIN_ANALYZE select * from mytab where lower(col1::text) = 'meter1';
select * from mytab where lower(col1::text) = 'meter1';

-- check predicate index
CREATE INDEX myidx_partial ON mytab (value)
WHERE (value > 2.4 AND value < 3);
select compress_chunk(show_chunks('mytab'));
select decompress_chunk(show_chunks('mytab'));
:EXPLAIN_ANALYZE SELECT * FROM mytab WHERE value BETWEEN 2.4 AND 2.8;

-- check exclusion constraint with index - should not be supported
CREATE TABLE hyper_ex (
time timestamptz,
device_id TEXT NOT NULL,
sensor_1 NUMERIC NULL DEFAULT 1,
canceled boolean DEFAULT false,
EXCLUDE USING btree (
time WITH =, device_id WITH =
) WHERE (not canceled)
);

SELECT * FROM create_hypertable('hyper_ex', 'time', chunk_time_interval=> interval '1 month');
INSERT INTO hyper_ex(time, device_id,sensor_1) VALUES
('2022-01-01', 'dev2', 11),
('2022-01-01 01:00', 'dev2', 12);
\set ON_ERROR_STOP 0
ALTER TABLE hyper_ex SET (timescaledb.compress, timescaledb.compress_segmentby='device_id');
\set ON_ERROR_STOP 1

-- check deferred uniqueness
CREATE TABLE hyper_unique_deferred (
time BIGINT UNIQUE DEFERRABLE INITIALLY DEFERRED,
device_id TEXT NOT NULL,
sensor_1 NUMERIC NULL DEFAULT 1 CHECK (sensor_1 > 10)
);
SELECT * FROM create_hypertable('hyper_unique_deferred', 'time', chunk_time_interval => 10);
INSERT INTO hyper_unique_deferred(time, device_id,sensor_1) VALUES (1257987700000000000, 'dev2', 11);
alter table hyper_unique_deferred set (timescaledb.compress);
select compress_chunk(show_chunks('hyper_unique_deferred')); -- also worked fine before 2.11.0
select decompress_chunk(show_chunks('hyper_unique_deferred'));
begin; insert INTO hyper_unique_deferred values (1257987700000000000, 'dev1', 1); abort;
select compress_chunk(show_chunks('hyper_unique_deferred'));
begin; insert INTO hyper_unique_deferred values (1257987700000000000, 'dev1', 1); abort;

0 comments on commit 708c392

Please sign in to comment.