Skip to content

Commit

Permalink
MERGE support on hypertables
Browse files Browse the repository at this point in the history
This patch does following:

1. Planner changes to create ChunkDispatch node when MERGE command
   has INSERT action.
2. Changes to map partition attributes from a tuple returned from
   child node of ChunkDispatch against physical targetlist, so that
   ChunkDispatch node can read the correct value from partition column.
3. Fixed issues with MERGE on compressed hypertable.
4. Added more testcases.
5. MERGE in distributed hypertables is not supported.
6. Since there is no Custom Scan (HypertableModify) node for MERGE
   with UPDATE/DELETE on compressed hypertables, we don't support this.

Fixes #5139
  • Loading branch information
sb230132 committed May 21, 2023
1 parent d984932 commit a8cf502
Show file tree
Hide file tree
Showing 24 changed files with 6,752 additions and 188 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ This release includes these noteworthy features:
* #5584 Reduce decompression during constraint checking
* #5530 Optimize compressed chunk resorting
* #5639 Support sending telemetry event reports
* #5150 MERGE support on hypertables

**Bugfixes**
* #5396 Fix SEGMENTBY columns predicates to be pushed down
Expand Down
4 changes: 3 additions & 1 deletion src/dimension.c
Original file line number Diff line number Diff line change
Expand Up @@ -970,10 +970,12 @@ ts_hyperspace_calculate_point(const Hyperspace *hs, TupleTableSlot *slot)
bool isnull;
Oid dimtype;

AttrNumber partition_col_idx = d->column_attno;

if (NULL != d->partitioning)
datum = ts_partitioning_func_apply_slot(d->partitioning, slot, &isnull);
else
datum = slot_getattr(slot, d->column_attno, &isnull);
datum = slot_getattr(slot, partition_col_idx, &isnull);

switch (d->type)
{
Expand Down
28 changes: 25 additions & 3 deletions src/import/ht_hypertable_modify.c
Original file line number Diff line number Diff line change
Expand Up @@ -766,9 +766,31 @@ ht_ExecMergeNotMatched(ModifyTableContext * context, ResultRelInfo * resultRelIn
*/
newslot = ExecProject(action->mas_proj);
context->relaction = action;

(void)
ExecInsert(context, mtstate->rootResultRelInfo, newslot, canSetTag);
if (cds->is_dropped_attr_exists)
{
AttrMap *map;
TupleDesc parenttupdesc, chunktupdesc;
TupleTableSlot *chunk_slot = NULL;

parenttupdesc = RelationGetDescr(resultRelInfo->ri_RelationDesc);
chunktupdesc = RelationGetDescr(cds->rri->ri_RelationDesc);
/* map from parent to chunk */
map = build_attrmap_by_name_if_req(parenttupdesc, chunktupdesc);
if (map != NULL)
chunk_slot =
execute_attr_map_slot(map,
newslot,
MakeSingleTupleTableSlot(chunktupdesc,
&TTSOpsVirtual));
(void) ExecInsert(context,
cds->rri,
(chunk_slot ? chunk_slot : newslot),
canSetTag);
if (chunk_slot)
ExecDropSingleTupleTableSlot(chunk_slot);
}
else
(void) ExecInsert(context, cds->rri, newslot, canSetTag);
mtstate->mt_merge_inserted = 1;
break;
case CMD_NOTHING:
Expand Down
70 changes: 66 additions & 4 deletions src/nodes/chunk_dispatch/chunk_dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
#include <nodes/extensible.h>
#include <nodes/makefuncs.h>
#include <nodes/nodeFuncs.h>
#include <optimizer/pathnode.h>
#include <parser/parsetree.h>
#include <utils/rel.h>
#include <utils/syscache.h>
#include <catalog/pg_type.h>

#include "compat/compat.h"
Expand All @@ -20,6 +22,7 @@
#include "subspace_store.h"
#include "dimension.h"
#include "guc.h"
#include "nodes/hypertable_modify.h"
#include "ts_catalog/chunk_data_node.h"

static Node *chunk_dispatch_state_create(CustomScan *cscan);
Expand Down Expand Up @@ -234,7 +237,6 @@ chunk_dispatch_plan_create(PlannerInfo *root, RelOptInfo *relopt, CustomPath *be
cscan->scan.plan.plan_rows += subplan->plan_rows;
cscan->scan.plan.plan_width += subplan->plan_width;
}

cscan->custom_private = list_make1_oid(cdpath->hypertable_relid);
cscan->methods = &chunk_dispatch_plan_methods;
cscan->custom_plans = custom_plans;
Expand All @@ -243,7 +245,15 @@ chunk_dispatch_plan_create(PlannerInfo *root, RelOptInfo *relopt, CustomPath *be
/* The "input" and "output" target lists should be the same */
cscan->custom_scan_tlist = tlist;
cscan->scan.plan.targetlist = tlist;

#if PG15_GE
if (root->parse->mergeUseOuterJoin)
{
/* replace expressions of ROWID_VAR */
tlist = ts_replace_rowid_vars(root, tlist, relopt->relid);
cscan->scan.plan.targetlist = tlist;
cscan->custom_scan_tlist = tlist;
}
#endif
return &cscan->scan.plan;
}

Expand Down Expand Up @@ -339,8 +349,61 @@ chunk_dispatch_exec(CustomScanState *node)
/* Switch to the executor's per-tuple memory context */
old = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));

