Skip to content

Commit

Permalink
Allow DELETE on compressed chunks without decompression
Browse files Browse the repository at this point in the history
When the constraints of a DELETE on a compressed chunks fully cover the
batches we can optimize the DELETE to work directly on the compressed
batches and skip the expensive decompression part.
  • Loading branch information
svenklemm committed Jul 29, 2024
1 parent a4a023e commit 73d9558
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 12 deletions.
1 change: 1 addition & 0 deletions src/nodes/chunk_dispatch/chunk_dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ typedef struct ChunkDispatchState
ResultRelInfo *rri;
/* flag to represent dropped attributes */
bool is_dropped_attr_exists;
int64 batches_deleted;
int64 batches_filtered;
int64 batches_decompressed;
int64 tuples_decompressed;
Expand Down
3 changes: 3 additions & 0 deletions src/nodes/hypertable_modify.c
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ hypertable_modify_explain(CustomScanState *node, List *ancestors, ExplainState *
foreach (lc, chunk_dispatch_states)
{
ChunkDispatchState *cds = (ChunkDispatchState *) lfirst(lc);
state->batches_deleted += cds->batches_deleted;
state->batches_filtered += cds->batches_filtered;
state->batches_decompressed += cds->batches_decompressed;
state->tuples_decompressed += cds->tuples_decompressed;
Expand All @@ -251,6 +252,8 @@ hypertable_modify_explain(CustomScanState *node, List *ancestors, ExplainState *
ExplainPropertyInteger("Batches decompressed", NULL, state->batches_decompressed, es);
if (state->tuples_decompressed > 0)
ExplainPropertyInteger("Tuples decompressed", NULL, state->tuples_decompressed, es);
if (state->batches_deleted > 0)
ExplainPropertyInteger("Batches deleted", NULL, state->batches_deleted, es);
}

static CustomExecMethods hypertable_modify_state_methods = {
Expand Down
1 change: 1 addition & 0 deletions src/nodes/hypertable_modify.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ typedef struct HypertableModifyState
int64 tuples_decompressed;
int64 batches_decompressed;
int64 batches_filtered;
int64 batches_deleted;
} HypertableModifyState;

extern void ts_hypertable_modify_fixup_tlist(Plan *plan);
Expand Down
4 changes: 4 additions & 0 deletions tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ typedef struct RowDecompressor
CommandId mycid;
BulkInsertState bistate;

bool delete_only;

Datum *compressed_datums;
bool *compressed_is_nulls;

Expand All @@ -147,6 +149,7 @@ typedef struct RowDecompressor
MemoryContext per_compressed_row_ctx;
int64 batches_decompressed;
int64 tuples_decompressed;
int64 batches_deleted;

TupleTableSlot **decompressed_slots;
int unprocessed_tuples;
Expand Down Expand Up @@ -410,6 +413,7 @@ const CompressionAlgorithmDefinition *algorithm_definition(CompressionAlgorithm

struct decompress_batches_stats
{
int64 batches_deleted;
int64 batches_filtered;
int64 batches_decompressed;
int64 tuples_decompressed;
Expand Down
101 changes: 89 additions & 12 deletions tsl/src/compression/compression_dml.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r
ScanKeyData *heap_scankeys, int num_heap_scankeys,
ScanKeyData *mem_scankeys, int num_mem_scankeys,
tuple_filtering_constraints *constraints, bool *skip_current_tuple,
Bitmapset *null_columns, List *is_nulls);
static struct decompress_batches_stats
decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot,
ScanKeyData *scankeys, int num_scankeys, ScanKeyData *mem_scankeys,
int num_mem_scankeys, tuple_filtering_constraints *constraints,
bool *skip_current_tuple, Bitmapset *null_columns, List *is_nulls);
bool delete_only, Bitmapset *null_columns, List *is_nulls);
static struct decompress_batches_stats decompress_batches_seqscan(
Relation in_rel, Relation out_rel, Snapshot snapshot, ScanKeyData *scankeys, int num_scankeys,
ScanKeyData *mem_scankeys, int num_mem_scankeys, tuple_filtering_constraints *constraints,
bool *skip_current_tuple, bool delete_only, Bitmapset *null_columns, List *is_nulls);

static bool batch_matches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num_scankeys,
tuple_filtering_constraints *constraints, bool *skip_current_tuple);
Expand All @@ -60,6 +59,9 @@ static void report_error(TM_Result result);

