Skip to content

Commit

Permalink
Added specific tests for MERGE statement
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziomello committed Sep 13, 2024
1 parent ecff248 commit 2c3ff7a
Show file tree
Hide file tree
Showing 14 changed files with 4,493 additions and 1,482 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7033
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7033 Use MERGE statement on CAgg Refresh
4 changes: 2 additions & 2 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ static char *ts_guc_default_orderby_fn = NULL;
TSDLLEXPORT bool ts_guc_enable_job_execution_logging = false;
bool ts_guc_enable_tss_callbacks = true;
TSDLLEXPORT bool ts_guc_enable_delete_after_compression = false;
TSDLLEXPORT bool ts_guc_enable_merge_on_cagg_refresh = true;
TSDLLEXPORT bool ts_guc_enable_merge_on_cagg_refresh = false;

/* default value of ts_guc_max_open_chunks_per_insert and ts_guc_max_cached_chunks_per_hypertable
* will be set as their respective boot-value when the GUC mechanism starts up */
Expand Down Expand Up @@ -577,7 +577,7 @@ _guc_init(void)
"Enable MERGE statement on cagg refresh",
"Enable MERGE statement on cagg refresh",
&ts_guc_enable_merge_on_cagg_refresh,
true,
false,
PGC_USERSET,
0,
NULL,
Expand Down
99 changes: 45 additions & 54 deletions tsl/src/continuous_aggs/materialize.c
Original file line number Diff line number Diff line change
Expand Up @@ -260,80 +260,71 @@ build_merge_insert_columns(List *strings, const char *separator, const char *pre
{
StringInfo ret = makeStringInfo();

if (strings != NIL)
{
ListCell *lc;
foreach (lc, strings)
{
char *grpcol = (char *) lfirst(lc);
if (ret->len > 0)
appendStringInfoString(ret, separator);
Assert(strings != NIL);

if (prefix)
appendStringInfoString(ret, prefix);
appendStringInfoString(ret, quote_identifier(grpcol));
}
ListCell *lc;
foreach (lc, strings)
{
char *grpcol = (char *) lfirst(lc);
if (ret->len > 0)
appendStringInfoString(ret, separator);

elog(DEBUG2, "%s: %s", __func__, ret->data);
return ret->data;
if (prefix)
appendStringInfoString(ret, prefix);
appendStringInfoString(ret, quote_identifier(grpcol));
}

return NULL;
elog(DEBUG2, "%s: %s", __func__, ret->data);
return ret->data;
}

static char *
build_merge_join_clause(List *column_names)
{
StringInfo ret = makeStringInfo();

if (column_names != NIL)
{
ListCell *lc;
foreach (lc, column_names)
{
char *column = (char *) lfirst(lc);
Assert(column_names != NIL);

if (ret->len > 0)
appendStringInfoString(ret, " AND ");
ListCell *lc;
foreach (lc, column_names)
{
char *column = (char *) lfirst(lc);

appendStringInfoString(ret, "P.");
appendStringInfoString(ret, quote_identifier(column));
appendStringInfoString(ret, " IS NOT DISTINCT FROM M.");
appendStringInfoString(ret, quote_identifier(column));
}
if (ret->len > 0)
appendStringInfoString(ret, " AND ");

elog(DEBUG2, "%s: %s", __func__, ret->data);
return ret->data;
appendStringInfoString(ret, "P.");
appendStringInfoString(ret, quote_identifier(column));
appendStringInfoString(ret, " IS NOT DISTINCT FROM M.");
appendStringInfoString(ret, quote_identifier(column));
}

return NULL;
elog(DEBUG2, "%s: %s", __func__, ret->data);
return ret->data;
}

static char *
build_merge_update_clause(List *column_names)
{
StringInfo ret = makeStringInfo();

if (column_names != NIL)
{
ListCell *lc;
foreach (lc, column_names)
{
char *column = (char *) lfirst(lc);
Assert(column_names != NIL);

if (ret->len > 0)
appendStringInfoString(ret, ", ");
ListCell *lc;
foreach (lc, column_names)
{
char *column = (char *) lfirst(lc);

appendStringInfoString(ret, quote_identifier(column));
appendStringInfoString(ret, " = P.");
appendStringInfoString(ret, quote_identifier(column));
}
if (ret->len > 0)
appendStringInfoString(ret, ", ");

elog(DEBUG2, "%s: %s", __func__, ret->data);
return ret->data;
appendStringInfoString(ret, quote_identifier(column));
appendStringInfoString(ret, " = P.");
appendStringInfoString(ret, quote_identifier(column));
}

return NULL;
elog(DEBUG2, "%s: %s", __func__, ret->data);
return ret->data;
}

static void
Expand All @@ -342,9 +333,6 @@ spi_update_watermark(Hypertable *mat_ht, SchemaAndName materialization_table,
Oid materialization_type, const char *const chunk_condition)
{
int res;
int64 watermark;
bool isnull = true;
Datum maxdat;
StringInfo command = makeStringInfo();

appendStringInfo(command,
Expand All @@ -369,12 +357,15 @@ spi_update_watermark(Hypertable *mat_ht, SchemaAndName materialization_table,
materialization_type);

if (SPI_processed > 0)
maxdat = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull);

if (!isnull)
{
watermark = ts_time_value_to_internal(maxdat, materialization_type);
ts_cagg_watermark_update(mat_ht, watermark, isnull, false);
bool isnull;
Datum maxdat = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull);

if (!isnull)
{
int64 watermark = ts_time_value_to_internal(maxdat, materialization_type);
ts_cagg_watermark_update(mat_ht, watermark, isnull, false);
}
}
}

Expand Down
Loading

0 comments on commit 2c3ff7a

Please sign in to comment.