Skip to content

Commit

Permalink
WIP: override the instance-wide tiering GUC value for caggs
Browse files Browse the repository at this point in the history
  • Loading branch information
zilder committed Jan 14, 2025
1 parent 1718120 commit 194f010
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 65 deletions.
2 changes: 1 addition & 1 deletion src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ extern bool ts_guc_enable_cagg_reorder_groupby;
extern TSDLLEXPORT int ts_guc_cagg_max_individual_materializations;
extern bool ts_guc_enable_now_constify;
extern bool ts_guc_enable_foreign_key_propagation;
extern bool ts_guc_enable_osm_reads;
extern TSDLLEXPORT bool ts_guc_enable_osm_reads;
#if PG16_GE
extern TSDLLEXPORT bool ts_guc_enable_cagg_sort_pushdown;
#endif
Expand Down
10 changes: 10 additions & 0 deletions tsl/src/bgw_policy/continuous_aggregate_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,16 @@ policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config, b
return res;
}

bool
policy_refresh_cagg_get_include_tiered_data(const Jsonb *config, bool *isnull)
{
bool found;
bool res = ts_jsonb_get_bool_field(config, POL_REFRESH_CONF_KEY_ENABLE_TIERED_READS, &found);

*isnull = !found;
return res;
}

/* returns false if a policy could not be found */
bool
policy_refresh_cagg_exists(int32 materialization_id)
Expand Down
1 change: 1 addition & 0 deletions tsl/src/bgw_policy/continuous_aggregate_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ int64 policy_refresh_cagg_get_refresh_start(const ContinuousAgg *cagg, const Dim
const Jsonb *config, bool *start_isnull);
int64 policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config,
bool *end_isnull);
bool policy_refresh_cagg_get_include_tiered_data(const Jsonb *config, bool *isnull);
bool policy_refresh_cagg_refresh_start_lt(int32 materialization_id, Oid cmp_type,
Datum cmp_interval);
bool policy_refresh_cagg_exists(int32 materialization_id);
Expand Down
29 changes: 29 additions & 0 deletions tsl/src/bgw_policy/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include "dimension_slice.h"
#include "dimension_vector.h"
#include "errors.h"
#include "guc.h"
#include "job.h"
#include "reorder.h"
#include "utils.h"
Expand Down Expand Up @@ -372,14 +373,36 @@ policy_refresh_cagg_execute(int32 job_id, Jsonb *config)
{
PolicyContinuousAggData policy_data;

StringInfo str = makeStringInfo();
JsonbToCStringIndent(str, &config->root, VARSIZE(config));

policy_refresh_cagg_read_and_validate_config(config, &policy_data);

bool enable_osm_reads_old = ts_guc_enable_osm_reads;

if (!policy_data.include_tiered_data_isnull) {
SetConfigOption(
"timescaledb.enable_tiered_reads",
policy_data.include_tiered_data ? "on" : "off",
PGC_USERSET,
PGC_S_SESSION);
}

continuous_agg_refresh_internal(policy_data.cagg,
&policy_data.refresh_window,
CAGG_REFRESH_POLICY,
policy_data.start_is_null,
policy_data.end_is_null,
false);

if (!policy_data.include_tiered_data_isnull) {
SetConfigOption(
"timescaledb.enable_tiered_reads",
enable_osm_reads_old ? "on" : "off",
PGC_USERSET,
PGC_S_SESSION);
}

return true;
}

Expand All @@ -392,6 +415,7 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
Oid dim_type;
int64 refresh_start, refresh_end;
bool start_isnull, end_isnull;
bool include_tiered_data, include_tiered_data_isnull;

materialization_id = policy_continuous_aggregate_get_mat_hypertable_id(config);
mat_ht = ts_hypertable_get_by_id(materialization_id);
Expand All @@ -418,6 +442,9 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
ts_internal_to_time_string(refresh_end, dim_type)),
errhint("The start of the window must be before the end.")));

include_tiered_data = policy_refresh_cagg_get_include_tiered_data(
config, &include_tiered_data_isnull);

if (policy_data)
{
policy_data->refresh_window.type = dim_type;
Expand All @@ -426,6 +453,8 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
policy_data->cagg = cagg;
policy_data->start_is_null = start_isnull;
policy_data->end_is_null = end_isnull;
policy_data->include_tiered_data = include_tiered_data;
policy_data->include_tiered_data_isnull = include_tiered_data_isnull;
}
}

Expand Down
3 changes: 2 additions & 1 deletion tsl/src/bgw_policy/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ typedef struct PolicyContinuousAggData
{
InternalTimeRange refresh_window;
ContinuousAgg *cagg;
bool start_is_null, end_is_null;
bool include_tiered_data;
bool start_is_null, end_is_null, include_tiered_data_isnull;
} PolicyContinuousAggData;

typedef struct PolicyCompressionData
Expand Down
1 change: 1 addition & 0 deletions tsl/src/bgw_policy/policies_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#define POL_REFRESH_CONF_KEY_MAT_HYPERTABLE_ID "mat_hypertable_id"
#define POL_REFRESH_CONF_KEY_START_OFFSET "start_offset"
#define POL_REFRESH_CONF_KEY_END_OFFSET "end_offset"
#define POL_REFRESH_CONF_KEY_ENABLE_TIERED_READS "include_tiered_data"

#define POLICY_COMPRESSION_PROC_NAME "policy_compression"
#define POLICY_COMPRESSION_CHECK_NAME "policy_compression_check"
Expand Down
Loading

0 comments on commit 194f010

Please sign in to comment.