Skip to content

Commit

Permalink
MERGE support on hypertables
Browse files Browse the repository at this point in the history
This patch does following:

1. Refactor ExecInsert/Delete/Update
   Backported commit 25e777cf8e547d7423d2e1e9da71f98b9414d59e
2. Backport all MERGE related interfaces and its implementations.
   Backported commit 7103ebb7aae8ab8076b7e85f335ceb8fe799097c
3. Planner changes to mark self referencing child tables as dummy.
4. Planner changes to create ChunkDispatch node when MERGE command
   has INSERT action.
5. Changes to map partition attributes from a tuple returned from
   child node of ChunkDispatch against physical targetlist, so that
   ChunkDispatch node can read the correct value from partition column.
6. Fixed issues with MERGE on compressed hypertable.
7. Added testcases.

Pending tasks:
1. More testing.
2. Support of MERGE in distributed hypertables.

Fixes #5139
  • Loading branch information
sb230132 committed Jan 12, 2023
1 parent f36db10 commit f4473cd
Show file tree
Hide file tree
Showing 12 changed files with 2,530 additions and 470 deletions.
2 changes: 1 addition & 1 deletion src/copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte
ExecStoreVirtualTuple(myslot);

/* Calculate the tuple's point in the N-dimensional hyperspace */
point = ts_hyperspace_calculate_point(ht->space, myslot);
point = ts_hyperspace_calculate_point(ht->space, myslot, NULL);

/* Find or create the insert state matching the point */
cis = ts_chunk_dispatch_get_chunk_insert_state(dispatch,
Expand Down
60 changes: 58 additions & 2 deletions src/dimension.c
Original file line number Diff line number Diff line change
Expand Up @@ -957,8 +957,60 @@ ts_point_create(int16 num_dimensions)
return p;
}

/*
* Helper function to find the right partition value from a tuple,
* in case attributes in tuple are not ordered correctly.
*
* Assume columns in table are in below order:
* [time, temperature, location, value]
* "time" is the partition column based on this columns value
* ChunkDispatch node decides if to create or reuse an existing
* chunk.
*
* Assume received tuple for child node of ChunkDispatch node is like
* [temperature, location, time, value]
*
* In this case this function will return correct attribute no: from
* the received slot.
*/
int
find_partition_column_in_tuple(const Hyperspace *hs, TupleTableSlot *slot, List *insert_targetList)
{
AttrNumber partition_col_idx = 0;
for (int i = 0; i < hs->num_dimensions; i++)
{
const Dimension *d = &hs->dimensions[i];
Oid partition_col_type = InvalidOid;
if (insert_targetList)
{
ListCell *lc;
foreach (lc, insert_targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
Var *new_var = castNode(Var, tle->expr);
if (new_var->varattno == d->column_attno)
{
partition_col_type = new_var->vartype;
break;
}
}
TupleDesc tupdesc = slot->tts_tupleDescriptor;
int natts = tupdesc->natts;
for (int i = 0; i < natts; i++)
{
if (tupdesc->attrs[i].atttypid == partition_col_type)
{
partition_col_idx = d->column_attno + i;
break;
}
}
}
}
return partition_col_idx;
}

