Skip to content

Commit

Permalink
Add optional force argument to refresh_continuous_aggregate
Browse files Browse the repository at this point in the history
Add optional `force` parameter to the `refresh_continuous_aggregate`
procedure that would allow user to partially re-materialize cagg within
a time window that has been previously materialized.
  • Loading branch information
zilder committed Jan 14, 2025
1 parent 6623eb6 commit 1718120
Show file tree
Hide file tree
Showing 16 changed files with 376 additions and 150 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7521
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7521 Add optional `force` argument to `refresh_continuous_aggregate`
3 changes: 2 additions & 1 deletion sql/ddl_api.sql
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ AS '@MODULE_PATHNAME@', 'ts_tablespace_show' LANGUAGE C VOLATILE STRICT;
CREATE OR REPLACE PROCEDURE @[email protected]_continuous_aggregate(
continuous_aggregate REGCLASS,
window_start "any",
window_end "any"
window_end "any",
force BOOLEAN = FALSE
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_continuous_agg_refresh';

11 changes: 11 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,14 @@ CREATE FUNCTION @[email protected]_columnstore_stats (hypertable REGCLASS)
STABLE STRICT
AS 'SELECT * FROM @[email protected]_compression_stats($1)'
SET search_path TO pg_catalog, pg_temp;

-- Recreate `refresh_continuous_aggregate` procedure to add `force` argument
DROP PROCEDURE IF EXISTS @[email protected]_continuous_aggregate (continuous_aggregate REGCLASS, window_start "any", window_end "any");

CREATE PROCEDURE @[email protected]_continuous_aggregate(
continuous_aggregate REGCLASS,
window_start "any",
window_end "any",
force BOOLEAN = FALSE
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_update_placeholder';

9 changes: 9 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,12 @@ DROP VIEW timescaledb_information.hypertable_columnstore_settings;
DROP VIEW timescaledb_information.chunk_columnstore_settings;

DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_update_watermark(INTEGER);

-- Recreate `refresh_continuous_aggregate` procedure to remove the `force` argument
DROP PROCEDURE IF EXISTS @[email protected]_continuous_aggregate (continuous_aggregate REGCLASS, window_start "any", window_end "any", force BOOLEAN);

CREATE PROCEDURE @[email protected]_continuous_aggregate(
continuous_aggregate REGCLASS,
window_start "any",
window_end "any"
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_continuous_agg_refresh';
3 changes: 2 additions & 1 deletion tsl/src/bgw_policy/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ policy_refresh_cagg_execute(int32 job_id, Jsonb *config)
&policy_data.refresh_window,
CAGG_REFRESH_POLICY,
policy_data.start_is_null,
policy_data.end_is_null);
policy_data.end_is_null,
false);

return true;
}
Expand Down
7 changes: 6 additions & 1 deletion tsl/src/continuous_aggs/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,12 @@ tsl_process_continuous_agg_viewstmt(Node *node, const char *query_string, void *
refresh_window.start = cagg_get_time_min(cagg);
refresh_window.end = ts_time_get_noend_or_max(refresh_window.type);

continuous_agg_refresh_internal(cagg, &refresh_window, CAGG_REFRESH_CREATION, true, true);
continuous_agg_refresh_internal(cagg,
&refresh_window,
CAGG_REFRESH_CREATION,
true,
true,
false);
}

