Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace SPI_execute by SPI_execute_with_args in materialization #7319

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 81 additions & 58 deletions tsl/src/continuous_aggs/materialize.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,28 @@ static char *build_merge_update_clause(List *column_names);
* materialization support *
***************************/
static void spi_update_watermark(Hypertable *mat_ht, SchemaAndName materialization_table,
const NameData *time_column_name, char *materialization_start,
Oid materialization_type, const char *const chunk_condition);
const NameData *time_column_name, TimeRange materialization_range,
const char *const chunk_condition);
static void spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view,
SchemaAndName materialization_table,
const NameData *time_column_name,
TimeRange invalidation_range, const int32 chunk_id);
static uint64 spi_delete_materializations(SchemaAndName materialization_table,
const NameData *time_column_name,
char *invalidation_start, char *invalidation_end,
TimeRange materialization_range,
const char *const chunk_condition);
static uint64 spi_insert_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view,
SchemaAndName materialization_table,
const NameData *time_column_name,
char *materialization_start, char *materialization_end,
TimeRange materialization_range,
const char *const chunk_condition);
static uint64 spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view,
SchemaAndName materialization_table,
const NameData *time_column_name,
char *materialization_start, char *materialization_end);
TimeRange materialization_range);

void
continuous_agg_update_materialization(Hypertable *mat_ht, const ContinuousAgg *cagg,
Expand Down Expand Up @@ -329,32 +329,40 @@ build_merge_update_clause(List *column_names)

static void
spi_update_watermark(Hypertable *mat_ht, SchemaAndName materialization_table,
const NameData *time_column_name, char *materialization_start,
Oid materialization_type, const char *const chunk_condition)
const NameData *time_column_name, TimeRange materialization_range,
const char *const chunk_condition)
{
int res;
StringInfo command = makeStringInfo();
Oid types[] = { materialization_range.type };
Datum values[] = { materialization_range.start };
char nulls[] = { false };

appendStringInfo(command,
"SELECT %s FROM %s.%s AS I "
"WHERE I.%s >= %s %s "
"WHERE I.%s >= $1 %s "
"ORDER BY 1 DESC LIMIT 1;",
quote_identifier(NameStr(*time_column_name)),
quote_identifier(NameStr(*materialization_table.schema)),
quote_identifier(NameStr(*materialization_table.name)),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_start),
chunk_condition);

res = SPI_execute(command->data, false /* read_only */, 0 /*count*/);
res = SPI_execute_with_args(command->data,
1,
types,
values,
nulls,
false /* read_only */,
0 /* count */);

if (res < 0)
elog(ERROR, "could not get the last bucket of the materialized data");

Ensure(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == materialization_type,
Ensure(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == materialization_range.type,
"partition types for result (%d) and dimension (%d) do not match",
SPI_gettypeid(SPI_tuptable->tupdesc, 1),
materialization_type);
materialization_range.type);

if (SPI_processed > 0)
{
Expand All @@ -363,7 +371,7 @@ spi_update_watermark(Hypertable *mat_ht, SchemaAndName materialization_table,

if (!isnull)
{
int64 watermark = ts_time_value_to_internal(maxdat, materialization_type);
int64 watermark = ts_time_value_to_internal(maxdat, materialization_range.type);
ts_cagg_watermark_update(mat_ht, watermark, isnull, false);
}
}
Expand All @@ -375,17 +383,9 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
const NameData *time_column_name, TimeRange invalidation_range,
const int32 chunk_id)
{
Oid out_fn;
bool type_is_varlena;
char *invalidation_start;
char *invalidation_end;
StringInfo chunk_condition = makeStringInfo();
uint64 rows_processed = 0;

getTypeOutputInfo(invalidation_range.type, &out_fn, &type_is_varlena);
invalidation_start = OidOutputFunctionCall(out_fn, invalidation_range.start);
invalidation_end = OidOutputFunctionCall(out_fn, invalidation_range.end);

/* MERGE statement is available starting on PG15 and we'll support it only in the new format of
* CAggs and for non-compressed hypertables */
if (ts_guc_enable_merge_on_cagg_refresh && PG_VERSION_NUM >= 150000 &&
Expand All @@ -396,8 +396,7 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
partial_view,
materialization_table,
time_column_name,
invalidation_start,
invalidation_end);
invalidation_range);
}
else
{
Expand All @@ -412,16 +411,14 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,

rows_processed += spi_delete_materializations(materialization_table,
time_column_name,
invalidation_start,
invalidation_end,
invalidation_range,
chunk_condition->data);
rows_processed += spi_insert_materializations(mat_ht,
cagg,
partial_view,
materialization_table,
time_column_name,
invalidation_start,
invalidation_end,
invalidation_range,
chunk_condition->data);
}

Expand All @@ -431,20 +428,28 @@ spi_update_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
spi_update_watermark(mat_ht,
materialization_table,
time_column_name,
invalidation_start,
invalidation_range.type,
invalidation_range,
chunk_condition->data);
}
}