TSDLLEXPORT Point *
ts_hyperspace_calculate_point(const Hyperspace *hs, TupleTableSlot *slot)
ts_hyperspace_calculate_point(const Hyperspace *hs, TupleTableSlot *slot, List *insert_targetList)
{
Point *p = ts_point_create(hs->num_dimensions);
int i;
Expand All @@ -970,10 +1022,14 @@ ts_hyperspace_calculate_point(const Hyperspace *hs, TupleTableSlot *slot)
bool isnull;
Oid dimtype;

AttrNumber partition_col_idx = d->column_attno;
if (insert_targetList)
partition_col_idx = find_partition_column_in_tuple(hs, slot, insert_targetList);

if (NULL != d->partitioning)
datum = ts_partitioning_func_apply_slot(d->partitioning, slot, &isnull);
else
datum = slot_getattr(slot, d->column_attno, &isnull);
datum = slot_getattr(slot, partition_col_idx, &isnull);

switch (d->type)
{
Expand Down
5 changes: 4 additions & 1 deletion src/dimension.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ typedef struct DimensionInfo
extern Hyperspace *ts_dimension_scan(int32 hypertable_id, Oid main_table_relid, int16 num_dimension,
MemoryContext mctx);
extern DimensionSlice *ts_dimension_calculate_default_slice(const Dimension *dim, int64 value);
extern TSDLLEXPORT Point *ts_hyperspace_calculate_point(const Hyperspace *h, TupleTableSlot *slot);
extern TSDLLEXPORT Point *ts_hyperspace_calculate_point(const Hyperspace *h, TupleTableSlot *slot,
List *insert_targetList);
extern int ts_dimension_get_slice_ordinal(const Dimension *dim, const DimensionSlice *slice);
extern const Dimension *ts_hyperspace_get_dimension_by_id(const Hyperspace *hs, int32 id);
extern TSDLLEXPORT const Dimension *ts_hyperspace_get_dimension(const Hyperspace *hs,
Expand Down Expand Up @@ -154,4 +155,6 @@ extern TSDLLEXPORT Point *ts_point_create(int16 num_dimensions);
#define hyperspace_get_closed_dimension(space, i) \
ts_hyperspace_get_dimension(space, DIMENSION_TYPE_CLOSED, i)

extern int find_partition_column_in_tuple(const Hyperspace *h, TupleTableSlot *slot,
List *insert_targetList);
#endif /* TIMESCALEDB_DIMENSION_H */
76 changes: 74 additions & 2 deletions src/nodes/chunk_dispatch/chunk_dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <nodes/extensible.h>
#include <nodes/makefuncs.h>
#include <nodes/nodeFuncs.h>
#include <optimizer/pathnode.h>
#include <optimizer/plancat.h>
#include <parser/parsetree.h>
#include <utils/rel.h>
#include <catalog/pg_type.h>
Expand Down Expand Up @@ -196,7 +198,36 @@ chunk_dispatch_plan_create(PlannerInfo *root, RelOptInfo *relopt, CustomPath *be
/* The "input" and "output" target lists should be the same */
cscan->custom_scan_tlist = tlist;
cscan->scan.plan.targetlist = tlist;

/*
tlist holds child node target list, whose order can be different from
hypertables column order. During INSERT into hypertable, ChunkDispatch node
checks if current row needs to be inserted into existing chunk or should it
create a new chunk. In case a new chunk needs to be created, attributes in
current row tuple should be ordered such a way that partition column is at
right place. This helps ChunkDispatch node to fetch the correct value and
then create a new range for the chunk to be created.
MERGE command with INSERT action can cause child node of ChunkDispatch to
return a tuple in completely different order. In such case ChunkDispatch
node can end up reading partition column wrongly.
To overcome this problem we save physical targetList of the hypertable
so that this can be used by ChunkDispatch node to fetch the right value
from received tuple. Thus we save targetlist in CustomScan node private
space. This list will be used to map the right partition column
during execution of ChunkDispatch node.
Please refer find_partition_column_in_tuple() for more details.
*/
#if PG15_GE
if (root->parse->mergeUseOuterJoin)
{
/* Save actual ordering of hypertable columns only
* if MERGE command has INSERT action. */
RelOptInfo *baserel = find_base_rel(root, 1);
List *orig_list = build_physical_tlist(root, baserel);
cscan->custom_private = lappend(cscan->custom_private, orig_list);
}
#endif
return &cscan->scan.plan;
}

Expand Down Expand Up @@ -292,8 +323,40 @@ chunk_dispatch_exec(CustomScanState *node)
/* Switch to the executor's per-tuple memory context */
old = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));

#if PG15_GE
Oid paritition_col_idx = InvalidOid;
TupleTableSlot *newslot = NULL;
if (dispatch->dispatch_state->mtstate->operation == CMD_MERGE)
{
paritition_col_idx =
find_partition_column_in_tuple(ht->space, slot, state->insert_targetList);
/* Row returned by child node does not have paritition col */
if (!OidIsValid(paritition_col_idx))
{
List *actionStates =
dispatch->dispatch_state->mtstate->resultRelInfo->ri_notMatchedMergeAction;
ListCell *l;
foreach (l, actionStates)
{
MergeActionState *action = (MergeActionState *) lfirst(l);
CmdType commandType = action->mas_action->commandType;
if (commandType == CMD_INSERT)
{
/* in case received tuple has only junk attributes, then
* fetch the actual values. */
newslot = ExecProject(action->mas_proj);
}
}
}
}
/* Calculate the tuple's point in the N-dimensional hyperspace */
point = ts_hyperspace_calculate_point(ht->space, slot);
if (!OidIsValid(paritition_col_idx) && newslot)
point = ts_hyperspace_calculate_point(ht->space, newslot, state->insert_targetList);
else
point = ts_hyperspace_calculate_point(ht->space, slot, state->insert_targetList);
#else
point = ts_hyperspace_calculate_point(ht->space, slot, state->insert_targetList);
#endif

/* Save the main table's (hypertable's) ResultRelInfo */
if (!dispatch->hypertable_result_rel_info)
Expand All @@ -304,6 +367,13 @@ chunk_dispatch_exec(CustomScanState *node)
dispatch->hypertable_result_rel_info = estate->es_result_relation_info;
#else
dispatch->hypertable_result_rel_info = dispatch->dispatch_state->mtstate->resultRelInfo;
#if 0
#if PG15_GE
if (dispatch->dispatch_state->mtstate->operation == CMD_MERGE)
dispatch->hypertable_result_rel_info =
dispatch->dispatch_state->mtstate->rootResultRelInfo;
#endif
#endif
#endif
}

Expand Down Expand Up @@ -376,6 +446,8 @@ chunk_dispatch_state_create(CustomScan *cscan)
Assert(list_length(cscan->custom_plans) == 1);
state->subplan = linitial(cscan->custom_plans);
state->cscan_state.methods = &chunk_dispatch_state_methods;
if (cscan->custom_private && cscan->custom_private->length > 1)
state->insert_targetList = lsecond(cscan->custom_private);
return (Node *) state;
}

Expand Down
4 changes: 4 additions & 0 deletions src/nodes/chunk_dispatch/chunk_dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ typedef struct ChunkDispatchState
*/
ChunkDispatch *dispatch;
ResultRelInfo *rri;
/*
* Save physical targetList for INSERT action in MERGE
*/
List *insert_targetList;
} ChunkDispatchState;

extern TSDLLEXPORT bool ts_is_chunk_dispatch_state(PlanState *state);
Expand Down
Loading

0 comments on commit f4473cd

Please sign in to comment.