Skip to content

Commit

Permalink
Use creation time in retention/compression policy
Browse files Browse the repository at this point in the history
The retention and compression policies can now use drop_created_before
and compress_created_before arguments respectively to specify chunk
selection using their creation times.

We don't support creation times for CAggs, yet.
  • Loading branch information
nikkhils committed Nov 16, 2023
1 parent e5840e3 commit 4481725
Show file tree
Hide file tree
Showing 37 changed files with 772 additions and 140 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/pgspot.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ jobs:
--proc-without-search-path 'extschema.recompress_chunk(chunk regclass,if_not_compressed boolean)'
--proc-without-search-path '_timescaledb_internal.policy_compression(job_id integer,config jsonb)'
--proc-without-search-path '_timescaledb_functions.policy_compression(job_id integer,config jsonb)'
--proc-without-search-path
'_timescaledb_internal.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean,use_creation_time boolean)'
--proc-without-search-path
'_timescaledb_functions.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean,use_creation_time boolean)'
--proc-without-search-path
'_timescaledb_internal.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean)'
--proc-without-search-path
Expand Down
1 change: 1 addition & 0 deletions .unreleased/feature_6227
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #6227 Use creation time in retention/compression policy
6 changes: 3 additions & 3 deletions sql/compat.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1015,12 +1015,12 @@ END$$
SET search_path TO pg_catalog,pg_temp;


CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean) LANGUAGE PLPGSQL AS $$
CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_compression_execute(job_id integer,htid integer,lag anyelement,maxchunks integer,verbose_log boolean,recompress_enabled boolean,use_creation_time boolean) LANGUAGE PLPGSQL AS $$
BEGIN
IF current_setting('timescaledb.enable_deprecation_warnings', true)::bool THEN
RAISE WARNING 'procedure _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version.';
RAISE WARNING 'procedure _timescaledb_internal.policy_compression_execute(integer,integer,anyelement,integer,boolean,boolean,boolean) is deprecated and has been moved to _timescaledb_functions schema. this compatibility function will be removed in a future version.';
END IF;
CALL _timescaledb_functions.policy_compression_execute($1,$2,$3,$4,$5,$6);
CALL _timescaledb_functions.policy_compression_execute($1,$2,$3,$4,$5,$6,$7);
END$$
SET search_path TO pg_catalog,pg_temp;

Expand Down
14 changes: 9 additions & 5 deletions sql/policy_api.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
-- might be kept, but data within the window will never be deleted.
CREATE OR REPLACE FUNCTION @[email protected]_retention_policy(
relation REGCLASS,
drop_after "any",
drop_after "any" = NULL,
if_not_exists BOOL = false,
schedule_interval INTERVAL = NULL,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL
timezone TEXT = NULL,
drop_created_before INTERVAL = NULL
)
RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_retention_add'
LANGUAGE C VOLATILE;
Expand Down Expand Up @@ -45,11 +46,13 @@ LANGUAGE C VOLATILE STRICT;

