Skip to content

Commit

Permalink
Add include_tiered_data option to cagg policy
Browse files Browse the repository at this point in the history
  • Loading branch information
zilder committed Jan 14, 2025
1 parent 194f010 commit e507251
Show file tree
Hide file tree
Showing 12 changed files with 117 additions and 42 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7587
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7587 Add `include_tiered_data` option to cagg policy
3 changes: 2 additions & 1 deletion sql/policy_api.sql
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ CREATE OR REPLACE FUNCTION @[email protected]_continuous_aggregate_policy(
end_offset "any", schedule_interval INTERVAL,
if_not_exists BOOL = false,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL
timezone TEXT = NULL,
include_tiered_data BOOL = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_add'
Expand Down
19 changes: 19 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,22 @@ CREATE PROCEDURE @[email protected]_continuous_aggregate(
force BOOLEAN = FALSE
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_update_placeholder';

-- Add `include_tiered_data` argument to `add_continuous_aggregate_policy`
DROP FUNCTION @[email protected]_continuous_aggregate_policy(
continuous_aggregate REGCLASS, start_offset "any",
end_offset "any", schedule_interval INTERVAL,
if_not_exists BOOL,
initial_start TIMESTAMPTZ,
timezone TEXT
);
CREATE FUNCTION @[email protected]_continuous_aggregate_policy(
continuous_aggregate REGCLASS, start_offset "any",
end_offset "any", schedule_interval INTERVAL,
if_not_exists BOOL = false,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL,
include_tiered_data BOOL = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_update_placeholder'
LANGUAGE C VOLATILE;
20 changes: 20 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,23 @@ CREATE PROCEDURE @[email protected]_continuous_aggregate(
window_start "any",
window_end "any"
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_continuous_agg_refresh';

-- Remove `include_tiered_data` argument from `add_continuous_aggregate_policy`
DROP FUNCTION @[email protected]_continuous_aggregate_policy(
continuous_aggregate REGCLASS, start_offset "any",
end_offset "any", schedule_interval INTERVAL,
if_not_exists BOOL,
initial_start TIMESTAMPTZ,
timezone TEXT,
include_tiered_data BOOL
);
CREATE FUNCTION @[email protected]_continuous_aggregate_policy(
continuous_aggregate REGCLASS, start_offset "any",
end_offset "any", schedule_interval INTERVAL,
if_not_exists BOOL = false,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_add'
LANGUAGE C VOLATILE;
15 changes: 12 additions & 3 deletions tsl/src/bgw_policy/continuous_aggregate_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ bool
policy_refresh_cagg_get_include_tiered_data(const Jsonb *config, bool *isnull)
{
bool found;
bool res = ts_jsonb_get_bool_field(config, POL_REFRESH_CONF_KEY_ENABLE_TIERED_READS, &found);
bool res = ts_jsonb_get_bool_field(config, POL_REFRESH_CONF_KEY_INCLUDE_TIERED_DATA, &found);

*isnull = !found;
return res;
Expand Down Expand Up @@ -529,7 +529,8 @@ Datum
policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type, NullableDatum start_offset,
Oid end_offset_type, NullableDatum end_offset,
Interval refresh_interval, bool if_not_exists, bool fixed_schedule,
TimestampTz initial_start, const char *timezone)
TimestampTz initial_start, const char *timezone,
NullableDatum include_tiered_data)
{
NameData application_name;
NameData proc_name, proc_schema, check_name, check_schema, owner;
Expand Down Expand Up @@ -640,6 +641,10 @@ policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type, NullableDa
policyconf.offset_end.value);
else
ts_jsonb_add_null(parse_state, POL_REFRESH_CONF_KEY_END_OFFSET);
if (!include_tiered_data.isnull)
ts_jsonb_add_bool(parse_state,
POL_REFRESH_CONF_KEY_INCLUDE_TIERED_DATA,
include_tiered_data.value);
JsonbValue *result = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
Jsonb *config = JsonbValueToJsonb(result);

Expand Down Expand Up @@ -670,6 +675,7 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
Interval refresh_interval;
bool if_not_exists;
NullableDatum start_offset, end_offset;
NullableDatum include_tiered_data;

ts_feature_flag_check(FEATURE_POLICY);

Expand All @@ -692,6 +698,8 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
bool fixed_schedule = !PG_ARGISNULL(5);
text *timezone = PG_ARGISNULL(6) ? NULL : PG_GETARG_TEXT_PP(6);
char *valid_timezone = NULL;
include_tiered_data.value = PG_GETARG_DATUM(7);
include_tiered_data.isnull = PG_ARGISNULL(7);

Datum retval;
/* if users pass in -infinity for initial_start, then use the current_timestamp instead */
Expand All @@ -714,7 +722,8 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
if_not_exists,
fixed_schedule,
initial_start,
valid_timezone);
valid_timezone,
include_tiered_data);
if (!TIMESTAMP_NOT_FINITE(initial_start))
{
int32 job_id = DatumGetInt32(retval);
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/bgw_policy/continuous_aggregate_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ Datum policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type,
NullableDatum start_offset, Oid end_offset_type,
NullableDatum end_offset, Interval refresh_interval,
bool if_not_exists, bool fixed_schedule,
TimestampTz initial_start, const char *timezone);
TimestampTz initial_start, const char *timezone,
NullableDatum include_tiered_data);
Datum policy_refresh_cagg_remove_internal(Oid cagg_oid, bool if_exists);
31 changes: 16 additions & 15 deletions tsl/src/bgw_policy/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <parser/parser.h>
#include <tcop/pquery.h>
#include <utils/builtins.h>
#include <utils/guc.h>
#include <utils/lsyscache.h>
#include <utils/portal.h>
#include <utils/snapmgr.h>
Expand Down Expand Up @@ -373,19 +374,19 @@ policy_refresh_cagg_execute(int32 job_id, Jsonb *config)
{
PolicyContinuousAggData policy_data;

StringInfo str = makeStringInfo();
StringInfo str = makeStringInfo();
JsonbToCStringIndent(str, &config->root, VARSIZE(config));

policy_refresh_cagg_read_and_validate_config(config, &policy_data);

bool enable_osm_reads_old = ts_guc_enable_osm_reads;

if (!policy_data.include_tiered_data_isnull) {
SetConfigOption(
"timescaledb.enable_tiered_reads",
policy_data.include_tiered_data ? "on" : "off",
PGC_USERSET,
PGC_S_SESSION);
if (!policy_data.include_tiered_data_isnull)
{
SetConfigOption("timescaledb.enable_tiered_reads",
policy_data.include_tiered_data ? "on" : "off",
PGC_USERSET,
PGC_S_SESSION);
}

continuous_agg_refresh_internal(policy_data.cagg,
Expand All @@ -395,12 +396,12 @@ policy_refresh_cagg_execute(int32 job_id, Jsonb *config)
policy_data.end_is_null,
false);

if (!policy_data.include_tiered_data_isnull) {
SetConfigOption(
"timescaledb.enable_tiered_reads",
enable_osm_reads_old ? "on" : "off",
PGC_USERSET,
PGC_S_SESSION);
if (!policy_data.include_tiered_data_isnull)
{
SetConfigOption("timescaledb.enable_tiered_reads",
enable_osm_reads_old ? "on" : "off",
PGC_USERSET,
PGC_S_SESSION);
}

return true;
Expand Down Expand Up @@ -442,8 +443,8 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
ts_internal_to_time_string(refresh_end, dim_type)),
errhint("The start of the window must be before the end.")));

include_tiered_data = policy_refresh_cagg_get_include_tiered_data(
config, &include_tiered_data_isnull);
include_tiered_data =
policy_refresh_cagg_get_include_tiered_data(config, &include_tiered_data_isnull);

if (policy_data)
{
Expand Down
5 changes: 4 additions & 1 deletion tsl/src/bgw_policy/policies_v2.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ validate_and_create_policies(policies_info all_policies, bool if_exists)
/* Create policies as required, delete the old ones if coming from alter */
if (all_policies.refresh && all_policies.refresh->create_policy)
{
NullableDatum include_tiered_data = { .isnull = true };

if (all_policies.is_alter_policy)
policy_refresh_cagg_remove_internal(all_policies.rel_oid, if_exists);
refresh_job_id = policy_refresh_cagg_add_internal(all_policies.rel_oid,
Expand All @@ -217,7 +219,8 @@ validate_and_create_policies(policies_info all_policies, bool if_exists)
false,
false,
DT_NOBEGIN,
NULL);
NULL,
include_tiered_data);
}
if (all_policies.compress && all_policies.compress->create_policy)
{
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/bgw_policy/policies_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#define POL_REFRESH_CONF_KEY_MAT_HYPERTABLE_ID "mat_hypertable_id"
#define POL_REFRESH_CONF_KEY_START_OFFSET "start_offset"
#define POL_REFRESH_CONF_KEY_END_OFFSET "end_offset"
#define POL_REFRESH_CONF_KEY_ENABLE_TIERED_READS "include_tiered_data"
#define POL_REFRESH_CONF_KEY_INCLUDE_TIERED_DATA "include_tiered_data"

#define POLICY_COMPRESSION_PROC_NAME "policy_compression"
#define POLICY_COMPRESSION_CHECK_NAME "policy_compression_check"
Expand Down
37 changes: 28 additions & 9 deletions tsl/test/expected/chunk_utils_internal.out
Original file line number Diff line number Diff line change
Expand Up @@ -822,10 +822,11 @@ NOTICE: drop cascades to table _timescaledb_internal._hyper_6_12_chunk
\c :TEST_DBNAME :ROLE_SUPERUSER
CREATE USER MAPPING FOR :ROLE_SUPERUSER SERVER s3_server
OPTIONS (user :'ROLE_SUPERUSER', password_required 'false');
CREATE FUNCTION create_test_cagg(include_tiered_data JSONB)
CREATE FUNCTION create_test_cagg(include_tiered_data BOOL)
RETURNS INTEGER AS
$$
DECLARE
cfg jsonb;
job_id INTEGER;
BEGIN
CREATE MATERIALIZED VIEW ht_try_weekly
Expand All @@ -839,17 +840,24 @@ BEGIN
'ht_try_weekly',
start_offset => NULL,
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
schedule_interval => INTERVAL '1 hour',
include_tiered_data => include_tiered_data
);

UPDATE _timescaledb_config.bgw_job
SET config = jsonb_insert(config, '{include_tiered_data}'::TEXT[], include_tiered_data)
WHERE id = job_id;
cfg := config FROM _timescaledb_config.bgw_job WHERE id = job_id;
RAISE NOTICE 'config: %', jsonb_pretty(cfg);

RETURN job_id;
END
$$ LANGUAGE plpgsql;
SELECT create_test_cagg('true') AS job_id \gset
-- include tiered data
SELECT create_test_cagg(true) AS job_id \gset
NOTICE: config: {
"end_offset": "@ 1 hour",
"start_offset": null,
"mat_hypertable_id": 7,
"include_tiered_data": true
}
CALL run_job(:job_id);
SELECT * FROM ht_try_weekly;
ts_bucket | avg
Expand All @@ -860,7 +868,14 @@ SELECT * FROM ht_try_weekly;

DROP MATERIALIZED VIEW ht_try_weekly;
NOTICE: drop cascades to 2 other objects
SELECT create_test_cagg('false') AS job_id \gset
-- exclude tiered data
SELECT create_test_cagg(false) AS job_id \gset
NOTICE: config: {
"end_offset": "@ 1 hour",
"start_offset": null,
"mat_hypertable_id": 8,
"include_tiered_data": false
}
CALL run_job(:job_id);
SELECT * FROM ht_try_weekly;
ts_bucket | avg
Expand All @@ -871,7 +886,12 @@ SELECT * FROM ht_try_weekly;
DROP MATERIALIZED VIEW ht_try_weekly;
NOTICE: drop cascades to table _timescaledb_internal._hyper_8_15_chunk
-- default behavior: use instance-wide GUC value
SELECT create_test_cagg('null') AS job_id \gset
SELECT create_test_cagg(null) AS job_id \gset
NOTICE: config: {
"end_offset": "@ 1 hour",
"start_offset": null,
"mat_hypertable_id": 9
}
CALL run_job(:job_id);
SELECT * FROM ht_try_weekly;
ts_bucket | avg
Expand All @@ -882,7 +902,6 @@ SELECT * FROM ht_try_weekly;

DROP MATERIALIZED VIEW ht_try_weekly;
NOTICE: drop cascades to 2 other objects
\c :TEST_DBNAME :ROLE_4;
-- This test verifies that a bugfix regarding the way `ROWID_VAR`s are adjusted
-- in the chunks' targetlists on DELETE/UPDATE works (including partially
-- compressed chunks)
Expand Down
2 changes: 1 addition & 1 deletion tsl/test/shared/expected/extension.out
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text
ts_now_mock()
add_columnstore_policy(regclass,"any",boolean,interval,timestamp with time zone,text,interval,boolean)
add_compression_policy(regclass,"any",boolean,interval,timestamp with time zone,text,interval,boolean)
add_continuous_aggregate_policy(regclass,"any","any",interval,boolean,timestamp with time zone,text)
add_continuous_aggregate_policy(regclass,"any","any",interval,boolean,timestamp with time zone,text,boolean)
add_dimension(regclass,_timescaledb_internal.dimension_info,boolean)
add_dimension(regclass,name,integer,anyelement,regproc,boolean)
add_job(regproc,interval,jsonb,timestamp with time zone,boolean,regproc,boolean,text)
Expand Down
21 changes: 11 additions & 10 deletions tsl/test/sql/chunk_utils_internal.sql
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,11 @@ DROP MATERIALIZED VIEW ht_try_weekly;
CREATE USER MAPPING FOR :ROLE_SUPERUSER SERVER s3_server
OPTIONS (user :'ROLE_SUPERUSER', password_required 'false');

