Skip to content

Commit

Permalink
Replace SPI_execute by SPI_execute_with_args
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziomello committed Oct 7, 2024
1 parent a3e6773 commit 8b69290
Showing 1 changed file with 81 additions and 87 deletions.
168 changes: 81 additions & 87 deletions tsl/src/continuous_aggs/materialize.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,28 @@ static char *build_merge_update_clause(List *column_names);
* materialization support *
***************************/
static void spi_update_watermark(Hypertable *mat_ht, SchemaAndName materialization_table,
const NameData *time_column_name, char *materialization_start,
Oid materialization_type, const char *const chunk_condition);
const NameData *time_column_name, TimeRange materialization_range,
const char *const chunk_condition);
static void spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view,
SchemaAndName materialization_table,
const NameData *time_column_name,
TimeRange invalidation_range, const int32 chunk_id);
static uint64 spi_delete_materializations(SchemaAndName materialization_table,
const NameData *time_column_name,
char *invalidation_start, char *invalidation_end,
TimeRange invalidation_range,
const char *const chunk_condition);
static uint64 spi_insert_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view,
SchemaAndName materialization_table,
const NameData *time_column_name,
char *materialization_start, char *materialization_end,
TimeRange materialization_range,
const char *const chunk_condition);
static uint64 spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view,
SchemaAndName materialization_table,
const NameData *time_column_name,
char *materialization_start, char *materialization_end);
TimeRange materialization_range);

void
continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *cagg,
Expand Down Expand Up @@ -329,32 +329,40 @@ build_merge_update_clause(List *column_names)

static void
spi_update_watermark(Hypertable *mat_ht, SchemaAndName materialization_table,
const NameData *time_column_name, char *materialization_start,
Oid materialization_type, const char *const chunk_condition)
const NameData *time_column_name, TimeRange materialization_range,
const char *const chunk_condition)
{
int res;
StringInfo command = makeStringInfo();
Oid types[] = { materialization_range.type };
Datum values[] = { materialization_range.start };
char nulls[] = { false };

appendStringInfo(command,
"SELECT %s FROM %s.%s AS I "
"WHERE I.%s >= %s %s "
"WHERE I.%s >= $1 %s "
"ORDER BY 1 DESC LIMIT 1;",
quote_identifier(NameStr(*time_column_name)),
quote_identifier(NameStr(*materialization_table.schema)),
quote_identifier(NameStr(*materialization_table.name)),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_start),
chunk_condition);

res = SPI_execute(command->data, false /* read_only */, 0 /*count*/);
res = SPI_execute_with_args(command->data,
1,
types,
values,
nulls,
false /* read_only */,
0 /* count */);

if (res < 0)
elog(ERROR, "could not get the last bucket of the materialized data");

Ensure(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == materialization_type,
Ensure(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == materialization_range.type,
"partition types for result (%d) and dimension (%d) do not match",
SPI_gettypeid(SPI_tuptable->tupdesc, 1),
materialization_type);
materialization_range.type);

if (SPI_processed > 0)
{
Expand All @@ -363,58 +371,21 @@ spi_update_watermark(Hypertable *mat_ht, SchemaAndName materialization_table,

if (!isnull)
{
int64 watermark = ts_time_value_to_internal(maxdat, materialization_type);
int64 watermark = ts_time_value_to_internal(maxdat, materialization_range.type);
ts_cagg_watermark_update(mat_ht, watermark, isnull, false);
}
}
}

static char *
OidSafeTimestampTzOutputFunctionCall(Oid type, Datum val)
{
char *result;
int SavedDateStyle, SavedDateOrder;
Oid functionId;
bool type_is_varlena;

/* Save DateStyle and DateOrder and force it to use ISO dates and YMD order */
if (type == TIMESTAMPTZOID)
{
SavedDateStyle = DateStyle;
SavedDateOrder = DateOrder;
DateStyle = USE_ISO_DATES;
DateOrder = DATEORDER_YMD;
}

getTypeOutputInfo(type, &functionId, &type_is_varlena);
result = OidOutputFunctionCall(functionId, val);

/* Restore previous date style and order configuration */
if (type == TIMESTAMPTZOID)
{
DateStyle = SavedDateStyle;
DateOrder = SavedDateOrder;
}

return result;
}

