From 8b9450cb43975c04a21c35cc6554299900c6070c Mon Sep 17 00:00:00 2001 From: John Gemignani Date: Fri, 5 Apr 2024 15:42:40 -0700 Subject: [PATCH] Fix Issue 1691 - MERGE incorrectly creates multiple vertices (#1718) (#1729) Fixed issue 1691 where MERGE would incorrectly create multiple vertices. This only occurred when MERGE was being driven by a previous clause. NOTE: To be more correct, the issue is with creating duplicate paths. The reason this happened was due to the visibility of tuples that were created during the MERGE instance. It is not possible to add them in to be rescanned. Because of this limitation, it required adding the ability to MERGE to know what path an instance had already created. Added regression tests. Modified the following files to add missing functionality in PG13 and below branches. Specifically adding the function datum_image_hash. modified: src/backend/executor/cypher_utils.c modified: src/include/executor/cypher_utils.h Resolved Conflicts: (due to differences between PG13 & PG12) src/backend/executor/cypher_merge.c Modified datum_image_hash to use hash_any. Changed datum_image_eq to datumIsEqual as the former doesn't exist in PG11. --- regress/expected/cypher_merge.out | 171 ++++++++- regress/sql/cypher_merge.sql | 70 +++- src/backend/executor/cypher_merge.c | 541 ++++++++++++++++++++++++++-- src/backend/executor/cypher_utils.c | 63 ++++ src/include/executor/cypher_utils.h | 4 + 5 files changed, 810 insertions(+), 39 deletions(-) diff --git a/regress/expected/cypher_merge.out b/regress/expected/cypher_merge.out index 7690405cf..af65c0555 100644 --- a/regress/expected/cypher_merge.out +++ b/regress/expected/cypher_merge.out @@ -1263,8 +1263,8 @@ $$) as (a agtype); --- (0 rows) ---- ---- Issue 1630 MERGE using array not working in some cases +-- +-- Issue 1630 - MERGE using array not working in some cases -- SELECT * FROM create_graph('issue_1630'); NOTICE: graph "issue_1630" has been created @@ -1387,7 +1387,151 @@ SELECT * FROM cypher('issue_1630', {"id": 844424930131974, "label": "PERSION", "properties": {"last": "snow", "first": "jon"}}::vertex | {"last": "snow", "first": "jon"} (1 row) ---clean up +-- +-- Issue 1691 - MERGE incorrectly creates multiple vertices +-- +SELECT * FROM create_graph('issue_1691'); +NOTICE: graph "issue_1691" has been created + create_graph +-------------- + +(1 row) + +SELECT * FROM cypher('issue_1691', $$ MATCH (u) RETURN (u) $$) AS (u agtype); + u +--- +(0 rows) + +-- should only create 2 distinct rows but return 4, the extra 2 being duplicates +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "bar", "foo", "foo"] as n + MERGE (u {name: n})-[e:knows]->(v) + RETURN u, e, v $$) AS (u agtype, e agtype, v agtype); + u | e | v +-----------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------- + {"id": 281474976710657, "label": "", "properties": {"name": "foo"}}::vertex | {"id": 844424930131969, "label": "knows", "end_id": 281474976710658, "start_id": 281474976710657, "properties": {}}::edge | {"id": 281474976710658, "label": "", "properties": {}}::vertex + {"id": 281474976710659, "label": "", "properties": {"name": "bar"}}::vertex | {"id": 844424930131970, "label": "knows", "end_id": 281474976710660, "start_id": 281474976710659, "properties": {}}::edge | {"id": 281474976710660, "label": "", "properties": {}}::vertex + {"id": 281474976710657, "label": "", "properties": {"name": "foo"}}::vertex | {"id": 844424930131969, "label": "knows", "end_id": 281474976710658, "start_id": 281474976710657, "properties": {}}::edge | {"id": 281474976710658, "label": "", "properties": {}}::vertex + {"id": 281474976710657, "label": "", "properties": {"name": "foo"}}::vertex | {"id": 844424930131969, "label": "knows", "end_id": 281474976710658, "start_id": 281474976710657, "properties": {}}::edge | {"id": 281474976710658, "label": "", "properties": {}}::vertex +(4 rows) + +-- should only return the same above 4 rows +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "bar", "foo", "foo"] as n + MERGE (u {name: n})-[e:knows]->(v) + RETURN u, e, v $$) AS (u agtype, e agtype, v agtype); + u | e | v +-----------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------- + {"id": 281474976710657, "label": "", "properties": {"name": "foo"}}::vertex | {"id": 844424930131969, "label": "knows", "end_id": 281474976710658, "start_id": 281474976710657, "properties": {}}::edge | {"id": 281474976710658, "label": "", "properties": {}}::vertex + {"id": 281474976710659, "label": "", "properties": {"name": "bar"}}::vertex | {"id": 844424930131970, "label": "knows", "end_id": 281474976710660, "start_id": 281474976710659, "properties": {}}::edge | {"id": 281474976710660, "label": "", "properties": {}}::vertex + {"id": 281474976710657, "label": "", "properties": {"name": "foo"}}::vertex | {"id": 844424930131969, "label": "knows", "end_id": 281474976710658, "start_id": 281474976710657, "properties": {}}::edge | {"id": 281474976710658, "label": "", "properties": {}}::vertex + {"id": 281474976710657, "label": "", "properties": {"name": "foo"}}::vertex | {"id": 844424930131969, "label": "knows", "end_id": 281474976710658, "start_id": 281474976710657, "properties": {}}::edge | {"id": 281474976710658, "label": "", "properties": {}}::vertex +(4 rows) + +-- should only return 2 distinct rows from above +SELECT * FROM cypher('issue_1691', $$ MATCH (u)-[e]->(v) + RETURN u, e, v $$) AS (u agtype, e agtype, v agtype); + u | e | v +-----------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------- + {"id": 281474976710657, "label": "", "properties": {"name": "foo"}}::vertex | {"id": 844424930131969, "label": "knows", "end_id": 281474976710658, "start_id": 281474976710657, "properties": {}}::edge | {"id": 281474976710658, "label": "", "properties": {}}::vertex + {"id": 281474976710659, "label": "", "properties": {"name": "bar"}}::vertex | {"id": 844424930131970, "label": "knows", "end_id": 281474976710660, "start_id": 281474976710659, "properties": {}}::edge | {"id": 281474976710660, "label": "", "properties": {}}::vertex +(2 rows) + +SELECT * FROM cypher('issue_1691', $$MATCH ()-[e]->() DELETE e $$) AS (a agtype); + a +--- +(0 rows) + +SELECT * FROM cypher('issue_1691', $$MATCH (u) DELETE u $$) AS (a agtype); + a +--- +(0 rows) + +-- should only create 1 record but return 2, one a dup of the other +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "foo"] AS each + MERGE (v:TEST {name: each}) + RETURN v $$) AS (v agtype); + v +---------------------------------------------------------------------------------- + {"id": 1125899906842625, "label": "TEST", "properties": {"name": "foo"}}::vertex + {"id": 1125899906842625, "label": "TEST", "properties": {"name": "foo"}}::vertex +(2 rows) + +SELECT * FROM cypher('issue_1691', $$ MATCH (u) RETURN (u) $$) AS (u agtype); + u +---------------------------------------------------------------------------------- + {"id": 1125899906842625, "label": "TEST", "properties": {"name": "foo"}}::vertex +(1 row) + +-- should just return 5 foo records that are all the same one +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "foo", "bar", "foo", "bar"] AS each + MERGE (v:TEST {name: "foo"}) + RETURN v $$) AS (v agtype); + v +---------------------------------------------------------------------------------- + {"id": 1125899906842625, "label": "TEST", "properties": {"name": "foo"}}::vertex + {"id": 1125899906842625, "label": "TEST", "properties": {"name": "foo"}}::vertex + {"id": 1125899906842625, "label": "TEST", "properties": {"name": "foo"}}::vertex + {"id": 1125899906842625, "label": "TEST", "properties": {"name": "foo"}}::vertex + {"id": 1125899906842625, "label": "TEST", "properties": {"name": "foo"}}::vertex +(5 rows) + +SELECT * FROM cypher('issue_1691', $$ MATCH (u) RETURN (u) $$) AS (u agtype); + u +---------------------------------------------------------------------------------- + {"id": 1125899906842625, "label": "TEST", "properties": {"name": "foo"}}::vertex +(1 row) + +-- should just return 5 bar records that are all the same one +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "foo", "bar", "foo", "bar"] AS each + MERGE (v:TEST {name: "bar"}) + RETURN v $$) AS (v agtype); + v +---------------------------------------------------------------------------------- + {"id": 1125899906842626, "label": "TEST", "properties": {"name": "bar"}}::vertex + {"id": 1125899906842626, "label": "TEST", "properties": {"name": "bar"}}::vertex + {"id": 1125899906842626, "label": "TEST", "properties": {"name": "bar"}}::vertex + {"id": 1125899906842626, "label": "TEST", "properties": {"name": "bar"}}::vertex + {"id": 1125899906842626, "label": "TEST", "properties": {"name": "bar"}}::vertex +(5 rows) + +SELECT * FROM cypher('issue_1691', $$ MATCH (u) RETURN (u) $$) AS (u agtype); + u +---------------------------------------------------------------------------------- + {"id": 1125899906842625, "label": "TEST", "properties": {"name": "foo"}}::vertex + {"id": 1125899906842626, "label": "TEST", "properties": {"name": "bar"}}::vertex +(2 rows) + +SELECT * FROM cypher('issue_1691', $$MATCH (u) DELETE u $$) AS (a agtype); + a +--- +(0 rows) + +-- should create 2 rows foo->bar and bar->bar and the other 3 are just returning dups +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "bar", "foo", "foo", "bar"] as n + MERGE (u {name: n})-[e1:knows]->(v {name: "bar"})-[e2:knows]->(w) + RETURN u, e1, v, e2, w $$) AS (u agtype, e1 agtype, v agtype, e2 agtype, w agtype); + u | e1 | v | e2 | w +-----------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------- + {"id": 281474976710661, "label": "", "properties": {"name": "foo"}}::vertex | {"id": 844424930131972, "label": "knows", "end_id": 281474976710662, "start_id": 281474976710661, "properties": {}}::edge | {"id": 281474976710662, "label": "", "properties": {"name": "bar"}}::vertex | {"id": 844424930131971, "label": "knows", "end_id": 281474976710663, "start_id": 281474976710662, "properties": {}}::edge | {"id": 281474976710663, "label": "", "properties": {}}::vertex + {"id": 281474976710664, "label": "", "properties": {"name": "bar"}}::vertex | {"id": 844424930131974, "label": "knows", "end_id": 281474976710665, "start_id": 281474976710664, "properties": {}}::edge | {"id": 281474976710665, "label": "", "properties": {"name": "bar"}}::vertex | {"id": 844424930131973, "label": "knows", "end_id": 281474976710666, "start_id": 281474976710665, "properties": {}}::edge | {"id": 281474976710666, "label": "", "properties": {}}::vertex + {"id": 281474976710661, "label": "", "properties": {"name": "foo"}}::vertex | {"id": 844424930131972, "label": "knows", "end_id": 281474976710662, "start_id": 281474976710661, "properties": {}}::edge | {"id": 281474976710662, "label": "", "properties": {"name": "bar"}}::vertex | {"id": 844424930131971, "label": "knows", "end_id": 281474976710663, "start_id": 281474976710662, "properties": {}}::edge | {"id": 281474976710663, "label": "", "properties": {}}::vertex + {"id": 281474976710661, "label": "", "properties": {"name": "foo"}}::vertex | {"id": 844424930131972, "label": "knows", "end_id": 281474976710662, "start_id": 281474976710661, "properties": {}}::edge | {"id": 281474976710662, "label": "", "properties": {"name": "bar"}}::vertex | {"id": 844424930131971, "label": "knows", "end_id": 281474976710663, "start_id": 281474976710662, "properties": {}}::edge | {"id": 281474976710663, "label": "", "properties": {}}::vertex + {"id": 281474976710664, "label": "", "properties": {"name": "bar"}}::vertex | {"id": 844424930131974, "label": "knows", "end_id": 281474976710665, "start_id": 281474976710664, "properties": {}}::edge | {"id": 281474976710665, "label": "", "properties": {"name": "bar"}}::vertex | {"id": 844424930131973, "label": "knows", "end_id": 281474976710666, "start_id": 281474976710665, "properties": {}}::edge | {"id": 281474976710666, "label": "", "properties": {}}::vertex +(5 rows) + +-- clean up +SELECT * FROM cypher('issue_1691', $$MATCH ()-[e]->() DELETE e $$) AS (a agtype); + a +--- +(0 rows) + +SELECT * FROM cypher('issue_1691', $$MATCH (u) DELETE u $$) AS (a agtype); + a +--- +(0 rows) + +-- +-- clean up graphs +-- SELECT * FROM cypher('cypher_merge', $$MATCH (n) DETACH DELETE n $$) AS (a agtype); a --- @@ -1398,9 +1542,9 @@ SELECT * FROM cypher('issue_1630', $$MATCH (n) DETACH DELETE n $$) AS (a agtype) --- (0 rows) -/* - * Clean up graph - */ +-- +-- delete graphs +-- SELECT drop_graph('cypher_merge', true); NOTICE: drop cascades to 19 other objects DETAIL: drop cascades to table cypher_merge._ag_label_vertex @@ -1439,3 +1583,18 @@ NOTICE: graph "issue_1630" has been dropped (1 row) +SELECT drop_graph('issue_1691', true); +NOTICE: drop cascades to 4 other objects +DETAIL: drop cascades to table issue_1691._ag_label_vertex +drop cascades to table issue_1691._ag_label_edge +drop cascades to table issue_1691.knows +drop cascades to table issue_1691."TEST" +NOTICE: graph "issue_1691" has been dropped + drop_graph +------------ + +(1 row) + +-- +-- End +-- diff --git a/regress/sql/cypher_merge.sql b/regress/sql/cypher_merge.sql index fe9ee8184..555aff6b3 100644 --- a/regress/sql/cypher_merge.sql +++ b/regress/sql/cypher_merge.sql @@ -606,8 +606,8 @@ SELECT * FROM cypher('cypher_merge', $$ CREATE (n), (m) WITH n AS r MERGE (m) $$) as (a agtype); ---- ---- Issue 1630 MERGE using array not working in some cases +-- +-- Issue 1630 - MERGE using array not working in some cases -- SELECT * FROM create_graph('issue_1630'); SELECT * FROM cypher('issue_1630', $$ MATCH (u) RETURN (u) $$) AS (u agtype); @@ -670,13 +670,71 @@ SELECT * FROM cypher('issue_1630', MERGE (v:PERSION {first: cols.first, last: cols.last}) RETURN v, cols $$) AS (v agtype, cols agtype); +-- +-- Issue 1691 - MERGE incorrectly creates multiple vertices +-- +SELECT * FROM create_graph('issue_1691'); +SELECT * FROM cypher('issue_1691', $$ MATCH (u) RETURN (u) $$) AS (u agtype); + +-- should only create 2 distinct rows but return 4, the extra 2 being duplicates +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "bar", "foo", "foo"] as n + MERGE (u {name: n})-[e:knows]->(v) + RETURN u, e, v $$) AS (u agtype, e agtype, v agtype); +-- should only return the same above 4 rows +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "bar", "foo", "foo"] as n + MERGE (u {name: n})-[e:knows]->(v) + RETURN u, e, v $$) AS (u agtype, e agtype, v agtype); +-- should only return 2 distinct rows from above +SELECT * FROM cypher('issue_1691', $$ MATCH (u)-[e]->(v) + RETURN u, e, v $$) AS (u agtype, e agtype, v agtype); + +SELECT * FROM cypher('issue_1691', $$MATCH ()-[e]->() DELETE e $$) AS (a agtype); +SELECT * FROM cypher('issue_1691', $$MATCH (u) DELETE u $$) AS (a agtype); + +-- should only create 1 record but return 2, one a dup of the other +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "foo"] AS each + MERGE (v:TEST {name: each}) + RETURN v $$) AS (v agtype); + +SELECT * FROM cypher('issue_1691', $$ MATCH (u) RETURN (u) $$) AS (u agtype); + +-- should just return 5 foo records that are all the same one +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "foo", "bar", "foo", "bar"] AS each + MERGE (v:TEST {name: "foo"}) + RETURN v $$) AS (v agtype); + +SELECT * FROM cypher('issue_1691', $$ MATCH (u) RETURN (u) $$) AS (u agtype); + +-- should just return 5 bar records that are all the same one +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "foo", "bar", "foo", "bar"] AS each + MERGE (v:TEST {name: "bar"}) + RETURN v $$) AS (v agtype); + +SELECT * FROM cypher('issue_1691', $$ MATCH (u) RETURN (u) $$) AS (u agtype); +SELECT * FROM cypher('issue_1691', $$MATCH (u) DELETE u $$) AS (a agtype); + +-- should create 2 rows foo->bar and bar->bar and the other 3 are just returning dups +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "bar", "foo", "foo", "bar"] as n + MERGE (u {name: n})-[e1:knows]->(v {name: "bar"})-[e2:knows]->(w) + RETURN u, e1, v, e2, w $$) AS (u agtype, e1 agtype, v agtype, e2 agtype, w agtype); + +-- clean up +SELECT * FROM cypher('issue_1691', $$MATCH ()-[e]->() DELETE e $$) AS (a agtype); +SELECT * FROM cypher('issue_1691', $$MATCH (u) DELETE u $$) AS (a agtype); ---clean up +-- +-- clean up graphs +-- SELECT * FROM cypher('cypher_merge', $$MATCH (n) DETACH DELETE n $$) AS (a agtype); SELECT * FROM cypher('issue_1630', $$MATCH (n) DETACH DELETE n $$) AS (a agtype); -/* - * Clean up graph - */ +-- +-- delete graphs +-- SELECT drop_graph('cypher_merge', true); SELECT drop_graph('issue_1630', true); +SELECT drop_graph('issue_1691', true); + +-- +-- End +-- diff --git a/src/backend/executor/cypher_merge.c b/src/backend/executor/cypher_merge.c index 861c52fd6..d9d0fd961 100644 --- a/src/backend/executor/cypher_merge.c +++ b/src/backend/executor/cypher_merge.c @@ -25,6 +25,40 @@ #include "catalog/ag_label.h" #include "executor/cypher_executor.h" #include "executor/cypher_utils.h" +#include "utils/datum.h" + +/* + * The following structure is used to hold a single vertex or edge component + * of a path. The smallest path is just a single vertex. + * + * Note: This structure is only useful for paths when stored in a dynamic + * array. + */ +typedef struct path_entry +{ + bool actual; /* actual tuple passed in for a vertex */ + cypher_rel_dir direction; /* the direction for the edge */ + graphid id; /* id of the vertex or edge */ + bool id_isNull; /* id isNull */ + graphid start_id; /* edge start id */ + graphid end_id; /* edge end id */ + Oid label; /* label oid */ + Datum prop; /* properties datum */ + bool prop_isNull; /* properties isNull */ + uint32 dih; /* datum_image_hash of properties datum */ +} path_entry; + +/* + * The following structure is used to hold a path_entry in a linked list. + * + * Note: The path_entry is stored as a pointer to a pointer. In this case + * the **path_entry is a dynamic array of path_entry elements. + */ +typedef struct created_path +{ + struct created_path *next; /* next link in linked list of path_entrys */ + struct path_entry **entry; /* path_entry array for this link */ +} created_path; static void begin_cypher_merge(CustomScanState *node, EState *estate, int eflags); @@ -32,14 +66,19 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node); static void end_cypher_merge(CustomScanState *node); static void rescan_cypher_merge(CustomScanState *node); static Datum merge_vertex(cypher_merge_custom_scan_state *css, - cypher_target_node *node, ListCell *next); + cypher_target_node *node, ListCell *next, + path_entry **path_array, int path_index, + bool should_insert); static void merge_edge(cypher_merge_custom_scan_state *css, cypher_target_node *node, Datum prev_vertex_id, - ListCell *next); + ListCell *next, + path_entry **path_array, int path_index, + bool should_insert); static void process_simple_merge(CustomScanState *node); static bool check_path(cypher_merge_custom_scan_state *css, TupleTableSlot *slot); -static void process_path(cypher_merge_custom_scan_state *css); +static void process_path(cypher_merge_custom_scan_state *css, + path_entry **path_array, bool should_insert); static void mark_tts_isnull(TupleTableSlot *slot); const CustomExecMethods cypher_merge_exec_methods = {MERGE_SCAN_STATE_NAME, @@ -50,6 +89,13 @@ const CustomExecMethods cypher_merge_exec_methods = {MERGE_SCAN_STATE_NAME, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL}; +static path_entry **prebuild_path(CustomScanState *node); +static bool compare_2_paths(path_entry **lhs, path_entry **rhs, + int path_length); +static path_entry **find_duplicate_path(CustomScanState *node, + path_entry **path_array); +static void free_path_entry_array(path_entry **path_array, int length); + /* * Initializes the MERGE Execution Node at the beginning of the execution * phase. @@ -61,6 +107,7 @@ static void begin_cypher_merge(CustomScanState *node, EState *estate, (cypher_merge_custom_scan_state *)node; ListCell *lc = NULL; Plan *subplan = NULL; + css->created_paths_list = NULL; Assert(list_length(css->cs->custom_plans) == 1); @@ -193,7 +240,8 @@ static bool check_path(cypher_merge_custom_scan_state *css, return false; } -static void process_path(cypher_merge_custom_scan_state *css) +static void process_path(cypher_merge_custom_scan_state *css, + path_entry **path_array, bool should_insert) { cypher_create_path *path = css->path; ListCell *lc = list_head(path->target_nodes); @@ -202,8 +250,8 @@ static void process_path(cypher_merge_custom_scan_state *css) * Create the first vertex. The create_vertex function will * create the rest of the path, if necessary. */ - - merge_vertex(css, lfirst(lc), lnext(lc)); + merge_vertex(css, lfirst(lc), lnext(lc), + path_array, 0, should_insert); /* * If this path is a variable, take the list that was accumulated @@ -266,7 +314,7 @@ static void process_simple_merge(CustomScanState *node) econtext->ecxt_scantuple = sss->ss.ss_ScanTupleSlot; econtext->ecxt_scantuple->tts_isempty = false; - process_path(css); + process_path(css, NULL, true); } } @@ -292,6 +340,217 @@ static void mark_tts_isnull(TupleTableSlot *slot) } } +/* helper function to free a path_entry array given its length */ +static void free_path_entry_array(path_entry **path_array, int length) +{ + int index; + + for (index = 0; index < length; index++) + { + pfree(path_array[index]); + } +} + +/* + * Helper function to prebuild a path. The user needs to free the returned + * path_entry when done. + * + * Note: The prebuilt path and its components are not filled out completely by + * this function. merge_vertex and merge_edge will/should fill out the + * rest. This is because the ID fields autoincrement the next available ID + * when evaluated AND the generated prebuilt path might not be used. + */ +static path_entry **prebuild_path(CustomScanState *node) +{ + cypher_merge_custom_scan_state *css = + (cypher_merge_custom_scan_state *)node; + List *nodes = css->path->target_nodes; + int path_length = list_length(nodes); + ListCell *lc = NULL; + ExprContext *econtext = css->css.ss.ps.ps_ExprContext; + int counter = 0; + + path_entry **path_array = NULL; + path_array = palloc0(sizeof(path_entry *) * path_length); + + /* iterate through the path, partially prebuilding it */ + foreach (lc, nodes) + { + /* get the node/edge and allocate the memory needed */ + cypher_target_node *node = lfirst(lc); + path_entry *entry = palloc0(sizeof(path_entry)); + + /* if this isn't an actual passed in tuple */ + if (CYPHER_TARGET_NODE_INSERT_ENTITY(node->flags)) + { + bool isNull = false; + + entry->actual = false; + entry->id = 0; + entry->id_isNull = true; + entry->direction = node->dir; + entry->label = node->relid; + entry->prop = ExecEvalExprSwitchContext(node->prop_expr_state, + econtext, &isNull); + entry->prop_isNull = isNull; + entry->dih = datum_image_hash(entry->prop, false, -1); + } + /* otherwise, it is */ + else + { + EState *estate = css->css.ss.ps.state; + TupleTableSlot *scanTupleSlot = econtext->ecxt_scantuple; + + agtype *agt = NULL; + Datum d; + agtype_value *agtv_vertex = NULL; + agtype_value *agtv_id = NULL; + + /* check that the variable isn't NULL */ + if (scanTupleSlot->tts_isnull[node->tuple_position - 1]) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Existing variable %s cannot be NULL in MERGE clause", + node->variable_name))); + } + + /* get the vertex agtype in the scanTupleSlot */ + d = scanTupleSlot->tts_values[node->tuple_position - 1]; + agt = DATUM_GET_AGTYPE_P(d); + + /* Convert to an agtype value */ + agtv_vertex = get_ith_agtype_value_from_container(&agt->root, 0); + + if (agtv_vertex->type != AGTV_VERTEX) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("agtype must resolve to a vertex"))); + } + + /* extract the id agtype field */ + agtv_id = GET_AGTYPE_VALUE_OBJECT_VALUE(agtv_vertex, "id"); + + /* set the necessary entry fields - actual & id */ + entry->actual = true; + entry->id = (graphid) agtv_id->val.int_value; + entry->id_isNull = false; + entry->prop = 0; + entry->prop_isNull = true; + entry->dih = 0; + + if (!SAFE_TO_SKIP_EXISTENCE_CHECK(node->flags)) + { + if (!entity_exists(estate, css->graph_oid, entry->id)) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("vertex assigned to variable %s was deleted", + node->variable_name))); + } + } + } + + /* save the pointer and move to the next */ + path_array[counter++] = entry; + } + + return path_array; +} + +/* + * Helper function to compare 2 paths. By definition, paths don't know + * specifics, so this comparison is somewhat generic. + */ +static bool compare_2_paths(path_entry **lhs, path_entry **rhs, int path_length) +{ + int i; + + /* iterate through the entire path, returning false for any mismatch */ + for (i = 0; i < path_length; i++) + { + /* if these are actual vertices (passed in from a variable) */ + if (lhs[i]->actual == rhs[i]->actual && + lhs[i]->actual == true) + { + /* just check the IDs */ + if (lhs[i]->id != rhs[i]->id) + { + return false; + } + else + { + continue; + } + } + + /* are the labels the same */ + if (lhs[i]->label != rhs[i]->label) + { + return false; + } + + /* are the directions the same */ + if (lhs[i]->direction != rhs[i]->direction) + { + return false; + } + + /* are the properties datum hashes the same */ + if (lhs[i]->dih != rhs[i]->dih) + { + return false; + } + + /* are the properties datum images the same */ + if (!datumIsEqual(lhs[i]->prop, rhs[i]->prop, false, -1)) + { + return false; + } + } + + /* no mismatches so it must match */ + return true; +} + +/* helper function to find a duplicate path in the created_paths_list */ +static path_entry **find_duplicate_path(CustomScanState *node, + path_entry **path_array) +{ + cypher_merge_custom_scan_state *css = + (cypher_merge_custom_scan_state *)node; + int path_length = list_length(css->path->target_nodes); + + /* if the list is NULL just return NULL */ + if (css->created_paths_list == NULL) + { + return NULL; + } + /* otherwise, check to see if the path already exists */ + else + { + /* set to the top of the list */ + created_path *curr_path = css->created_paths_list; + + /* iterate through our list of created paths */ + while (curr_path != NULL) + { + /* if we have found the entry, return it */ + if (compare_2_paths(path_array, curr_path->entry, path_length)) + { + return curr_path->entry; + } + + /* otherwise, get the next path */ + curr_path = curr_path->next; + } + } + + /* if we didn't find it, return NULL */ + return NULL; +} + /* * Function that is called mid-execution. This function will call * its subtree in the execution tree, and depending on the results @@ -353,9 +612,68 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node) econtext->ecxt_scantuple = node->ss.ps.lefttree->ps_ProjInfo->pi_exprContext->ecxt_scantuple; + /* + * Check the subtree to see if the lateral join representing the + * MERGE path found results. If not, we need to create the path + */ if (check_path(css, econtext->ecxt_scantuple)) { - process_path(css); + path_entry **prebuilt_path_array = NULL; + path_entry **found_path_array = NULL; + int path_length = list_length(css->path->target_nodes); + + /* + * Prebuild our path and verify that it wasn't already created. + * + * Note: This is currently only needed when there is a previous + * clause. This is due to the fact that MERGE can't see + * what it has just created. This isn't due to transaction + * or command ids, it's due to the join's scan not being + * able to add in the newly inserted tuples and rescan + * with these tuples. + * + * Note: The prebuilt path is purposely generic as it needs to + * only match a path. The more specific items will be + * added by merge_vertex and merge_edge if it is inserted. + * + * Note: The IDs are purposely not created here because we may + * need to throw them away if a path was previously + * created. Remember, the IDs are automatically + * incremented when fetched. + */ + prebuilt_path_array = prebuild_path(node); + + found_path_array = find_duplicate_path(node, + prebuilt_path_array); + + /* if found we don't need to insert anything, just reuse it */ + if (found_path_array) + { + /* we don't need our prebuilt path anymore */ + free_path_entry_array(prebuilt_path_array, path_length); + + /* as this path exists, we don't need to insert it */ + process_path(css, found_path_array, false); + } + /* otherwise, we need to insert the new, prebuilt, path */ + else + { + created_path *new_path = palloc0(sizeof(created_path)); + + /* build the next linked list entry for our created_paths */ + new_path = palloc0(sizeof(created_path)); + new_path->next = css->created_paths_list; + new_path->entry = prebuilt_path_array; + + /* we need to push our prebuilt path onto the list */ + css->created_paths_list = new_path; + + /* + * We need to pass in the prebuilt path so that it can get + * filled in with more specific information + */ + process_path(css, prebuilt_path_array, true); + } } } while (terminal); @@ -367,6 +685,7 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node) } econtext->ecxt_scantuple = ExecProject(node->ss.ps.lefttree->ps_ProjInfo); + return ExecProject(node->ss.ps.ps_ProjInfo); } @@ -497,7 +816,7 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node) econtext->ecxt_scantuple = sss->ss.ss_ScanTupleSlot; // create the path - process_path(css); + process_path(css, NULL, true); // mark the create_new_path flag to true. css->created_new_path = true; @@ -548,6 +867,7 @@ static void end_cypher_merge(CustomScanState *node) (cypher_merge_custom_scan_state *)node; cypher_create_path *path = css->path; ListCell *lc = NULL; + int path_length = list_length(path->target_nodes); // increment the command counter CommandCounterIncrement(); @@ -570,6 +890,25 @@ static void end_cypher_merge(CustomScanState *node) heap_close(cypher_node->resultRelInfo->ri_RelationDesc, RowExclusiveLock); } + + /* free up our created paths lists */ + while (css->created_paths_list != NULL) + { + created_path *next = css->created_paths_list->next; + path_entry **entry = css->created_paths_list->entry; + + /* free up the path array elements */ + free_path_entry_array(entry, path_length); + + /* free up the array container */ + pfree(entry); + + /* free up the created_path container */ + pfree(css->created_paths_list); + + css->created_paths_list = next; + } + } /* @@ -626,7 +965,9 @@ Node *create_cypher_merge_plan_state(CustomScan *cscan) * the create_edge function. */ static Datum merge_vertex(cypher_merge_custom_scan_state *css, - cypher_target_node *node, ListCell *next) + cypher_target_node *node, ListCell *next, + path_entry **path_array, int path_index, + bool should_insert) { bool isNull; Datum id; @@ -662,13 +1003,70 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css, ExecClearTuple(elemTupleSlot); - /* get the next graphid for this vertex */ - id = ExecEvalExpr(node->id_expr_state, econtext, &isNull); + /* if we not are going to insert, we need our structure pointers */ + if (should_insert == false && + (path_array == NULL || path_array[path_index] == NULL)) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid input parameter combination"))); + } + + /* + * If we shouldn't insert the vertex, we need to retrieve it from the + * storage structure. + */ + if (should_insert == false && + path_array != NULL && + path_array[path_index] != NULL) + { + id = path_array[path_index]->id; + isNull = path_array[path_index]->id_isNull; + } + /* + * Otherwise, we need to retrieve the vertex normally and store its + * unique values if the storage structure exists. + */ + else if (should_insert == true) + { + /* get the next graphid for this vertex */ + id = ExecEvalExpr(node->id_expr_state, econtext, &isNull); + + if (path_array != NULL && path_array[path_index] != NULL) + { + /* store it */ + path_array[path_index]->id = id; + path_array[path_index]->id_isNull = isNull; + } + } + else + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid input parameter combination"))); + } + + /* put the id values into the tuple slot */ elemTupleSlot->tts_values[vertex_tuple_id] = id; elemTupleSlot->tts_isnull[vertex_tuple_id] = isNull; - /* get the properties for this vertex */ - prop = ExecEvalExpr(node->prop_expr_state, econtext, &isNull); + /* + * Retrieve the properties and isNull values if the storage structure + * exists. + */ + if (path_array != NULL && path_array[path_index] != NULL) + { + prop = path_array[path_index]->prop; + isNull = path_array[path_index]->prop_isNull; + } + /* otherwise, get them normally */ + else + { + /* get the properties for this vertex */ + prop = ExecEvalExpr(node->prop_expr_state, econtext, &isNull); + } + + /* put the prop values into the tuple slot */ elemTupleSlot->tts_values[vertex_tuple_properties] = prop; elemTupleSlot->tts_isnull[vertex_tuple_properties] = isNull; @@ -696,7 +1094,8 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css, * following command to see the updates generated by this instance of * merge. */ - if (css->base_currentCommandId == GetCurrentCommandId(false)) + if (should_insert && + css->base_currentCommandId == GetCurrentCommandId(false)) { insert_entity_tuple(resultRelInfo, elemTupleSlot, estate); @@ -708,7 +1107,7 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css, */ CommandCounterIncrement(); } - else + else if (should_insert) { insert_entity_tuple_cid(resultRelInfo, elemTupleSlot, estate, css->base_currentCommandId); @@ -765,6 +1164,26 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css, } } } + /* + * If we have the storage structure pointers, we have already retrieved the + * ID from the datum in the scan tuple, so just retrieve it from the + * structure. + */ + else if (path_array != NULL && path_array[path_index] != NULL) + { + /* retrieve the id of the vertex */ + id = path_array[path_index]->id; + + /* + * Add the Datum to the list of entities for creating the path variable + */ + if (CYPHER_TARGET_NODE_IN_PATH(node->flags)) + { + Datum vertex = scanTupleSlot->tts_values[node->tuple_position - 1]; + css->path_values = lappend(css->path_values, + DatumGetPointer(vertex)); + } + } else { agtype *a = NULL; @@ -837,7 +1256,8 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css, /* If the path continues, create the next edge, passing the vertex's id. */ if (next != NULL) { - merge_edge(css, lfirst(next), id, lnext(next)); + merge_edge(css, lfirst(next), id, lnext(next), + path_array, path_index+1, should_insert); } return id; @@ -848,7 +1268,9 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css, */ static void merge_edge(cypher_merge_custom_scan_state *css, cypher_target_node *node, Datum prev_vertex_id, - ListCell *next) + ListCell *next, + path_entry **path_array, int path_index, + bool should_insert) { bool isNull; EState *estate = css->css.ss.ps.state; @@ -869,7 +1291,8 @@ static void merge_edge(cypher_merge_custom_scan_state *css, * next vertex's id. */ css->path_values = NIL; - next_vertex_id = merge_vertex(css, lfirst(next), lnext(next)); + next_vertex_id = merge_vertex(css, lfirst(next), lnext(next), + path_array, path_index+1, should_insert); /* * Set the start and end vertex ids @@ -907,26 +1330,90 @@ static void merge_edge(cypher_merge_custom_scan_state *css, ExecClearTuple(elemTupleSlot); - // Graph Id for the edge - id = ExecEvalExpr(node->id_expr_state, econtext, &isNull); + /* if we not are going to insert, we need our structure pointers */ + if (should_insert == false && + (path_array == NULL || path_array[path_index] == NULL)) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid input parameter combination"))); + } + + /* + * If we shouldn't insert the edge, we need to retrieve the entire edge from + * the storage structure. + */ + if (should_insert == false && + path_array != NULL && + path_array[path_index] != NULL) + { + id = path_array[path_index]->id; + isNull = path_array[path_index]->id_isNull; + start_id = path_array[path_index]->start_id; + end_id = path_array[path_index]->end_id; + } + /* + * Otherwise, we need to get the edge's ID and store its unique values if + * the storage structure exists + */ + else if (should_insert == true) + { + /* get the next graphid for this edge */ + id = ExecEvalExpr(node->id_expr_state, econtext, &isNull); + + if (path_array != NULL && path_array[path_index] != NULL) + { + /* store it */ + path_array[path_index]->id = id; + path_array[path_index]->id_isNull = isNull; + path_array[path_index]->start_id = start_id; + path_array[path_index]->end_id = end_id; + } + } + else + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid input parameter combination"))); + } + + /* put the id values into the tuple slot */ elemTupleSlot->tts_values[edge_tuple_id] = id; elemTupleSlot->tts_isnull[edge_tuple_id] = isNull; - // Graph id for the starting vertex + /* Graph id for the starting vertex */ elemTupleSlot->tts_values[edge_tuple_start_id] = start_id; elemTupleSlot->tts_isnull[edge_tuple_start_id] = false; - // Graph id for the ending vertex + /* Graph id for the ending vertex */ elemTupleSlot->tts_values[edge_tuple_end_id] = end_id; elemTupleSlot->tts_isnull[edge_tuple_end_id] = false; - // Edge's properties map - prop = ExecEvalExpr(node->prop_expr_state, econtext, &isNull); + /* + * Retrieve the properties and isNull values if the storage structure + * exists. + */ + if (path_array != NULL && path_array[path_index] != NULL) + { + prop = path_array[path_index]->prop; + isNull = path_array[path_index]->prop_isNull; + } + /* otherwise, get them normally */ + else + { + /* get the properties for this edge */ + prop = ExecEvalExpr(node->prop_expr_state, econtext, &isNull); + } + + /* store the properties in the tuple slot */ elemTupleSlot->tts_values[edge_tuple_properties] = prop; elemTupleSlot->tts_isnull[edge_tuple_properties] = isNull; - // Insert the new edge - insert_entity_tuple(resultRelInfo, elemTupleSlot, estate); + /* Insert the edge, if it is a new edge */ + if (should_insert) + { + insert_entity_tuple(resultRelInfo, elemTupleSlot, estate); + } /* restore the old result relation info */ estate->es_result_relation_info = old_estate_es_result_relation_info; diff --git a/src/backend/executor/cypher_utils.c b/src/backend/executor/cypher_utils.c index be175f130..44158eb26 100644 --- a/src/backend/executor/cypher_utils.c +++ b/src/backend/executor/cypher_utils.c @@ -24,6 +24,8 @@ #include "postgres.h" +#include "access/hash.h" +#include "access/tuptoaster.h" #include "access/xact.h" #include "utils/rel.h" #include "nodes/makefuncs.h" @@ -34,6 +36,67 @@ #include "executor/cypher_utils.h" #include "utils/ag_cache.h" +/* + * NOTE: This function is copied directly from PG14 src and is needed to provide + * Datum hash functionality to Apache AGE. Only formatting has been adjusted to + * align with our standards. + * NOTE: Changed to use hash_any. + *------------------------------------------------------------------------- + * datum_image_hash + * + * Generate a hash value based on the binary representation of 'value'. Most + * use cases will want to use the hash function specific to the Datum's type, + * however, some corner cases require generating a hash value based on the + * actual bits rather than the logical value. + *------------------------------------------------------------------------- + */ +uint32 datum_image_hash(Datum value, bool typByVal, int typLen) +{ + Size len; + uint32 result; + + if (typByVal) + { + result = hash_any((unsigned char *) &value, sizeof(Datum)); + } + else if (typLen > 0) + { + result = hash_any((unsigned char *) DatumGetPointer(value), typLen); + } + else if (typLen == -1) + { + struct varlena *val; + + len = toast_raw_datum_size(value); + + val = PG_DETOAST_DATUM_PACKED(value); + + result = hash_any((unsigned char *) VARDATA_ANY(val), len - VARHDRSZ); + + /* Only free memory if it's a copy made here. */ + if ((Pointer) val != (Pointer) value) + { + pfree(val); + } + } + else if (typLen == -2) + { + char *s; + + s = DatumGetCString(value); + len = strlen(s) + 1; + + result = hash_any((unsigned char *) s, len); + } + else + { + elog(ERROR, "unexpected typLen: %d", typLen); + result = 0; /* keep compiler quiet */ + } + + return result; +} + /* * Given the graph name and the label name, create a ResultRelInfo for the table * those to variables represent. Open the Indices too. diff --git a/src/include/executor/cypher_utils.h b/src/include/executor/cypher_utils.h index eda3a3911..a4c52da77 100644 --- a/src/include/executor/cypher_utils.h +++ b/src/include/executor/cypher_utils.h @@ -104,6 +104,7 @@ typedef struct cypher_merge_custom_scan_state bool created_new_path; bool found_a_path; CommandId base_currentCommandId; + struct created_path *created_paths_list; } cypher_merge_custom_scan_state; TupleTableSlot *populate_vertex_tts(TupleTableSlot *elemTupleSlot, @@ -124,4 +125,7 @@ HeapTuple insert_entity_tuple_cid(ResultRelInfo *resultRelInfo, TupleTableSlot *elemTupleSlot, EState *estate, CommandId cid); +/* This function is copied directly from PG14 src */ +uint32 datum_image_hash(Datum value, bool typByVal, int typLen); + #endif