Skip to content

Commit

Permalink
Add include_tiered_data option to cagg policy
Browse files Browse the repository at this point in the history
The `include_tiered_data` option allows user to override the value of
`timescaledb.enable_tiered_reads` defined on instance level for a
particular continuous aggregate policy.
  • Loading branch information
zilder committed Jan 15, 2025
1 parent 3e02179 commit 94deb0a
Show file tree
Hide file tree
Showing 14 changed files with 297 additions and 71 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7587
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7587 Add `include_tiered_data` parameter to `add_continuous_aggregate_policy` API
3 changes: 2 additions & 1 deletion sql/policy_api.sql
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ CREATE OR REPLACE FUNCTION @[email protected]_continuous_aggregate_policy(
end_offset "any", schedule_interval INTERVAL,
if_not_exists BOOL = false,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL
timezone TEXT = NULL,
include_tiered_data BOOL = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_add'
Expand Down
19 changes: 19 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,22 @@ CREATE PROCEDURE @[email protected]_continuous_aggregate(
force BOOLEAN = FALSE
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_update_placeholder';

-- Add `include_tiered_data` argument to `add_continuous_aggregate_policy`
DROP FUNCTION @[email protected]_continuous_aggregate_policy(
continuous_aggregate REGCLASS, start_offset "any",
end_offset "any", schedule_interval INTERVAL,
if_not_exists BOOL,
initial_start TIMESTAMPTZ,
timezone TEXT
);
CREATE FUNCTION @[email protected]_continuous_aggregate_policy(
continuous_aggregate REGCLASS, start_offset "any",
end_offset "any", schedule_interval INTERVAL,
if_not_exists BOOL = false,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL,
include_tiered_data BOOL = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_update_placeholder'
LANGUAGE C VOLATILE;
20 changes: 20 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,23 @@ CREATE PROCEDURE @[email protected]_continuous_aggregate(
window_start "any",
window_end "any"
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_continuous_agg_refresh';

-- Remove `include_tiered_data` argument from `add_continuous_aggregate_policy`
DROP FUNCTION @[email protected]_continuous_aggregate_policy(
continuous_aggregate REGCLASS, start_offset "any",
end_offset "any", schedule_interval INTERVAL,
if_not_exists BOOL,
initial_start TIMESTAMPTZ,
timezone TEXT,
include_tiered_data BOOL
);
CREATE FUNCTION @[email protected]_continuous_aggregate_policy(
continuous_aggregate REGCLASS, start_offset "any",
end_offset "any", schedule_interval INTERVAL,
if_not_exists BOOL = false,
initial_start TIMESTAMPTZ = NULL,
timezone TEXT = NULL
)
RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_add'
LANGUAGE C VOLATILE;
2 changes: 1 addition & 1 deletion src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ extern bool ts_guc_enable_cagg_reorder_groupby;
extern TSDLLEXPORT int ts_guc_cagg_max_individual_materializations;
extern bool ts_guc_enable_now_constify;
extern bool ts_guc_enable_foreign_key_propagation;
extern bool ts_guc_enable_osm_reads;
extern TSDLLEXPORT bool ts_guc_enable_osm_reads;
#if PG16_GE
extern TSDLLEXPORT bool ts_guc_enable_cagg_sort_pushdown;
#endif
Expand Down
23 changes: 21 additions & 2 deletions tsl/src/bgw_policy/continuous_aggregate_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,16 @@ policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config, b
return res;
}

bool
policy_refresh_cagg_get_include_tiered_data(const Jsonb *config, bool *isnull)
{
bool found;
bool res = ts_jsonb_get_bool_field(config, POL_REFRESH_CONF_KEY_INCLUDE_TIERED_DATA, &found);

*isnull = !found;
return res;
}

/* returns false if a policy could not be found */
bool
policy_refresh_cagg_exists(int32 materialization_id)
Expand Down Expand Up @@ -519,7 +529,8 @@ Datum
policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type, NullableDatum start_offset,
Oid end_offset_type, NullableDatum end_offset,
Interval refresh_interval, bool if_not_exists, bool fixed_schedule,
TimestampTz initial_start, const char *timezone)
TimestampTz initial_start, const char *timezone,
NullableDatum include_tiered_data)
{
NameData application_name;
NameData proc_name, proc_schema, check_name, check_schema, owner;
Expand Down Expand Up @@ -630,6 +641,10 @@ policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type, NullableDa
policyconf.offset_end.value);
else
ts_jsonb_add_null(parse_state, POL_REFRESH_CONF_KEY_END_OFFSET);
if (!include_tiered_data.isnull)
ts_jsonb_add_bool(parse_state,
POL_REFRESH_CONF_KEY_INCLUDE_TIERED_DATA,
include_tiered_data.value);
JsonbValue *result = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
Jsonb *config = JsonbValueToJsonb(result);

Expand Down Expand Up @@ -660,6 +675,7 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
Interval refresh_interval;
bool if_not_exists;
NullableDatum start_offset, end_offset;
NullableDatum include_tiered_data;

ts_feature_flag_check(FEATURE_POLICY);

Expand All @@ -682,6 +698,8 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
bool fixed_schedule = !PG_ARGISNULL(5);
text *timezone = PG_ARGISNULL(6) ? NULL : PG_GETARG_TEXT_PP(6);
char *valid_timezone = NULL;
include_tiered_data.value = PG_GETARG_DATUM(7);
include_tiered_data.isnull = PG_ARGISNULL(7);

Datum retval;
/* if users pass in -infinity for initial_start, then use the current_timestamp instead */
Expand All @@ -704,7 +722,8 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
if_not_exists,
fixed_schedule,
initial_start,
valid_timezone);
valid_timezone,
include_tiered_data);
if (!TIMESTAMP_NOT_FINITE(initial_start))
{
int32 job_id = DatumGetInt32(retval);
Expand Down
4 changes: 3 additions & 1 deletion tsl/src/bgw_policy/continuous_aggregate_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ int64 policy_refresh_cagg_get_refresh_start(const ContinuousAgg *cagg, const Dim
const Jsonb *config, bool *start_isnull);
int64 policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config,
bool *end_isnull);
bool policy_refresh_cagg_get_include_tiered_data(const Jsonb *config, bool *isnull);
bool policy_refresh_cagg_refresh_start_lt(int32 materialization_id, Oid cmp_type,
Datum cmp_interval);
bool policy_refresh_cagg_exists(int32 materialization_id);
Expand All @@ -28,5 +29,6 @@ Datum policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type,
NullableDatum start_offset, Oid end_offset_type,
NullableDatum end_offset, Interval refresh_interval,
bool if_not_exists, bool fixed_schedule,
TimestampTz initial_start, const char *timezone);
TimestampTz initial_start, const char *timezone,
NullableDatum include_tiered_data);
Datum policy_refresh_cagg_remove_internal(Oid cagg_oid, bool if_exists);
30 changes: 30 additions & 0 deletions tsl/src/bgw_policy/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <parser/parser.h>
#include <tcop/pquery.h>
#include <utils/builtins.h>
#include <utils/guc.h>
#include <utils/lsyscache.h>
#include <utils/portal.h>
#include <utils/snapmgr.h>
Expand Down Expand Up @@ -51,6 +52,7 @@
#include "dimension_slice.h"
#include "dimension_vector.h"
#include "errors.h"
#include "guc.h"
#include "job.h"
#include "reorder.h"
#include "utils.h"
Expand Down Expand Up @@ -372,14 +374,36 @@ policy_refresh_cagg_execute(int32 job_id, Jsonb *config)
{
PolicyContinuousAggData policy_data;

StringInfo str = makeStringInfo();
JsonbToCStringIndent(str, &config->root, VARSIZE(config));

policy_refresh_cagg_read_and_validate_config(config, &policy_data);

bool enable_osm_reads_old = ts_guc_enable_osm_reads;

if (!policy_data.include_tiered_data_isnull)
{
SetConfigOption("timescaledb.enable_tiered_reads",
policy_data.include_tiered_data ? "on" : "off",
PGC_USERSET,
PGC_S_SESSION);
}

continuous_agg_refresh_internal(policy_data.cagg,
&policy_data.refresh_window,
CAGG_REFRESH_POLICY,
policy_data.start_is_null,
policy_data.end_is_null,
false);

if (!policy_data.include_tiered_data_isnull)
{
SetConfigOption("timescaledb.enable_tiered_reads",
enable_osm_reads_old ? "on" : "off",
PGC_USERSET,
PGC_S_SESSION);
}

return true;
}

