From b685f2f0103c8b49e38f6a5e8d5d996ac13e4e06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADzio=20de=20Royes=20Mello?= Date: Mon, 7 Oct 2024 10:17:57 -0300 Subject: [PATCH] Replace SPI_execute by SPI_execute_with_args in materialization To prevent issues on TimestampTz output used in SQL statements dynamically generated for the Continuous Aggregate materialization code we replaced the `SPI_execute` by `SPI_execute_with_args` so now it will not convert timestamp to string and parse it back anymore. This popped up due to some tzdata 2024b changes in timezone PST8PDT that also lead to some Postgres changes on PG17: https://github.com/postgres/postgres/commit/b8ea0f67 --- tsl/src/continuous_aggs/materialize.c | 139 +++++++++++++++----------- 1 file changed, 81 insertions(+), 58 deletions(-) diff --git a/tsl/src/continuous_aggs/materialize.c b/tsl/src/continuous_aggs/materialize.c index 9bee509adff..0022b4c0adb 100644 --- a/tsl/src/continuous_aggs/materialize.c +++ b/tsl/src/continuous_aggs/materialize.c @@ -44,8 +44,8 @@ static char *build_merge_update_clause(List *column_names); * materialization support * ***************************/ static void spi_update_watermark(Hypertable *mat_ht, SchemaAndName materialization_table, - const NameData *time_column_name, char *materialization_start, - Oid materialization_type, const char *const chunk_condition); + const NameData *time_column_name, TimeRange materialization_range, + const char *const chunk_condition); static void spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, SchemaAndName partial_view, SchemaAndName materialization_table, @@ -53,19 +53,19 @@ static void spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg TimeRange invalidation_range, const int32 chunk_id); static uint64 spi_delete_materializations(SchemaAndName materialization_table, const NameData *time_column_name, - char *invalidation_start, char *invalidation_end, + TimeRange materialization_range, const char *const chunk_condition); static uint64 spi_insert_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, SchemaAndName partial_view, SchemaAndName materialization_table, const NameData *time_column_name, - char *materialization_start, char *materialization_end, + TimeRange materialization_range, const char *const chunk_condition); static uint64 spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, SchemaAndName partial_view, SchemaAndName materialization_table, const NameData *time_column_name, - char *materialization_start, char *materialization_end); + TimeRange materialization_range); void continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *cagg, @@ -329,32 +329,40 @@ build_merge_update_clause(List *column_names) static void spi_update_watermark(Hypertable *mat_ht, SchemaAndName materialization_table, - const NameData *time_column_name, char *materialization_start, - Oid materialization_type, const char *const chunk_condition) + const NameData *time_column_name, TimeRange materialization_range, + const char *const chunk_condition) { int res; StringInfo command = makeStringInfo(); + Oid types[] = { materialization_range.type }; + Datum values[] = { materialization_range.start }; + char nulls[] = { false }; appendStringInfo(command, "SELECT %s FROM %s.%s AS I " - "WHERE I.%s >= %s %s " + "WHERE I.%s >= $1 %s " "ORDER BY 1 DESC LIMIT 1;", quote_identifier(NameStr(*time_column_name)), quote_identifier(NameStr(*materialization_table.schema)), quote_identifier(NameStr(*materialization_table.name)), quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_start), chunk_condition); - res = SPI_execute(command->data, false /* read_only */, 0 /*count*/); + res = SPI_execute_with_args(command->data, + 1, + types, + values, + nulls, + false /* read_only */, + 0 /* count */); if (res < 0) elog(ERROR, "could not get the last bucket of the materialized data"); - Ensure(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == materialization_type, + Ensure(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == materialization_range.type, "partition types for result (%d) and dimension (%d) do not match", SPI_gettypeid(SPI_tuptable->tupdesc, 1), - materialization_type); + materialization_range.type); if (SPI_processed > 0) { @@ -363,7 +371,7 @@ spi_update_watermark(Hypertable *mat_ht, SchemaAndName materialization_table, if (!isnull) { - int64 watermark = ts_time_value_to_internal(maxdat, materialization_type); + int64 watermark = ts_time_value_to_internal(maxdat, materialization_range.type); ts_cagg_watermark_update(mat_ht, watermark, isnull, false); } } @@ -375,17 +383,9 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, const NameData *time_column_name, TimeRange invalidation_range, const int32 chunk_id) { - Oid out_fn; - bool type_is_varlena; - char *invalidation_start; - char *invalidation_end; StringInfo chunk_condition = makeStringInfo(); uint64 rows_processed = 0; - getTypeOutputInfo(invalidation_range.type, &out_fn, &type_is_varlena); - invalidation_start = OidOutputFunctionCall(out_fn, invalidation_range.start); - invalidation_end = OidOutputFunctionCall(out_fn, invalidation_range.end); - /* MERGE statement is available starting on PG15 and we'll support it only in the new format of * CAggs and for non-compressed hypertables */ if (ts_guc_enable_merge_on_cagg_refresh && PG_VERSION_NUM >= 150000 && @@ -396,8 +396,7 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, partial_view, materialization_table, time_column_name, - invalidation_start, - invalidation_end); + invalidation_range); } else { @@ -412,16 +411,14 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, rows_processed += spi_delete_materializations(materialization_table, time_column_name, - invalidation_start, - invalidation_end, + invalidation_range, chunk_condition->data); rows_processed += spi_insert_materializations(mat_ht, cagg, partial_view, materialization_table, time_column_name, - invalidation_start, - invalidation_end, + invalidation_range, chunk_condition->data); } @@ -431,8 +428,7 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, spi_update_watermark(mat_ht, materialization_table, time_column_name, - invalidation_start, - invalidation_range.type, + invalidation_range, chunk_condition->data); } } @@ -440,11 +436,20 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, static uint64 spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, SchemaAndName partial_view, SchemaAndName materialization_table, - const NameData *time_column_name, char *materialization_start, - char *materialization_end) + const NameData *time_column_name, TimeRange materialization_range) { int res; StringInfo command = makeStringInfo(); + Oid types[] = { materialization_range.type, + materialization_range.type, + materialization_range.type, + materialization_range.type }; + Datum values[] = { materialization_range.start, + materialization_range.end, + materialization_range.start, + materialization_range.end }; + char nulls[] = { false, false, false, false }; + List *grp_colnames = cagg_find_groupingcols((ContinuousAgg *) cagg, mat_ht); List *agg_colnames = cagg_find_aggref_and_var_cols((ContinuousAgg *) cagg, mat_ht); List *all_columns = NIL; @@ -472,10 +477,10 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, "WITH partial AS ( " " SELECT * " " FROM %s.%s " - " WHERE %s >= %s AND %s < %s " + " WHERE %s >= $1 AND %s < $2 " ") " "MERGE INTO %s.%s M " - "USING partial P ON %s AND M.%s >= %s AND M.%s < %s " + "USING partial P ON %s AND M.%s >= $3 AND M.%s < $4 " " %s " /* UPDATE */ " WHEN NOT MATCHED THEN " " INSERT (%s) VALUES (%s) ", @@ -486,9 +491,7 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, /* partial WHERE */ quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_start), quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_end), /* materialization hypertable */ quote_identifier(NameStr(*materialization_table.schema)), @@ -499,9 +502,7 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, /* extra MERGE JOIN condition with primary dimension */ quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_start), quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_end), /* UPDATE */ merge_update->data, @@ -511,7 +512,13 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, build_merge_insert_columns(all_columns, ", ", "P.")); elog(DEBUG2, "%s", command->data); - res = SPI_execute(command->data, false /* read_only */, 0 /*count*/); + res = SPI_execute_with_args(command->data, + 4, + types, + values, + nulls, + false /* read_only */, + 0 /* count */); if (res < 0) elog(ERROR, @@ -532,10 +539,10 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, appendStringInfo(command, "DELETE " "FROM %s.%s M " - "WHERE M.%s >= %s AND M.%s < %s " + "WHERE M.%s >= $1 AND M.%s < $2 " "AND NOT EXISTS (" " SELECT FROM %s.%s P " - " WHERE %s AND P.%s >= %s AND P.%s < %s) ", + " WHERE %s AND P.%s >= $3 AND P.%s < $4) ", /* materialization hypertable */ quote_identifier(NameStr(*materialization_table.schema)), @@ -543,9 +550,7 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, /* materialization hypertable WHERE */ quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_start), quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_end), /* partial VIEW */ quote_identifier(NameStr(*partial_view.schema)), @@ -556,11 +561,16 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, /* partial WHERE */ quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_start), - quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_end)); + quote_identifier(NameStr(*time_column_name))); + elog(DEBUG2, "%s", command->data); - res = SPI_execute(command->data, false /* read_only */, 0 /*count*/); + res = SPI_execute_with_args(command->data, + 4, + types, + values, + nulls, + false /* read_only */, + 0 /* count */); if (res < 0) elog(ERROR, @@ -581,24 +591,30 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, static uint64 spi_delete_materializations(SchemaAndName materialization_table, const NameData *time_column_name, - char *invalidation_start, char *invalidation_end, - const char *const chunk_condition) + TimeRange materialization_range, const char *const chunk_condition) { int res; StringInfo command = makeStringInfo(); + Oid types[] = { materialization_range.type, materialization_range.type }; + Datum values[] = { materialization_range.start, materialization_range.end }; + char nulls[] = { false, false }; appendStringInfo(command, "DELETE FROM %s.%s AS D WHERE " - "D.%s >= %s AND D.%s < %s %s;", + "D.%s >= $1 AND D.%s < $2 %s;", quote_identifier(NameStr(*materialization_table.schema)), quote_identifier(NameStr(*materialization_table.name)), quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(invalidation_start), quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(invalidation_end), chunk_condition); - res = SPI_execute(command->data, false /* read_only */, 0 /*count*/); + res = SPI_execute_with_args(command->data, + 2, + types, + values, + nulls, + false /* read_only */, + 0 /* count */); if (res < 0) elog(ERROR, @@ -618,26 +634,33 @@ spi_delete_materializations(SchemaAndName materialization_table, const NameData static uint64 spi_insert_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg, SchemaAndName partial_view, SchemaAndName materialization_table, - const NameData *time_column_name, char *materialization_start, - char *materialization_end, const char *const chunk_condition) + const NameData *time_column_name, TimeRange materialization_range, + const char *const chunk_condition) { int res; StringInfo command = makeStringInfo(); + Oid types[] = { materialization_range.type, materialization_range.type }; + Datum values[] = { materialization_range.start, materialization_range.end }; + char nulls[] = { false, false }; appendStringInfo(command, "INSERT INTO %s.%s SELECT * FROM %s.%s AS I " - "WHERE I.%s >= %s AND I.%s < %s %s;", + "WHERE I.%s >= $1 AND I.%s < $2 %s;", quote_identifier(NameStr(*materialization_table.schema)), quote_identifier(NameStr(*materialization_table.name)), quote_identifier(NameStr(*partial_view.schema)), quote_identifier(NameStr(*partial_view.name)), quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_start), quote_identifier(NameStr(*time_column_name)), - quote_literal_cstr(materialization_end), chunk_condition); - res = SPI_execute(command->data, false /* read_only */, 0 /*count*/); + res = SPI_execute_with_args(command->data, + 2, + types, + values, + nulls, + false /* read_only */, + 0 /* count */); if (res < 0) elog(ERROR,