Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce WAL activity by freezing tuples immediately #5890

Merged
merged 1 commit into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .unreleased/feature_5890
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #5890 Reduce WAL activity by freezing compressed tuples immediately
1 change: 1 addition & 0 deletions sql/pre_install/tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ CREATE TABLE _timescaledb_catalog.compression_chunk_size (
compressed_index_size bigint NOT NULL,
numrows_pre_compression bigint,
numrows_post_compression bigint,
numrows_frozen_immediately bigint,
-- table constraints
CONSTRAINT compression_chunk_size_pkey PRIMARY KEY (chunk_id),
CONSTRAINT compression_chunk_size_chunk_id_fkey FOREIGN KEY (chunk_id) REFERENCES _timescaledb_catalog.chunk (id) ON DELETE CASCADE,
Expand Down
50 changes: 50 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,53 @@ DROP TABLE _timescaledb_internal.tmp_chunk_seq_value;
GRANT SELECT ON _timescaledb_catalog.chunk_id_seq TO PUBLIC;
GRANT SELECT ON _timescaledb_catalog.chunk TO PUBLIC;
-- end recreate _timescaledb_catalog.chunk table --

--
-- Rebuild the catalog table `_timescaledb_catalog.compression_chunk_size` to
-- add new column `numrows_frozen_immediately`
--
CREATE TABLE _timescaledb_internal.compression_chunk_size_tmp
AS SELECT * from _timescaledb_catalog.compression_chunk_size;

-- Drop depended views
-- We assume that '_timescaledb_internal.compressed_chunk_stats' was already dropped in this update
-- (see above)

-- Drop table
ALTER EXTENSION timescaledb DROP TABLE _timescaledb_catalog.compression_chunk_size;
DROP TABLE _timescaledb_catalog.compression_chunk_size;

CREATE TABLE _timescaledb_catalog.compression_chunk_size (
chunk_id integer NOT NULL,
compressed_chunk_id integer NOT NULL,
uncompressed_heap_size bigint NOT NULL,
uncompressed_toast_size bigint NOT NULL,
uncompressed_index_size bigint NOT NULL,
compressed_heap_size bigint NOT NULL,
compressed_toast_size bigint NOT NULL,
compressed_index_size bigint NOT NULL,
numrows_pre_compression bigint,
numrows_post_compression bigint,
numrows_frozen_immediately bigint,
-- table constraints
CONSTRAINT compression_chunk_size_pkey PRIMARY KEY (chunk_id),
CONSTRAINT compression_chunk_size_chunk_id_fkey FOREIGN KEY (chunk_id) REFERENCES _timescaledb_catalog.chunk (id) ON DELETE CASCADE,
CONSTRAINT compression_chunk_size_compressed_chunk_id_fkey FOREIGN KEY (compressed_chunk_id) REFERENCES _timescaledb_catalog.chunk (id) ON DELETE CASCADE
);

INSERT INTO _timescaledb_catalog.compression_chunk_size
(chunk_id, compressed_chunk_id, uncompressed_heap_size, uncompressed_toast_size,
uncompressed_index_size, compressed_heap_size, compressed_toast_size,
compressed_index_size, numrows_pre_compression, numrows_post_compression, numrows_frozen_immediately)
SELECT chunk_id, compressed_chunk_id, uncompressed_heap_size, uncompressed_toast_size,
uncompressed_index_size, compressed_heap_size, compressed_toast_size,
compressed_index_size, numrows_pre_compression, numrows_post_compression, 0
FROM _timescaledb_internal.compression_chunk_size_tmp;

DROP TABLE _timescaledb_internal.compression_chunk_size_tmp;

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.compression_chunk_size', '');

GRANT SELECT ON _timescaledb_catalog.compression_chunk_size TO PUBLIC;

-- End modify `_timescaledb_catalog.compression_chunk_size`
50 changes: 50 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,53 @@ GRANT SELECT ON _timescaledb_catalog.chunk_id_seq TO PUBLIC;
GRANT SELECT ON _timescaledb_catalog.chunk TO PUBLIC;

-- end recreate _timescaledb_catalog.chunk table --


--
-- Rebuild the catalog table `_timescaledb_catalog.compression_chunk_size` to
-- remove column `numrows_frozen_immediately`
--
CREATE TABLE _timescaledb_internal.compression_chunk_size_tmp
AS SELECT * from _timescaledb_catalog.compression_chunk_size;

-- Drop depended views
-- We assume that '_timescaledb_internal.compressed_chunk_stats' was already dropped in this update
-- (see above)

-- Drop table
ALTER EXTENSION timescaledb DROP TABLE _timescaledb_catalog.compression_chunk_size;
DROP TABLE _timescaledb_catalog.compression_chunk_size;

CREATE TABLE _timescaledb_catalog.compression_chunk_size (
chunk_id integer NOT NULL,
compressed_chunk_id integer NOT NULL,
uncompressed_heap_size bigint NOT NULL,
uncompressed_toast_size bigint NOT NULL,
uncompressed_index_size bigint NOT NULL,
compressed_heap_size bigint NOT NULL,
compressed_toast_size bigint NOT NULL,
compressed_index_size bigint NOT NULL,
numrows_pre_compression bigint,
numrows_post_compression bigint,
-- table constraints
CONSTRAINT compression_chunk_size_pkey PRIMARY KEY (chunk_id),
CONSTRAINT compression_chunk_size_chunk_id_fkey FOREIGN KEY (chunk_id) REFERENCES _timescaledb_catalog.chunk (id) ON DELETE CASCADE,
CONSTRAINT compression_chunk_size_compressed_chunk_id_fkey FOREIGN KEY (compressed_chunk_id) REFERENCES _timescaledb_catalog.chunk (id) ON DELETE CASCADE
);

INSERT INTO _timescaledb_catalog.compression_chunk_size
(chunk_id, compressed_chunk_id, uncompressed_heap_size, uncompressed_toast_size,
uncompressed_index_size, compressed_heap_size, compressed_toast_size,
compressed_index_size, numrows_pre_compression, numrows_post_compression)
SELECT chunk_id, compressed_chunk_id, uncompressed_heap_size, uncompressed_toast_size,
uncompressed_index_size, compressed_heap_size, compressed_toast_size,
compressed_index_size, numrows_pre_compression, numrows_post_compression
FROM _timescaledb_internal.compression_chunk_size_tmp;

DROP TABLE _timescaledb_internal.compression_chunk_size_tmp;

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.compression_chunk_size', '');

GRANT SELECT ON _timescaledb_catalog.compression_chunk_size TO PUBLIC;

-- End modify `_timescaledb_catalog.compression_chunk_size`
1 change: 1 addition & 0 deletions src/telemetry/stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ add_chunk_stats(HyperStats *stats, Form_pg_class class, const Chunk *chunk,
stats->uncompressed_toast_size += fd_compr->uncompressed_toast_size;
stats->uncompressed_row_count += fd_compr->numrows_pre_compression;
stats->compressed_row_count += fd_compr->numrows_post_compression;
stats->compressed_row_frozen_immediately_count += fd_compr->numrows_frozen_immediately;

/* Also add compressed sizes to total number for entire table */
stats->storage.relsize.heap_size += fd_compr->compressed_heap_size;
Expand Down
1 change: 1 addition & 0 deletions src/telemetry/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ typedef struct HyperStats
int64 compressed_indexes_size;
int64 compressed_toast_size;
int64 compressed_row_count;
int64 compressed_row_frozen_immediately_count;
int64 uncompressed_heap_size;
int64 uncompressed_indexes_size;
int64 uncompressed_toast_size;
Expand Down
4 changes: 4 additions & 0 deletions src/telemetry/telemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ format_iso8601(Datum value)
#define REQ_RELKIND_COMPRESSED_TOAST_SIZE "compressed_toast_size"
#define REQ_RELKIND_COMPRESSED_INDEXES_SIZE "compressed_indexes_size"
#define REQ_RELKIND_COMPRESSED_ROWCOUNT "compressed_row_count"
#define REQ_RELKIND_COMPRESSED_ROWCOUNT_FROZEN_IMMEDIATELY "compressed_row_count_frozen_immediately"

#define REQ_RELKIND_CAGG_ON_DISTRIBUTED_HYPERTABLE_COUNT "num_caggs_on_distributed_hypertables"
#define REQ_RELKIND_CAGG_USES_REAL_TIME_AGGREGATION_COUNT "num_caggs_using_real_time_aggregation"
Expand Down Expand Up @@ -639,6 +640,9 @@ add_compression_stats_object(JsonbParseState *parse_state, StatsRelType reltype,
ts_jsonb_add_int64(parse_state,
REQ_RELKIND_COMPRESSED_INDEXES_SIZE,
hs->compressed_indexes_size);
ts_jsonb_add_int64(parse_state,
REQ_RELKIND_COMPRESSED_ROWCOUNT_FROZEN_IMMEDIATELY,
hs->compressed_row_frozen_immediately_count);
ts_jsonb_add_int64(parse_state, REQ_RELKIND_UNCOMPRESSED_ROWCOUNT, hs->uncompressed_row_count);
ts_jsonb_add_int64(parse_state, REQ_RELKIND_UNCOMPRESSED_HEAP_SIZE, hs->uncompressed_heap_size);
ts_jsonb_add_int64(parse_state,
Expand Down
2 changes: 2 additions & 0 deletions src/ts_catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -1289,6 +1289,7 @@ typedef enum Anum_compression_chunk_size
Anum_compression_chunk_size_compressed_index_size,
Anum_compression_chunk_size_numrows_pre_compression,
Anum_compression_chunk_size_numrows_post_compression,
Anum_compression_chunk_size_numrows_frozen_immediately,
_Anum_compression_chunk_size_max,
} Anum_compression_chunk_size;

Expand All @@ -1306,6 +1307,7 @@ typedef struct FormData_compression_chunk_size
int64 compressed_index_size;
int64 numrows_pre_compression;
int64 numrows_post_compression;
int64 numrows_frozen_immediately;
} FormData_compression_chunk_size;

typedef FormData_compression_chunk_size *Form_compression_chunk_size;
Expand Down
1 change: 1 addition & 0 deletions tsl/src/chunk_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ chunk_copy_get_source_compressed_chunk_stats(ChunkCopy *cc)
cc->fd_ccs.compressed_index_size = atoll(PQgetvalue(res, 0, 5));
cc->fd_ccs.numrows_pre_compression = atoll(PQgetvalue(res, 0, 6));
cc->fd_ccs.numrows_post_compression = atoll(PQgetvalue(res, 0, 7));
cc->fd_ccs.numrows_frozen_immediately = 0;

ts_dist_cmd_close_response(dist_res);
}
Expand Down
41 changes: 35 additions & 6 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ typedef struct CompressChunkCxt
static void
compression_chunk_size_catalog_insert(int32 src_chunk_id, const RelationSize *src_size,
int32 compress_chunk_id, const RelationSize *compress_size,
int64 rowcnt_pre_compression, int64 rowcnt_post_compression)
int64 rowcnt_pre_compression, int64 rowcnt_post_compression,
int64 rowcnt_frozen)
{
Catalog *catalog = ts_catalog_get();
Relation rel;
Expand Down Expand Up @@ -93,6 +94,8 @@ compression_chunk_size_catalog_insert(int32 src_chunk_id, const RelationSize *sr
Int64GetDatum(rowcnt_pre_compression);
values[AttrNumberGetAttrOffset(Anum_compression_chunk_size_numrows_post_compression)] =
Int64GetDatum(rowcnt_post_compression);
values[AttrNumberGetAttrOffset(Anum_compression_chunk_size_numrows_frozen_immediately)] =
Int64GetDatum(rowcnt_frozen);

ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_insert_values(rel, desc, values, nulls);
Expand Down Expand Up @@ -487,6 +490,27 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
compress_ht_chunk = ts_chunk_get_by_id(mergable_chunk->fd.compressed_chunk_id, true);
result_chunk_id = mergable_chunk->table_id;
}

/* Since the compressed relation is created in the same transaction as the tuples that will be
* written by the compressor, we can insert the tuple directly in frozen state. This is the same
* logic as performed in COPY INSERT FROZEN.
*
* Note: Tuples inserted with HEAP_INSERT_FROZEN become immediately visible to all transactions
* (they violate the MVCC pattern). So, this flag can only be used when creating the compressed
* chunk in the same transaction as the compressed tuples are inserted.
*
* If this isn't the case, then tuples can be seen multiple times by parallel readers - once in
* the uncompressed part of the hypertable (since they are not deleted in the transaction) and
* once in the compressed part of the hypertable since the MVCC semantic is violated due to the
* flag.
*
* In contrast, when the compressed chunk part is created in the same transaction as the tuples
* are written, the compressed chunk (i.e., the catalog entry) becomes visible to other
* transactions only after the transaction that performs the compression is commited and
* the uncompressed chunk is truncated.
Comment on lines +507 to +510
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wont this mean that we can rarely benefit from this as we are moving towards precreating chunks and getting rid of truncate due to high locking requirements.

Copy link
Contributor Author

@jnidzwetzki jnidzwetzki Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correct. If we precreate a chunk together with the compressed counterpart, we cannot benefit from this optimization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can easily create a policy for pre-creating chunks ... or is the idea doing it in the chunk dispatch code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh... I guess you're talking about this PR: #5849

*/
int insert_options = new_compressed_chunk ? HEAP_INSERT_FROZEN : 0;

/* convert list to array of pointers for compress_chunk */
colinfo_array = palloc(sizeof(ColumnCompressionInfo *) * htcols_listlen);
foreach (lc, htcols_list)
Expand All @@ -498,7 +522,8 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
cstat = compress_chunk(cxt.srcht_chunk->table_id,
compress_ht_chunk->table_id,
colinfo_array,
htcols_listlen);
htcols_listlen,
insert_options);
Comment on lines 522 to +526
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to track the number of times this feature is used by adding it to the telemetry. That way, we can see if the feature is being used and understand if it is useful or if it is just in the way and we should remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mkindahl I added the information to the telemetry. Does it provide the information you had in mind?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this provides enough information to see how widely it is used. Thanks for adding it!


