From 389f0fc894a5f109944752990833ea6f6aaf15ec Mon Sep 17 00:00:00 2001 From: Bharathy Date: Tue, 17 Jan 2023 12:16:09 +0530 Subject: [PATCH] MERGE support on hypertables This patch does following: 1. Refactor ExecInsert/Delete/Update Backported commit 25e777cf8e547d7423d2e1e9da71f98b9414d59e 2. Backport all MERGE related interfaces and its implementations. Backported commit 7103ebb7aae8ab8076b7e85f335ceb8fe799097c 3. Planner changes to mark self referencing child tables as dummy. 4. Planner changes to create ChunkDispatch node when MERGE command has INSERT action. 5. Changes to map partition attributes from a tuple returned from child node of ChunkDispatch against physical targetlist, so that ChunkDispatch node can read the correct value from partition column. 6. Fixed issues with MERGE on compressed hypertable. 7. Added testcases. 8. MERGE in distributed hypertables is not supported. Pending tasks: 1. More testing. Fixes #5139 --- src/copy.c | 2 +- src/dimension.c | 60 +- src/dimension.h | 5 +- src/nodes/chunk_dispatch/chunk_dispatch.c | 81 +- src/nodes/chunk_dispatch/chunk_dispatch.h | 4 + src/nodes/hypertable_modify.c | 1497 +++++++++++++++++---- src/planner/planner.c | 65 +- test/expected/merge.out | 585 ++++++-- test/sql/merge.sql | 499 ++++++- tsl/test/expected/merge_compress.out | 111 ++ tsl/test/sql/CMakeLists.txt | 4 + tsl/test/sql/merge_compress.sql | 92 ++ 12 files changed, 2532 insertions(+), 473 deletions(-) create mode 100644 tsl/test/expected/merge_compress.out create mode 100644 tsl/test/sql/merge_compress.sql diff --git a/src/copy.c b/src/copy.c index 36cae576425..87babbef38b 100644 --- a/src/copy.c +++ b/src/copy.c @@ -943,7 +943,7 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte ExecStoreVirtualTuple(myslot); /* Calculate the tuple's point in the N-dimensional hyperspace */ - point = ts_hyperspace_calculate_point(ht->space, myslot); + point = ts_hyperspace_calculate_point(ht->space, myslot, NULL); /* Find or create the insert state matching the point */ cis = ts_chunk_dispatch_get_chunk_insert_state(dispatch, diff --git a/src/dimension.c b/src/dimension.c index c4f4b489af5..f235dede946 100644 --- a/src/dimension.c +++ b/src/dimension.c @@ -957,8 +957,60 @@ ts_point_create(int16 num_dimensions) return p; } +/* + * Helper function to find the right partition value from a tuple, + * in case attributes in tuple are not ordered correctly. + * + * Assume columns in table are in below order: + * [time, temperature, location, value] + * "time" is the partition column based on this columns value + * ChunkDispatch node decides if to create or reuse an existing + * chunk. + * + * Assume received tuple for child node of ChunkDispatch node is like + * [temperature, location, time, value] + * + * In this case this function will return correct attribute no: from + * the received slot. + */ +int +find_partition_column_in_tuple(const Hyperspace *hs, TupleTableSlot *slot, List *insert_targetList) +{ + AttrNumber partition_col_idx = 0; + for (int i = 0; i < hs->num_dimensions; i++) + { + const Dimension *d = &hs->dimensions[i]; + Oid partition_col_type = InvalidOid; + if (insert_targetList) + { + ListCell *lc; + foreach (lc, insert_targetList) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + Var *new_var = castNode(Var, tle->expr); + if (new_var->varattno == d->column_attno) + { + partition_col_type = new_var->vartype; + break; + } + } + TupleDesc tupdesc = slot->tts_tupleDescriptor; + int natts = tupdesc->natts; + for (int i = 0; i < natts; i++) + { + if (tupdesc->attrs[i].atttypid == partition_col_type) + { + partition_col_idx = d->column_attno + i; + break; + } + } + } + } + return partition_col_idx; +} + TSDLLEXPORT Point * -ts_hyperspace_calculate_point(const Hyperspace *hs, TupleTableSlot *slot) +ts_hyperspace_calculate_point(const Hyperspace *hs, TupleTableSlot *slot, List *insert_targetList) { Point *p = ts_point_create(hs->num_dimensions); int i; @@ -970,10 +1022,14 @@ ts_hyperspace_calculate_point(const Hyperspace *hs, TupleTableSlot *slot) bool isnull; Oid dimtype; + AttrNumber partition_col_idx = d->column_attno; + if (insert_targetList) + partition_col_idx = find_partition_column_in_tuple(hs, slot, insert_targetList); + if (NULL != d->partitioning) datum = ts_partitioning_func_apply_slot(d->partitioning, slot, &isnull); else - datum = slot_getattr(slot, d->column_attno, &isnull); + datum = slot_getattr(slot, partition_col_idx, &isnull); switch (d->type) { diff --git a/src/dimension.h b/src/dimension.h index 9c32c53dbd2..354c0f4ab78 100644 --- a/src/dimension.h +++ b/src/dimension.h @@ -108,7 +108,8 @@ typedef struct DimensionInfo extern Hyperspace *ts_dimension_scan(int32 hypertable_id, Oid main_table_relid, int16 num_dimension, MemoryContext mctx); extern DimensionSlice *ts_dimension_calculate_default_slice(const Dimension *dim, int64 value); -extern TSDLLEXPORT Point *ts_hyperspace_calculate_point(const Hyperspace *h, TupleTableSlot *slot); +extern TSDLLEXPORT Point *ts_hyperspace_calculate_point(const Hyperspace *h, TupleTableSlot *slot, + List *insert_targetList); extern int ts_dimension_get_slice_ordinal(const Dimension *dim, const DimensionSlice *slice); extern const Dimension *ts_hyperspace_get_dimension_by_id(const Hyperspace *hs, int32 id); extern TSDLLEXPORT const Dimension *ts_hyperspace_get_dimension(const Hyperspace *hs, @@ -154,4 +155,6 @@ extern TSDLLEXPORT Point *ts_point_create(int16 num_dimensions); #define hyperspace_get_closed_dimension(space, i) \ ts_hyperspace_get_dimension(space, DIMENSION_TYPE_CLOSED, i) +extern int find_partition_column_in_tuple(const Hyperspace *h, TupleTableSlot *slot, + List *insert_targetList); #endif /* TIMESCALEDB_DIMENSION_H */ diff --git a/src/nodes/chunk_dispatch/chunk_dispatch.c b/src/nodes/chunk_dispatch/chunk_dispatch.c index b84bf92d7b2..500e7ac7cb2 100644 --- a/src/nodes/chunk_dispatch/chunk_dispatch.c +++ b/src/nodes/chunk_dispatch/chunk_dispatch.c @@ -8,6 +8,8 @@ #include #include #include +#include +#include #include #include #include @@ -187,8 +189,7 @@ chunk_dispatch_plan_create(PlannerInfo *root, RelOptInfo *relopt, CustomPath *be cscan->scan.plan.plan_rows += subplan->plan_rows; cscan->scan.plan.plan_width += subplan->plan_width; } - - cscan->custom_private = list_make1_oid(cdpath->hypertable_relid); + cscan->custom_private = lappend(cscan->custom_private, makeInteger(cdpath->hypertable_relid)); cscan->methods = &chunk_dispatch_plan_methods; cscan->custom_plans = custom_plans; cscan->scan.scanrelid = 0; /* Indicate this is not a real relation we are @@ -196,7 +197,36 @@ chunk_dispatch_plan_create(PlannerInfo *root, RelOptInfo *relopt, CustomPath *be /* The "input" and "output" target lists should be the same */ cscan->custom_scan_tlist = tlist; cscan->scan.plan.targetlist = tlist; - + /* + tlist holds child node target list, whose order can be different from + hypertables column order. During INSERT into hypertable, ChunkDispatch node + checks if current row needs to be inserted into existing chunk or should it + create a new chunk. In case a new chunk needs to be created, attributes in + current row tuple should be ordered such a way that partition column is at + right place. This helps ChunkDispatch node to fetch the correct value and + then create a new range for the chunk to be created. + + MERGE command with INSERT action can cause child node of ChunkDispatch to + return a tuple in completely different order. In such case ChunkDispatch + node can end up reading partition column wrongly. + To overcome this problem we save physical targetList of the hypertable + so that this can be used by ChunkDispatch node to fetch the right value + from received tuple. Thus we save targetlist in CustomScan node private + space. This list will be used to map the right partition column + during execution of ChunkDispatch node. + + Please refer find_partition_column_in_tuple() for more details. + */ +#if PG15_GE + if (root->parse->mergeUseOuterJoin) + { + /* Save actual ordering of hypertable columns only + * if MERGE command has INSERT action. */ + RelOptInfo *baserel = find_base_rel(root, 1); + List *orig_list = build_physical_tlist(root, baserel); + cscan->custom_private = lappend(cscan->custom_private, orig_list); + } +#endif return &cscan->scan.plan; } @@ -292,8 +322,40 @@ chunk_dispatch_exec(CustomScanState *node) /* Switch to the executor's per-tuple memory context */ old = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); +#if PG15_GE + Oid paritition_col_idx = InvalidOid; + TupleTableSlot *newslot = NULL; + if (dispatch->dispatch_state->mtstate->operation == CMD_MERGE) + { + paritition_col_idx = + find_partition_column_in_tuple(ht->space, slot, state->insert_targetList); + /* Row returned by child node does not have paritition col */ + if (!OidIsValid(paritition_col_idx)) + { + List *actionStates = + dispatch->dispatch_state->mtstate->resultRelInfo->ri_notMatchedMergeAction; + ListCell *l; + foreach (l, actionStates) + { + MergeActionState *action = (MergeActionState *) lfirst(l); + CmdType commandType = action->mas_action->commandType; + if (commandType == CMD_INSERT) + { + /* in case received tuple has only junk attributes, then + * fetch the actual values. */ + newslot = ExecProject(action->mas_proj); + } + } + } + } /* Calculate the tuple's point in the N-dimensional hyperspace */ - point = ts_hyperspace_calculate_point(ht->space, slot); + if (!OidIsValid(paritition_col_idx) && newslot) + point = ts_hyperspace_calculate_point(ht->space, newslot, state->insert_targetList); + else + point = ts_hyperspace_calculate_point(ht->space, slot, state->insert_targetList); +#else + point = ts_hyperspace_calculate_point(ht->space, slot, state->insert_targetList); +#endif /* Save the main table's (hypertable's) ResultRelInfo */ if (!dispatch->hypertable_result_rel_info) @@ -304,6 +366,13 @@ chunk_dispatch_exec(CustomScanState *node) dispatch->hypertable_result_rel_info = estate->es_result_relation_info; #else dispatch->hypertable_result_rel_info = dispatch->dispatch_state->mtstate->resultRelInfo; +#if 0 +#if PG15_GE + if (dispatch->dispatch_state->mtstate->operation == CMD_MERGE) + dispatch->hypertable_result_rel_info = + dispatch->dispatch_state->mtstate->rootResultRelInfo; +#endif +#endif #endif } @@ -369,13 +438,15 @@ static Node * chunk_dispatch_state_create(CustomScan *cscan) { ChunkDispatchState *state; - Oid hypertable_relid = linitial_oid(cscan->custom_private); + Oid hypertable_relid = intVal(list_nth(cscan->custom_private, 0)); state = (ChunkDispatchState *) newNode(sizeof(ChunkDispatchState), T_CustomScanState); state->hypertable_relid = hypertable_relid; Assert(list_length(cscan->custom_plans) == 1); state->subplan = linitial(cscan->custom_plans); state->cscan_state.methods = &chunk_dispatch_state_methods; + if (cscan->custom_private && cscan->custom_private->length > 1) + state->insert_targetList = lsecond(cscan->custom_private); return (Node *) state; } diff --git a/src/nodes/chunk_dispatch/chunk_dispatch.h b/src/nodes/chunk_dispatch/chunk_dispatch.h index 372390eb0c6..ec2b22b46af 100644 --- a/src/nodes/chunk_dispatch/chunk_dispatch.h +++ b/src/nodes/chunk_dispatch/chunk_dispatch.h @@ -72,6 +72,10 @@ typedef struct ChunkDispatchState */ ChunkDispatch *dispatch; ResultRelInfo *rri; + /* + * Save physical targetList for INSERT action in MERGE + */ + List *insert_targetList; } ChunkDispatchState; extern TSDLLEXPORT bool ts_is_chunk_dispatch_state(PlanState *state); diff --git a/src/nodes/hypertable_modify.c b/src/nodes/hypertable_modify.c index aef24fde57d..b3ccce13b25 100644 --- a/src/nodes/hypertable_modify.c +++ b/src/nodes/hypertable_modify.c @@ -32,6 +32,70 @@ #include "ts_catalog/hypertable_data_node.h" #if PG14_GE +/* + * Context struct for a ModifyTable operation, containing basic execution + * state and some output variables populated by ExecUpdateAct() and + * ExecDeleteAct() to report the result of their actions to callers. + */ +typedef struct ModifyTableContext +{ + /* Operation state */ + ModifyTableState *mtstate; + EPQState *epqstate; + EState *estate; + + /* + * Slot containing tuple obtained from ModifyTable's subplan. Used to + * access "junk" columns that are not going to be stored. + */ + TupleTableSlot *planSlot; + + /* + * During EvalPlanQual, project and return the new version of the new + * tuple + */ +#if PG15_GE + TupleTableSlot *(*GetUpdateNewTuple)(ResultRelInfo *resultRelInfo, TupleTableSlot *epqslot, + TupleTableSlot *oldSlot, MergeActionState *relaction); + + /* MERGE specific */ + MergeActionState *relaction; /* MERGE action in progress */ +#endif + /* + * Information about the changes that were made concurrently to a tuple + * being updated or deleted + */ + TM_FailureData tmfd; + + /* + * The tuple produced by EvalPlanQual to retry from, if a cross-partition + * UPDATE requires it + */ + TupleTableSlot *cpUpdateRetrySlot; + + /* + * The tuple projected by the INSERT's RETURNING clause, when doing a + * cross-partition UPDATE + */ + TupleTableSlot *cpUpdateReturningSlot; + + /* + * Lock mode to acquire on the latest tuple version before performing + * EvalPlanQual on it + */ + LockTupleMode lockmode; +} ModifyTableContext; + +/* + * Context struct containing output data specific to UPDATE operations. + */ +typedef struct UpdateContext +{ + bool updated; /* did UPDATE actually occur? */ + bool updateIndexes; /* index update required? */ + bool crossPartUpdate; /* was it a cross-partition update? */ +} UpdateContext; + static void fireASTriggers(ModifyTableState *node); static void fireBSTriggers(ModifyTableState *node); static TupleTableSlot *ExecModifyTable(PlanState *pstate); @@ -41,30 +105,38 @@ static void ExecInitInsertProjection(ModifyTableState *mtstate, ResultRelInfo *r static void ExecInitUpdateProjection(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo); static void ExecCheckPlanOutput(Relation resultRel, List *targetList); static TupleTableSlot *ExecGetInsertNewTuple(ResultRelInfo *relinfo, TupleTableSlot *planSlot); -static TupleTableSlot *ExecInsert(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, - TupleTableSlot *slot, TupleTableSlot *planSlot, EState *estate, - bool canSetTag); +static TupleTableSlot *ExecInsert(ModifyTableContext *context, ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, bool canSetTag); static void ExecBatchInsert(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, TupleTableSlot **slots, TupleTableSlot **planSlots, int numSlots, EState *estate, bool canSetTag); -static TupleTableSlot *ExecDelete(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, - ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *planSlot, - EPQState *epqstate, EState *estate, bool processReturning, +static TupleTableSlot *ExecDelete(ModifyTableContext *context, ResultRelInfo *resultRelInfo, + ItemPointer tupleid, HeapTuple oldtuple, bool processReturning, bool canSetTag, bool changingPart, bool *tupleDeleted, TupleTableSlot **epqreturnslot); -static TupleTableSlot *ExecUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, +static TupleTableSlot *ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo, ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *slot, - TupleTableSlot *planSlot, EPQState *epqstate, EState *estate, bool canSetTag); -static bool ExecOnConflictUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, - ItemPointer conflictTid, TupleTableSlot *planSlot, - TupleTableSlot *excludedSlot, EState *estate, bool canSetTag, - TupleTableSlot **returning); +static bool ExecOnConflictUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo, + ItemPointer conflictTid, TupleTableSlot *excludedSlot, + bool canSetTag, TupleTableSlot **returning); static void ExecCheckTupleVisible(EState *estate, Relation rel, TupleTableSlot *slot); static void ExecCheckTIDVisible(EState *estate, ResultRelInfo *relinfo, ItemPointer tid, TupleTableSlot *tempSlot); #endif +#if PG15_GE +/* MERGE specific */ +static TupleTableSlot *ExecMerge(ModifyTableContext *context, ResultRelInfo *resultRelInfo, + ResultRelInfo *cds_rri, ItemPointer tupleid, bool canSetTag); +static bool ExecMergeMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo, + ItemPointer tupleid, bool canSetTag); +static void ExecMergeNotMatched(ModifyTableContext *context, ResultRelInfo *cds_rri, + bool canSetTag); +static TupleTableSlot *mergeGetUpdateNewTuple(ResultRelInfo *relinfo, TupleTableSlot *planSlot, + TupleTableSlot *oldSlot, MergeActionState *relaction); +#endif + static List * get_chunk_dispatch_states(PlanState *substate) { @@ -126,7 +198,13 @@ hypertable_modify_begin(CustomScanState *node, EState *estate, int eflags) * we need to set the hypertable as the rootRelation otherwise * statement trigger defined only on the hypertable will not fire. */ - if (mt->operation == CMD_DELETE || mt->operation == CMD_UPDATE) + if (mt->operation == CMD_DELETE || mt->operation == CMD_UPDATE +#if 0 +#if PG15_GE + || mt->operation == CMD_MERGE +#endif +#endif + ) mt->rootRelation = mt->nominalRelation; ps = ExecInitNode(&mt->plan, estate, eflags); @@ -157,7 +235,11 @@ hypertable_modify_begin(CustomScanState *node, EState *estate, int eflags) PlanState *subplan = outerPlanState(mtstate); #endif - if (mtstate->operation == CMD_INSERT) + if (mtstate->operation == CMD_INSERT +#if PG15_GE + || mtstate->operation == CMD_MERGE +#endif + ) { /* setup chunk dispatch state only for INSERTs */ chunk_dispatch_states = get_chunk_dispatch_states(subplan); @@ -217,7 +299,13 @@ hypertable_modify_explain(CustomScanState *node, List *ancestors, ExplainState * mtstate->ps.plan->lefttree->targetlist = NULL; ((CustomScan *) mtstate->ps.plan->lefttree)->custom_scan_tlist = NULL; } - +#if PG15_GE + if (((ModifyTable *) mtstate->ps.plan)->operation == CMD_MERGE && es->verbose) + { + mtstate->ps.plan->lefttree->targetlist = NULL; + ((CustomScan *) mtstate->ps.plan->lefttree)->custom_scan_tlist = NULL; + } +#endif /* * Since we hijack the ModifyTable node instrumentation on ModifyTable will * be missing so we set it to instrumentation of HypertableModify node. @@ -538,9 +626,9 @@ hypertable_modify_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *be cscan->scan.plan.targetlist = copyObject(root->processed_tlist); /* - * For UPDATE/DELETE processed_tlist will have ROWID_VAR. We need to remove - * those because set_customscan_references will bail if it sees - * ROWID_VAR entries in the targetlist. + * For UPDATE/DELETE/MERGE processed_tlist will have ROWID_VAR. We + * need to remove those because set_customscan_references will bail + * if it sees ROWID_VAR entries in the targetlist. */ #if PG14_GE if (mt->operation == CMD_UPDATE || mt->operation == CMD_DELETE) @@ -616,7 +704,11 @@ ts_hypertable_modify_path_create(PlannerInfo *root, ModifyTablePath *mtpath, Hyp Index rti = mtpath->nominalRelation; - if (mtpath->operation == CMD_INSERT) + if (mtpath->operation == CMD_INSERT +#if PG15_GE + || mtpath->operation == CMD_MERGE +#endif + ) { if (hypertable_is_distributed(ht) && ts_guc_max_insert_batch_size > 0) { @@ -653,7 +745,28 @@ ts_hypertable_modify_path_create(PlannerInfo *root, ModifyTablePath *mtpath, Hyp return path; } +/* + * Callback for ModifyTableState->GetUpdateNewTuple for use by regular UPDATE. + */ #if PG14_GE +static TupleTableSlot * +internalGetUpdateNewTuple(ResultRelInfo *relinfo, TupleTableSlot *planSlot, TupleTableSlot *oldSlot, +#if PG15_GE + MergeActionState *relaction +#else + void *temp +#endif +) +{ + ProjectionInfo *newProj = relinfo->ri_projectNew; + ExprContext *econtext; + + econtext = newProj->pi_exprContext; + econtext->ecxt_outertuple = planSlot; + econtext->ecxt_scantuple = oldSlot; + return ExecProject(newProj); +} + /* ---------------------------------------------------------------- * ExecModifyTable * @@ -667,12 +780,12 @@ static TupleTableSlot * ExecModifyTable(PlanState *pstate) { ModifyTableState *node = castNode(ModifyTableState, pstate); + ModifyTableContext context; EState *estate = node->ps.state; CmdType operation = node->operation; ResultRelInfo *resultRelInfo; PlanState *subplanstate; TupleTableSlot *slot; - TupleTableSlot *planSlot; TupleTableSlot *oldSlot; ItemPointer tupleid; ItemPointerData tuple_ctid; @@ -718,7 +831,11 @@ ExecModifyTable(PlanState *pstate) resultRelInfo = node->resultRelInfo + node->mt_lastResultIndex; subplanstate = outerPlanState(node); - if (operation == CMD_INSERT) + if (operation == CMD_INSERT +#if PG15_GE + || operation == CMD_MERGE +#endif + ) { if (ts_is_chunk_dispatch_state(subplanstate)) { @@ -730,6 +847,10 @@ ExecModifyTable(PlanState *pstate) cds = linitial(get_chunk_dispatch_states(subplanstate)); } } + /* Set global context */ + context.mtstate = node; + context.epqstate = &node->mt_epqstate; + context.estate = estate; /* * Fetch rows from subplan, and execute the required table modification @@ -753,12 +874,17 @@ ExecModifyTable(PlanState *pstate) if (pstate->ps_ExprContext) ResetExprContext(pstate->ps_ExprContext); - planSlot = ExecProcNode(subplanstate); + context.planSlot = ExecProcNode(subplanstate); /* No more tuples to process? */ - if (TupIsNull(planSlot)) + if (TupIsNull(context.planSlot)) break; +#if PG15_GE + /* copy INSERT merge action list to result relation info of corresponding chunk */ + if (cds && cds->rri && operation == CMD_MERGE) + cds->rri->ri_notMatchedMergeAction = resultRelInfo->ri_notMatchedMergeAction; +#endif /* * When there are multiple result relations, each tuple contains a * junk column that gives the OID of the rel from which it came. @@ -770,9 +896,24 @@ ExecModifyTable(PlanState *pstate) bool isNull; Oid resultoid; - datum = ExecGetJunkAttribute(planSlot, node->mt_resultOidAttno, &isNull); + datum = ExecGetJunkAttribute(context.planSlot, node->mt_resultOidAttno, &isNull); if (isNull) + { +#if PG15_GE + if (operation == CMD_MERGE) + { + EvalPlanQualSetSlot(&node->mt_epqstate, context.planSlot); + + ExecMerge(&context, + node->resultRelInfo, + (cds ? cds->rri : NULL), + NULL, + node->canSetTag); + continue; /* no RETURNING support yet */ + } +#endif elog(ERROR, "tableoid is NULL"); + } resultoid = DatumGetObjectId(datum); /* If it's not the same as last time, we need to locate the rel */ @@ -794,13 +935,13 @@ ExecModifyTable(PlanState *pstate) * ExecProcessReturning by IterateDirectModify, so no need to * provide it here. */ - slot = ExecProcessReturning(resultRelInfo, NULL, planSlot); + slot = ExecProcessReturning(resultRelInfo, NULL, context.planSlot); return slot; } - EvalPlanQualSetSlot(&node->mt_epqstate, planSlot); - slot = planSlot; + EvalPlanQualSetSlot(&node->mt_epqstate, context.planSlot); + slot = context.planSlot; tupleid = NULL; oldtuple = NULL; @@ -812,7 +953,11 @@ ExecModifyTable(PlanState *pstate) * Keep this in step with the part of ExecInitModifyTable that sets up * ri_RowIdAttNo. */ - if (operation == CMD_UPDATE || operation == CMD_DELETE) + if (operation == CMD_UPDATE || operation == CMD_DELETE +#if PG15_GE + || operation == CMD_MERGE +#endif + ) { char relkind; Datum datum; @@ -833,7 +978,22 @@ ExecModifyTable(PlanState *pstate) datum = ExecGetJunkAttribute(slot, resultRelInfo->ri_RowIdAttNo, &isNull); /* shouldn't ever get a null result... */ if (isNull) + { +#if PG15_GE + if (operation == CMD_MERGE) + { + EvalPlanQualSetSlot(&node->mt_epqstate, context.planSlot); + + ExecMerge(&context, + node->resultRelInfo, + (cds ? cds->rri : NULL), + NULL, + node->canSetTag); + continue; /* no RETURNING support yet */ + } +#endif elog(ERROR, "ctid is NULL"); + } tupleid = (ItemPointer) DatumGetPointer(datum); tuple_ctid = *tupleid; /* be sure we don't free ctid!! */ @@ -886,8 +1046,8 @@ ExecModifyTable(PlanState *pstate) /* Initialize projection info if first time for this table */ if (unlikely(!resultRelInfo->ri_projectNewInfoValid)) ExecInitInsertProjection(node, resultRelInfo); - slot = ExecGetInsertNewTuple(resultRelInfo, planSlot); - slot = ExecInsert(node, cds->rri, slot, planSlot, estate, node->canSetTag); + slot = ExecGetInsertNewTuple(resultRelInfo, context.planSlot); + slot = ExecInsert(&context, cds->rri, slot, node->canSetTag); break; case CMD_UPDATE: /* Initialize projection info if first time for this table */ @@ -909,37 +1069,42 @@ ExecModifyTable(PlanState *pstate) /* Fetch the most recent version of old tuple. */ Relation relation = resultRelInfo->ri_RelationDesc; - Assert(tupleid != NULL); if (!table_tuple_fetch_row_version(relation, tupleid, SnapshotAny, oldSlot)) elog(ERROR, "failed to fetch tuple being updated"); } +#if PG14_LT slot = ExecGetUpdateNewTuple(resultRelInfo, planSlot, oldSlot); - +#else + slot = internalGetUpdateNewTuple(resultRelInfo, context.planSlot, oldSlot, NULL); +#endif +#if PG15_GE + context.GetUpdateNewTuple = internalGetUpdateNewTuple; + context.relaction = NULL; +#endif /* Now apply the update. */ - slot = ExecUpdate(node, - resultRelInfo, - tupleid, - oldtuple, - slot, - planSlot, - &node->mt_epqstate, - estate, - node->canSetTag); + slot = + ExecUpdate(&context, resultRelInfo, tupleid, oldtuple, slot, node->canSetTag); break; case CMD_DELETE: - slot = ExecDelete(node, + slot = ExecDelete(&context, resultRelInfo, tupleid, oldtuple, - planSlot, - &node->mt_epqstate, - estate, - true, /* processReturning */ + true, + false, node->canSetTag, - false, /* changingPart */ NULL, NULL); break; +#if PG15_GE + case CMD_MERGE: + slot = ExecMerge(&context, + resultRelInfo, + (cds ? cds->rri : NULL), + tupleid, + node->canSetTag); + break; +#endif default: elog(ERROR, "unknown operation"); break; @@ -1005,6 +1170,16 @@ fireBSTriggers(ModifyTableState *node) case CMD_DELETE: ExecBSDeleteTriggers(node->ps.state, resultRelInfo); break; +#if PG15_GE + case CMD_MERGE: + if (node->mt_merge_subcommands & MERGE_INSERT) + ExecBSInsertTriggers(node->ps.state, resultRelInfo); + if (node->mt_merge_subcommands & MERGE_UPDATE) + ExecBSUpdateTriggers(node->ps.state, resultRelInfo); + if (node->mt_merge_subcommands & MERGE_DELETE) + ExecBSDeleteTriggers(node->ps.state, resultRelInfo); + break; +#endif default: elog(ERROR, "unknown operation"); break; @@ -1035,6 +1210,16 @@ fireASTriggers(ModifyTableState *node) case CMD_DELETE: ExecASDeleteTriggers(node->ps.state, resultRelInfo, node->mt_transition_capture); break; +#if PG15_GE + case CMD_MERGE: + if (node->mt_merge_subcommands & MERGE_INSERT) + ExecASInsertTriggers(node->ps.state, resultRelInfo, node->mt_transition_capture); + if (node->mt_merge_subcommands & MERGE_UPDATE) + ExecASUpdateTriggers(node->ps.state, resultRelInfo, node->mt_transition_capture); + if (node->mt_merge_subcommands & MERGE_DELETE) + ExecASDeleteTriggers(node->ps.state, resultRelInfo, node->mt_transition_capture); + break; +#endif default: elog(ERROR, "unknown operation"); break; @@ -1322,6 +1507,26 @@ ExecGetInsertNewTuple(ResultRelInfo *relinfo, TupleTableSlot *planSlot) return ExecProject(newProj); } +/* + * ExecGetUpdateNewTuple + * This prepares a "new" tuple by combining an UPDATE subplan's output + * tuple (which contains values of changed columns) with unchanged + * columns taken from the old tuple. + * + * The subplan tuple might also contain junk columns, which are ignored. + * Note that the projection also ensures we have a slot of the right type. + */ +TupleTableSlot * +ExecGetUpdateNewTuple(ResultRelInfo *relinfo, TupleTableSlot *planSlot, TupleTableSlot *oldSlot) +{ + /* Use a few extra Asserts to protect against outside callers */ + Assert(relinfo->ri_projectNewInfoValid); + Assert(planSlot != NULL && !TTS_EMPTY(planSlot)); + Assert(oldSlot != NULL && !TTS_EMPTY(oldSlot)); + + return internalGetUpdateNewTuple(relinfo, planSlot, oldSlot, NULL); +} + /* ---------------------------------------------------------------- * ExecInsert * @@ -1343,11 +1548,14 @@ ExecGetInsertNewTuple(ResultRelInfo *relinfo, TupleTableSlot *planSlot) * copied and modified version of ExecInsert from executor/nodeModifyTable.c */ static TupleTableSlot * -ExecInsert(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, - TupleTableSlot *planSlot, EState *estate, bool canSetTag) +ExecInsert(ModifyTableContext *context, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, + bool canSetTag) { + ModifyTableState *mtstate = context->mtstate; + EState *estate = context->estate; Relation resultRelationDesc; List *recheckIndexes = NIL; + TupleTableSlot *planSlot = context->planSlot; TupleTableSlot *result = NULL; TransitionCaptureState *ar_insert_trig_tcs; ModifyTable *node = (ModifyTable *) mtstate->ps.plan; @@ -1511,8 +1719,20 @@ ExecInsert(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, TupleTableSl * partition, we should instead check UPDATE policies, because we are * executing policies defined on the target table, and not those * defined on the child partitions. + * + * If we're running MERGE, we refer to the action that we're executing + * to know if we're doing an INSERT or UPDATE to a partition table. */ - wco_kind = (mtstate->operation == CMD_UPDATE) ? WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK; + if (mtstate->operation == CMD_UPDATE) + wco_kind = WCO_RLS_UPDATE_CHECK; +#if PG15_GE + else if (mtstate->operation == CMD_MERGE) + wco_kind = (context->relaction->mas_action->commandType == CMD_UPDATE) ? + WCO_RLS_UPDATE_CHECK : + WCO_RLS_INSERT_CHECK; +#endif + else + wco_kind = WCO_RLS_INSERT_CHECK; /* * ExecWithCheckOptions() will skip any WCOs which are not of the kind @@ -1578,12 +1798,10 @@ ExecInsert(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, TupleTableSl */ TupleTableSlot *returning = NULL; - if (ExecOnConflictUpdate(mtstate, + if (ExecOnConflictUpdate(context, resultRelInfo, &conflictTid, - planSlot, slot, - estate, canSetTag, &returning)) { @@ -1791,6 +2009,212 @@ ExecBatchInsert(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, TupleTa estate->es_processed += numInserted; } +/* + * ExecUpdatePrologue -- subroutine for ExecUpdate + * + * Prepare executor state for UPDATE. This includes running BEFORE ROW + * triggers. We return false if one of them makes the update a no-op; + * otherwise, return true. + */ +static bool +ExecUpdatePrologue(ModifyTableContext *context, ResultRelInfo *resultRelInfo, ItemPointer tupleid, + HeapTuple oldtuple, TupleTableSlot *slot) +{ + Relation resultRelationDesc = resultRelInfo->ri_RelationDesc; + + ExecMaterializeSlot(slot); + + /* + * Open the table's indexes, if we have not done so already, so that we + * can add new index entries for the updated tuple. + */ + if (resultRelationDesc->rd_rel->relhasindex && resultRelInfo->ri_IndexRelationDescs == NULL) + ExecOpenIndices(resultRelInfo, false); + + /* BEFORE ROW UPDATE triggers */ + if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_update_before_row) + return ExecBRUpdateTriggersCompat(context->estate, + context->epqstate, + resultRelInfo, + tupleid, + oldtuple, + slot, + &context->tmfd); + + return true; +} + +/* + * ExecUpdatePrepareSlot -- subroutine for ExecUpdate + * + * Apply the final modifications to the tuple slot before the update. + */ +static void +ExecUpdatePrepareSlot(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate) +{ + Relation resultRelationDesc = resultRelInfo->ri_RelationDesc; + + /* + * Constraints and GENERATED expressions might reference the tableoid + * column, so (re-)initialize tts_tableOid before evaluating them. + */ + slot->tts_tableOid = RelationGetRelid(resultRelationDesc); + + /* + * Compute stored generated columns + */ + if (resultRelationDesc->rd_att->constr && + resultRelationDesc->rd_att->constr->has_generated_stored) + ExecComputeStoredGenerated(resultRelInfo, estate, slot, CMD_UPDATE); +} + +/* + * ExecUpdateAct -- subroutine for ExecUpdate + * + * Actually update the tuple, when operating on a plain table. If the + * table is a partition, and the command was called referencing an ancestor + * partitioned table, this routine migrates the resulting tuple to another + * partition. + * + * The caller is in charge of keeping indexes current as necessary. The + * caller is also in charge of doing EvalPlanQual if the tuple is found to + * be concurrently updated. However, in case of a cross-partition update, + * this routine does it. + * + * Caller is in charge of doing EvalPlanQual as necessary, and of keeping + * indexes current for the update. + */ +static TM_Result +ExecUpdateAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo, ItemPointer tupleid, + HeapTuple oldtuple, TupleTableSlot *slot, bool canSetTag, UpdateContext *updateCxt) +{ + EState *estate = context->estate; + Relation resultRelationDesc = resultRelInfo->ri_RelationDesc; + bool partition_constraint_failed; + TM_Result result; + + updateCxt->crossPartUpdate = false; + + /* + * If we generate a new candidate tuple after EvalPlanQual testing, we + * must loop back here and recheck any RLS policies and constraints. (We + * don't need to redo triggers, however. If there are any BEFORE triggers + * then trigger.c will have done table_tuple_lock to lock the correct + * tuple, so there's no need to do them again.) + */ + + /* ensure slot is independent, consider e.g. EPQ */ + ExecMaterializeSlot(slot); + + /* + * If partition constraint fails, this row might get moved to another + * partition, in which case we should check the RLS CHECK policy just + * before inserting into the new partition, rather than doing it here. + * This is because a trigger on that partition might again change the row. + * So skip the WCO checks if the partition constraint fails. + */ + partition_constraint_failed = resultRelationDesc->rd_rel->relispartition && + !ExecPartitionCheck(resultRelInfo, slot, estate, false); + + /* Check any RLS UPDATE WITH CHECK policies */ + if (!partition_constraint_failed && resultRelInfo->ri_WithCheckOptions != NIL) + { + /* + * ExecWithCheckOptions() will skip any WCOs which are not of the kind + * we are looking for at this point. + */ + ExecWithCheckOptions(WCO_RLS_UPDATE_CHECK, resultRelInfo, slot, estate); + } + + /* + * If a partition check failed, try to move the row into the right + * partition. + */ + if (partition_constraint_failed) + { + elog(ERROR, "cross chunk updates not supported"); + } + + /* + * Check the constraints of the tuple. We've already checked the + * partition constraint above; however, we must still ensure the tuple + * passes all other constraints, so we will call ExecConstraints() and + * have it validate all remaining checks. + */ + if (resultRelationDesc->rd_att->constr) + ExecConstraints(resultRelInfo, slot, estate); + + /* + * replace the heap tuple + * + * Note: if es_crosscheck_snapshot isn't InvalidSnapshot, we check that + * the row to be updated is visible to that snapshot, and throw a + * can't-serialize error if not. This is a special-case behavior needed + * for referential integrity updates in transaction-snapshot mode + * transactions. + */ + result = table_tuple_update(resultRelationDesc, + tupleid, + slot, + estate->es_output_cid, + estate->es_snapshot, + estate->es_crosscheck_snapshot, + true /* wait for commit */, + &context->tmfd, + &context->lockmode, + &updateCxt->updateIndexes); + if (result == TM_Ok) + updateCxt->updated = true; + + return result; +} + +/* + * ExecUpdateEpilogue -- subroutine for ExecUpdate + * + * Closing steps of updating a tuple. Must be called if ExecUpdateAct + * returns indicating that the tuple was updated. + */ +static void +ExecUpdateEpilogue(ModifyTableContext *context, UpdateContext *updateCxt, + ResultRelInfo *resultRelInfo, ItemPointer tupleid, HeapTuple oldtuple, + TupleTableSlot *slot, List *recheckIndexes) +{ + ModifyTableState *mtstate = context->mtstate; + + /* insert index entries for tuple if necessary */ + if (resultRelInfo->ri_NumIndices > 0 && updateCxt->updateIndexes) + recheckIndexes = + ExecInsertIndexTuples(resultRelInfo, slot, context->estate, true, false, NULL, NIL); + + /* AFTER ROW UPDATE Triggers */ + ExecARUpdateTriggersCompat(context->estate, + resultRelInfo, + NULL, + NULL, + tupleid, + oldtuple, + slot, + recheckIndexes, + mtstate->operation == CMD_INSERT ? + mtstate->mt_oc_transition_capture : + mtstate->mt_transition_capture, + false /* is_crosspart_update */ + ); + + /* + * Check any WITH CHECK OPTION constraints from parent views. We are + * required to do this after testing all constraints and uniqueness + * violations per the SQL spec, so we do it after actually updating the + * record in the heap and all indexes. + * + * ExecWithCheckOptions() will skip any WCOs which are not of the kind we + * are looking for at this point. + */ + if (resultRelInfo->ri_WithCheckOptions != NIL) + ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, context->estate); +} + /* ---------------------------------------------------------------- * ExecUpdate * @@ -1821,36 +2245,21 @@ ExecBatchInsert(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, TupleTa * copied and modified version of ExecUpdate from executor/nodeModifyTable.c */ static TupleTableSlot * -ExecUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer tupleid, - HeapTuple oldtuple, TupleTableSlot *slot, TupleTableSlot *planSlot, EPQState *epqstate, - EState *estate, bool canSetTag) +ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo, ItemPointer tupleid, + HeapTuple oldtuple, TupleTableSlot *slot, bool canSetTag) { + EState *estate = context->estate; Relation resultRelationDesc = resultRelInfo->ri_RelationDesc; TM_Result result; - TM_FailureData tmfd; List *recheckIndexes = NIL; - - ExecMaterializeSlot(slot); + UpdateContext updateCxt = { 0 }; /* - * Open the table's indexes, if we have not done so already, so that we - * can add new index entries for the updated tuple. + * Prepare for the update. This includes BEFORE ROW triggers, so we're + * done if it says we are. */ - if (resultRelationDesc->rd_rel->relhasindex && resultRelInfo->ri_IndexRelationDescs == NULL) - ExecOpenIndices(resultRelInfo, false); - - /* BEFORE ROW UPDATE Triggers */ - if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_update_before_row) - { - if (!ExecBRUpdateTriggersCompat(estate, - epqstate, - resultRelInfo, - tupleid, - oldtuple, - slot, - NULL)) - return NULL; /* "do nothing" */ - } + if (!ExecUpdatePrologue(context, resultRelInfo, tupleid, oldtuple, slot)) + return NULL; /* INSTEAD OF ROW UPDATE Triggers */ if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_update_instead_row) @@ -1860,24 +2269,15 @@ ExecUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer } else if (resultRelInfo->ri_FdwRoutine) { - /* - * GENERATED expressions might reference the tableoid column, so - * (re-)initialize tts_tableOid before evaluating them. - */ - slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); - - /* - * Compute stored generated columns - */ - if (resultRelationDesc->rd_att->constr && - resultRelationDesc->rd_att->constr->has_generated_stored) - ExecComputeStoredGenerated(resultRelInfo, estate, slot, CMD_UPDATE); + ExecUpdatePrepareSlot(resultRelInfo, slot, estate); /* * update in foreign table: let the FDW do it */ - slot = - resultRelInfo->ri_FdwRoutine->ExecForeignUpdate(estate, resultRelInfo, slot, planSlot); + slot = resultRelInfo->ri_FdwRoutine->ExecForeignUpdate(estate, + resultRelInfo, + slot, + context->planSlot); if (slot == NULL) /* "do nothing" */ return NULL; @@ -1891,95 +2291,22 @@ ExecUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer } else { - LockTupleMode lockmode; - bool partition_constraint_failed; - bool update_indexes; - - /* - * Constraints and GENERATED expressions might reference the tableoid - * column, so (re-)initialize tts_tableOid before evaluating them. - */ - slot->tts_tableOid = RelationGetRelid(resultRelationDesc); - - /* - * Compute stored generated columns - */ - if (resultRelationDesc->rd_att->constr && - resultRelationDesc->rd_att->constr->has_generated_stored) - ExecComputeStoredGenerated(resultRelInfo, estate, slot, CMD_UPDATE); + /* Fill in the slot appropriately */ + ExecUpdatePrepareSlot(resultRelInfo, slot, estate); - /* - * Check any RLS UPDATE WITH CHECK policies - * - * If we generate a new candidate tuple after EvalPlanQual testing, we - * must loop back here and recheck any RLS policies and constraints. - * (We don't need to redo triggers, however. If there are any BEFORE - * triggers then trigger.c will have done table_tuple_lock to lock the - * correct tuple, so there's no need to do them again.) - */ - lreplace:; - - /* ensure slot is independent, consider e.g. EPQ */ - ExecMaterializeSlot(slot); + redo_act: + result = + ExecUpdateAct(context, resultRelInfo, tupleid, oldtuple, slot, canSetTag, &updateCxt); /* - * If partition constraint fails, this row might get moved to another - * partition, in which case we should check the RLS CHECK policy just - * before inserting into the new partition, rather than doing it here. - * This is because a trigger on that partition might again change the - * row. So skip the WCO checks if the partition constraint fails. + * If ExecUpdateAct reports that a cross-partition update was done, + * then the returning tuple has been projected and there's nothing + * else for us to do. */ - partition_constraint_failed = resultRelationDesc->rd_rel->relispartition && - !ExecPartitionCheck(resultRelInfo, slot, estate, false); + if (updateCxt.crossPartUpdate) + return context->cpUpdateReturningSlot; - if (!partition_constraint_failed && resultRelInfo->ri_WithCheckOptions != NIL) - { - /* - * ExecWithCheckOptions() will skip any WCOs which are not of the - * kind we are looking for at this point. - */ - ExecWithCheckOptions(WCO_RLS_UPDATE_CHECK, resultRelInfo, slot, estate); - } - - /* - * If a partition check failed, try to move the row into the right - * partition. - */ - if (partition_constraint_failed) - { - elog(ERROR, "cross chunk updates not supported"); - } - - /* - * Check the constraints of the tuple. We've already checked the - * partition constraint above; however, we must still ensure the tuple - * passes all other constraints, so we will call ExecConstraints() and - * have it validate all remaining checks. - */ - if (resultRelationDesc->rd_att->constr) - ExecConstraints(resultRelInfo, slot, estate); - - /* - * replace the heap tuple - * - * Note: if es_crosscheck_snapshot isn't InvalidSnapshot, we check - * that the row to be updated is visible to that snapshot, and throw a - * can't-serialize error if not. This is a special-case behavior - * needed for referential integrity updates in transaction-snapshot - * mode transactions. - */ - result = table_tuple_update(resultRelationDesc, - tupleid, - slot, - estate->es_output_cid, - estate->es_snapshot, - estate->es_crosscheck_snapshot, - true /* wait for commit */, - &tmfd, - &lockmode, - &update_indexes); - - switch (result) + switch (result) { case TM_SelfModified: @@ -2006,7 +2333,7 @@ ExecUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer * can re-execute the UPDATE (assuming it can figure out how) * and then return NULL to cancel the outer update. */ - if (tmfd.cmax != estate->es_output_cid) + if (context->tmfd.cmax != estate->es_output_cid) ereport(ERROR, (errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION), errmsg("tuple to be updated was already modified by an operation " @@ -2035,7 +2362,7 @@ ExecUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer * Already know that we're going to need to do EPQ, so * fetch tuple directly into the right slot. */ - inputslot = EvalPlanQualSlot(epqstate, + inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc, resultRelInfo->ri_RangeTableIndex); @@ -2044,17 +2371,17 @@ ExecUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer estate->es_snapshot, inputslot, estate->es_output_cid, - lockmode, + context->lockmode, LockWaitBlock, TUPLE_LOCK_FLAG_FIND_LAST_VERSION, - &tmfd); + &context->tmfd); switch (result) { case TM_Ok: - Assert(tmfd.traversed); + Assert(context->tmfd.traversed); - epqslot = EvalPlanQual(epqstate, + epqslot = EvalPlanQual(context->epqstate, resultRelationDesc, resultRelInfo->ri_RangeTableIndex, inputslot); @@ -2064,7 +2391,7 @@ ExecUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer /* Make sure ri_oldTupleSlot is initialized. */ if (unlikely(!resultRelInfo->ri_projectNewInfoValid)) - ExecInitUpdateProjection(mtstate, resultRelInfo); + ExecInitUpdateProjection(context->mtstate, resultRelInfo); /* Fetch the most recent version of old tuple. */ oldSlot = resultRelInfo->ri_oldTupleSlot; @@ -2074,7 +2401,7 @@ ExecUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer oldSlot)) elog(ERROR, "failed to fetch tuple being updated"); slot = ExecGetUpdateNewTuple(resultRelInfo, epqslot, oldSlot); - goto lreplace; + goto redo_act; case TM_Deleted: /* tuple already deleted; nothing to do */ @@ -2093,7 +2420,7 @@ ExecUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer * See also TM_SelfModified response to * table_tuple_update() above. */ - if (tmfd.cmax != estate->es_output_cid) + if (context->tmfd.cmax != estate->es_output_cid) ereport(ERROR, (errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION), errmsg("tuple to be updated was already modified by an " @@ -2123,48 +2450,19 @@ ExecUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer elog(ERROR, "unrecognized table_tuple_update status: %u", result); return NULL; } - - /* insert index entries for tuple if necessary */ - if (resultRelInfo->ri_NumIndices > 0 && update_indexes) - recheckIndexes = - ExecInsertIndexTuples(resultRelInfo, slot, estate, true, false, NULL, NIL); } if (canSetTag) (estate->es_processed)++; /* AFTER ROW UPDATE Triggers */ - ExecARUpdateTriggersCompat(estate, - resultRelInfo, - NULL, - NULL, - tupleid, - oldtuple, - slot, - recheckIndexes, - mtstate->operation == CMD_INSERT ? - mtstate->mt_oc_transition_capture : - mtstate->mt_transition_capture, - false /* is_crosspart_update */ - ); + ExecUpdateEpilogue(context, &updateCxt, resultRelInfo, tupleid, oldtuple, slot, recheckIndexes); list_free(recheckIndexes); - /* - * Check any WITH CHECK OPTION constraints from parent views. We are - * required to do this after testing all constraints and uniqueness - * violations per the SQL spec, so we do it after actually updating the - * record in the heap and all indexes. - * - * ExecWithCheckOptions() will skip any WCOs which are not of the kind we - * are looking for at this point. - */ - if (resultRelInfo->ri_WithCheckOptions != NIL) - ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, estate); - /* Process RETURNING if present */ if (resultRelInfo->ri_projectReturning) - return ExecProcessReturning(resultRelInfo, slot, planSlot); + return ExecProcessReturning(resultRelInfo, slot, context->planSlot); return NULL; } @@ -2183,11 +2481,11 @@ ExecUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer * copied verbatim from executor/nodeModifyTable.c */ static bool -ExecOnConflictUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, - ItemPointer conflictTid, TupleTableSlot *planSlot, - TupleTableSlot *excludedSlot, EState *estate, bool canSetTag, +ExecOnConflictUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo, + ItemPointer conflictTid, TupleTableSlot *excludedSlot, bool canSetTag, TupleTableSlot **returning) { + ModifyTableState *mtstate = context->mtstate; ExprContext *econtext = mtstate->ps.ps_ExprContext; Relation relation = resultRelInfo->ri_RelationDesc; ExprState *onConflictSetWhere = resultRelInfo->ri_onConflict->oc_WhereClause; @@ -2200,7 +2498,7 @@ ExecOnConflictUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, bool isnull; /* Determine lock mode to use */ - lockmode = ExecUpdateLockMode(estate, resultRelInfo); + lockmode = ExecUpdateLockMode(context->estate, resultRelInfo); /* * Lock tuple for update. Don't follow updates when tuple cannot be @@ -2210,9 +2508,9 @@ ExecOnConflictUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, */ test = table_tuple_lock(relation, conflictTid, - estate->es_snapshot, + context->estate->es_snapshot, existing, - estate->es_output_cid, + context->estate->es_output_cid, lockmode, LockWaitBlock, 0, @@ -2320,7 +2618,7 @@ ExecOnConflictUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, * snapshot. This is in line with the way UPDATE deals with newer tuple * versions. */ - ExecCheckTupleVisible(estate, relation, existing); + ExecCheckTupleVisible(context->estate, relation, existing); /* * Make tuple and any needed join variables available to ExecQual and @@ -2373,14 +2671,11 @@ ExecOnConflictUpdate(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, */ /* Execute UPDATE with projection */ - *returning = ExecUpdate(mtstate, + *returning = ExecUpdate(context, resultRelInfo, conflictTid, NULL, resultRelInfo->ri_onConflict->oc_ProjSlot, - planSlot, - &mtstate->mt_epqstate, - mtstate->ps.state, canSetTag); /* @@ -2452,6 +2747,99 @@ ExecCheckTIDVisible(EState *estate, ResultRelInfo *relinfo, ItemPointer tid, ExecClearTuple(tempSlot); } +/* + * ExecDeletePrologue -- subroutine for ExecDelete + * + * Prepare executor state for DELETE. Actually, the only thing we have to do + * here is execute BEFORE ROW triggers. We return false if one of them makes + * the delete a no-op; otherwise, return true. + */ +static bool +ExecDeletePrologue(ModifyTableContext *context, ResultRelInfo *resultRelInfo, ItemPointer tupleid, + HeapTuple oldtuple, TupleTableSlot **epqreturnslot) +{ + /* BEFORE ROW DELETE triggers */ + if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_delete_before_row) + return ExecBRDeleteTriggers(context->estate, + context->epqstate, + resultRelInfo, + tupleid, + oldtuple, + epqreturnslot); + + return true; +} + +/* + * ExecDeleteAct -- subroutine for ExecDelete + * + * Actually delete the tuple from a plain table. + * + * Caller is in charge of doing EvalPlanQual as necessary + */ +static TM_Result +ExecDeleteAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo, ItemPointer tupleid, + bool changingPart) +{ + EState *estate = context->estate; + + return table_tuple_delete(resultRelInfo->ri_RelationDesc, + tupleid, + estate->es_output_cid, + estate->es_snapshot, + estate->es_crosscheck_snapshot, + true /* wait for commit */, + &context->tmfd, + changingPart); +} + +/* + * ExecDeleteEpilogue -- subroutine for ExecDelete + * + * Closing steps of tuple deletion; this invokes AFTER FOR EACH ROW triggers, + * including the UPDATE triggers if the deletion is being done as part of a + * cross-partition tuple move. + */ +static void +ExecDeleteEpilogue(ModifyTableContext *context, ResultRelInfo *resultRelInfo, ItemPointer tupleid, + HeapTuple oldtuple) +{ + ModifyTableState *mtstate = context->mtstate; + EState *estate = context->estate; + TransitionCaptureState *ar_delete_trig_tcs; + + /* + * If this delete is the result of a partition key update that moved the + * tuple to a new partition, put this row into the transition OLD TABLE, + * if there is one. We need to do this separately for DELETE and INSERT + * because they happen on different tables. + */ + ar_delete_trig_tcs = mtstate->mt_transition_capture; + if (mtstate->operation == CMD_UPDATE && mtstate->mt_transition_capture && + mtstate->mt_transition_capture->tcs_update_old_table) + { + ExecARUpdateTriggersCompat(estate, + resultRelInfo, + NULL, + NULL, + tupleid, + oldtuple, + NULL, + NULL, + mtstate->mt_transition_capture, + false); + + /* + * We've already captured the NEW TABLE row, so make sure any AR + * DELETE trigger fired below doesn't capture it again. + */ + ar_delete_trig_tcs = NULL; + } + + /* AFTER ROW DELETE Triggers */ + ExecARDeleteTriggersCompat(estate, resultRelInfo, tupleid, oldtuple, ar_delete_trig_tcs, false); +} + /* ---------------------------------------------------------------- * ExecDelete * @@ -2477,31 +2865,24 @@ ExecCheckTIDVisible(EState *estate, ResultRelInfo *relinfo, ItemPointer tid, * copied from executor/nodeModifyTable.c */ static TupleTableSlot * -ExecDelete(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer tupleid, - HeapTuple oldtuple, TupleTableSlot *planSlot, EPQState *epqstate, EState *estate, - bool processReturning, bool canSetTag, bool changingPart, bool *tupleDeleted, - TupleTableSlot **epqreturnslot) +ExecDelete(ModifyTableContext *context, ResultRelInfo *resultRelInfo, ItemPointer tupleid, + HeapTuple oldtuple, bool processReturning, bool canSetTag, bool changingPart, + bool *tupleDeleted, TupleTableSlot **epqreturnslot) { + EState *estate = context->estate; Relation resultRelationDesc = resultRelInfo->ri_RelationDesc; TM_Result result; - TM_FailureData tmfd; TupleTableSlot *slot = NULL; - TransitionCaptureState *ar_delete_trig_tcs; if (tupleDeleted) *tupleDeleted = false; - /* BEFORE ROW DELETE Triggers */ - if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_delete_before_row) - { - bool dodelete; - - dodelete = - ExecBRDeleteTriggers(estate, epqstate, resultRelInfo, tupleid, oldtuple, epqreturnslot); - - if (!dodelete) /* "do nothing" */ - return NULL; - } + /* + * Prepare for the delete. This includes BEFORE ROW triggers, so we're + * done if it says we are. + */ + if (!ExecDeletePrologue(context, resultRelInfo, tupleid, oldtuple, epqreturnslot)) + return NULL; /* INSTEAD OF ROW DELETE Triggers */ if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_delete_instead_row) @@ -2523,8 +2904,10 @@ ExecDelete(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer * although the FDW can return some other slot if it wants. */ slot = ExecGetReturningSlot(estate, resultRelInfo); - slot = - resultRelInfo->ri_FdwRoutine->ExecForeignDelete(estate, resultRelInfo, slot, planSlot); + slot = resultRelInfo->ri_FdwRoutine->ExecForeignDelete(estate, + resultRelInfo, + slot, + context->planSlot); if (slot == NULL) /* "do nothing" */ return NULL; @@ -2543,11 +2926,11 @@ ExecDelete(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer /* * delete the tuple * - * Note: if es_crosscheck_snapshot isn't InvalidSnapshot, we check - * that the row to be deleted is visible to that snapshot, and throw a - * can't-serialize error if not. This is a special-case behavior - * needed for referential integrity updates in transaction-snapshot - * mode transactions. + * Note: if context->estate->es_crosscheck_snapshot isn't + * InvalidSnapshot, we check that the row to be deleted is visible to + * that snapshot, and throw a can't-serialize error if not. This is a + * special-case behavior needed for referential integrity updates in + * transaction-snapshot mode transactions. */ ldelete:; if (!ItemPointerIsValid(tupleid)) @@ -2556,14 +2939,7 @@ ExecDelete(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer "cannot update/delete rows from chunk \"%s\" as it is compressed", NameStr(resultRelationDesc->rd_rel->relname)); } - result = table_tuple_delete(resultRelationDesc, - tupleid, - estate->es_output_cid, - estate->es_snapshot, - estate->es_crosscheck_snapshot, - true /* wait for commit */, - &tmfd, - changingPart); + result = ExecDeleteAct(context, resultRelInfo, tupleid, changingPart); switch (result) { @@ -2593,7 +2969,7 @@ ExecDelete(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer * can re-execute the DELETE and then return NULL to cancel * the outer delete. */ - if (tmfd.cmax != estate->es_output_cid) + if (context->tmfd.cmax != estate->es_output_cid) ereport(ERROR, (errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION), errmsg("tuple to be deleted was already modified by an operation " @@ -2621,8 +2997,8 @@ ExecDelete(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer * Already know that we're going to need to do EPQ, so * fetch tuple directly into the right slot. */ - EvalPlanQualBegin(epqstate); - inputslot = EvalPlanQualSlot(epqstate, + EvalPlanQualBegin(context->epqstate); + inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc, resultRelInfo->ri_RangeTableIndex); @@ -2634,13 +3010,13 @@ ExecDelete(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer LockTupleExclusive, LockWaitBlock, TUPLE_LOCK_FLAG_FIND_LAST_VERSION, - &tmfd); + &context->tmfd); switch (result) { case TM_Ok: - Assert(tmfd.traversed); - epqslot = EvalPlanQual(epqstate, + Assert(context->tmfd.traversed); + epqslot = EvalPlanQual(context->epqstate, resultRelationDesc, resultRelInfo->ri_RangeTableIndex, inputslot); @@ -2673,7 +3049,7 @@ ExecDelete(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer * See also TM_SelfModified response to * table_tuple_delete() above. */ - if (tmfd.cmax != estate->es_output_cid) + if (context->tmfd.cmax != estate->es_output_cid) ereport(ERROR, (errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION), errmsg("tuple to be deleted was already modified by an " @@ -2711,7 +3087,12 @@ ExecDelete(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to concurrent delete"))); - /* tuple already deleted; nothing to do */ + /* + * tuple already deleted; nothing to do. But MERGE might want + * to handle it differently. We've already filled-in + * actionInfo with sufficient information for MERGE to look + * at. + */ return NULL; default: @@ -2736,36 +3117,7 @@ ExecDelete(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer if (tupleDeleted) *tupleDeleted = true; - /* - * If this delete is the result of a partition key update that moved the - * tuple to a new partition, put this row into the transition OLD TABLE, - * if there is one. We need to do this separately for DELETE and INSERT - * because they happen on different tables. - */ - ar_delete_trig_tcs = mtstate->mt_transition_capture; - if (mtstate->operation == CMD_UPDATE && mtstate->mt_transition_capture && - mtstate->mt_transition_capture->tcs_update_old_table) - { - ExecARUpdateTriggersCompat(estate, - resultRelInfo, - NULL, - NULL, - tupleid, - oldtuple, - NULL, - NULL, - mtstate->mt_transition_capture, - false); - - /* - * We've already captured the NEW TABLE row, so make sure any AR - * DELETE trigger fired below doesn't capture it again. - */ - ar_delete_trig_tcs = NULL; - } - - /* AFTER ROW DELETE Triggers */ - ExecARDeleteTriggersCompat(estate, resultRelInfo, tupleid, oldtuple, ar_delete_trig_tcs, false); + ExecDeleteEpilogue(context, resultRelInfo, tupleid, oldtuple); /* Process RETURNING if present and if requested */ if (processReturning && resultRelInfo->ri_projectReturning) @@ -2795,7 +3147,7 @@ ExecDelete(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer } } - rslot = ExecProcessReturning(resultRelInfo, slot, planSlot); + rslot = ExecProcessReturning(resultRelInfo, slot, context->planSlot); /* * Before releasing the target tuple again, make sure rslot has a @@ -2810,5 +3162,558 @@ ExecDelete(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, ItemPointer return NULL; } +#endif + +/* + * Check and execute the first qualifying MATCHED action. The current target + * tuple is identified by tupleid. + * + * We start from the first WHEN MATCHED action and check if the WHEN quals + * pass, if any. If the WHEN quals for the first action do not pass, we + * check the second, then the third and so on. If we reach to the end, no + * action is taken and we return true, indicating that no further action is + * required for this tuple. + * + * If we do find a qualifying action, then we attempt to execute the action. + * + * If the tuple is concurrently updated, EvalPlanQual is run with the updated + * tuple to recheck the join quals. Note that the additional quals associated + * with individual actions are evaluated by this routine via ExecQual, while + * EvalPlanQual checks for the join quals. If EvalPlanQual tells us that the + * updated tuple still passes the join quals, then we restart from the first + * action to look for a qualifying action. Otherwise, we return false -- + * meaning that a NOT MATCHED action must now be executed for the current + * source tuple. + */ +#if PG15_GE +static bool +ExecMergeMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo, ItemPointer tupleid, + bool canSetTag) +{ + ModifyTableState *mtstate = context->mtstate; + TupleTableSlot *newslot; + EState *estate = context->estate; + ExprContext *econtext = mtstate->ps.ps_ExprContext; + bool isNull; + EPQState *epqstate = &mtstate->mt_epqstate; + ListCell *l; + + /* + * If there are no WHEN MATCHED actions, we are done. + */ + if (resultRelInfo->ri_matchedMergeAction == NIL) + return true; + + /* + * Make tuple and any needed join variables available to ExecQual and + * ExecProject. The target's existing tuple is installed in the scantuple. + * Again, this target relation's slot is required only in the case of a + * MATCHED tuple and UPDATE/DELETE actions. + */ + econtext->ecxt_scantuple = resultRelInfo->ri_oldTupleSlot; + econtext->ecxt_innertuple = context->planSlot; + econtext->ecxt_outertuple = NULL; + +lmerge_matched:; + + /* + * This routine is only invoked for matched rows, and we must have found + * the tupleid of the target row in that case; fetch that tuple. + * + * We use SnapshotAny for this because we might get called again after + * EvalPlanQual returns us a new tuple, which may not be visible to our + * MVCC snapshot. + */ + + if (!table_tuple_fetch_row_version(resultRelInfo->ri_RelationDesc, + tupleid, + SnapshotAny, + resultRelInfo->ri_oldTupleSlot)) + elog(ERROR, "failed to fetch the target tuple"); + + foreach (l, resultRelInfo->ri_matchedMergeAction) + { + MergeActionState *relaction = (MergeActionState *) lfirst(l); + CmdType commandType = relaction->mas_action->commandType; + List *recheckIndexes = NIL; + TM_Result result; + UpdateContext updateCxt = { 0 }; + + /* + * Test condition, if any. + * + * In the absence of any condition, we perform the action + * unconditionally (no need to check separately since ExecQual() will + * return true if there are no conditions to evaluate). + */ + if (!ExecQual(relaction->mas_whenqual, econtext)) + continue; + + /* + * Check if the existing target tuple meets the USING checks of + * UPDATE/DELETE RLS policies. If those checks fail, we throw an + * error. + * + * The WITH CHECK quals are applied in ExecUpdate() and hence we need + * not do anything special to handle them. + * + * NOTE: We must do this after WHEN quals are evaluated, so that we + * check policies only when they matter. + */ + if (resultRelInfo->ri_WithCheckOptions) + { + ExecWithCheckOptions(commandType == CMD_UPDATE ? WCO_RLS_MERGE_UPDATE_CHECK : + WCO_RLS_MERGE_DELETE_CHECK, + resultRelInfo, + resultRelInfo->ri_oldTupleSlot, + context->mtstate->ps.state); + } + + /* Perform stated action */ + switch (commandType) + { + case CMD_UPDATE: + + /* + * Project the output tuple, and use that to update the table. + * We don't need to filter out junk attributes, because the + * UPDATE action's targetlist doesn't have any. + */ + newslot = ExecProject(relaction->mas_proj); + + context->relaction = relaction; + context->GetUpdateNewTuple = mergeGetUpdateNewTuple; + context->cpUpdateRetrySlot = NULL; + + if (!ExecUpdatePrologue(context, resultRelInfo, tupleid, NULL, newslot)) + { + result = TM_Ok; + break; + } + ExecUpdatePrepareSlot(resultRelInfo, newslot, context->estate); + result = ExecUpdateAct(context, + resultRelInfo, + tupleid, + NULL, + newslot, + mtstate->canSetTag, + &updateCxt); + if (result == TM_Ok && updateCxt.updated) + { + ExecUpdateEpilogue(context, + &updateCxt, + resultRelInfo, + tupleid, + NULL, + newslot, + recheckIndexes); + mtstate->mt_merge_updated = 1; + } + + break; + + case CMD_DELETE: + context->relaction = relaction; + if (!ExecDeletePrologue(context, resultRelInfo, tupleid, NULL, NULL)) + { + result = TM_Ok; + break; + } + result = ExecDeleteAct(context, resultRelInfo, tupleid, false); + if (result == TM_Ok) + { + ExecDeleteEpilogue(context, resultRelInfo, tupleid, NULL); + mtstate->mt_merge_deleted = 1; + } + break; + + case CMD_NOTHING: + /* Doing nothing is always OK */ + result = TM_Ok; + break; + + default: + elog(ERROR, "unknown action in MERGE WHEN MATCHED clause"); + } + + switch (result) + { + case TM_Ok: + /* all good; perform final actions */ + if (canSetTag) + (estate->es_processed)++; + + break; + + case TM_SelfModified: + + /* + * The SQL standard disallows this for MERGE. + */ + if (TransactionIdIsCurrentTransactionId(context->tmfd.xmax)) + ereport(ERROR, + (errcode(ERRCODE_CARDINALITY_VIOLATION), + /* translator: %s is a SQL command name */ + errmsg("%s command cannot affect row a second time", "MERGE"), + errhint("Ensure that not more than one source row matches any one " + "target row."))); + /* This shouldn't happen */ + elog(ERROR, "attempted to update or delete invisible tuple"); + break; + + case TM_Deleted: + if (IsolationUsesXactSnapshot()) + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("could not serialize access due to concurrent delete"))); + + /* + * If the tuple was already deleted, return to let caller + * handle it under NOT MATCHED clauses. + */ + return false; + + case TM_Updated: + { + Relation resultRelationDesc; + TupleTableSlot *epqslot, *inputslot; + LockTupleMode lockmode; + + /* + * The target tuple was concurrently updated by some other + * transaction. + */ + + /* + * If cpUpdateRetrySlot is set, ExecCrossPartitionUpdate() + * must have detected that the tuple was concurrently + * updated, so we restart the search for an appropriate + * WHEN MATCHED clause to process the updated tuple. + * + * In this case, ExecDelete() would already have performed + * EvalPlanQual() on the latest version of the tuple, + * which in turn would already have been loaded into + * ri_oldTupleSlot, so no need to do either of those + * things. + * + * XXX why do we not check the WHEN NOT MATCHED list in + * this case? + */ + if (!TupIsNull(context->cpUpdateRetrySlot)) + goto lmerge_matched; + + /* + * Otherwise, we run the EvalPlanQual() with the new + * version of the tuple. If EvalPlanQual() does not return + * a tuple, then we switch to the NOT MATCHED list of + * actions. If it does return a tuple and the join qual is + * still satisfied, then we just need to recheck the + * MATCHED actions, starting from the top, and execute the + * first qualifying action. + */ + resultRelationDesc = resultRelInfo->ri_RelationDesc; + lockmode = ExecUpdateLockMode(estate, resultRelInfo); + + inputslot = EvalPlanQualSlot(epqstate, + resultRelationDesc, + resultRelInfo->ri_RangeTableIndex); + + result = table_tuple_lock(resultRelationDesc, + tupleid, + estate->es_snapshot, + inputslot, + estate->es_output_cid, + lockmode, + LockWaitBlock, + TUPLE_LOCK_FLAG_FIND_LAST_VERSION, + &context->tmfd); + switch (result) + { + case TM_Ok: + epqslot = EvalPlanQual(epqstate, + resultRelationDesc, + resultRelInfo->ri_RangeTableIndex, + inputslot); + + /* + * If we got no tuple, or the tuple we get has a + * NULL ctid, go back to caller: this one is not a + * MATCHED tuple anymore, so they can retry with + * NOT MATCHED actions. + */ + if (TupIsNull(epqslot)) + return false; + + (void) ExecGetJunkAttribute(epqslot, resultRelInfo->ri_RowIdAttNo, &isNull); + if (isNull) + return false; + + /* + * When a tuple was updated and migrated to + * another partition concurrently, the current + * MERGE implementation can't follow. There's + * probably a better way to handle this case, but + * it'd require recognizing the relation to which + * the tuple moved, and setting our current + * resultRelInfo to that. + */ + if (ItemPointerIndicatesMovedPartitions(&context->tmfd.ctid)) + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("tuple to be deleted was already moved to another " + "partition due to concurrent update"))); + + /* + * A non-NULL ctid means that we are still dealing + * with MATCHED case. Restart the loop so that we + * apply all the MATCHED rules again, to ensure + * that the first qualifying WHEN MATCHED action + * is executed. + * + * Update tupleid to that of the new tuple, for + * the refetch we do at the top. + */ + ItemPointerCopy(&context->tmfd.ctid, tupleid); + goto lmerge_matched; + + case TM_Deleted: + + /* + * tuple already deleted; tell caller to run NOT + * MATCHED actions + */ + return false; + + case TM_SelfModified: + + /* + * This can be reached when following an update + * chain from a tuple updated by another session, + * reaching a tuple that was already updated in + * this transaction. If previously modified by + * this command, ignore the redundant update, + * otherwise error out. + * + * See also response to TM_SelfModified in + * ExecUpdate(). + */ + if (context->tmfd.cmax != estate->es_output_cid) + ereport(ERROR, + (errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION), + errmsg("tuple to be updated or deleted was already modified " + "by an operation triggered by the current command"), + errhint("Consider using an AFTER trigger instead of a BEFORE " + "trigger to propagate changes to other rows."))); + return false; + + default: + /* see table_tuple_lock call in ExecDelete() */ + elog(ERROR, "unexpected table_tuple_lock status: %u", result); + return false; + } + } + + case TM_Invisible: + case TM_WouldBlock: + case TM_BeingModified: + /* these should not occur */ + elog(ERROR, "unexpected tuple operation result: %d", result); + break; + } + + /* + * We've activated one of the WHEN clauses, so we don't search + * further. This is required behaviour, not an optimization. + */ + break; + } + + /* + * Successfully executed an action or no qualifying action was found. + */ + return true; +} + +/* + * Execute the first qualifying NOT MATCHED action. + */ +static void +ExecMergeNotMatched(ModifyTableContext *context, ResultRelInfo *cds_rri, bool canSetTag) +{ + ModifyTableState *mtstate = context->mtstate; + ExprContext *econtext = mtstate->ps.ps_ExprContext; + List *actionStates = NIL; + ListCell *l; + + /* + * For INSERT actions, the root relation's merge action is OK since the + * INSERT's targetlist and the WHEN conditions can only refer to the + * source relation and hence it does not matter which result relation we + * work with. + * + * XXX does this mean that we can avoid creating copies of actionStates on + * partitioned tables, for not-matched actions? + */ + actionStates = cds_rri->ri_notMatchedMergeAction; + + /* + * Make source tuple available to ExecQual and ExecProject. We don't need + * the target tuple, since the WHEN quals and targetlist can't refer to + * the target columns. + */ + econtext->ecxt_scantuple = NULL; + econtext->ecxt_innertuple = context->planSlot; + econtext->ecxt_outertuple = NULL; + + foreach (l, actionStates) + { + MergeActionState *action = (MergeActionState *) lfirst(l); + CmdType commandType = action->mas_action->commandType; + TupleTableSlot *newslot; + + /* + * Test condition, if any. + * + * In the absence of any condition, we perform the action + * unconditionally (no need to check separately since ExecQual() will + * return true if there are no conditions to evaluate). + */ + if (!ExecQual(action->mas_whenqual, econtext)) + continue; + + /* Perform stated action */ + switch (commandType) + { + case CMD_INSERT: + + /* + * Project the tuple. In case of a partitioned table, the + * projection was already built to use the root's descriptor, + * so we don't need to map the tuple here. + */ + newslot = ExecProject(action->mas_proj); + context->relaction = action; + + (void) ExecInsert(context, cds_rri, newslot, canSetTag); + mtstate->mt_merge_inserted = 1; + break; + case CMD_NOTHING: + /* Do nothing */ + break; + default: + elog(ERROR, "unknown action in MERGE WHEN NOT MATCHED clause"); + } + + /* + * We've activated one of the WHEN clauses, so we don't search + * further. This is required behaviour, not an optimization. + */ + break; + } +} + +/* + * Initializes the tuple slots in a ResultRelInfo for any MERGE action. + * + * We mark 'projectNewInfoValid' even though the projections themselves + * are not initialized here. + */ +void +ExecInitMergeTupleSlots(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo) +{ + EState *estate = mtstate->ps.state; + + Assert(!resultRelInfo->ri_projectNewInfoValid); + + resultRelInfo->ri_oldTupleSlot = + table_slot_create(resultRelInfo->ri_RelationDesc, &estate->es_tupleTable); + resultRelInfo->ri_newTupleSlot = + table_slot_create(resultRelInfo->ri_RelationDesc, &estate->es_tupleTable); + resultRelInfo->ri_projectNewInfoValid = true; +} + +/* + * Perform MERGE. + */ +static TupleTableSlot * +ExecMerge(ModifyTableContext *context, ResultRelInfo *resultRelInfo, ResultRelInfo *cds_rri, + ItemPointer tupleid, bool canSetTag) +{ + bool matched; + + /*----- + * If we are dealing with a WHEN MATCHED case (tupleid is valid), we + * execute the first action for which the additional WHEN MATCHED AND + * quals pass. If an action without quals is found, that action is + * executed. + * + * Similarly, if we are dealing with WHEN NOT MATCHED case, we look at + * the given WHEN NOT MATCHED actions in sequence until one passes. + * + * Things get interesting in case of concurrent update/delete of the + * target tuple. Such concurrent update/delete is detected while we are + * executing a WHEN MATCHED action. + * + * A concurrent update can: + * + * 1. modify the target tuple so that it no longer satisfies the + * additional quals attached to the current WHEN MATCHED action + * + * In this case, we are still dealing with a WHEN MATCHED case. + * We recheck the list of WHEN MATCHED actions from the start and + * choose the first one that satisfies the new target tuple. + * + * 2. modify the target tuple so that the join quals no longer pass and + * hence the source tuple no longer has a match. + * + * In this case, the source tuple no longer matches the target tuple, + * so we now instead find a qualifying WHEN NOT MATCHED action to + * execute. + * + * XXX Hmmm, what if the updated tuple would now match one that was + * considered NOT MATCHED so far? + * + * A concurrent delete changes a WHEN MATCHED case to WHEN NOT MATCHED. + * + * ExecMergeMatched takes care of following the update chain and + * re-finding the qualifying WHEN MATCHED action, as long as the updated + * target tuple still satisfies the join quals, i.e., it remains a WHEN + * MATCHED case. If the tuple gets deleted or the join quals fail, it + * returns and we try ExecMergeNotMatched. Given that ExecMergeMatched + * always make progress by following the update chain and we never switch + * from ExecMergeNotMatched to ExecMergeMatched, there is no risk of a + * livelock. + */ + matched = tupleid != NULL; + if (matched) + matched = ExecMergeMatched(context, resultRelInfo, tupleid, canSetTag); + + /* + * Either we were dealing with a NOT MATCHED tuple or ExecMergeMatched() + * returned "false", indicating the previously MATCHED tuple no longer + * matches. + */ + if (!matched) + ExecMergeNotMatched(context, cds_rri, canSetTag); + + /* No RETURNING support yet */ + return NULL; +} + +/* + * Callback for ModifyTableContext->GetUpdateNewTuple for use by MERGE. It + * computes the updated tuple by projecting from the current merge action's + * projection. + */ +static TupleTableSlot * +mergeGetUpdateNewTuple(ResultRelInfo *relinfo, TupleTableSlot *planSlot, TupleTableSlot *oldSlot, + MergeActionState *relaction) +{ + ExprContext *econtext = relaction->mas_proj->pi_exprContext; + + econtext->ecxt_scantuple = oldSlot; + econtext->ecxt_innertuple = planSlot; + return ExecProject(relaction->mas_proj); +} #endif diff --git a/src/planner/planner.c b/src/planner/planner.c index 8bc19fefbab..a4eb6aeab20 100644 --- a/src/planner/planner.c +++ b/src/planner/planner.c @@ -283,8 +283,19 @@ ts_rte_is_hypertable(const RangeTblEntry *rte, bool *isdistributed) return ht != NULL; } -#define IS_UPDL_CMD(parse) \ - ((parse)->commandType == CMD_UPDATE || (parse)->commandType == CMD_DELETE) +static bool +is_update_del_merge_dml(Query *parse) +{ + if ((parse)->commandType == CMD_UPDATE || + (parse)->commandType == CMD_DELETE +#if PG15_GE + /* MERGE without INSERT action */ + || ((parse)->commandType == CMD_MERGE && !(parse)->mergeUseOuterJoin) +#endif + ) + return true; + return false; +} typedef struct { @@ -383,8 +394,8 @@ preprocess_query(Node *node, PreprocessQueryContext *context) { /* Mark hypertable RTEs we'd like to expand ourselves */ if (ts_guc_enable_optimizations && ts_guc_enable_constraint_exclusion && - !IS_UPDL_CMD(context->rootquery) && query->resultRelation == 0 && - query->rowMarks == NIL && rte->inh) + !is_update_del_merge_dml(context->rootquery) && + query->resultRelation == 0 && query->rowMarks == NIL && rte->inh) rte_mark_for_expansion(rte); if (TS_HYPERTABLE_HAS_COMPRESSION_TABLE(ht)) @@ -1217,14 +1228,14 @@ timescaledb_set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, Rang switch (reltype) { case TS_REL_HYPERTABLE_CHILD: - if (ts_guc_enable_optimizations && IS_UPDL_CMD(root->parse)) + if (ts_guc_enable_optimizations && is_update_del_merge_dml(root->parse)) ts_planner_constraint_cleanup(root, rel); break; case TS_REL_CHUNK_STANDALONE: case TS_REL_CHUNK_CHILD: /* Check for UPDATE/DELETE (DML) on compressed chunks */ - if (IS_UPDL_CMD(root->parse) && dml_involves_hypertable(root, ht, rti)) + if (is_update_del_merge_dml(root->parse) && dml_involves_hypertable(root, ht, rti)) { if (ts_cm_functions->set_rel_pathlist_dml != NULL) ts_cm_functions->set_rel_pathlist_dml(root, rel, rti, rte, ht); @@ -1276,8 +1287,9 @@ timescaledb_get_relation_info_hook(PlannerInfo *root, Oid relation_objectid, boo * condition that rte->requiredPerms is not requiring UPDATE/DELETE on this rel. */ if (ts_guc_enable_optimizations && ts_guc_enable_constraint_exclusion && inhparent && - rte->ctename == NULL && !IS_UPDL_CMD(query) && query->resultRelation == 0 && - query->rowMarks == NIL && (rte->requiredPerms & (ACL_UPDATE | ACL_DELETE)) == 0) + rte->ctename == NULL && !is_update_del_merge_dml(query) && + query->resultRelation == 0 && query->rowMarks == NIL && + (rte->requiredPerms & (ACL_UPDATE | ACL_DELETE)) == 0) { rte_mark_for_expansion(rte); } @@ -1339,7 +1351,7 @@ timescaledb_get_relation_info_hook(PlannerInfo *root, Oid relation_objectid, boo * the trigger behaviour on access nodes, which would otherwise * no longer fire. */ - if (IS_UPDL_CMD(root->parse) && !hypertable_is_distributed(ht)) + if (is_update_del_merge_dml(root->parse) && !hypertable_is_distributed(ht)) mark_dummy_rel(rel); break; case TS_REL_OTHER: @@ -1442,35 +1454,38 @@ replace_hypertable_modify_paths(PlannerInfo *root, List *pathlist, RelOptInfo *i if (IsA(path, ModifyTablePath)) { ModifyTablePath *mt = castNode(ModifyTablePath, path); - + RangeTblEntry *rte = planner_rt_fetch(mt->nominalRelation, root); + Hypertable *ht = ts_planner_get_hypertable(rte->relid, CACHE_FLAG_CHECK); if ( #if PG14_GE /* We only route UPDATE/DELETE through our CustomNode for PG 14+ because * the codepath for earlier versions is different. */ mt->operation == CMD_UPDATE || mt->operation == CMD_DELETE || -#endif -#if PG15_GE - mt->operation == CMD_MERGE || #endif mt->operation == CMD_INSERT) { - RangeTblEntry *rte = planner_rt_fetch(mt->nominalRelation, root); - Hypertable *ht = ts_planner_get_hypertable(rte->relid, CACHE_FLAG_CHECK); - -#if PG15_GE - if (ht && mt->operation == CMD_MERGE) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("The MERGE command does not support hypertables in this " - "version"), - errhint("Check https://github.com/timescale/timescaledb/issues/4929 " - "for more information and current status"))); -#endif if (ht && (mt->operation == CMD_INSERT || !hypertable_is_distributed(ht))) { path = ts_hypertable_modify_path_create(root, mt, ht, input_rel); } } +#if PG15_GE + if (ht && mt->operation == CMD_MERGE) + { + List *firstMergeActionList = linitial(mt->mergeActionLists); + ListCell *l; + /* iterate over merge action to check if there is an INSERT sql */ + foreach (l, firstMergeActionList) + { + MergeAction *action = (MergeAction *) lfirst(l); + if (action->commandType == CMD_INSERT) + { + path = ts_hypertable_modify_path_create(root, mt, ht, input_rel); + break; + } + } + } +#endif } new_pathlist = lappend(new_pathlist, path); diff --git a/test/expected/merge.out b/test/expected/merge.out index b6bcb0a92eb..a860e2d222e 100644 --- a/test/expected/merge.out +++ b/test/expected/merge.out @@ -1,22 +1,24 @@ -- This file and its contents are licensed under the Apache License 2.0. -- Please see the included NOTICE for copyright information and -- LICENSE-APACHE for a copy of the license. --- Create conditions table with location and temperature -CREATE TABLE conditions ( +\c :TEST_DBNAME :ROLE_SUPERUSER +-- Create target table with location and temperature +CREATE TABLE target ( time TIMESTAMPTZ NOT NULL, location SMALLINT NOT NULL, - temperature DOUBLE PRECISION NULL + temperature DOUBLE PRECISION NULL, + val text default 'string -' ); SELECT create_hypertable( - 'conditions', + 'target', 'time', chunk_time_interval => INTERVAL '5 seconds'); - create_hypertable -------------------------- - (1,public,conditions,t) + create_hypertable +--------------------- + (1,public,target,t) (1 row) -INSERT INTO conditions +INSERT INTO target SELECT time, location, 14 as temperature FROM generate_series( '2021-01-01 00:00:00', @@ -24,23 +26,23 @@ FROM generate_series( INTERVAL '5 seconds' ) as time, generate_series(1,4) as location; --- Create conditions_updated table with location and temperature -CREATE TABLE conditions_updated ( +-- Create source table with location and temperature +CREATE TABLE source ( time TIMESTAMPTZ NOT NULL, location SMALLINT NOT NULL, temperature DOUBLE PRECISION NULL ); SELECT create_hypertable( - 'conditions_updated', + 'source', 'time', chunk_time_interval => INTERVAL '5 seconds'); - create_hypertable ---------------------------------- - (2,public,conditions_updated,t) + create_hypertable +--------------------- + (2,public,source,t) (1 row) --- Generate data that overlaps with conditions table -INSERT INTO conditions_updated +-- Generate data that overlaps with target table +INSERT INTO source SELECT time, location, 80 as temperature FROM generate_series( '2021-01-01 00:00:05', @@ -49,20 +51,20 @@ FROM generate_series( ) as time, generate_series(1,4) as location; -- Print table/rows/num of chunks -select * from conditions order by time, location asc; - time | location | temperature -------------------------------+----------+------------- - Fri Jan 01 00:00:00 2021 PST | 1 | 14 - Fri Jan 01 00:00:00 2021 PST | 2 | 14 - Fri Jan 01 00:00:00 2021 PST | 3 | 14 - Fri Jan 01 00:00:00 2021 PST | 4 | 14 - Fri Jan 01 00:00:05 2021 PST | 1 | 14 - Fri Jan 01 00:00:05 2021 PST | 2 | 14 - Fri Jan 01 00:00:05 2021 PST | 3 | 14 - Fri Jan 01 00:00:05 2021 PST | 4 | 14 +select * from target order by time, location asc; + time | location | temperature | val +------------------------------+----------+-------------+---------- + Fri Jan 01 00:00:00 2021 PST | 1 | 14 | string - + Fri Jan 01 00:00:00 2021 PST | 2 | 14 | string - + Fri Jan 01 00:00:00 2021 PST | 3 | 14 | string - + Fri Jan 01 00:00:00 2021 PST | 4 | 14 | string - + Fri Jan 01 00:00:05 2021 PST | 1 | 14 | string - + Fri Jan 01 00:00:05 2021 PST | 2 | 14 | string - + Fri Jan 01 00:00:05 2021 PST | 3 | 14 | string - + Fri Jan 01 00:00:05 2021 PST | 4 | 14 | string - (8 rows) -select * from conditions_updated order by time, location asc; +select * from source order by time, location asc; time | location | temperature ------------------------------+----------+------------- Fri Jan 01 00:00:05 2021 PST | 1 | 80 @@ -75,85 +77,464 @@ select * from conditions_updated order by time, location asc; Fri Jan 01 00:00:10 2021 PST | 4 | 80 (8 rows) -select hypertable_name, count(*) as num_of_chunks from timescaledb_information.chunks group by hypertable_name; - hypertable_name | num_of_chunks ---------------------+--------------- - conditions | 2 - conditions_updated | 2 -(2 rows) - --- Print expected values in the conditions table once conditions_updated is merged into it --- If a key exists in both tables, we take average of the temperature measured --- average logic here is a mess but it works -SELECT COALESCE(c.time, cu.time) as time, - COALESCE(c.location, cu.location) as location, - (COALESCE(c.temperature, cu.temperature) + COALESCE(cu.temperature, c.temperature))/2 as temperature -FROM conditions AS c FULL JOIN conditions_updated AS cu -ON c.time = cu.time AND c.location = cu.location; - time | location | temperature -------------------------------+----------+------------- - Fri Jan 01 00:00:00 2021 PST | 1 | 14 - Fri Jan 01 00:00:00 2021 PST | 2 | 14 - Fri Jan 01 00:00:00 2021 PST | 3 | 14 - Fri Jan 01 00:00:00 2021 PST | 4 | 14 - Fri Jan 01 00:00:05 2021 PST | 1 | 47 - Fri Jan 01 00:00:05 2021 PST | 2 | 47 - Fri Jan 01 00:00:05 2021 PST | 3 | 47 - Fri Jan 01 00:00:05 2021 PST | 4 | 47 - Fri Jan 01 00:00:10 2021 PST | 1 | 80 - Fri Jan 01 00:00:10 2021 PST | 2 | 80 - Fri Jan 01 00:00:10 2021 PST | 3 | 80 - Fri Jan 01 00:00:10 2021 PST | 4 | 80 -(12 rows) +-- CREATE normal PostgreSQL tables +CREATE TABLE target_pg AS SELECT * FROM target; +CREATE TABLE source_pg AS SELECT * FROM source; +-- Merge UPDATE matched rows for normal PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN +UPDATE SET temperature = (t.temperature + s.temperature)/2, val = val || ' UPDATED BY MERGE'; +-- Merge UPDATE matched rows for hypertables +MERGE INTO target t +USING source s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN +UPDATE SET temperature = (t.temperature + s.temperature)/2, val = val || ' UPDATED BY MERGE'; +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + result +-------- + same +(1 row) + +-- Merge DELETE matched rows for normal PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN +DELETE; +-- Merge DELETE matched rows for hypertables +MERGE INTO target t +USING source s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN +DELETE; +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + result +-------- + same +(1 row) + +-- clean up tables +DELETE FROM target_pg; +DELETE FROM target; +DELETE FROM source_pg; +DELETE FROM source; +INSERT INTO target +SELECT time, location, 14 as temperature +FROM generate_series( + '2021-01-01 00:00:00', + '2021-01-01 00:00:09', + INTERVAL '5 seconds' + ) as time, +generate_series(1,4) as location; +INSERT INTO source +SELECT time, location, 80 as temperature +FROM generate_series( + '2021-01-01 00:00:05', + '2021-01-01 00:00:14', + INTERVAL '5 seconds' + ) as time, +generate_series(1,4) as location; +INSERT INTO target_pg SELECT * FROM target; +INSERT INTO source_pg SELECT * FROM source; +-- Merge UPDATE matched rows and INSERT new row for unmatched rows for normal PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN +UPDATE SET temperature = (t.temperature + s.temperature)/2, val = val || ' UPDATED BY MERGE' +WHEN NOT MATCHED THEN +INSERT (time, location, temperature, val) VALUES (s.time, s.location, s.temperature, 'string - INSERTED BY MERGE'); +-- Merge UPDATE matched rows and INSERT new row for unmatched rows for hypertables +MERGE INTO target t +USING source s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN +UPDATE SET temperature = (t.temperature + s.temperature)/2, val = val || ' UPDATED BY MERGE' +WHEN NOT MATCHED THEN +INSERT (time, location, temperature, val) VALUES (s.time, s.location, s.temperature, 'string - INSERTED BY MERGE'); +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + result +-------- + same +(1 row) --- Test that normal PostgreSQL tables can merge without exceptions -CREATE TABLE conditions_pg AS SELECT * FROM conditions; -CREATE TABLE conditions_updated_pg AS SELECT * FROM conditions_updated; -MERGE INTO conditions_pg c -USING conditions_updated_pg cu -ON c.time = cu.time AND c.location = cu.location +-- Merge INSERT with constant literals for normal PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.location = 1234 +WHEN NOT MATCHED THEN +INSERT VALUES ('2021-11-01 00:00:05'::timestamp with time zone, 5, 210, 'string - INSERTED BY MERGE'); +-- Merge INSERT with constant literals for hypertables +MERGE INTO target t +USING source s +ON t.location = 1234 +WHEN NOT MATCHED THEN +INSERT VALUES ('2021-11-01 00:00:05'::timestamp with time zone, 5, 210, 'string - INSERTED BY MERGE'); +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + result +-------- + same +(1 row) + +-- Merge with INSERT/DELETE/UPDATE on PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN + UPDATE SET temperature = (t.temperature + s.temperature) * 2, val = val || ' UPDATED BY MERGE' WHEN MATCHED THEN -UPDATE SET temperature = (c.temperature + cu.temperature)/2 + DELETE WHEN NOT MATCHED THEN -INSERT (time, location, temperature) VALUES (cu.time, cu.location, cu.temperature); -SELECT * FROM conditions_pg ORDER BY time, location ASC; - time | location | temperature -------------------------------+----------+------------- - Fri Jan 01 00:00:00 2021 PST | 1 | 14 - Fri Jan 01 00:00:00 2021 PST | 2 | 14 - Fri Jan 01 00:00:00 2021 PST | 3 | 14 - Fri Jan 01 00:00:00 2021 PST | 4 | 14 - Fri Jan 01 00:00:05 2021 PST | 1 | 47 - Fri Jan 01 00:00:05 2021 PST | 2 | 47 - Fri Jan 01 00:00:05 2021 PST | 3 | 47 - Fri Jan 01 00:00:05 2021 PST | 4 | 47 - Fri Jan 01 00:00:10 2021 PST | 1 | 80 - Fri Jan 01 00:00:10 2021 PST | 2 | 80 - Fri Jan 01 00:00:10 2021 PST | 3 | 80 - Fri Jan 01 00:00:10 2021 PST | 4 | 80 -(12 rows) + INSERT (time, location, temperature, val) VALUES (s.time, s.location, s.temperature, 'string - INSERTED BY MERGE'); +-- Merge with INSERT/DELETE/UPDATE on hypertables +MERGE INTO target t +USING source s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN + UPDATE SET temperature = (t.temperature + s.temperature) * 2, val = val || ' UPDATED BY MERGE' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT (time, location, temperature, val) VALUES (s.time, s.location, s.temperature, 'string - INSERTED BY MERGE'); +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + result +-------- + same +(1 row) --- Merge conditions_updated into conditions -\set ON_ERROR_STOP 0 -MERGE INTO conditions c -USING conditions_updated cu -ON c.time = cu.time AND c.location = cu.location +-- Merge with Subqueries on PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.time = s.time AND t.location > (SELECT count(*) FROM source_pg) +WHEN MATCHED THEN + UPDATE SET temperature = (SELECT count(*) FROM target_pg) * 2, val = val || ' UPDATED BY MERGE' WHEN MATCHED THEN -UPDATE SET temperature = (c.temperature + cu.temperature)/2 + DELETE WHEN NOT MATCHED THEN -INSERT (time, location, temperature) VALUES (cu.time, cu.location, cu.temperature); -ERROR: The MERGE command does not support hypertables in this version -SELECT * FROM conditions ORDER BY time, location ASC; - time | location | temperature -------------------------------+----------+------------- - Fri Jan 01 00:00:00 2021 PST | 1 | 14 - Fri Jan 01 00:00:00 2021 PST | 2 | 14 - Fri Jan 01 00:00:00 2021 PST | 3 | 14 - Fri Jan 01 00:00:00 2021 PST | 4 | 14 - Fri Jan 01 00:00:05 2021 PST | 1 | 14 - Fri Jan 01 00:00:05 2021 PST | 2 | 14 - Fri Jan 01 00:00:05 2021 PST | 3 | 14 - Fri Jan 01 00:00:05 2021 PST | 4 | 14 -(8 rows) + INSERT (time, location, temperature, val) VALUES (s.time, s.location, s.temperature, 'SUBQUERY string - INSERTED BY MERGE'); +-- Merge with Subqueries on hypertables +MERGE INTO target t +USING source s +ON t.time = s.time AND t.location > (SELECT count(*) FROM source) +WHEN MATCHED THEN + UPDATE SET temperature = (SELECT count(*) FROM target) * 2, val = val || ' UPDATED BY MERGE' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT (time, location, temperature, val) VALUES (s.time, s.location, s.temperature, 'SUBQUERY string - INSERTED BY MERGE'); +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + result +-------- + same +(1 row) + +-- clean up tables +DELETE FROM target_pg; +DELETE FROM target; +DELETE FROM source_pg; +DELETE FROM source; +-- TEST with target as hypertable and source as normal PG table +INSERT INTO target +SELECT time, location, 14 as temperature +FROM generate_series( + '2021-01-01 00:00:00', + '2021-01-01 00:00:09', + INTERVAL '5 seconds' + ) as time, +generate_series(1,4) as location; +INSERT INTO source +SELECT time, location, 80 as temperature +FROM generate_series( + '2021-01-01 00:00:05', + '2021-01-01 00:00:14', + INTERVAL '5 seconds' + ) as time, +generate_series(1,4) as location; +INSERT INTO target_pg SELECT * FROM target; +INSERT INTO source_pg SELECT * FROM source; +-- Merge UPDATE matched rows for normal PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN +UPDATE SET temperature = (t.temperature + s.temperature)/2, val = val || ' UPDATED BY MERGE'; +-- Merge UPDATE with target as hypertables and source as normal PG tables +MERGE INTO target t +USING source_pg s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN +UPDATE SET temperature = (t.temperature + s.temperature)/2, val = val || ' UPDATED BY MERGE'; +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + result +-------- + same +(1 row) + +-- Merge DELETE matched rows for normal PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN +DELETE; +-- Merge DELETE with target as hypertables and source as normal PG tables +MERGE INTO target t +USING source_pg s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN +DELETE; +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + result +-------- + same +(1 row) + +-- Merge INSERT with constant literals for normal PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.location = 1234 +WHEN NOT MATCHED THEN +INSERT VALUES ('2021-11-01 00:00:05'::timestamp with time zone, 5, 210, 'string - INSERTED BY MERGE'); +-- Merge INSERT with constant literals for target as hypertables and source as normal PG tables +MERGE INTO target t +USING source s +ON t.location = 1234 +WHEN NOT MATCHED THEN +INSERT VALUES ('2021-11-01 00:00:05'::timestamp with time zone, 5, 210, 'string - INSERTED BY MERGE'); +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + result +-------- + same +(1 row) + +-- Merge with INSERT/DELETE/UPDATE on PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN + UPDATE SET temperature = (t.temperature + s.temperature) * 2, val = val || ' UPDATED BY MERGE' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT (time, location, temperature, val) VALUES (s.time, s.location, s.temperature, 'string - INSERTED BY MERGE'); +-- Merge with INSERT/DELETE/UPDATE on target as hypertables and source as normal PG tables +MERGE INTO target t +USING source s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN + UPDATE SET temperature = (t.temperature + s.temperature) * 2, val = val || ' UPDATED BY MERGE' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT (time, location, temperature, val) VALUES (s.time, s.location, s.temperature, 'string - INSERTED BY MERGE'); +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + result +-------- + same +(1 row) +DROP TABLE target_pg CASCADE; +DROP TABLE target CASCADE; +DROP TABLE source_pg CASCADE; +DROP TABLE source CASCADE; +-- test MERGE with source being a PARTITION table +CREATE TABLE source_pg( + id INT NOT NULL, + dev INT NOT NULL, + value INT, + CONSTRAINT cstr_source_pky PRIMARY KEY (id) +) PARTITION BY LIST (id); +CREATE TABLE source_1_2_3_4 PARTITION OF source_pg FOR VALUES IN (1,2,3,4); +CREATE TABLE source_5_6_7_8 PARTITION OF source_pg FOR VALUES IN (5,6,7,8); +INSERT INTO source_pg SELECT generate_series(1,8), 44,55; +CREATE TABLE target ( + ts TIMESTAMP WITH TIME ZONE NOT NULL, + id INT NOT NULL, + dev INT NOT NULL, + FOREIGN KEY (id) REFERENCES source_pg(id) ON DELETE CASCADE +); +SELECT create_hypertable( + relation => 'target', + time_column_name => 'ts' +); + create_hypertable +--------------------- + (3,public,target,t) +(1 row) + +insert into target values ('2023-01-12 00:00:05'::timestamp with time zone, 1,2); +insert into target values ('2023-01-12 00:00:10'::timestamp with time zone, 2,2); +insert into target values ('2023-01-12 00:00:15'::timestamp with time zone, 3,2); +insert into target values ('2023-01-12 00:00:20'::timestamp with time zone, 4,2); +insert into target values ('2023-01-14 00:00:25'::timestamp with time zone, 5,2); +insert into target values ('2023-01-14 00:00:30'::timestamp with time zone, 6,2); +insert into target values ('2023-01-14 00:00:35'::timestamp with time zone, 7,2); +insert into target values ('2023-01-14 00:00:40'::timestamp with time zone, 8,2); +CREATE TABLE target_pg AS SELECT * FROM target; +-- Merge UPDATE matched rows for normal PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.id = s.id +WHEN MATCHED THEN +UPDATE SET dev = (t.dev + s.dev)/2; +-- Merge UPDATE matched rows for hypertables +MERGE INTO target t +USING source_pg s +ON t.id = s.id +WHEN MATCHED THEN +UPDATE SET dev = (t.dev + s.dev)/2; +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + result +-------- + same +(1 row) + +-- Merge DELETE matched rows for normal PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.id = s.id +WHEN MATCHED THEN +DELETE; +-- Merge DELETE matched rows for hypertables +MERGE INTO target t +USING source_pg s +ON t.id = s.id +WHEN MATCHED THEN +DELETE; +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + result +-------- + same +(1 row) + +-- clean up tables +DROP TABLE target_pg CASCADE; +DROP TABLE target CASCADE; +DROP TABLE source_pg CASCADE; +-- TEST for PERMISSIONS +CREATE USER priv_user; +CREATE USER non_priv_user; +CREATE TABLE target ( + value DOUBLE PRECISION NOT NULL, + time TIMESTAMPTZ NOT NULL +); +SELECT table_name FROM create_hypertable( + 'target'::regclass, + 'time'::name, chunk_time_interval=>interval '8 hours', + create_default_indexes=> false); + table_name +------------ + target +(1 row) + +SELECT '2022-10-10 14:33:44.1234+05:30' as start_date \gset +INSERT INTO target (value, time) + SELECT 1,t from generate_series(:'start_date'::timestamptz, :'start_date'::timestamptz + interval '1 day', '5m') t cross join + generate_series(1,3) s; +CREATE TABLE source ( + time TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL, + value DOUBLE PRECISION NOT NULL + ); +SELECT table_name FROM create_hypertable( + 'source'::regclass, + 'time'::name, chunk_time_interval=>interval '6 hours', + create_default_indexes=> false); + table_name +------------ + source +(1 row) + +ALTER TABLE target OWNER TO priv_user; +ALTER TABLE source OWNER TO priv_user; +GRANT SELECT ON source TO non_priv_user; +SET SESSION AUTHORIZATION non_priv_user; +\set ON_ERROR_STOP 0 +-- non_priv_user does not have UPDATE privilege on target table +MERGE INTO target +USING source +ON target.time = source.time +WHEN MATCHED THEN + UPDATE SET value = 0; +ERROR: permission denied for table target +-- non_priv_user does not have DELETE privilege on target table +MERGE INTO target +USING source +ON target.time = source.time +WHEN MATCHED THEN + DELETE; +ERROR: permission denied for table target +-- non_priv_user does not have INSERT privilege on target table +MERGE INTO target +USING source +ON target.time = source.time +WHEN NOT MATCHED THEN + INSERT VALUES (10, '2023-01-15 00:00:10'::timestamp with time zone); +ERROR: permission denied for table target \set ON_ERROR_STOP 1 +RESET SESSION AUTHORIZATION; +DROP TABLE target; +DROP TABLE source; +DROP USER priv_user; +DROP USER non_priv_user; diff --git a/test/sql/merge.sql b/test/sql/merge.sql index b9c3207932c..13ac39495e6 100644 --- a/test/sql/merge.sql +++ b/test/sql/merge.sql @@ -2,20 +2,22 @@ -- Please see the included NOTICE for copyright information and -- LICENSE-APACHE for a copy of the license. --- Create conditions table with location and temperature -CREATE TABLE conditions ( +\c :TEST_DBNAME :ROLE_SUPERUSER + +-- Create target table with location and temperature +CREATE TABLE target ( time TIMESTAMPTZ NOT NULL, location SMALLINT NOT NULL, - temperature DOUBLE PRECISION NULL + temperature DOUBLE PRECISION NULL, + val text default 'string -' ); SELECT create_hypertable( - 'conditions', + 'target', 'time', chunk_time_interval => INTERVAL '5 seconds'); - -INSERT INTO conditions +INSERT INTO target SELECT time, location, 14 as temperature FROM generate_series( '2021-01-01 00:00:00', @@ -24,20 +26,20 @@ FROM generate_series( ) as time, generate_series(1,4) as location; --- Create conditions_updated table with location and temperature -CREATE TABLE conditions_updated ( +-- Create source table with location and temperature +CREATE TABLE source ( time TIMESTAMPTZ NOT NULL, location SMALLINT NOT NULL, temperature DOUBLE PRECISION NULL ); SELECT create_hypertable( - 'conditions_updated', + 'source', 'time', chunk_time_interval => INTERVAL '5 seconds'); --- Generate data that overlaps with conditions table -INSERT INTO conditions_updated +-- Generate data that overlaps with target table +INSERT INTO source SELECT time, location, 80 as temperature FROM generate_series( '2021-01-01 00:00:05', @@ -47,40 +49,455 @@ FROM generate_series( generate_series(1,4) as location; -- Print table/rows/num of chunks -select * from conditions order by time, location asc; -select * from conditions_updated order by time, location asc; -select hypertable_name, count(*) as num_of_chunks from timescaledb_information.chunks group by hypertable_name; - --- Print expected values in the conditions table once conditions_updated is merged into it --- If a key exists in both tables, we take average of the temperature measured --- average logic here is a mess but it works -SELECT COALESCE(c.time, cu.time) as time, - COALESCE(c.location, cu.location) as location, - (COALESCE(c.temperature, cu.temperature) + COALESCE(cu.temperature, c.temperature))/2 as temperature -FROM conditions AS c FULL JOIN conditions_updated AS cu -ON c.time = cu.time AND c.location = cu.location; - --- Test that normal PostgreSQL tables can merge without exceptions -CREATE TABLE conditions_pg AS SELECT * FROM conditions; -CREATE TABLE conditions_updated_pg AS SELECT * FROM conditions_updated; -MERGE INTO conditions_pg c -USING conditions_updated_pg cu -ON c.time = cu.time AND c.location = cu.location -WHEN MATCHED THEN -UPDATE SET temperature = (c.temperature + cu.temperature)/2 +select * from target order by time, location asc; +select * from source order by time, location asc; + +-- CREATE normal PostgreSQL tables +CREATE TABLE target_pg AS SELECT * FROM target; +CREATE TABLE source_pg AS SELECT * FROM source; + +-- Merge UPDATE matched rows for normal PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN +UPDATE SET temperature = (t.temperature + s.temperature)/2, val = val || ' UPDATED BY MERGE'; + +-- Merge UPDATE matched rows for hypertables +MERGE INTO target t +USING source s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN +UPDATE SET temperature = (t.temperature + s.temperature)/2, val = val || ' UPDATED BY MERGE'; + +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + +-- Merge DELETE matched rows for normal PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN +DELETE; + +-- Merge DELETE matched rows for hypertables +MERGE INTO target t +USING source s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN +DELETE; + +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + +-- clean up tables +DELETE FROM target_pg; +DELETE FROM target; +DELETE FROM source_pg; +DELETE FROM source; + +INSERT INTO target +SELECT time, location, 14 as temperature +FROM generate_series( + '2021-01-01 00:00:00', + '2021-01-01 00:00:09', + INTERVAL '5 seconds' + ) as time, +generate_series(1,4) as location; + +INSERT INTO source +SELECT time, location, 80 as temperature +FROM generate_series( + '2021-01-01 00:00:05', + '2021-01-01 00:00:14', + INTERVAL '5 seconds' + ) as time, +generate_series(1,4) as location; + +INSERT INTO target_pg SELECT * FROM target; +INSERT INTO source_pg SELECT * FROM source; + +-- Merge UPDATE matched rows and INSERT new row for unmatched rows for normal PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN +UPDATE SET temperature = (t.temperature + s.temperature)/2, val = val || ' UPDATED BY MERGE' +WHEN NOT MATCHED THEN +INSERT (time, location, temperature, val) VALUES (s.time, s.location, s.temperature, 'string - INSERTED BY MERGE'); + +-- Merge UPDATE matched rows and INSERT new row for unmatched rows for hypertables +MERGE INTO target t +USING source s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN +UPDATE SET temperature = (t.temperature + s.temperature)/2, val = val || ' UPDATED BY MERGE' +WHEN NOT MATCHED THEN +INSERT (time, location, temperature, val) VALUES (s.time, s.location, s.temperature, 'string - INSERTED BY MERGE'); + +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + +-- Merge INSERT with constant literals for normal PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.location = 1234 +WHEN NOT MATCHED THEN +INSERT VALUES ('2021-11-01 00:00:05'::timestamp with time zone, 5, 210, 'string - INSERTED BY MERGE'); + +-- Merge INSERT with constant literals for hypertables +MERGE INTO target t +USING source s +ON t.location = 1234 +WHEN NOT MATCHED THEN +INSERT VALUES ('2021-11-01 00:00:05'::timestamp with time zone, 5, 210, 'string - INSERTED BY MERGE'); + +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + +-- Merge with INSERT/DELETE/UPDATE on PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN + UPDATE SET temperature = (t.temperature + s.temperature) * 2, val = val || ' UPDATED BY MERGE' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT (time, location, temperature, val) VALUES (s.time, s.location, s.temperature, 'string - INSERTED BY MERGE'); + +-- Merge with INSERT/DELETE/UPDATE on hypertables +MERGE INTO target t +USING source s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN + UPDATE SET temperature = (t.temperature + s.temperature) * 2, val = val || ' UPDATED BY MERGE' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT (time, location, temperature, val) VALUES (s.time, s.location, s.temperature, 'string - INSERTED BY MERGE'); + +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + +-- Merge with Subqueries on PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.time = s.time AND t.location > (SELECT count(*) FROM source_pg) +WHEN MATCHED THEN + UPDATE SET temperature = (SELECT count(*) FROM target_pg) * 2, val = val || ' UPDATED BY MERGE' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT (time, location, temperature, val) VALUES (s.time, s.location, s.temperature, 'SUBQUERY string - INSERTED BY MERGE'); + +-- Merge with Subqueries on hypertables +MERGE INTO target t +USING source s +ON t.time = s.time AND t.location > (SELECT count(*) FROM source) +WHEN MATCHED THEN + UPDATE SET temperature = (SELECT count(*) FROM target) * 2, val = val || ' UPDATED BY MERGE' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT (time, location, temperature, val) VALUES (s.time, s.location, s.temperature, 'SUBQUERY string - INSERTED BY MERGE'); + +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + +-- clean up tables +DELETE FROM target_pg; +DELETE FROM target; +DELETE FROM source_pg; +DELETE FROM source; + +-- TEST with target as hypertable and source as normal PG table +INSERT INTO target +SELECT time, location, 14 as temperature +FROM generate_series( + '2021-01-01 00:00:00', + '2021-01-01 00:00:09', + INTERVAL '5 seconds' + ) as time, +generate_series(1,4) as location; + +INSERT INTO source +SELECT time, location, 80 as temperature +FROM generate_series( + '2021-01-01 00:00:05', + '2021-01-01 00:00:14', + INTERVAL '5 seconds' + ) as time, +generate_series(1,4) as location; + +INSERT INTO target_pg SELECT * FROM target; +INSERT INTO source_pg SELECT * FROM source; + +-- Merge UPDATE matched rows for normal PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN +UPDATE SET temperature = (t.temperature + s.temperature)/2, val = val || ' UPDATED BY MERGE'; + +-- Merge UPDATE with target as hypertables and source as normal PG tables +MERGE INTO target t +USING source_pg s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN +UPDATE SET temperature = (t.temperature + s.temperature)/2, val = val || ' UPDATED BY MERGE'; + +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + +-- Merge DELETE matched rows for normal PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN +DELETE; + +-- Merge DELETE with target as hypertables and source as normal PG tables +MERGE INTO target t +USING source_pg s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN +DELETE; + +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + +-- Merge INSERT with constant literals for normal PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.location = 1234 WHEN NOT MATCHED THEN -INSERT (time, location, temperature) VALUES (cu.time, cu.location, cu.temperature); -SELECT * FROM conditions_pg ORDER BY time, location ASC; +INSERT VALUES ('2021-11-01 00:00:05'::timestamp with time zone, 5, 210, 'string - INSERTED BY MERGE'); + +-- Merge INSERT with constant literals for target as hypertables and source as normal PG tables +MERGE INTO target t +USING source s +ON t.location = 1234 +WHEN NOT MATCHED THEN +INSERT VALUES ('2021-11-01 00:00:05'::timestamp with time zone, 5, 210, 'string - INSERTED BY MERGE'); + +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + +-- Merge with INSERT/DELETE/UPDATE on PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN + UPDATE SET temperature = (t.temperature + s.temperature) * 2, val = val || ' UPDATED BY MERGE' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT (time, location, temperature, val) VALUES (s.time, s.location, s.temperature, 'string - INSERTED BY MERGE'); + +-- Merge with INSERT/DELETE/UPDATE on target as hypertables and source as normal PG tables +MERGE INTO target t +USING source s +ON t.time = s.time AND t.location = s.location +WHEN MATCHED THEN + UPDATE SET temperature = (t.temperature + s.temperature) * 2, val = val || ' UPDATED BY MERGE' +WHEN MATCHED THEN + DELETE +WHEN NOT MATCHED THEN + INSERT (time, location, temperature, val) VALUES (s.time, s.location, s.temperature, 'string - INSERTED BY MERGE'); + +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + +DROP TABLE target_pg CASCADE; +DROP TABLE target CASCADE; +DROP TABLE source_pg CASCADE; +DROP TABLE source CASCADE; + +-- test MERGE with source being a PARTITION table +CREATE TABLE source_pg( + id INT NOT NULL, + dev INT NOT NULL, + value INT, + CONSTRAINT cstr_source_pky PRIMARY KEY (id) +) PARTITION BY LIST (id); + +CREATE TABLE source_1_2_3_4 PARTITION OF source_pg FOR VALUES IN (1,2,3,4); +CREATE TABLE source_5_6_7_8 PARTITION OF source_pg FOR VALUES IN (5,6,7,8); + +INSERT INTO source_pg SELECT generate_series(1,8), 44,55; + +CREATE TABLE target ( + ts TIMESTAMP WITH TIME ZONE NOT NULL, + id INT NOT NULL, + dev INT NOT NULL, + FOREIGN KEY (id) REFERENCES source_pg(id) ON DELETE CASCADE +); + +SELECT create_hypertable( + relation => 'target', + time_column_name => 'ts' +); + +insert into target values ('2023-01-12 00:00:05'::timestamp with time zone, 1,2); +insert into target values ('2023-01-12 00:00:10'::timestamp with time zone, 2,2); +insert into target values ('2023-01-12 00:00:15'::timestamp with time zone, 3,2); +insert into target values ('2023-01-12 00:00:20'::timestamp with time zone, 4,2); +insert into target values ('2023-01-14 00:00:25'::timestamp with time zone, 5,2); +insert into target values ('2023-01-14 00:00:30'::timestamp with time zone, 6,2); +insert into target values ('2023-01-14 00:00:35'::timestamp with time zone, 7,2); +insert into target values ('2023-01-14 00:00:40'::timestamp with time zone, 8,2); + +CREATE TABLE target_pg AS SELECT * FROM target; + +-- Merge UPDATE matched rows for normal PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.id = s.id +WHEN MATCHED THEN +UPDATE SET dev = (t.dev + s.dev)/2; + +-- Merge UPDATE matched rows for hypertables +MERGE INTO target t +USING source_pg s +ON t.id = s.id +WHEN MATCHED THEN +UPDATE SET dev = (t.dev + s.dev)/2; + +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + +-- Merge DELETE matched rows for normal PG tables +MERGE INTO target_pg t +USING source_pg s +ON t.id = s.id +WHEN MATCHED THEN +DELETE; + +-- Merge DELETE matched rows for hypertables +MERGE INTO target t +USING source_pg s +ON t.id = s.id +WHEN MATCHED THEN +DELETE; + +-- ensure TARGET PG table and hypertable are same +SELECT CASE WHEN EXISTS (TABLE target EXCEPT TABLE target_pg) + OR EXISTS (TABLE target_pg EXCEPT TABLE target) + THEN 'different' + ELSE 'same' + END AS result; + +-- clean up tables +DROP TABLE target_pg CASCADE; +DROP TABLE target CASCADE; +DROP TABLE source_pg CASCADE; + +-- TEST for PERMISSIONS +CREATE USER priv_user; +CREATE USER non_priv_user; + +CREATE TABLE target ( + value DOUBLE PRECISION NOT NULL, + time TIMESTAMPTZ NOT NULL +); + +SELECT table_name FROM create_hypertable( + 'target'::regclass, + 'time'::name, chunk_time_interval=>interval '8 hours', + create_default_indexes=> false); + +SELECT '2022-10-10 14:33:44.1234+05:30' as start_date \gset +INSERT INTO target (value, time) + SELECT 1,t from generate_series(:'start_date'::timestamptz, :'start_date'::timestamptz + interval '1 day', '5m') t cross join + generate_series(1,3) s; + +CREATE TABLE source ( + time TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL, + value DOUBLE PRECISION NOT NULL + ); + +SELECT table_name FROM create_hypertable( + 'source'::regclass, + 'time'::name, chunk_time_interval=>interval '6 hours', + create_default_indexes=> false); + +ALTER TABLE target OWNER TO priv_user; +ALTER TABLE source OWNER TO priv_user; + +GRANT SELECT ON source TO non_priv_user; +SET SESSION AUTHORIZATION non_priv_user; --- Merge conditions_updated into conditions \set ON_ERROR_STOP 0 -MERGE INTO conditions c -USING conditions_updated cu -ON c.time = cu.time AND c.location = cu.location +-- non_priv_user does not have UPDATE privilege on target table +MERGE INTO target +USING source +ON target.time = source.time WHEN MATCHED THEN -UPDATE SET temperature = (c.temperature + cu.temperature)/2 + UPDATE SET value = 0; + +-- non_priv_user does not have DELETE privilege on target table +MERGE INTO target +USING source +ON target.time = source.time +WHEN MATCHED THEN + DELETE; + +-- non_priv_user does not have INSERT privilege on target table +MERGE INTO target +USING source +ON target.time = source.time WHEN NOT MATCHED THEN -INSERT (time, location, temperature) VALUES (cu.time, cu.location, cu.temperature); + INSERT VALUES (10, '2023-01-15 00:00:10'::timestamp with time zone); -SELECT * FROM conditions ORDER BY time, location ASC; \set ON_ERROR_STOP 1 + +RESET SESSION AUTHORIZATION; +DROP TABLE target; +DROP TABLE source; +DROP USER priv_user; +DROP USER non_priv_user; diff --git a/tsl/test/expected/merge_compress.out b/tsl/test/expected/merge_compress.out new file mode 100644 index 00000000000..9bd4c61df9b --- /dev/null +++ b/tsl/test/expected/merge_compress.out @@ -0,0 +1,111 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +CREATE TABLE target ( + time TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL, + value DOUBLE PRECISION NOT NULL, + series_id BIGINT NOT NULL, + partition_column TIMESTAMPTZ NOT NULL +); +SELECT table_name FROM create_hypertable( + 'target'::regclass, + 'partition_column'::name, chunk_time_interval=>interval '8 hours', + create_default_indexes=> false); + table_name +------------ + target +(1 row) + +-- enable compression +ALTER TABLE target SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'series_id', + timescaledb.compress_orderby = 'partition_column, value' +); +SELECT '2022-10-10 14:33:44.1234+05:30' as start_date \gset +INSERT INTO target (series_id, value, partition_column) + SELECT s,1,t from generate_series(:'start_date'::timestamptz, :'start_date'::timestamptz + interval '1 day', '5m') t cross join + generate_series(1,3, 1) s; +-- compress chunks +SELECT count(compress_chunk(c.schema_name|| '.' || c.table_name)) + FROM _timescaledb_catalog.chunk c, _timescaledb_catalog.hypertable ht where + c.hypertable_id = ht.id and ht.table_name = 'target' and c.compressed_chunk_id IS NULL; + count +------- + 4 +(1 row) + +CREATE TABLE source ( + time TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL, + value DOUBLE PRECISION NOT NULL, + series_id BIGINT NOT NULL + ); +SELECT table_name FROM create_hypertable( + 'source'::regclass, + 'time'::name, chunk_time_interval=>interval '6 hours', + create_default_indexes=> false); + table_name +------------ + source +(1 row) + +-- enable compression +ALTER TABLE source SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'series_id', + timescaledb.compress_orderby = 'time, value' +); +SELECT '2022-10-10 10:00:00.0123+05:30' as start_date \gset +INSERT INTO source (time, series_id, value) + SELECT t, s,1 from generate_series(:'start_date'::timestamptz, :'start_date'::timestamptz + interval '1 day', '5m') t cross join + generate_series(1,2, 1) s; +-- compress chunks +SELECT count(compress_chunk(c.schema_name|| '.' || c.table_name)) + FROM _timescaledb_catalog.chunk c, _timescaledb_catalog.hypertable ht where + c.hypertable_id = ht.id and ht.table_name = 'source' and c.compressed_chunk_id IS NULL; + count +------- + 5 +(1 row) + +-- Merge UPDATE on compressed hypertables should report error +\set ON_ERROR_STOP 0 +MERGE INTO target t + USING source s + ON t.value = s.value AND t.series_id = s.series_id + WHEN MATCHED THEN + UPDATE SET series_id = (t.series_id * 0.123); +ERROR: cannot update/delete rows from chunk "_hyper_1_1_chunk" as it is compressed +-- Merge DELETE on compressed hypertables should report error +MERGE INTO target t + USING source s + ON t.value = s.value AND t.series_id = s.series_id + WHEN MATCHED THEN + DELETE; +ERROR: cannot update/delete rows from chunk "_hyper_1_1_chunk" as it is compressed +\set ON_ERROR_STOP 1 +-- total compressed chunks +SELECT count(*) AS "total compressed_chunks", is_compressed FROM timescaledb_information.chunks WHERE + hypertable_name = 'target' GROUP BY is_compressed; + total compressed_chunks | is_compressed +-------------------------+--------------- + 4 | t +(1 row) + +-- Merge INSERT on compressed hypertables should work +MERGE INTO target t + USING source s + ON t.partition_column = s.time AND t.value = s.value + WHEN NOT MATCHED THEN + INSERT VALUES ('2021-11-01 00:00:05'::timestamp with time zone, 5, 210, '2021-11-01 00:00:05'::timestamp with time zone); +-- you should notice 1 uncompressed chunk +SELECT count(*) AS "total compressed_chunks", is_compressed FROM timescaledb_information.chunks WHERE + hypertable_name = 'target' GROUP BY is_compressed; + total compressed_chunks | is_compressed +-------------------------+--------------- + 1 | f + 4 | t +(2 rows) + +DROP TABLE target; +DROP TABLE source; diff --git a/tsl/test/sql/CMakeLists.txt b/tsl/test/sql/CMakeLists.txt index 1e5ee521c0b..da57248446d 100644 --- a/tsl/test/sql/CMakeLists.txt +++ b/tsl/test/sql/CMakeLists.txt @@ -103,6 +103,10 @@ if((${PG_VERSION_MAJOR} GREATER_EQUAL "14")) endif() endif() +if((${PG_VERSION_MAJOR} GREATER_EQUAL "15")) + list(APPEND TEST_FILES merge_compress.sql) +endif() + set(SOLO_TESTS # dist_hypertable needs a lot of memory when the Sanitizer is active dist_hypertable-${PG_VERSION_MAJOR} diff --git a/tsl/test/sql/merge_compress.sql b/tsl/test/sql/merge_compress.sql new file mode 100644 index 00000000000..c671dddef53 --- /dev/null +++ b/tsl/test/sql/merge_compress.sql @@ -0,0 +1,92 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + +CREATE TABLE target ( + time TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL, + value DOUBLE PRECISION NOT NULL, + series_id BIGINT NOT NULL, + partition_column TIMESTAMPTZ NOT NULL +); + +SELECT table_name FROM create_hypertable( + 'target'::regclass, + 'partition_column'::name, chunk_time_interval=>interval '8 hours', + create_default_indexes=> false); + +-- enable compression +ALTER TABLE target SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'series_id', + timescaledb.compress_orderby = 'partition_column, value' +); + +SELECT '2022-10-10 14:33:44.1234+05:30' as start_date \gset +INSERT INTO target (series_id, value, partition_column) + SELECT s,1,t from generate_series(:'start_date'::timestamptz, :'start_date'::timestamptz + interval '1 day', '5m') t cross join + generate_series(1,3, 1) s; + +-- compress chunks +SELECT count(compress_chunk(c.schema_name|| '.' || c.table_name)) + FROM _timescaledb_catalog.chunk c, _timescaledb_catalog.hypertable ht where + c.hypertable_id = ht.id and ht.table_name = 'target' and c.compressed_chunk_id IS NULL; + +CREATE TABLE source ( + time TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL, + value DOUBLE PRECISION NOT NULL, + series_id BIGINT NOT NULL + ); +SELECT table_name FROM create_hypertable( + 'source'::regclass, + 'time'::name, chunk_time_interval=>interval '6 hours', + create_default_indexes=> false); +-- enable compression +ALTER TABLE source SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'series_id', + timescaledb.compress_orderby = 'time, value' +); + +SELECT '2022-10-10 10:00:00.0123+05:30' as start_date \gset +INSERT INTO source (time, series_id, value) + SELECT t, s,1 from generate_series(:'start_date'::timestamptz, :'start_date'::timestamptz + interval '1 day', '5m') t cross join + generate_series(1,2, 1) s; + +-- compress chunks +SELECT count(compress_chunk(c.schema_name|| '.' || c.table_name)) + FROM _timescaledb_catalog.chunk c, _timescaledb_catalog.hypertable ht where + c.hypertable_id = ht.id and ht.table_name = 'source' and c.compressed_chunk_id IS NULL; + +-- Merge UPDATE on compressed hypertables should report error +\set ON_ERROR_STOP 0 +MERGE INTO target t + USING source s + ON t.value = s.value AND t.series_id = s.series_id + WHEN MATCHED THEN + UPDATE SET series_id = (t.series_id * 0.123); + +-- Merge DELETE on compressed hypertables should report error +MERGE INTO target t + USING source s + ON t.value = s.value AND t.series_id = s.series_id + WHEN MATCHED THEN + DELETE; +\set ON_ERROR_STOP 1 + +-- total compressed chunks +SELECT count(*) AS "total compressed_chunks", is_compressed FROM timescaledb_information.chunks WHERE + hypertable_name = 'target' GROUP BY is_compressed; + +-- Merge INSERT on compressed hypertables should work +MERGE INTO target t + USING source s + ON t.partition_column = s.time AND t.value = s.value + WHEN NOT MATCHED THEN + INSERT VALUES ('2021-11-01 00:00:05'::timestamp with time zone, 5, 210, '2021-11-01 00:00:05'::timestamp with time zone); + +-- you should notice 1 uncompressed chunk +SELECT count(*) AS "total compressed_chunks", is_compressed FROM timescaledb_information.chunks WHERE + hypertable_name = 'target' GROUP BY is_compressed; + +DROP TABLE target; +DROP TABLE source;