Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix index inserts during decompression #6222

Merged
merged 1 commit into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .unreleased/PR_6222
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes: #6222 Allow enabling compression on hypertable with unique expression index
2 changes: 2 additions & 0 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,8 @@ tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS)
RowDecompressor decompressor = build_decompressor(compressed_chunk_rel, uncompressed_chunk_rel);
/* do not need the indexes on the uncompressed chunk as we do not write to it anymore */
ts_catalog_close_indexes(decompressor.indexstate);
/* also do not need estate because we don't insert into indexes */
FreeExecutorState(decompressor.estate);
/********** row compressor *******************/
RowCompressor row_compressor;
row_compressor_init(&row_compressor,
Expand Down
42 changes: 34 additions & 8 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,7 @@ build_decompressor(Relation in_rel, Relation out_rel)
.per_compressed_row_ctx = AllocSetContextCreate(CurrentMemoryContext,
"decompress chunk per-compressed row",
ALLOCSET_DEFAULT_SIZES),
.estate = CreateExecutorState(),
};

/*
Expand Down Expand Up @@ -1445,6 +1446,7 @@ decompress_chunk(Oid in_table, Oid out_table)
FreeBulkInsertState(decompressor.bistate);
MemoryContextDelete(decompressor.per_compressed_row_ctx);
ts_catalog_close_indexes(decompressor.indexstate);
FreeExecutorState(decompressor.estate);

table_close(out_rel, NoLock);
table_close(in_rel, NoLock);
Expand Down Expand Up @@ -1576,24 +1578,48 @@ row_decompressor_decompress_row(RowDecompressor *decompressor, Tuplesortstate *t
*/
if (!is_done || !wrote_data)
{
HeapTuple decompressed_tuple = heap_form_tuple(decompressor->out_desc,
decompressor->decompressed_datums,
decompressor->decompressed_is_nulls);
TupleTableSlot *slot = MakeSingleTupleTableSlot(decompressor->out_desc, &TTSOpsVirtual);
decompressor->tuples_decompressed++;

if (tuplesortstate == NULL)
{
HeapTuple decompressed_tuple = heap_form_tuple(decompressor->out_desc,
decompressor->decompressed_datums,
decompressor->decompressed_is_nulls);

heap_insert(decompressor->out_rel,
decompressed_tuple,
decompressor->mycid,
0 /*=options*/,
decompressor->bistate);

ts_catalog_index_insert(decompressor->indexstate, decompressed_tuple);
EState *estate = decompressor->estate;
ExprContext *econtext = GetPerTupleExprContext(estate);

TupleTableSlot *tts_slot =
MakeSingleTupleTableSlot(decompressor->out_desc, &TTSOpsHeapTuple);
ExecStoreHeapTuple(decompressed_tuple, tts_slot, false);

/* Arrange for econtext's scan tuple to be the tuple under test */
econtext->ecxt_scantuple = tts_slot;
#if PG14_LT
estate->es_result_relation_info = decompressor->indexstate;
#endif
ExecInsertIndexTuplesCompat(decompressor->indexstate,
tts_slot,
estate,
false,
false,
NULL,
NIL,
false);

ExecDropSingleTupleTableSlot(tts_slot);
heap_freetuple(decompressed_tuple);
}
else
{
TupleTableSlot *slot =
MakeSingleTupleTableSlot(decompressor->out_desc, &TTSOpsVirtual);
/* create the virtual tuple slot */
ExecClearTuple(slot);
for (int i = 0; i < decompressor->out_desc->natts; i++)
Expand All @@ -1607,10 +1633,8 @@ row_decompressor_decompress_row(RowDecompressor *decompressor, Tuplesortstate *t
slot_getallattrs(slot);

tuplesort_puttupleslot(tuplesortstate, slot);
ExecDropSingleTupleTableSlot(slot);
}

ExecDropSingleTupleTableSlot(slot);
heap_freetuple(decompressed_tuple);
wrote_data = true;
}
} while (!is_done);
Expand Down Expand Up @@ -2107,6 +2131,7 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo
table_endscan(heapScan);