/* Drop all FK constraints on the uncompressed chunk. This is needed to allow
* cascading deleted data in FK-referenced tables, while blocking deleting data
Expand All @@ -514,7 +539,8 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
compress_ht_chunk->fd.id,
&after_size,
cstat.rowcnt_pre_compression,
cstat.rowcnt_post_compression);
cstat.rowcnt_post_compression,
cstat.rowcnt_frozen);

/* Copy chunk constraints (including fkey) to compressed chunk.
* Do this after compressing the chunk to avoid holding strong, unnecessary locks on the
Expand Down Expand Up @@ -811,7 +837,8 @@ tsl_create_compressed_chunk(PG_FUNCTION_ARGS)
compress_ht_chunk->fd.id,
&compressed_size,
numrows_pre_compression,
numrows_post_compression);
numrows_post_compression,
0);

chunk_was_compressed = ts_chunk_is_compressed(cxt.srcht_chunk);
ts_chunk_set_compressed_chunk(cxt.srcht_chunk, compress_ht_chunk->fd.id);
Expand Down Expand Up @@ -1071,7 +1098,8 @@ tsl_get_compressed_chunk_index_for_recompression(PG_FUNCTION_ARGS)
in_column_offsets,
compressed_rel_tupdesc->natts,
true /*need_bistate*/,
true /*reset_sequence*/);
true /*reset_sequence*/,
0 /*insert options*/);