Expand All @@ -392,6 +416,7 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
Oid dim_type;
int64 refresh_start, refresh_end;
bool start_isnull, end_isnull;
bool include_tiered_data, include_tiered_data_isnull;

materialization_id = policy_continuous_aggregate_get_mat_hypertable_id(config);
mat_ht = ts_hypertable_get_by_id(materialization_id);
Expand All @@ -418,6 +443,9 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
ts_internal_to_time_string(refresh_end, dim_type)),
errhint("The start of the window must be before the end.")));

include_tiered_data =
policy_refresh_cagg_get_include_tiered_data(config, &include_tiered_data_isnull);

if (policy_data)
{
policy_data->refresh_window.type = dim_type;
Expand All @@ -426,6 +454,8 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
policy_data->cagg = cagg;
policy_data->start_is_null = start_isnull;
policy_data->end_is_null = end_isnull;
policy_data->include_tiered_data = include_tiered_data;
policy_data->include_tiered_data_isnull = include_tiered_data_isnull;
}
}

Expand Down
3 changes: 2 additions & 1 deletion tsl/src/bgw_policy/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ typedef struct PolicyContinuousAggData
{
InternalTimeRange refresh_window;
ContinuousAgg *cagg;
bool start_is_null, end_is_null;
bool include_tiered_data;
bool start_is_null, end_is_null, include_tiered_data_isnull;
} PolicyContinuousAggData;