static bool key_column_is_null(tuple_filtering_constraints *constraints, Relation chunk_rel,
Oid ht_relid, TupleTableSlot *slot);
static bool can_delete_without_decompression(HypertableModifyState *ht_state,
CompressionSettings *settings, Chunk *chunk,
List *predicates);

void
decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
Expand Down Expand Up @@ -167,6 +169,7 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
num_mem_scankeys,
constraints,
&skip_current_tuple,
false,
NULL, /* no null column check for non-segmentby
columns */
NIL);
Expand All @@ -193,6 +196,7 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
num_mem_scankeys,
constraints,
&skip_current_tuple,
false,
null_columns,
NIL);
}
Expand All @@ -203,6 +207,7 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
cis->cds->skip_current_tuple = true;
}

cis->cds->batches_deleted += stats.batches_deleted;
cis->cds->batches_filtered += stats.batches_filtered;
cis->cds->batches_decompressed += stats.batches_decompressed;
cis->cds->tuples_decompressed += stats.tuples_decompressed;
Expand Down Expand Up @@ -248,6 +253,8 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu

comp_chunk = ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, true);
CompressionSettings *settings = ts_compression_settings_get(comp_chunk->table_id);
bool delete_only = ht_state->mt->operation == CMD_DELETE &&
can_delete_without_decompression(ht_state, settings, chunk, predicates);

process_predicates(chunk,
settings,
Expand Down Expand Up @@ -289,6 +296,7 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
num_mem_scankeys,
NULL,
NULL,
delete_only,
null_columns,
is_null);
/* close the selected index */
Expand All @@ -305,6 +313,7 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
num_mem_scankeys,
NULL,
NULL,
delete_only,
null_columns,
is_null);
}
Expand All @@ -329,6 +338,7 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
filter = lfirst(lc);
pfree(filter);
}
ht_state->batches_deleted += stats.batches_deleted;
ht_state->batches_filtered += stats.batches_filtered;
ht_state->batches_decompressed += stats.batches_decompressed;
ht_state->tuples_decompressed += stats.tuples_decompressed;
Expand All @@ -351,7 +361,7 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r
ScanKeyData *heap_scankeys, int num_heap_scankeys,
ScanKeyData *mem_scankeys, int num_mem_scankeys,
tuple_filtering_constraints *constraints, bool *skip_current_tuple,
Bitmapset *null_columns, List *is_nulls)
bool delete_only, Bitmapset *null_columns, List *is_nulls)
{
HeapTuple compressed_tuple;
RowDecompressor decompressor;
Expand Down Expand Up @@ -426,6 +436,8 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r
if (!decompressor_initialized)
{
decompressor = build_decompressor(in_rel, out_rel);
decompressor.delete_only = delete_only;

decompressor_initialized = true;
}

Expand Down Expand Up @@ -475,8 +487,15 @@ decompress_batches_indexscan(Relation in_rel, Relation out_rel, Relation index_r
report_error(result);
return stats;
}
stats.tuples_decompressed += row_decompressor_decompress_row_to_table(&decompressor);
stats.batches_decompressed++;
if (decompressor.delete_only)
{
stats.batches_deleted++;
}
else
{
stats.tuples_decompressed += row_decompressor_decompress_row_to_table(&decompressor);
stats.batches_decompressed++;
}
write_logical_replication_msg_decompression_end();
}