/*
* Keep the ExclusiveLock on the compressed chunk. This lock will be requested
Expand Down Expand Up @@ -1370,7 +1398,8 @@ tsl_recompress_chunk_segmentwise(PG_FUNCTION_ARGS)
in_column_offsets,
compressed_rel_tupdesc->natts,
true /*need_bistate*/,
true /*reset_sequence*/);
true /*reset_sequence*/,
0 /*insert options*/);

/* create an array of the segmentby column offsets in the compressed chunk */
int16 *segmentby_column_offsets_compressed =
Expand Down
20 changes: 16 additions & 4 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <utils/fmgroids.h>
#include <utils/lsyscache.h>
#include <utils/memutils.h>
#include <utils/portal.h>
#include <utils/rel.h>
#include <utils/relcache.h>
#include <utils/snapmgr.h>
Expand All @@ -53,6 +54,7 @@
#include "create.h"
#include "custom_type_cache.h"
#include "arrow_c_data_interface.h"
#include "debug_assert.h"
#include "debug_point.h"
#include "deltadelta.h"
#include "dictionary.h"
Expand Down Expand Up @@ -223,7 +225,7 @@ truncate_relation(Oid table_oid)

CompressionStats
compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column_compression_info,
int num_compression_infos)
int num_compression_infos, int insert_options)
{
int n_keys;
ListCell *lc;
Expand Down Expand Up @@ -399,7 +401,8 @@ compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column
in_column_offsets,
out_desc->natts,
true /*need_bistate*/,
false /*reset_sequence*/);
false /*reset_sequence*/,
insert_options);