static uint64
spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view, SchemaAndName materialization_table,
const NameData *time_column_name, char *materialization_start,
char *materialization_end)
const NameData *time_column_name, TimeRange materialization_range)
{
int res;
StringInfo command = makeStringInfo();
Oid types[] = { materialization_range.type,
materialization_range.type,
materialization_range.type,
materialization_range.type };
Datum values[] = { materialization_range.start,
materialization_range.end,
materialization_range.start,
materialization_range.end };
char nulls[] = { false, false, false, false };

List *grp_colnames = cagg_find_groupingcols((ContinuousAgg *) cagg, mat_ht);
List *agg_colnames = cagg_find_aggref_and_var_cols((ContinuousAgg *) cagg, mat_ht);
List *all_columns = NIL;
Expand Down Expand Up @@ -472,10 +477,10 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
"WITH partial AS ( "
" SELECT * "
" FROM %s.%s "
" WHERE %s >= %s AND %s < %s "
" WHERE %s >= $1 AND %s < $2 "
") "
"MERGE INTO %s.%s M "
"USING partial P ON %s AND M.%s >= %s AND M.%s < %s "
"USING partial P ON %s AND M.%s >= $3 AND M.%s < $4 "
" %s " /* UPDATE */
" WHEN NOT MATCHED THEN "
" INSERT (%s) VALUES (%s) ",
Expand All @@ -486,9 +491,7 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,

/* partial WHERE */
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_end),

/* materialization hypertable */
quote_identifier(NameStr(*materialization_table.schema)),
Expand All @@ -499,9 +502,7 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,

/* extra MERGE JOIN condition with primary dimension */
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_end),

/* UPDATE */
merge_update->data,
Expand All @@ -511,7 +512,13 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
build_merge_insert_columns(all_columns, ", ", "P."));

elog(DEBUG2, "%s", command->data);
res = SPI_execute(command->data, false /* read_only */, 0 /*count*/);
res = SPI_execute_with_args(command->data,
4,
types,
values,
nulls,
false /* read_only */,
0 /* count */);

if (res < 0)
elog(ERROR,
Expand All @@ -532,20 +539,18 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
appendStringInfo(command,
"DELETE "
"FROM %s.%s M "
"WHERE M.%s >= %s AND M.%s < %s "
"WHERE M.%s >= $1 AND M.%s < $2 "
"AND NOT EXISTS ("
" SELECT FROM %s.%s P "
" WHERE %s AND P.%s >= %s AND P.%s < %s) ",
" WHERE %s AND P.%s >= $3 AND P.%s < $4) ",

/* materialization hypertable */
quote_identifier(NameStr(*materialization_table.schema)),
quote_identifier(NameStr(*materialization_table.name)),

/* materialization hypertable WHERE */
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_end),

/* partial VIEW */
quote_identifier(NameStr(*partial_view.schema)),
Expand All @@ -556,11 +561,16 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,

/* partial WHERE */
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_end));
quote_identifier(NameStr(*time_column_name)));

elog(DEBUG2, "%s", command->data);
res = SPI_execute(command->data, false /* read_only */, 0 /*count*/);
res = SPI_execute_with_args(command->data,
4,
types,
values,
nulls,
false /* read_only */,
0 /* count */);

if (res < 0)
elog(ERROR,
Expand All @@ -581,24 +591,30 @@ spi_merge_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,

static uint64
spi_delete_materializations(SchemaAndName materialization_table, const NameData *time_column_name,
char *invalidation_start, char *invalidation_end,
const char *const chunk_condition)
TimeRange materialization_range, const char *const chunk_condition)
{
int res;
StringInfo command = makeStringInfo();
Oid types[] = { materialization_range.type, materialization_range.type };
Datum values[] = { materialization_range.start, materialization_range.end };
char nulls[] = { false, false };

appendStringInfo(command,
"DELETE FROM %s.%s AS D WHERE "
"D.%s >= %s AND D.%s < %s %s;",
"D.%s >= $1 AND D.%s < $2 %s;",
quote_identifier(NameStr(*materialization_table.schema)),
quote_identifier(NameStr(*materialization_table.name)),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(invalidation_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(invalidation_end),
chunk_condition);

res = SPI_execute(command->data, false /* read_only */, 0 /*count*/);
res = SPI_execute_with_args(command->data,
2,
types,
values,
nulls,
false /* read_only */,
0 /* count */);

if (res < 0)
elog(ERROR,
Expand All @@ -618,26 +634,33 @@ spi_delete_materializations(SchemaAndName materialization_table, const NameData
static uint64
spi_insert_materializations(Hypertable *mat_ht, const ContinuousAgg *cagg,
SchemaAndName partial_view, SchemaAndName materialization_table,
const NameData *time_column_name, char *materialization_start,
char *materialization_end, const char *const chunk_condition)
const NameData *time_column_name, TimeRange materialization_range,
const char *const chunk_condition)
{
int res;
StringInfo command = makeStringInfo();
Oid types[] = { materialization_range.type, materialization_range.type };
Datum values[] = { materialization_range.start, materialization_range.end };
char nulls[] = { false, false };

appendStringInfo(command,
"INSERT INTO %s.%s SELECT * FROM %s.%s AS I "
"WHERE I.%s >= %s AND I.%s < %s %s;",
"WHERE I.%s >= $1 AND I.%s < $2 %s;",
quote_identifier(NameStr(*materialization_table.schema)),
quote_identifier(NameStr(*materialization_table.name)),
quote_identifier(NameStr(*partial_view.schema)),
quote_identifier(NameStr(*partial_view.name)),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_start),
quote_identifier(NameStr(*time_column_name)),
quote_literal_cstr(materialization_end),
chunk_condition);

res = SPI_execute(command->data, false /* read_only */, 0 /*count*/);
res = SPI_execute_with_args(command->data,
2,
types,
values,
nulls,
false /* read_only */,
0 /* count */);

if (res < 0)
elog(ERROR,
Expand Down
Loading