Expand Down Expand Up @@ -515,7 +534,8 @@ static struct decompress_batches_stats
decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot,
ScanKeyData *scankeys, int num_scankeys, ScanKeyData *mem_scankeys,
int num_mem_scankeys, tuple_filtering_constraints *constraints,
bool *skip_current_tuple, Bitmapset *null_columns, List *is_nulls)
bool *skip_current_tuple, bool delete_only, Bitmapset *null_columns,
List *is_nulls)
{
RowDecompressor decompressor;
bool decompressor_initialized = false;
Expand Down Expand Up @@ -568,6 +588,7 @@ decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot,
if (!decompressor_initialized)
{
decompressor = build_decompressor(in_rel, out_rel);
decompressor.delete_only = delete_only;
decompressor_initialized = true;
}

Expand Down Expand Up @@ -612,8 +633,15 @@ decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot,
table_endscan(scan);
report_error(result);
}
stats.tuples_decompressed += row_decompressor_decompress_row_to_table(&decompressor);
stats.batches_decompressed++;
if (decompressor.delete_only)
{
stats.batches_deleted++;
}
else
{
stats.tuples_decompressed += row_decompressor_decompress_row_to_table(&decompressor);
stats.batches_decompressed++;
}
write_logical_replication_msg_decompression_end();
}
if (scankeys)
Expand Down Expand Up @@ -1415,3 +1443,52 @@ key_column_is_null(tuple_filtering_constraints *constraints, Relation chunk_rel,

return false;
}

static bool
can_delete_without_decompression(HypertableModifyState *ht_state, CompressionSettings *settings,
Chunk *chunk, List *predicates)
{
ListCell *lc;

/*
* If there are any DELETE row triggers on the hypertable we skip the optimization
* to delete compressed batches directly.
*/
ModifyTableState *ps =
linitial_node(ModifyTableState, castNode(CustomScanState, ht_state)->custom_ps);
if (ps->rootResultRelInfo->ri_TrigDesc)
{
TriggerDesc *trigdesc = ps->rootResultRelInfo->ri_TrigDesc;
if (trigdesc->trig_delete_before_row || trigdesc->trig_delete_after_row ||
trigdesc->trig_delete_instead_row)
{
return false;
}
}

foreach (lc, predicates)
{
Node *node = lfirst(lc);
Var *var;
Expr *arg_value;
Oid opno;

if (ts_extract_expr_args((Expr *) node, &var, &arg_value, &opno, NULL))

{
if (!IsA(arg_value, Const))

{
return false;
}
char *column_name = get_attname(chunk->table_id, var->varattno, false);
if (ts_array_is_member(settings->fd.segmentby, column_name))

{
continue;
}
}
return false;
}
return true;
}
125 changes: 125 additions & 0 deletions tsl/test/shared/expected/compression_dml.out
Original file line number Diff line number Diff line change
Expand Up @@ -415,3 +415,128 @@ QUERY PLAN

RESET timescaledb.enable_dml_decompression_tuple_filtering;
DROP TABLE lazy_decompress;
-- test direct delete on compressed hypertable
CREATE TABLE direct_delete(time timestamptz not null, device text, reading text, value float);
SELECT table_name FROM create_hypertable('direct_delete', 'time');
table_name
direct_delete
(1 row)

ALTER TABLE direct_delete SET (timescaledb.compress, timescaledb.compress_segmentby = 'device, reading');
NOTICE: default order by for hypertable "direct_delete" is set to ""time" DESC"
INSERT INTO direct_delete VALUES
('2021-01-01', 'd1', 'r1', 1.0),
('2021-01-01', 'd1', 'r2', 1.0),
('2021-01-01', 'd1', 'r3', 1.0),
('2021-01-01', 'd2', 'r1', 1.0),
('2021-01-01', 'd2', 'r2', 1.0),
('2021-01-01', 'd2', 'r3', 1.0);
SELECT count(compress_chunk(c)) FROM show_chunks('direct_delete') c;
count
1
(1 row)

BEGIN;
-- should be 3 batches directly deleted
:ANALYZE DELETE FROM direct_delete WHERE device='d1';
QUERY PLAN
Custom Scan (HypertableModify) (actual rows=0 loops=1)
Batches deleted: 3
-> Delete on direct_delete (actual rows=0 loops=1)
Delete on _hyper_X_X_chunk direct_delete_1
-> Seq Scan on _hyper_X_X_chunk direct_delete_1 (actual rows=0 loops=1)
Filter: (device = 'd1'::text)
(6 rows)

-- double check its actually deleted
SELECT count(*) FROM direct_delete WHERE device='d1';
count
0
(1 row)

