Skip to content

Commit

Permalink
MERGE support on hypertables
Browse files Browse the repository at this point in the history
This patch does following:

1. Refactor ExecInsert/Delete/Update
   Backported commit 25e777cf8e547d7423d2e1e9da71f98b9414d59e
2. Backport all MERGE related interfaces and its implementations.
   Backported commit 7103ebb7aae8ab8076b7e85f335ceb8fe799097c
3. Planner changes to mark self referencing child tables as dummy.
4. Planner changes to create ChunkDispatch node when MERGE command
   has INSERT action.
5. Changes to map partition attributes from a tuple returned from
   child node of ChunkDispatch against physical targetlist, so that
   ChunkDispatch node can read the correct value from partition column.
6. Fixed issues with MERGE on compressed hypertable.
7. Added more testcases.
8. MERGE in distributed hypertables is not supported.

Pending tasks:
1. More testing.

Fixes #5139
  • Loading branch information
sb230132 committed Feb 1, 2023
1 parent 789bb26 commit f01c812
Show file tree
Hide file tree
Showing 21 changed files with 3,811 additions and 486 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ accidentally triggering the load of a previous DB version.**

## Unreleased

**Features**
* #5150 MERGE support on hypertables

**Features**
* #5241 Allow RETURNING clause when inserting into compressed chunks
* #5245 Mange life-cycle of connections via memory contexts
Expand Down
2 changes: 1 addition & 1 deletion src/copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte
ExecStoreVirtualTuple(myslot);

/* Calculate the tuple's point in the N-dimensional hyperspace */
point = ts_hyperspace_calculate_point(ht->space, myslot);
point = ts_hyperspace_calculate_point(ht->space, myslot, NULL, NULL, NULL);

/* Find or create the insert state matching the point */
cis = ts_chunk_dispatch_get_chunk_insert_state(dispatch,
Expand Down
117 changes: 114 additions & 3 deletions src/dimension.c
Original file line number Diff line number Diff line change
Expand Up @@ -957,8 +957,110 @@ ts_point_create(int16 num_dimensions)
return p;
}

int
get_partition_idx_from_tlist(Dimension *d, TupleTableSlot *slot, List *colnames_list)
{
AttrNumber partition_col_idx = 0;
bool partition_col_found = false;
if (colnames_list)
{
ListCell *col_list;
foreach (col_list, colnames_list)
{
const char *col_name = lfirst(col_list);
if (namestrcmp(&d->fd.column_name, col_name) == 0)
{
partition_col_found = true;
break;
}
partition_col_idx++;
}
}
if (partition_col_found)
{
TupleDesc tupdesc = slot->tts_tupleDescriptor;
if (tupdesc->attrs[partition_col_idx].atttypid == d->fd.column_type)
return partition_col_idx + 1;
}
return 0;
}

/*
* Helper function to find the right partition value from a tuple,
* in case attributes in tuple are not ordered correctly.
*
* Assume columns in table are in below order:
* [time, temperature, location, value]
* "time" is the partition column based on this columns value
* ChunkDispatch node decides if to create or reuse an existing
* chunk.
*
* Assume received tuple for child node of ChunkDispatch node is like
* [temperature, location, time, value]
*
* In this case this function will return correct attribute no: from
* the input slot.
*/
int
find_partition_column_in_tuple(const Hyperspace *hs, TupleTableSlot *slot, List *insert_targetList,
List *colnames_list, List *dropped_attrList, bool is_space_partition)
{
AttrNumber partition_col_idx = 0;
unsigned int num_dropped_attr_before_partition_col = 0;
Dimension *d;
if (is_space_partition)
d = (Dimension *) &hs->dimensions[1];
else
d = (Dimension *) &hs->dimensions[0];

if (colnames_list)
partition_col_idx = get_partition_idx_from_tlist(d, slot, colnames_list);

if (OidIsValid(partition_col_idx))
return partition_col_idx;

if (dropped_attrList && dropped_attrList->length)
{
ListCell *lc;
foreach (lc, dropped_attrList)
{
int pos = lfirst_int(lc);
if (pos < d->column_attno)
num_dropped_attr_before_partition_col++;
}
}
Oid partition_col_type = InvalidOid;
if (insert_targetList)
{
ListCell *lc;
foreach (lc, insert_targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
Var *new_var = castNode(Var, tle->expr);
if (new_var->varattno == d->column_attno)
{
partition_col_type = new_var->vartype;
break;
}
}
TupleDesc tupdesc = slot->tts_tupleDescriptor;
int natts = tupdesc->natts;
for (int i = 0; i < natts; i++)
{
Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
if (attr->atttypid == partition_col_type)
{
partition_col_idx = i + 1;
break;
}
}
}
return partition_col_idx;
}