ts_catalog_close_indexes(decompressor.indexstate);
FreeExecutorState(decompressor.estate);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope for this PR: we should create a decompressor finisher/closer function, seems like we have some management at the end of its lifecycle.

FreeBulkInsertState(decompressor.bistate);

CommandCounterIncrement();
Expand Down Expand Up @@ -3286,6 +3311,7 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
ts_chunk_set_partial(chunk);

ts_catalog_close_indexes(decompressor.indexstate);
FreeExecutorState(decompressor.estate);
FreeBulkInsertState(decompressor.bistate);

table_close(chunk_rel, NoLock);
Expand Down
1 change: 1 addition & 0 deletions tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ typedef struct RowDecompressor
TupleDesc out_desc;
Relation out_rel;
ResultRelInfo *indexstate;
EState *estate;

CommandId mycid;
BulkInsertState bistate;
Expand Down
2 changes: 2 additions & 0 deletions tsl/src/compression/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,8 @@ validate_existing_indexes(Hypertable *ht, CompressColInfo *colinfo, Bitmapset *i
for (int i = 0; i < index->indnkeyatts; i++)
{
int attno = index->indkey.values[i];
if (attno == 0)
continue; /* skip check for expression column */
Form_hypertable_compression col_def = get_col_info_for_attnum(ht, colinfo, attno);
Ensure(col_def, "missing column definition for unique index");
if (col_def->segmentby_column_index < 1 && col_def->orderby_column_index < 1)
Expand Down
132 changes: 132 additions & 0 deletions tsl/test/expected/compression_ddl.out
Original file line number Diff line number Diff line change
Expand Up @@ -2392,3 +2392,135 @@ EXPLAIN (COSTS OFF) SELECT * FROM space_part ORDER BY time;
-> Seq Scan on _hyper_36_135_chunk
(35 rows)

-- test creation of unique expression index does not interfere with enabling compression
-- github issue 6205
create table mytab (col1 varchar(100) not null, col2 integer not null,
value double precision not null default 0, arrival_ts timestamptz not null, departure_ts timestamptz not null
default current_timestamp);
select create_hypertable('mytab', 'departure_ts');
WARNING: column type "character varying" used for "col1" does not follow best practices
create_hypertable
---------------------
(38,public,mytab,t)
(1 row)

create unique index myidx_unique ON
mytab (lower(col1::text), col2, departure_ts, arrival_ts);
alter table mytab set (timescaledb.compress);
WARNING: column "col2" should be used for segmenting or ordering
WARNING: column "arrival_ts" should be used for segmenting or ordering
-- github issue 6186
-- verify inserting into index works as expected during decompression
insert into mytab (col1, col2, value, arrival_ts, departure_ts)
values ('meter1', 1, 2.3, '2022-01-01'::timestamptz, '2022-01-01'::timestamptz),
('meTEr1', 2, 2.5, '2022-01-01'::timestamptz, '2022-01-01'::timestamptz),
('METEr1', 1, 2.9, '2022-01-01'::timestamptz, '2022-01-01 01:00'::timestamptz);
select compress_chunk(show_chunks('mytab'));
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_38_138_chunk
(1 row)

REINDEX TABLE mytab; -- should update index
select decompress_chunk(show_chunks('mytab'));
decompress_chunk
-------------------------------------------
_timescaledb_internal._hyper_38_138_chunk
(1 row)

\set EXPLAIN 'EXPLAIN (costs off,timing off,summary off)'
\set EXPLAIN_ANALYZE 'EXPLAIN (analyze,costs off,timing off,summary off)'
-- do index scan on uncompressed, should give correct results
set enable_seqscan = off;
set enable_indexscan = on;
:EXPLAIN_ANALYZE select * from mytab where lower(col1::text) = 'meter1';
QUERY PLAN
--------------------------------------------------------------------------------------------------
Index Scan using _hyper_38_138_chunk_myidx_unique on _hyper_38_138_chunk (actual rows=3 loops=1)
Index Cond: (lower((col1)::text) = 'meter1'::text)
(2 rows)

select * from mytab where lower(col1::text) = 'meter1';
col1 | col2 | value | arrival_ts | departure_ts
--------+------+-------+------------------------------+------------------------------
meter1 | 1 | 2.3 | Sat Jan 01 00:00:00 2022 PST | Sat Jan 01 00:00:00 2022 PST
METEr1 | 1 | 2.9 | Sat Jan 01 00:00:00 2022 PST | Sat Jan 01 01:00:00 2022 PST
meTEr1 | 2 | 2.5 | Sat Jan 01 00:00:00 2022 PST | Sat Jan 01 00:00:00 2022 PST
(3 rows)

-- check predicate index
CREATE INDEX myidx_partial ON mytab (value)
WHERE (value > 2.4 AND value < 3);
select compress_chunk(show_chunks('mytab'));
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_38_138_chunk
(1 row)

select decompress_chunk(show_chunks('mytab'));
decompress_chunk
-------------------------------------------
_timescaledb_internal._hyper_38_138_chunk
(1 row)

:EXPLAIN_ANALYZE SELECT * FROM mytab WHERE value BETWEEN 2.4 AND 2.8;
QUERY PLAN
---------------------------------------------------------------------------------------
Seq Scan on _hyper_38_138_chunk (actual rows=1 loops=1)
Filter: ((value >= '2.4'::double precision) AND (value <= '2.8'::double precision))
Rows Removed by Filter: 2
(3 rows)

-- check exclusion constraint with index - should not be supported
CREATE TABLE hyper_ex (
time timestamptz,
device_id TEXT NOT NULL,
sensor_1 NUMERIC NULL DEFAULT 1,
canceled boolean DEFAULT false,
EXCLUDE USING btree (
time WITH =, device_id WITH =
) WHERE (not canceled)
);
SELECT * FROM create_hypertable('hyper_ex', 'time', chunk_time_interval=> interval '1 month');
NOTICE: adding not-null constraint to column "time"
hypertable_id | schema_name | table_name | created
---------------+-------------+------------+---------
40 | public | hyper_ex | t
(1 row)

INSERT INTO hyper_ex(time, device_id,sensor_1) VALUES
('2022-01-01', 'dev2', 11),
('2022-01-01 01:00', 'dev2', 12);
\set ON_ERROR_STOP 0
ALTER TABLE hyper_ex SET (timescaledb.compress, timescaledb.compress_segmentby='device_id');
ERROR: constraint hyper_ex_time_device_id_excl is not supported for compression
\set ON_ERROR_STOP 1
-- check deferred uniqueness
CREATE TABLE hyper_unique_deferred (
time BIGINT UNIQUE DEFERRABLE INITIALLY DEFERRED,
device_id TEXT NOT NULL,
sensor_1 NUMERIC NULL DEFAULT 1 CHECK (sensor_1 > 10)
);
SELECT * FROM create_hypertable('hyper_unique_deferred', 'time', chunk_time_interval => 10);
NOTICE: adding not-null constraint to column "time"
hypertable_id | schema_name | table_name | created
---------------+-------------+-----------------------+---------
41 | public | hyper_unique_deferred | t
(1 row)

INSERT INTO hyper_unique_deferred(time, device_id,sensor_1) VALUES (1257987700000000000, 'dev2', 11);
alter table hyper_unique_deferred set (timescaledb.compress);
select compress_chunk(show_chunks('hyper_unique_deferred')); -- also worked fine before 2.11.0
compress_chunk
-------------------------------------------
_timescaledb_internal._hyper_41_142_chunk
(1 row)

select decompress_chunk(show_chunks('hyper_unique_deferred'));
decompress_chunk
-------------------------------------------
_timescaledb_internal._hyper_41_142_chunk
(1 row)

begin; insert INTO hyper_unique_deferred values (1257987700000000000, 'dev1', 1); abort;
ERROR: new row for relation "_hyper_41_142_chunk" violates check constraint "hyper_unique_deferred_sensor_1_check"
69 changes: 69 additions & 0 deletions tsl/test/sql/compression_ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -965,3 +965,72 @@ EXPLAIN (COSTS OFF) SELECT * FROM space_part ORDER BY time;
INSERT INTO space_part VALUES
('2022-01-01 00:02', 1, 1, 1);
EXPLAIN (COSTS OFF) SELECT * FROM space_part ORDER BY time;

-- test creation of unique expression index does not interfere with enabling compression
-- github issue 6205
create table mytab (col1 varchar(100) not null, col2 integer not null,
value double precision not null default 0, arrival_ts timestamptz not null, departure_ts timestamptz not null
default current_timestamp);

select create_hypertable('mytab', 'departure_ts');
create unique index myidx_unique ON
mytab (lower(col1::text), col2, departure_ts, arrival_ts);
alter table mytab set (timescaledb.compress);

-- github issue 6186
-- verify inserting into index works as expected during decompression
insert into mytab (col1, col2, value, arrival_ts, departure_ts)
values ('meter1', 1, 2.3, '2022-01-01'::timestamptz, '2022-01-01'::timestamptz),
('meTEr1', 2, 2.5, '2022-01-01'::timestamptz, '2022-01-01'::timestamptz),
('METEr1', 1, 2.9, '2022-01-01'::timestamptz, '2022-01-01 01:00'::timestamptz);
select compress_chunk(show_chunks('mytab'));
REINDEX TABLE mytab; -- should update index
select decompress_chunk(show_chunks('mytab'));
\set EXPLAIN 'EXPLAIN (costs off,timing off,summary off)'
\set EXPLAIN_ANALYZE 'EXPLAIN (analyze,costs off,timing off,summary off)'
-- do index scan on uncompressed, should give correct results
set enable_seqscan = off;
set enable_indexscan = on;
:EXPLAIN_ANALYZE select * from mytab where lower(col1::text) = 'meter1';
select * from mytab where lower(col1::text) = 'meter1';

-- check predicate index
CREATE INDEX myidx_partial ON mytab (value)
WHERE (value > 2.4 AND value < 3);
select compress_chunk(show_chunks('mytab'));
select decompress_chunk(show_chunks('mytab'));
:EXPLAIN_ANALYZE SELECT * FROM mytab WHERE value BETWEEN 2.4 AND 2.8;

-- check exclusion constraint with index - should not be supported
CREATE TABLE hyper_ex (
time timestamptz,
device_id TEXT NOT NULL,
sensor_1 NUMERIC NULL DEFAULT 1,
canceled boolean DEFAULT false,
EXCLUDE USING btree (
time WITH =, device_id WITH =
) WHERE (not canceled)
);

SELECT * FROM create_hypertable('hyper_ex', 'time', chunk_time_interval=> interval '1 month');
INSERT INTO hyper_ex(time, device_id,sensor_1) VALUES
('2022-01-01', 'dev2', 11),
('2022-01-01 01:00', 'dev2', 12);
\set ON_ERROR_STOP 0
ALTER TABLE hyper_ex SET (timescaledb.compress, timescaledb.compress_segmentby='device_id');
\set ON_ERROR_STOP 1

-- check deferred uniqueness
CREATE TABLE hyper_unique_deferred (
time BIGINT UNIQUE DEFERRABLE INITIALLY DEFERRED,
device_id TEXT NOT NULL,
sensor_1 NUMERIC NULL DEFAULT 1 CHECK (sensor_1 > 10)
);
SELECT * FROM create_hypertable('hyper_unique_deferred', 'time', chunk_time_interval => 10);
INSERT INTO hyper_unique_deferred(time, device_id,sensor_1) VALUES (1257987700000000000, 'dev2', 11);
alter table hyper_unique_deferred set (timescaledb.compress);
select compress_chunk(show_chunks('hyper_unique_deferred')); -- also worked fine before 2.11.0
select decompress_chunk(show_chunks('hyper_unique_deferred'));
begin; insert INTO hyper_unique_deferred values (1257987700000000000, 'dev1', 1); abort;
select compress_chunk(show_chunks('hyper_unique_deferred'));
begin; insert INTO hyper_unique_deferred values (1257987700000000000, 'dev1', 1); abort;
Loading