typedef struct PolicyCompressionData
Expand Down
5 changes: 4 additions & 1 deletion tsl/src/bgw_policy/policies_v2.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ validate_and_create_policies(policies_info all_policies, bool if_exists)
/* Create policies as required, delete the old ones if coming from alter */
if (all_policies.refresh && all_policies.refresh->create_policy)
{
NullableDatum include_tiered_data = { .isnull = true };

if (all_policies.is_alter_policy)
policy_refresh_cagg_remove_internal(all_policies.rel_oid, if_exists);
refresh_job_id = policy_refresh_cagg_add_internal(all_policies.rel_oid,
Expand All @@ -217,7 +219,8 @@ validate_and_create_policies(policies_info all_policies, bool if_exists)
false,
false,
DT_NOBEGIN,
NULL);
NULL,
include_tiered_data);
}
if (all_policies.compress && all_policies.compress->create_policy)
{
Expand Down
1 change: 1 addition & 0 deletions tsl/src/bgw_policy/policies_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#define POL_REFRESH_CONF_KEY_MAT_HYPERTABLE_ID "mat_hypertable_id"
#define POL_REFRESH_CONF_KEY_START_OFFSET "start_offset"
#define POL_REFRESH_CONF_KEY_END_OFFSET "end_offset"
#define POL_REFRESH_CONF_KEY_INCLUDE_TIERED_DATA "include_tiered_data"

#define POLICY_COMPRESSION_PROC_NAME "policy_compression"
#define POLICY_COMPRESSION_CHECK_NAME "policy_compression_check"
Expand Down
Loading

0 comments on commit 94deb0a

Please sign in to comment.