static void
spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view, SchemaAndName materialization_table,
const NameData *time_column_name, TimeRange invalidation_range,
const int32 chunk_id)
{
char *invalidation_start;
char *invalidation_end;
StringInfo chunk_condition = makeStringInfo();
uint64 rows_processed = 0;

invalidation_start =
OidSafeTimestampTzOutputFunctionCall(invalidation_range.type, invalidation_range.start);
invalidation_end =
OidSafeTimestampTzOutputFunctionCall(invalidation_range.type, invalidation_range.end);

/* MERGE statement is available starting on PG15 and we'll support it only in the new format of
* CAggs and for non-compressed hypertables */
if (ts_guc_enable_merge_on_cagg_refresh && PG_VERSION_NUM >= 150000 &&
Expand All @@ -425,8 +396,7 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
partial_view,
materialization_table,
time_column_name,
invalidation_start,
invalidation_end);
invalidation_range);
}
else
{
Expand All @@ -441,16 +411,14 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,

rows_processed += spi_delete_materializations(materialization_table,
time_column_name,
invalidation_start,
invalidation_end,
invalidation_range,
chunk_condition->data);
rows_processed += spi_insert_materializations(mat_ht,
cagg,
partial_view,
materialization_table,
time_column_name,
invalidation_start,
invalidation_end,
invalidation_range,
chunk_condition->data);
}

Expand All @@ -460,20 +428,28 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
spi_update_watermark(mat_ht,
materialization_table,
time_column_name,
invalidation_start,
invalidation_range.type,
invalidation_range,
chunk_condition->data);
}
}

static uint64
spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view, SchemaAndName materialization_table,
const NameData *time_column_name, char *materialization_start,
char *materialization_end)
const NameData *time_column_name, TimeRange materialization_range)
{
int res;
StringInfo command = makeStringInfo();
Oid types[] = { materialization_range.type,
materialization_range.type,
materialization_range.type,
materialization_range.type };
Datum values[] = { materialization_range.start,
materialization_range.end,
materialization_range.start,
materialization_range.end };
char nulls[] = { false, false, false, false };

List *grp_colnames = cagg_find_groupingcols((ContinuousAgg *) cagg, mat_ht);
List *agg_colnames = cagg_find_aggref_and_var_cols((ContinuousAgg *) cagg, mat_ht);
List *all_columns = NIL;
Expand Down Expand Up @@ -501,10 +477,10 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
"WITH partial AS ( "
" SELECT * "
" FROM %s.%s "
" WHERE %s >= %s AND %s < %s "
" WHERE %s >= $1 AND %s < $2 "
") "
"MERGE INTO %s.%s M "
"USING partial P ON %s AND M.%s >= %s AND M.%s < %s "
"USING partial P ON %s AND M.%s >= $3 AND M.%s < $4 "
" %s " /* UPDATE */
" WHEN NOT MATCHED THEN "
" INSERT (%s) VALUES (%s) ",
Expand All @@ -515,9 +491,7 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,

/* partial WHERE */
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_end),

/* materialization hypertable */
quote_identifier(NameStr(*materialization_table.schema)),
Expand All @@ -528,9 +502,7 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,

/* extra MERGE JOIN condition with primary dimension */
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_end),

/* UPDATE */
merge_update->data,
Expand All @@ -540,7 +512,13 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
build_merge_insert_columns(all_columns, ", ", "P."));

elog(DEBUG2, "%s", command->data);
res = SPI_execute(command->data, false /* read_only */, 0 /*count*/);
res = SPI_execute_with_args(command->data,
4,
types,
values,
nulls,
false /* read_only */,
0 /* count */);