ROLLBACK;
BEGIN;
-- should be 2 batches directly deleted
:ANALYZE DELETE FROM direct_delete WHERE reading='r2';
QUERY PLAN
Custom Scan (HypertableModify) (actual rows=0 loops=1)
Batches deleted: 2
-> Delete on direct_delete (actual rows=0 loops=1)
Delete on _hyper_X_X_chunk direct_delete_1
-> Seq Scan on _hyper_X_X_chunk direct_delete_1 (actual rows=0 loops=1)
Filter: (reading = 'r2'::text)
(6 rows)

-- double check its actually deleted
SELECT count(*) FROM direct_delete WHERE reading='r2';
count
0
(1 row)

ROLLBACK;
-- combining constraints on segmentby columns should work
BEGIN;
-- should be 1 batches directly deleted
:ANALYZE DELETE FROM direct_delete WHERE device='d1' AND reading='r2';
QUERY PLAN
Custom Scan (HypertableModify) (actual rows=0 loops=1)
Batches deleted: 1
-> Delete on direct_delete (actual rows=0 loops=1)
Delete on _hyper_X_X_chunk direct_delete_1
-> Seq Scan on _hyper_X_X_chunk direct_delete_1 (actual rows=0 loops=1)
Filter: ((device = 'd1'::text) AND (reading = 'r2'::text))
(6 rows)

-- double check its actually deleted
SELECT count(*) FROM direct_delete WHERE device='d1' AND reading='r2';
count
0
(1 row)

ROLLBACK;
-- constraints involving non-segmentby columns should not diretly delete
BEGIN; :ANALYZE DELETE FROM direct_delete WHERE value = '1.0'; ROLLBACK;
QUERY PLAN
Custom Scan (HypertableModify) (actual rows=0 loops=1)
Batches decompressed: 6
Tuples decompressed: 6
-> Delete on direct_delete (actual rows=0 loops=1)
Delete on _hyper_X_X_chunk direct_delete_1
-> Seq Scan on _hyper_X_X_chunk direct_delete_1 (actual rows=6 loops=1)
Filter: (value = '1'::double precision)
(7 rows)

BEGIN; :ANALYZE DELETE FROM direct_delete WHERE device = 'd1' AND value = '1.0'; ROLLBACK;
QUERY PLAN
Custom Scan (HypertableModify) (actual rows=0 loops=1)
Batches decompressed: 3
Tuples decompressed: 3
-> Delete on direct_delete (actual rows=0 loops=1)
Delete on _hyper_X_X_chunk direct_delete_1
-> Seq Scan on _hyper_X_X_chunk direct_delete_1 (actual rows=3 loops=1)
Filter: ((device = 'd1'::text) AND (value = '1'::double precision))
(7 rows)

BEGIN; :ANALYZE DELETE FROM direct_delete WHERE reading = 'r1' AND value = '1.0'; ROLLBACK;
QUERY PLAN
Custom Scan (HypertableModify) (actual rows=0 loops=1)
Batches decompressed: 2
Tuples decompressed: 2
-> Delete on direct_delete (actual rows=0 loops=1)
Delete on _hyper_X_X_chunk direct_delete_1
-> Seq Scan on _hyper_X_X_chunk direct_delete_1 (actual rows=2 loops=1)
Filter: ((reading = 'r1'::text) AND (value = '1'::double precision))
(7 rows)

BEGIN; :ANALYZE DELETE FROM direct_delete WHERE device = 'd2' AND reading = 'r3' AND value = '1.0'; ROLLBACK;
QUERY PLAN
Custom Scan (HypertableModify) (actual rows=0 loops=1)
Batches decompressed: 1
Tuples decompressed: 1
-> Delete on direct_delete (actual rows=0 loops=1)
Delete on _hyper_X_X_chunk direct_delete_1
-> Seq Scan on _hyper_X_X_chunk direct_delete_1 (actual rows=1 loops=1)
Filter: ((device = 'd2'::text) AND (reading = 'r3'::text) AND (value = '1'::double precision))
(7 rows)

DROP TABLE direct_delete;
Loading

0 comments on commit 73d9558

Please sign in to comment.