return DDL_DONE;
Expand Down
23 changes: 19 additions & 4 deletions tsl/src/continuous_aggs/invalidation.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ static Invalidation cut_cagg_invalidation_and_compute_remainder(
const CaggInvalidationState *state, const InternalTimeRange *refresh_window,
const Invalidation *mergedentry, const Invalidation *current_remainder);
static void clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,
const InternalTimeRange *refresh_window);
const InternalTimeRange *refresh_window,
bool force);
static void invalidation_state_init(CaggInvalidationState *state, const ContinuousAgg *cagg,
Oid dimtype, const CaggsInfo *all_caggs);
static void invalidation_state_cleanup(const CaggInvalidationState *state);
Expand Down Expand Up @@ -878,7 +879,7 @@ cut_cagg_invalidation_and_compute_remainder(const CaggInvalidationState *state,
*/
static void
clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,
const InternalTimeRange *refresh_window)
const InternalTimeRange *refresh_window, bool force)
{
ScanIterator iterator;
int32 cagg_hyper_id = state->mat_hypertable_id;
Expand All @@ -892,6 +893,20 @@ clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,

MemoryContextReset(state->per_tuple_mctx);

/* Force refresh within the entire window */
if (force)
{
Invalidation logentry;

logentry.hyper_id = cagg_hyper_id;
logentry.lowest_modified_value = refresh_window->start;
logentry.greatest_modified_value = refresh_window->end;
logentry.is_modified = false;
ItemPointerSet(&logentry.tid, InvalidBlockNumber, 0);

save_invalidation_for_refresh(state, &logentry);
}

/* Process all invalidations for the continuous aggregate */
ts_scanner_foreach(&iterator)
{
Expand Down Expand Up @@ -981,7 +996,7 @@ InvalidationStore *
invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window,
const CaggsInfo *all_caggs_info, const long max_materializations,
bool *do_merged_refresh, InternalTimeRange *ret_merged_refresh_window,
const CaggRefreshCallContext callctx)
const CaggRefreshCallContext callctx, bool force)
{
CaggInvalidationState state;
InvalidationStore *store = NULL;
Expand All @@ -991,7 +1006,7 @@ invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange

invalidation_state_init(&state, cagg, refresh_window->type, all_caggs_info);
state.invalidations = tuplestore_begin_heap(false, false, work_mem);
clear_cagg_invalidations_for_refresh(&state, refresh_window);
clear_cagg_invalidations_for_refresh(&state, refresh_window, force);
count = tuplestore_tuple_count(state.invalidations);

if (count == 0)
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/continuous_aggs/invalidation.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ extern InvalidationStore *
invalidation_process_cagg_log(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window,
const CaggsInfo *all_caggs_info, const long max_materializations,
bool *do_merged_refresh, InternalTimeRange *ret_merged_refresh_window,
const CaggRefreshCallContext callctx);
const CaggRefreshCallContext callctx, bool force);

extern void invalidation_store_free(InvalidationStore *store);
20 changes: 14 additions & 6 deletions tsl/src/continuous_aggs/refresh.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ static void emit_up_to_date_notice(const ContinuousAgg *cagg, const CaggRefreshC
static bool process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const CaggRefreshCallContext callctx,
int32 chunk_id);
int32 chunk_id, bool force);
static void fill_bucket_offset_origin(const ContinuousAgg *cagg,
const InternalTimeRange *const refresh_window,
NullableDatum *offset, NullableDatum *origin);
Expand Down Expand Up @@ -628,6 +628,7 @@ Datum
continuous_agg_refresh(PG_FUNCTION_ARGS)
{
Oid cagg_relid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
bool force = PG_ARGISNULL(3) ? false : PG_GETARG_BOOL(3);
ContinuousAgg *cagg;
InternalTimeRange refresh_window = {
.type = InvalidOid,
Expand Down Expand Up @@ -659,7 +660,8 @@ continuous_agg_refresh(PG_FUNCTION_ARGS)
&refresh_window,
CAGG_REFRESH_WINDOW,
PG_ARGISNULL(1),
PG_ARGISNULL(2));
PG_ARGISNULL(2),
force);

PG_RETURN_VOID();
}
Expand Down Expand Up @@ -703,7 +705,8 @@ continuous_agg_calculate_merged_refresh_window(const ContinuousAgg *cagg,
static bool
process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const CaggRefreshCallContext callctx, int32 chunk_id)
const CaggRefreshCallContext callctx, int32 chunk_id,
bool force)
{
InvalidationStore *invalidations;
Oid hyper_relid = ts_hypertable_id_to_relid(cagg->data.mat_hypertable_id, false);
Expand All @@ -727,7 +730,8 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
ts_guc_cagg_max_individual_materializations,
&do_merged_refresh,
&merged_refresh_window,
callctx);
callctx,
force);

if (invalidations != NULL || do_merged_refresh)
{
Expand Down Expand Up @@ -759,7 +763,7 @@ void
continuous_agg_refresh_internal(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window_arg,
const CaggRefreshCallContext callctx, const bool start_isnull,
const bool end_isnull)
const bool end_isnull, bool force)
{
int32 mat_id = cagg->data.mat_hypertable_id;
InternalTimeRange refresh_window = *refresh_window_arg;
Expand Down Expand Up @@ -881,7 +885,11 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,

cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_id, false);

if (!process_cagg_invalidations_and_refresh(cagg, &refresh_window, callctx, INVALID_CHUNK_ID))
if (!process_cagg_invalidations_and_refresh(cagg,
&refresh_window,
callctx,
INVALID_CHUNK_ID,
force))
emit_up_to_date_notice(cagg, callctx);

/* Restore search_path */
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/continuous_aggs/refresh.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ extern void continuous_agg_calculate_merged_refresh_window(
extern void continuous_agg_refresh_internal(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const CaggRefreshCallContext callctx,
const bool start_isnull, const bool end_isnull);
const bool start_isnull, const bool end_isnull,
bool force);
Loading

0 comments on commit 1718120

Please sign in to comment.