if (res < 0)
elog(ERROR,
Expand All @@ -561,20 +539,18 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
appendStringInfo(command,
"DELETE "
"FROM %s.%s M "
"WHERE M.%s >= %s AND M.%s < %s "
"WHERE M.%s >= $1 AND M.%s < $2 "
"AND NOT EXISTS ("
" SELECT FROM %s.%s P "
" WHERE %s AND P.%s >= %s AND P.%s < %s) ",
" WHERE %s AND P.%s >= $3 AND P.%s < $4) ",

/* materialization hypertable */
quote_identifier(NameStr(*materialization_table.schema)),
quote_identifier(NameStr(*materialization_table.name)),

/* materialization hypertable WHERE */
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_end),

/* partial VIEW */
quote_identifier(NameStr(*partial_view.schema)),
Expand All @@ -585,11 +561,16 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,

/* partial WHERE */
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_end));
quote_identifier(NameStr(*time_column_name)));

elog(DEBUG2, "%s", command->data);
res = SPI_execute(command->data, false /* read_only */, 0 /*count*/);
res = SPI_execute_with_args(command->data,
4,
types,
values,
nulls,
false /* read_only */,
0 /* count */);

if (res < 0)
elog(ERROR,
Expand All @@ -610,24 +591,30 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,

static uint64
spi_delete_materializations(SchemaAndName materialization_table, const NameData *time_column_name,
char *invalidation_start, char *invalidation_end,
const char *const chunk_condition)
TimeRange materialization_range, const char *const chunk_condition)
{
int res;
StringInfo command = makeStringInfo();
Oid types[] = { materialization_range.type, materialization_range.type };
Datum values[] = { materialization_range.start, materialization_range.end };
char nulls[] = { false, false };

appendStringInfo(command,
"DELETE FROM %s.%s AS D WHERE "
"D.%s >= %s AND D.%s < %s %s;",
"D.%s >= $1 AND D.%s < $2 %s;",
quote_identifier(NameStr(*materialization_table.schema)),
quote_identifier(NameStr(*materialization_table.name)),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(invalidation_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(invalidation_end),
chunk_condition);

res = SPI_execute(command->data, false /* read_only */, 0 /*count*/);
res = SPI_execute_with_args(command->data,
2,
types,
values,
nulls,
false /* read_only */,
0 /* count */);

if (res < 0)
elog(ERROR,
Expand All @@ -647,26 +634,33 @@ spi_delete_materializations(SchemaAndName materialization_table, const NameData
static uint64
spi_insert_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view, SchemaAndName materialization_table,
const NameData *time_column_name, char *materialization_start,
char *materialization_end, const char *const chunk_condition)
const NameData *time_column_name, TimeRange materialization_range,
const char *const chunk_condition)
{
int res;
StringInfo command = makeStringInfo();
Oid types[] = { materialization_range.type, materialization_range.type };
Datum values[] = { materialization_range.start, materialization_range.end };
char nulls[] = { false, false };

appendStringInfo(command,
"INSERT INTO %s.%s SELECT * FROM %s.%s AS I "
"WHERE I.%s >= %s AND I.%s < %s %s;",
"WHERE I.%s >= $1 AND I.%s < $2 %s;",
quote_identifier(NameStr(*materialization_table.schema)),
quote_identifier(NameStr(*materialization_table.name)),
quote_identifier(NameStr(*partial_view.schema)),
quote_identifier(NameStr(*partial_view.name)),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_end),
chunk_condition);

res = SPI_execute(command->data, false /* read_only */, 0 /*count*/);
res = SPI_execute_with_args(command->data,
2,
types,
values,
nulls,
false /* read_only */,
0 /* count */);

if (res < 0)
elog(ERROR,
Expand Down

0 comments on commit 8b69290

Please sign in to comment.