if (matched_index_rel != NULL)
{
Expand Down Expand Up @@ -441,12 +444,19 @@ compress_chunk(Oid in_table, Oid out_table, const ColumnCompressionInfo **column
}

row_compressor_finish(&row_compressor);
DEBUG_WAITPOINT("compression_done_before_truncate_uncompressed");
truncate_relation(in_table);

table_close(out_rel, NoLock);
table_close(in_rel, NoLock);
cstat.rowcnt_pre_compression = row_compressor.rowcnt_pre_compression;
cstat.rowcnt_post_compression = row_compressor.num_compressed_rows;

if ((insert_options & HEAP_INSERT_FROZEN) == HEAP_INSERT_FROZEN)
cstat.rowcnt_frozen = row_compressor.num_compressed_rows;
else
cstat.rowcnt_frozen = 0;

return cstat;
}

Expand Down Expand Up @@ -836,7 +846,8 @@ void
row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_desc,
Relation compressed_table, int num_compression_infos,
const ColumnCompressionInfo **column_compression_info, int16 *in_column_offsets,
int16 num_columns_in_compressed_table, bool need_bistate, bool reset_sequence)
int16 num_columns_in_compressed_table, bool need_bistate, bool reset_sequence,
int insert_options)
{
TupleDesc out_desc = RelationGetDescr(compressed_table);
int col;
Expand Down Expand Up @@ -883,6 +894,7 @@ row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompressed_tuple_
.sequence_num = SEQUENCE_NUM_GAP,
.reset_sequence = reset_sequence,
.first_iteration = true,
.insert_options = insert_options,
};