CREATE FUNCTION create_test_cagg(include_tiered_data JSONB)
CREATE FUNCTION create_test_cagg(include_tiered_data BOOL)
RETURNS INTEGER AS
$$
DECLARE
cfg jsonb;
job_id INTEGER;
BEGIN
CREATE MATERIALIZED VIEW ht_try_weekly
Expand All @@ -461,35 +462,35 @@ BEGIN
'ht_try_weekly',
start_offset => NULL,
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
schedule_interval => INTERVAL '1 hour',
include_tiered_data => include_tiered_data
);

UPDATE _timescaledb_config.bgw_job
SET config = jsonb_insert(config, '{include_tiered_data}'::TEXT[], include_tiered_data)
WHERE id = job_id;
cfg := config FROM _timescaledb_config.bgw_job WHERE id = job_id;
RAISE NOTICE 'config: %', jsonb_pretty(cfg);

RETURN job_id;
END
$$ LANGUAGE plpgsql;

SELECT create_test_cagg('true') AS job_id \gset
-- include tiered data
SELECT create_test_cagg(true) AS job_id \gset
CALL run_job(:job_id);
SELECT * FROM ht_try_weekly;
DROP MATERIALIZED VIEW ht_try_weekly;

SELECT create_test_cagg('false') AS job_id \gset
-- exclude tiered data
SELECT create_test_cagg(false) AS job_id \gset
CALL run_job(:job_id);
SELECT * FROM ht_try_weekly;
DROP MATERIALIZED VIEW ht_try_weekly;

-- default behavior: use instance-wide GUC value
SELECT create_test_cagg('null') AS job_id \gset
SELECT create_test_cagg(null) AS job_id \gset
CALL run_job(:job_id);
SELECT * FROM ht_try_weekly;
DROP MATERIALIZED VIEW ht_try_weekly;

\c :TEST_DBNAME :ROLE_4;

-- This test verifies that a bugfix regarding the way `ROWID_VAR`s are adjusted
-- in the chunks' targetlists on DELETE/UPDATE works (including partially
-- compressed chunks)
Expand Down

0 comments on commit e507251

Please sign in to comment.