TSDLLEXPORT Point *
ts_hyperspace_calculate_point(const Hyperspace *hs, TupleTableSlot *slot)
ts_hyperspace_calculate_point(const Hyperspace *hs, TupleTableSlot *slot, List *insert_targetList,
List *colnames_list, List *dropped_attrList)
{
Point *p = ts_point_create(hs->num_dimensions);
int i;
Expand All @@ -970,10 +1072,19 @@ ts_hyperspace_calculate_point(const Hyperspace *hs, TupleTableSlot *slot)
bool isnull;
Oid dimtype;

AttrNumber partition_col_idx = d->column_attno;
if (insert_targetList)
partition_col_idx = find_partition_column_in_tuple(hs,
slot,
insert_targetList,
colnames_list,
dropped_attrList,
(d->partitioning ? true : false));

if (NULL != d->partitioning)
datum = ts_partitioning_func_apply_slot(d->partitioning, slot, &isnull);
datum = ts_partitioning_func_apply_slot(d->partitioning, slot, &isnull, partition_col_idx);
else
datum = slot_getattr(slot, d->column_attno, &isnull);
datum = slot_getattr(slot, partition_col_idx, &isnull);

switch (d->type)
{
Expand Down
10 changes: 9 additions & 1 deletion src/dimension.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ typedef struct DimensionInfo
extern Hyperspace *ts_dimension_scan(int32 hypertable_id, Oid main_table_relid, int16 num_dimension,
MemoryContext mctx);
extern DimensionSlice *ts_dimension_calculate_default_slice(const Dimension *dim, int64 value);
extern TSDLLEXPORT Point *ts_hyperspace_calculate_point(const Hyperspace *h, TupleTableSlot *slot);
extern TSDLLEXPORT Point *ts_hyperspace_calculate_point(const Hyperspace *h, TupleTableSlot *slot,
List *insert_targetList,
List *colnames_list,
List *dropped_attrList);
extern int ts_dimension_get_slice_ordinal(const Dimension *dim, const DimensionSlice *slice);
extern const Dimension *ts_hyperspace_get_dimension_by_id(const Hyperspace *hs, int32 id);
extern TSDLLEXPORT const Dimension *ts_hyperspace_get_dimension(const Hyperspace *hs,
Expand Down Expand Up @@ -155,4 +158,9 @@ extern TSDLLEXPORT bool ts_is_equality_operator(Oid opno, Oid left, Oid right);
#define hyperspace_get_closed_dimension(space, i) \
ts_hyperspace_get_dimension(space, DIMENSION_TYPE_CLOSED, i)

extern int get_partition_idx_from_tlist(Dimension *d, TupleTableSlot *slot,
List *colnames_list);
extern int find_partition_column_in_tuple(const Hyperspace *h, TupleTableSlot *slot,
List *insert_targetList, List *colnames_list,
List *dropped_attrList, bool is_space_partition);
#endif /* TIMESCALEDB_DIMENSION_H */
Loading

0 comments on commit f01c812

Please sign in to comment.