/* compression policy */
CREATE OR REPLACE FUNCTION @[email protected]_compression_policy(
hypertable REGCLASS, compress_after "any",
hypertable REGCLASS,
compress_after "any" = NULL,
if_not_exists BOOL = false,
schedule_interval INTERVAL = NULL,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL
timezone TEXT = NULL,
compress_created_before INTERVAL = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_compression_add'
Expand Down Expand Up @@ -83,6 +86,7 @@ LANGUAGE C VOLATILE;
/* 1 step policies */

/* Add policies */
/* Unsupported drop_created_before/compress_created_before in add/alter for caggs */
CREATE OR REPLACE FUNCTION timescaledb_experimental.add_policies(
relation REGCLASS,
if_not_exists BOOL = false,
Expand Down Expand Up @@ -128,4 +132,4 @@ CREATE OR REPLACE FUNCTION timescaledb_experimental.show_policies(
relation REGCLASS)
RETURNS SETOF JSONB
AS '@MODULE_PATHNAME@', 'ts_policies_show'
LANGUAGE C VOLATILE;
LANGUAGE C VOLATILE;
49 changes: 36 additions & 13 deletions sql/policy_internal.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ _timescaledb_functions.policy_compression_execute(
lag ANYELEMENT,
maxchunks INTEGER,
verbose_log BOOLEAN,
recompress_enabled BOOLEAN)
recompress_enabled BOOLEAN,
use_creation_time BOOLEAN)
AS $$
DECLARE
htoid REGCLASS;
Expand All @@ -54,6 +55,7 @@ DECLARE
bit_compressed_unordered int := 2;
bit_frozen int := 4;
bit_compressed_partial int := 8;
creation_lag INTERVAL := NULL;
BEGIN

-- procedures with SET clause cannot execute transaction
Expand All @@ -67,14 +69,26 @@ BEGIN
-- for the integer cases, we have to compute the lag w.r.t
-- the integer_now function and then pass on to show_chunks
IF pg_typeof(lag) IN ('BIGINT'::regtype, 'INTEGER'::regtype, 'SMALLINT'::regtype) THEN
-- cannot have use_creation_time set with this
IF use_creation_time IS TRUE THEN
RAISE EXCEPTION 'job % cannot use creation time with integer_now function', job_id;
END IF;
lag := _timescaledb_functions.subtract_integer_from_now(htoid, lag::BIGINT);
END IF;

-- if use_creation_time has been specified then the lag needs to be used with the
-- "compress_created_before" argument. Otherwise the usual "older_than" argument
-- is good enough
IF use_creation_time IS TRUE THEN
creation_lag := lag;
lag := NULL;
END IF;

FOR chunk_rec IN
SELECT
show.oid, ch.schema_name, ch.table_name, ch.status
FROM
@[email protected]_chunks(htoid, older_than => lag) AS show(oid)
@[email protected]_chunks(htoid, older_than => lag, created_before => creation_lag) AS show(oid)
INNER JOIN pg_class pgc ON pgc.oid = show.oid
INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid
INNER JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname AND ch.schema_name = pgns.nspname AND ch.hypertable_id = htid
Expand Down Expand Up @@ -157,14 +171,17 @@ DECLARE
dimtype REGTYPE;
dimtypeinput REGPROC;
compress_after TEXT;
compress_created_before TEXT;
lag_value TEXT;
lag_bigint_value BIGINT;
htid INTEGER;
htoid REGCLASS;
chunk_rec RECORD;
verbose_log BOOL;
maxchunks INTEGER := 0;
numchunks INTEGER := 1;
recompress_enabled BOOL;
use_creation_time BOOL := FALSE;
BEGIN

-- procedures with SET clause cannot execute transaction
Expand All @@ -183,11 +200,6 @@ BEGIN
verbose_log := COALESCE(jsonb_object_field_text(config, 'verbose_log')::BOOLEAN, FALSE);
maxchunks := COALESCE(jsonb_object_field_text(config, 'maxchunks_to_compress')::INTEGER, 0);
recompress_enabled := COALESCE(jsonb_object_field_text(config, 'recompress')::BOOLEAN, TRUE);
compress_after := jsonb_object_field_text(config, 'compress_after');

IF compress_after IS NULL THEN
RAISE EXCEPTION 'job % config must have compress_after', job_id;
END IF;

-- find primary dimension type --
SELECT dim.column_type INTO dimtype
Expand All @@ -197,29 +209,40 @@ BEGIN
ORDER BY dim.id
LIMIT 1;

lag_value := jsonb_object_field_text(config, 'compress_after');
compress_after := jsonb_object_field_text(config, 'compress_after');
IF compress_after IS NULL THEN
compress_created_before := jsonb_object_field_text(config, 'compress_created_before');
IF compress_created_before IS NULL THEN
RAISE EXCEPTION 'job % config must have compress_after or compress_created_before', job_id;
END IF;
lag_value := compress_created_before;
use_creation_time := true;
dimtype := 'INTERVAL' ::regtype;
ELSE
lag_value := compress_after;
END IF;

-- execute the properly type casts for the lag value
CASE dimtype
WHEN 'TIMESTAMP'::regtype, 'TIMESTAMPTZ'::regtype, 'DATE'::regtype THEN
WHEN 'TIMESTAMP'::regtype, 'TIMESTAMPTZ'::regtype, 'DATE'::regtype, 'INTERVAL' ::regtype THEN
CALL _timescaledb_functions.policy_compression_execute(
job_id, htid, lag_value::INTERVAL,
maxchunks, verbose_log, recompress_enabled
maxchunks, verbose_log, recompress_enabled, use_creation_time
);
WHEN 'BIGINT'::regtype THEN
CALL _timescaledb_functions.policy_compression_execute(
job_id, htid, lag_value::BIGINT,
maxchunks, verbose_log, recompress_enabled
maxchunks, verbose_log, recompress_enabled, use_creation_time
);
WHEN 'INTEGER'::regtype THEN
CALL _timescaledb_functions.policy_compression_execute(
job_id, htid, lag_value::INTEGER,
maxchunks, verbose_log, recompress_enabled
maxchunks, verbose_log, recompress_enabled, use_creation_time
);
WHEN 'SMALLINT'::regtype THEN
CALL _timescaledb_functions.policy_compression_execute(
job_id, htid, lag_value::SMALLINT,
maxchunks, verbose_log, recompress_enabled
maxchunks, verbose_log, recompress_enabled, use_creation_time
);
END CASE;
END;
Expand Down
153 changes: 153 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,156 @@ CREATE FUNCTION @[email protected]_chunks(
created_after "any" = NULL
) RETURNS SETOF REGCLASS AS '@MODULE_PATHNAME@', 'ts_chunk_show_chunks'
LANGUAGE C STABLE PARALLEL SAFE;

DROP FUNCTION @[email protected]_retention_policy(REGCLASS, "any", BOOL, INTERVAL, TIMESTAMPTZ, TEXT);
CREATE FUNCTION @[email protected]_retention_policy(
relation REGCLASS,
drop_after "any" = NULL,
if_not_exists BOOL = false,
schedule_interval INTERVAL = NULL,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL,
drop_created_before INTERVAL = NULL
)
RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_retention_add'
LANGUAGE C VOLATILE;

DROP FUNCTION @[email protected]_compression_policy(REGCLASS, "any", BOOL, INTERVAL, TIMESTAMPTZ, TEXT);
CREATE FUNCTION @[email protected]_compression_policy(
hypertable REGCLASS,
compress_after "any" = NULL,
if_not_exists BOOL = false,
schedule_interval INTERVAL = NULL,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL,
compress_created_before INTERVAL = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_compression_add'
LANGUAGE C VOLATILE;

DROP PROCEDURE IF EXISTS _timescaledb_functions.policy_compression_execute(INTEGER, INTEGER, ANYELEMENT, INTEGER, BOOLEAN, BOOLEAN);
DROP PROCEDURE IF EXISTS _timescaledb_internal.policy_compression_execute(INTEGER, INTEGER, ANYELEMENT, INTEGER, BOOLEAN, BOOLEAN);
CREATE PROCEDURE
_timescaledb_functions.policy_compression_execute(
job_id INTEGER,
htid INTEGER,
lag ANYELEMENT,
maxchunks INTEGER,
verbose_log BOOLEAN,
recompress_enabled BOOLEAN,
use_creation_time BOOLEAN)
AS $$
DECLARE
htoid REGCLASS;
chunk_rec RECORD;
numchunks INTEGER := 1;
_message text;
_detail text;
-- chunk status bits:
bit_compressed int := 1;
bit_compressed_unordered int := 2;
bit_frozen int := 4;
bit_compressed_partial int := 8;
creation_lag INTERVAL := NULL;
BEGIN

-- procedures with SET clause cannot execute transaction
-- control so we adjust search_path in procedure body
SET LOCAL search_path TO pg_catalog, pg_temp;

SELECT format('%I.%I', schema_name, table_name) INTO htoid
FROM _timescaledb_catalog.hypertable
WHERE id = htid;

-- for the integer cases, we have to compute the lag w.r.t
-- the integer_now function and then pass on to show_chunks
IF pg_typeof(lag) IN ('BIGINT'::regtype, 'INTEGER'::regtype, 'SMALLINT'::regtype) THEN
-- cannot have use_creation_time set with this
IF use_creation_time IS TRUE THEN
RAISE EXCEPTION 'job % cannot use creation time with integer_now function', job_id;
END IF;
lag := _timescaledb_functions.subtract_integer_from_now(htoid, lag::BIGINT);
END IF;

-- if use_creation_time has been specified then the lag needs to be used with the
-- "compress_created_before" argument. Otherwise the usual "older_than" argument
-- is good enough
IF use_creation_time IS TRUE THEN
creation_lag := lag;
lag := NULL;
END IF;

FOR chunk_rec IN
SELECT
show.oid, ch.schema_name, ch.table_name, ch.status
FROM
@[email protected]_chunks(htoid, older_than => lag, created_before => creation_lag) AS show(oid)
INNER JOIN pg_class pgc ON pgc.oid = show.oid
INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid
INNER JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname AND ch.schema_name = pgns.nspname AND ch.hypertable_id = htid
WHERE
ch.dropped IS FALSE
AND (
ch.status = 0 OR
(
ch.status & bit_compressed > 0 AND (
ch.status & bit_compressed_unordered > 0 OR
ch.status & bit_compressed_partial > 0
)
)
)
LOOP
IF chunk_rec.status = 0 THEN
BEGIN
PERFORM @[email protected]_chunk( chunk_rec.oid );
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS
_message = MESSAGE_TEXT,
_detail = PG_EXCEPTION_DETAIL;
RAISE WARNING 'compressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text
USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
ERRCODE = sqlstate;
END;
ELSIF
(
chunk_rec.status & bit_compressed > 0 AND (
chunk_rec.status & bit_compressed_unordered > 0 OR
chunk_rec.status & bit_compressed_partial > 0
)
) AND recompress_enabled IS TRUE THEN
BEGIN
PERFORM @[email protected]_chunk(chunk_rec.oid, if_compressed => true);
EXCEPTION WHEN OTHERS THEN
RAISE WARNING 'decompressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text
USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
ERRCODE = sqlstate;
END;
-- SET LOCAL is only active until end of transaction.
-- While we could use SET at the start of the function we do not
-- want to bleed out search_path to caller, so we do SET LOCAL
-- again after COMMIT
BEGIN
PERFORM @[email protected]_chunk(chunk_rec.oid);
EXCEPTION WHEN OTHERS THEN
RAISE WARNING 'compressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text
USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
ERRCODE = sqlstate;
END;
END IF;
COMMIT;
-- SET LOCAL is only active until end of transaction.
-- While we could use SET at the start of the function we do not
-- want to bleed out search_path to caller, so we do SET LOCAL
-- again after COMMIT
SET LOCAL search_path TO pg_catalog, pg_temp;
IF verbose_log THEN
RAISE LOG 'job % completed processing chunk %.%', job_id, chunk_rec.schema_name, chunk_rec.table_name;
END IF;
numchunks := numchunks + 1;
IF maxchunks > 0 AND numchunks >= maxchunks THEN
EXIT;
END IF;
END LOOP;
END;
$$ LANGUAGE PLPGSQL;
Loading

0 comments on commit 4481725

Please sign in to comment.