memset(row_compressor->compressed_is_null, 1, sizeof(bool) * num_columns_in_compressed_table);
Expand Down Expand Up @@ -1214,7 +1226,7 @@ row_compressor_flush(RowCompressor *row_compressor, CommandId mycid, bool change
heap_insert(row_compressor->compressed_table,
compressed_tuple,
mycid,
0 /*=options*/,
row_compressor->insert_options /*=options*/,
row_compressor->bistate);
if (row_compressor->resultRelInfo->ri_NumIndices > 0)
{
Expand Down
7 changes: 5 additions & 2 deletions tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ typedef struct CompressionStats
{
int64 rowcnt_pre_compression;
int64 rowcnt_post_compression;
int64 rowcnt_frozen;
} CompressionStats;

typedef struct PerColumn
Expand Down Expand Up @@ -264,6 +265,8 @@ typedef struct RowCompressor
bool reset_sequence;
/* flag for checking if we are working on the first tuple */
bool first_iteration;
/* the heap insert options */
int insert_options;
} RowCompressor;

/* SegmentFilter is used for filtering segments based on qualifiers */
Expand Down Expand Up @@ -312,7 +315,7 @@ pg_attribute_unused() assert_num_compression_algorithms_sane(void)
extern CompressionStorage compression_get_toast_storage(CompressionAlgorithms algo);
extern CompressionStats compress_chunk(Oid in_table, Oid out_table,
const ColumnCompressionInfo **column_compression_info,
int num_compression_infos);
int num_compression_infos, int insert_options);
extern void decompress_chunk(Oid in_table, Oid out_table);

extern DecompressionIterator *(*tsl_get_decompression_iterator_init(
Expand Down Expand Up @@ -354,7 +357,7 @@ extern void row_compressor_init(RowCompressor *row_compressor, TupleDesc uncompr
Relation compressed_table, int num_compression_infos,
const ColumnCompressionInfo **column_compression_info,
int16 *column_offsets, int16 num_columns_in_compressed_table,
bool need_bistate, bool reset_sequence);
bool need_bistate, bool reset_sequence, int insert_options);
extern void row_compressor_finish(RowCompressor *row_compressor);
extern void populate_per_compressed_columns_from_data(PerCompressedColumn *per_compressed_cols,
int16 num_cols, Datum *compressed_datums,
Expand Down
File renamed without changes.
Loading