Skip to content

Commit

Permalink
MERGE support on hypertables
Browse files Browse the repository at this point in the history
This patch does following:

1. Planner changes to create ChunkDispatch node when MERGE command
   has INSERT action.
2. Changes to map partition attributes from a tuple returned from
   child node of ChunkDispatch against physical targetlist, so that
   ChunkDispatch node can read the correct value from partition column.
3. Fixed issues with MERGE on compressed hypertable.
4. Added more testcases.
5. MERGE in distributed hypertables is not supported.
6. Since there is no Custom Scan (HypertableModify) node for MERGE
   with UPDATE/DELETE on compressed hypertables, we don't support this.

Fixes #5139
  • Loading branch information
sb230132 committed May 23, 2023
1 parent d984932 commit 90c16bf
Show file tree
Hide file tree
Showing 23 changed files with 6,748 additions and 180 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ This release includes these noteworthy features:
* #5584 Reduce decompression during constraint checking
* #5530 Optimize compressed chunk resorting
* #5639 Support sending telemetry event reports
* #5150 MERGE support on hypertables

**Bugfixes**
* #5396 Fix SEGMENTBY columns predicates to be pushed down
Expand Down
28 changes: 25 additions & 3 deletions src/import/ht_hypertable_modify.c
Original file line number Diff line number Diff line change
Expand Up @@ -766,9 +766,31 @@ ht_ExecMergeNotMatched(ModifyTableContext * context, ResultRelInfo * resultRelIn
*/
newslot = ExecProject(action->mas_proj);
context->relaction = action;

(void)
ExecInsert(context, mtstate->rootResultRelInfo, newslot, canSetTag);
if (cds->is_dropped_attr_exists)
{
AttrMap *map;
TupleDesc parenttupdesc, chunktupdesc;
TupleTableSlot *chunk_slot = NULL;

parenttupdesc = RelationGetDescr(resultRelInfo->ri_RelationDesc);
chunktupdesc = RelationGetDescr(cds->rri->ri_RelationDesc);
/* map from parent to chunk */
map = build_attrmap_by_name_if_req(parenttupdesc, chunktupdesc);
if (map != NULL)
chunk_slot =
execute_attr_map_slot(map,
newslot,
MakeSingleTupleTableSlot(chunktupdesc,
&TTSOpsVirtual));
(void) ExecInsert(context,
cds->rri,
(chunk_slot ? chunk_slot : newslot),
canSetTag);
if (chunk_slot)
ExecDropSingleTupleTableSlot(chunk_slot);
}
else
(void) ExecInsert(context, cds->rri, newslot, canSetTag);
mtstate->mt_merge_inserted = 1;
break;
case CMD_NOTHING:
Expand Down
66 changes: 65 additions & 1 deletion src/nodes/chunk_dispatch/chunk_dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <nodes/nodeFuncs.h>
#include <parser/parsetree.h>
#include <utils/rel.h>
#include <utils/syscache.h>
#include <catalog/pg_type.h>

#include "compat/compat.h"
Expand All @@ -20,6 +21,7 @@
#include "subspace_store.h"
#include "dimension.h"
#include "guc.h"
#include "nodes/hypertable_modify.h"
#include "ts_catalog/chunk_data_node.h"

static Node *chunk_dispatch_state_create(CustomScan *cscan);
Expand Down Expand Up @@ -244,6 +246,15 @@ chunk_dispatch_plan_create(PlannerInfo *root, RelOptInfo *relopt, CustomPath *be
cscan->custom_scan_tlist = tlist;
cscan->scan.plan.targetlist = tlist;

#if PG15_GE
if (root->parse->mergeUseOuterJoin)
{
/* replace expressions of ROWID_VAR */
tlist = ts_replace_rowid_vars(root, tlist, relopt->relid);
cscan->scan.plan.targetlist = tlist;
cscan->custom_scan_tlist = tlist;
}
#endif
return &cscan->scan.plan;
}

Expand Down Expand Up @@ -339,8 +350,61 @@ chunk_dispatch_exec(CustomScanState *node)
/* Switch to the executor's per-tuple memory context */
old = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));

#if PG15_GE
TupleTableSlot *newslot = NULL;
if (dispatch->dispatch_state->mtstate->operation == CMD_MERGE)
{
HeapTuple tp;
AttrNumber natts;
AttrNumber attno;

tp = SearchSysCache1(RELOID, ObjectIdGetDatum(ht->main_table_relid));
if (!HeapTupleIsValid(tp))
elog(ERROR, "cache lookup failed for relation %u", ht->main_table_relid);
natts = ((Form_pg_class) GETSTRUCT(tp))->relnatts;
ReleaseSysCache(tp);
for (attno = 1; attno <= natts; attno++)
{
tp = SearchSysCache2(ATTNUM,
ObjectIdGetDatum(ht->main_table_relid),
Int16GetDatum(attno));
if (!HeapTupleIsValid(tp))
continue;
Form_pg_attribute att_tup = (Form_pg_attribute) GETSTRUCT(tp);
ReleaseSysCache(tp);
if (att_tup->attisdropped || att_tup->atthasmissing)
{
state->is_dropped_attr_exists = true;
continue;
}
}
for (int i = 0; i < ht->space->num_dimensions; i++)
{
List *actionStates =
dispatch->dispatch_state->mtstate->resultRelInfo->ri_notMatchedMergeAction;
ListCell *l;
foreach (l, actionStates)
{
MergeActionState *action = (MergeActionState *) lfirst(l);
CmdType commandType = action->mas_action->commandType;
if (commandType == CMD_INSERT)
{
/* fetch full projection list */
action->mas_proj->pi_exprContext->ecxt_innertuple = slot;
newslot = ExecProject(action->mas_proj);
break;
}
}
if (newslot)
break;
}
}
/* Calculate the tuple's point in the N-dimensional hyperspace */
point = ts_hyperspace_calculate_point(ht->space, (newslot ? newslot : slot));

#else
point = ts_hyperspace_calculate_point(ht->space, slot);
#endif

/* Save the main table's (hypertable's) ResultRelInfo */
if (!dispatch->hypertable_result_rel_info)
Expand Down Expand Up @@ -375,7 +439,7 @@ chunk_dispatch_exec(CustomScanState *node)
MemoryContextSwitchTo(old);

/* Convert the tuple to the chunk's rowtype, if necessary */
if (cis->hyper_to_chunk_map != NULL)
if (cis->hyper_to_chunk_map != NULL && state->is_dropped_attr_exists == false)
slot = execute_attr_map_slot(cis->hyper_to_chunk_map->attrMap, slot, cis->slot);

return slot;
Expand Down
2 changes: 2 additions & 0 deletions src/nodes/chunk_dispatch/chunk_dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ typedef struct ChunkDispatchState
*/
ChunkDispatch *dispatch;
ResultRelInfo *rri;
/* flag to represent dropped attributes */
bool is_dropped_attr_exists;
} ChunkDispatchState;

extern TSDLLEXPORT bool ts_is_chunk_dispatch_state(PlanState *state);
Expand Down
Loading

0 comments on commit 90c16bf

Please sign in to comment.