#if PG15_GE
TupleTableSlot *newslot = NULL;
if (dispatch->dispatch_state->mtstate->operation == CMD_MERGE)
{
HeapTuple tp;
AttrNumber natts;
AttrNumber attno;

tp = SearchSysCache1(RELOID, ObjectIdGetDatum(ht->main_table_relid));
if (!HeapTupleIsValid(tp))
elog(ERROR, "cache lookup failed for relation %u", ht->main_table_relid);
natts = ((Form_pg_class) GETSTRUCT(tp))->relnatts;
ReleaseSysCache(tp);
for (attno = 1; attno <= natts; attno++)
{
tp = SearchSysCache2(ATTNUM,
ObjectIdGetDatum(ht->main_table_relid),
Int16GetDatum(attno));
if (!HeapTupleIsValid(tp))
continue;
Form_pg_attribute att_tup = (Form_pg_attribute) GETSTRUCT(tp);
ReleaseSysCache(tp);
if (att_tup->attisdropped || att_tup->atthasmissing)
{
state->is_dropped_attr_exists = true;
continue;
}
}
for (int i = 0; i < ht->space->num_dimensions; i++)
{
List *actionStates =
dispatch->dispatch_state->mtstate->resultRelInfo->ri_notMatchedMergeAction;
ListCell *l;
foreach (l, actionStates)
{
MergeActionState *action = (MergeActionState *) lfirst(l);
CmdType commandType = action->mas_action->commandType;
if (commandType == CMD_INSERT)
{
/* fetch full projection list */
action->mas_proj->pi_exprContext->ecxt_innertuple = slot;
newslot = ExecProject(action->mas_proj);
break;
}
}
if (newslot)
break;
}
}
/* Calculate the tuple's point in the N-dimensional hyperspace */
point = ts_hyperspace_calculate_point(ht->space, (newslot ? newslot : slot));

#else
point = ts_hyperspace_calculate_point(ht->space, slot);
#endif

/* Save the main table's (hypertable's) ResultRelInfo */
if (!dispatch->hypertable_result_rel_info)
Expand Down Expand Up @@ -375,9 +438,8 @@ chunk_dispatch_exec(CustomScanState *node)
MemoryContextSwitchTo(old);

/* Convert the tuple to the chunk's rowtype, if necessary */
if (cis->hyper_to_chunk_map != NULL)
if (cis->hyper_to_chunk_map != NULL && state->is_dropped_attr_exists == false)
slot = execute_attr_map_slot(cis->hyper_to_chunk_map->attrMap, slot, cis->slot);

return slot;
}

Expand Down
3 changes: 2 additions & 1 deletion src/nodes/chunk_dispatch/chunk_dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ typedef struct ChunkDispatchState
*/
ChunkDispatch *dispatch;
ResultRelInfo *rri;
/* flag to represent dropped attributes */
bool is_dropped_attr_exists;
} ChunkDispatchState;

extern TSDLLEXPORT bool ts_is_chunk_dispatch_state(PlanState *state);
Expand All @@ -89,5 +91,4 @@ ts_chunk_dispatch_get_chunk_insert_state(ChunkDispatch *dispatch, Point *p, Tupl

extern TSDLLEXPORT Path *ts_chunk_dispatch_path_create(PlannerInfo *root, ModifyTablePath *mtpath,
Index hypertable_rti, int subpath_index);

#endif /* TIMESCALEDB_NODES_CHUNK_DISPATCH_H */
Loading

0 comments on commit a8cf502

Please sign in to comment.