diff --git a/regress/expected/cypher_merge.out b/regress/expected/cypher_merge.out index 7690405cf..af65c0555 100644 --- a/regress/expected/cypher_merge.out +++ b/regress/expected/cypher_merge.out @@ -1263,8 +1263,8 @@ $$) as (a agtype); --- (0 rows) ---- ---- Issue 1630 MERGE using array not working in some cases +-- +-- Issue 1630 - MERGE using array not working in some cases -- SELECT * FROM create_graph('issue_1630'); NOTICE: graph "issue_1630" has been created @@ -1387,7 +1387,151 @@ SELECT * FROM cypher('issue_1630', {"id": 844424930131974, "label": "PERSION", "properties": {"last": "snow", "first": "jon"}}::vertex | {"last": "snow", "first": "jon"} (1 row) ---clean up +-- +-- Issue 1691 - MERGE incorrectly creates multiple vertices +-- +SELECT * FROM create_graph('issue_1691'); +NOTICE: graph "issue_1691" has been created + create_graph +-------------- + +(1 row) + +SELECT * FROM cypher('issue_1691', $$ MATCH (u) RETURN (u) $$) AS (u agtype); + u +--- +(0 rows) + +-- should only create 2 distinct rows but return 4, the extra 2 being duplicates +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "bar", "foo", "foo"] as n + MERGE (u {name: n})-[e:knows]->(v) + RETURN u, e, v $$) AS (u agtype, e agtype, v agtype); + u | e | v +-----------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------- + {"id": 281474976710657, "label": "", "properties": {"name": "foo"}}::vertex | {"id": 844424930131969, "label": "knows", "end_id": 281474976710658, "start_id": 281474976710657, "properties": {}}::edge | {"id": 281474976710658, "label": "", "properties": {}}::vertex + {"id": 281474976710659, "label": "", "properties": {"name": "bar"}}::vertex | {"id": 844424930131970, "label": "knows", "end_id": 281474976710660, "start_id": 281474976710659, "properties": {}}::edge | {"id": 281474976710660, "label": "", "properties": {}}::vertex + {"id": 281474976710657, "label": "", "properties": {"name": "foo"}}::vertex | {"id": 844424930131969, "label": "knows", "end_id": 281474976710658, "start_id": 281474976710657, "properties": {}}::edge | {"id": 281474976710658, "label": "", "properties": {}}::vertex + {"id": 281474976710657, "label": "", "properties": {"name": "foo"}}::vertex | {"id": 844424930131969, "label": "knows", "end_id": 281474976710658, "start_id": 281474976710657, "properties": {}}::edge | {"id": 281474976710658, "label": "", "properties": {}}::vertex +(4 rows) + +-- should only return the same above 4 rows +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "bar", "foo", "foo"] as n + MERGE (u {name: n})-[e:knows]->(v) + RETURN u, e, v $$) AS (u agtype, e agtype, v agtype); + u | e | v +-----------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------- + {"id": 281474976710657, "label": "", "properties": {"name": "foo"}}::vertex | {"id": 844424930131969, "label": "knows", "end_id": 281474976710658, "start_id": 281474976710657, "properties": {}}::edge | {"id": 281474976710658, "label": "", "properties": {}}::vertex + {"id": 281474976710659, "label": "", "properties": {"name": "bar"}}::vertex | {"id": 844424930131970, "label": "knows", "end_id": 281474976710660, "start_id": 281474976710659, "properties": {}}::edge | {"id": 281474976710660, "label": "", "properties": {}}::vertex + {"id": 281474976710657, "label": "", "properties": {"name": "foo"}}::vertex | {"id": 844424930131969, "label": "knows", "end_id": 281474976710658, "start_id": 281474976710657, "properties": {}}::edge | {"id": 281474976710658, "label": "", "properties": {}}::vertex + {"id": 281474976710657, "label": "", "properties": {"name": "foo"}}::vertex | {"id": 844424930131969, "label": "knows", "end_id": 281474976710658, "start_id": 281474976710657, "properties": {}}::edge | {"id": 281474976710658, "label": "", "properties": {}}::vertex +(4 rows) + +-- should only return 2 distinct rows from above +SELECT * FROM cypher('issue_1691', $$ MATCH (u)-[e]->(v) + RETURN u, e, v $$) AS (u agtype, e agtype, v agtype); + u | e | v +-----------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------- + {"id": 281474976710657, "label": "", "properties": {"name": "foo"}}::vertex | {"id": 844424930131969, "label": "knows", "end_id": 281474976710658, "start_id": 281474976710657, "properties": {}}::edge | {"id": 281474976710658, "label": "", "properties": {}}::vertex + {"id": 281474976710659, "label": "", "properties": {"name": "bar"}}::vertex | {"id": 844424930131970, "label": "knows", "end_id": 281474976710660, "start_id": 281474976710659, "properties": {}}::edge | {"id": 281474976710660, "label": "", "properties": {}}::vertex +(2 rows) + +SELECT * FROM cypher('issue_1691', $$MATCH ()-[e]->() DELETE e $$) AS (a agtype); + a +--- +(0 rows) + +SELECT * FROM cypher('issue_1691', $$MATCH (u) DELETE u $$) AS (a agtype); + a +--- +(0 rows) + +-- should only create 1 record but return 2, one a dup of the other +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "foo"] AS each + MERGE (v:TEST {name: each}) + RETURN v $$) AS (v agtype); + v +---------------------------------------------------------------------------------- + {"id": 1125899906842625, "label": "TEST", "properties": {"name": "foo"}}::vertex + {"id": 1125899906842625, "label": "TEST", "properties": {"name": "foo"}}::vertex +(2 rows) + +SELECT * FROM cypher('issue_1691', $$ MATCH (u) RETURN (u) $$) AS (u agtype); + u +---------------------------------------------------------------------------------- + {"id": 1125899906842625, "label": "TEST", "properties": {"name": "foo"}}::vertex +(1 row) + +-- should just return 5 foo records that are all the same one +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "foo", "bar", "foo", "bar"] AS each + MERGE (v:TEST {name: "foo"}) + RETURN v $$) AS (v agtype); + v +---------------------------------------------------------------------------------- + {"id": 1125899906842625, "label": "TEST", "properties": {"name": "foo"}}::vertex + {"id": 1125899906842625, "label": "TEST", "properties": {"name": "foo"}}::vertex + {"id": 1125899906842625, "label": "TEST", "properties": {"name": "foo"}}::vertex + {"id": 1125899906842625, "label": "TEST", "properties": {"name": "foo"}}::vertex + {"id": 1125899906842625, "label": "TEST", "properties": {"name": "foo"}}::vertex +(5 rows) + +SELECT * FROM cypher('issue_1691', $$ MATCH (u) RETURN (u) $$) AS (u agtype); + u +---------------------------------------------------------------------------------- + {"id": 1125899906842625, "label": "TEST", "properties": {"name": "foo"}}::vertex +(1 row) + +-- should just return 5 bar records that are all the same one +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "foo", "bar", "foo", "bar"] AS each + MERGE (v:TEST {name: "bar"}) + RETURN v $$) AS (v agtype); + v +---------------------------------------------------------------------------------- + {"id": 1125899906842626, "label": "TEST", "properties": {"name": "bar"}}::vertex + {"id": 1125899906842626, "label": "TEST", "properties": {"name": "bar"}}::vertex + {"id": 1125899906842626, "label": "TEST", "properties": {"name": "bar"}}::vertex + {"id": 1125899906842626, "label": "TEST", "properties": {"name": "bar"}}::vertex + {"id": 1125899906842626, "label": "TEST", "properties": {"name": "bar"}}::vertex +(5 rows) + +SELECT * FROM cypher('issue_1691', $$ MATCH (u) RETURN (u) $$) AS (u agtype); + u +---------------------------------------------------------------------------------- + {"id": 1125899906842625, "label": "TEST", "properties": {"name": "foo"}}::vertex + {"id": 1125899906842626, "label": "TEST", "properties": {"name": "bar"}}::vertex +(2 rows) + +SELECT * FROM cypher('issue_1691', $$MATCH (u) DELETE u $$) AS (a agtype); + a +--- +(0 rows) + +-- should create 2 rows foo->bar and bar->bar and the other 3 are just returning dups +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "bar", "foo", "foo", "bar"] as n + MERGE (u {name: n})-[e1:knows]->(v {name: "bar"})-[e2:knows]->(w) + RETURN u, e1, v, e2, w $$) AS (u agtype, e1 agtype, v agtype, e2 agtype, w agtype); + u | e1 | v | e2 | w +-----------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------- + {"id": 281474976710661, "label": "", "properties": {"name": "foo"}}::vertex | {"id": 844424930131972, "label": "knows", "end_id": 281474976710662, "start_id": 281474976710661, "properties": {}}::edge | {"id": 281474976710662, "label": "", "properties": {"name": "bar"}}::vertex | {"id": 844424930131971, "label": "knows", "end_id": 281474976710663, "start_id": 281474976710662, "properties": {}}::edge | {"id": 281474976710663, "label": "", "properties": {}}::vertex + {"id": 281474976710664, "label": "", "properties": {"name": "bar"}}::vertex | {"id": 844424930131974, "label": "knows", "end_id": 281474976710665, "start_id": 281474976710664, "properties": {}}::edge | {"id": 281474976710665, "label": "", "properties": {"name": "bar"}}::vertex | {"id": 844424930131973, "label": "knows", "end_id": 281474976710666, "start_id": 281474976710665, "properties": {}}::edge | {"id": 281474976710666, "label": "", "properties": {}}::vertex + {"id": 281474976710661, "label": "", "properties": {"name": "foo"}}::vertex | {"id": 844424930131972, "label": "knows", "end_id": 281474976710662, "start_id": 281474976710661, "properties": {}}::edge | {"id": 281474976710662, "label": "", "properties": {"name": "bar"}}::vertex | {"id": 844424930131971, "label": "knows", "end_id": 281474976710663, "start_id": 281474976710662, "properties": {}}::edge | {"id": 281474976710663, "label": "", "properties": {}}::vertex + {"id": 281474976710661, "label": "", "properties": {"name": "foo"}}::vertex | {"id": 844424930131972, "label": "knows", "end_id": 281474976710662, "start_id": 281474976710661, "properties": {}}::edge | {"id": 281474976710662, "label": "", "properties": {"name": "bar"}}::vertex | {"id": 844424930131971, "label": "knows", "end_id": 281474976710663, "start_id": 281474976710662, "properties": {}}::edge | {"id": 281474976710663, "label": "", "properties": {}}::vertex + {"id": 281474976710664, "label": "", "properties": {"name": "bar"}}::vertex | {"id": 844424930131974, "label": "knows", "end_id": 281474976710665, "start_id": 281474976710664, "properties": {}}::edge | {"id": 281474976710665, "label": "", "properties": {"name": "bar"}}::vertex | {"id": 844424930131973, "label": "knows", "end_id": 281474976710666, "start_id": 281474976710665, "properties": {}}::edge | {"id": 281474976710666, "label": "", "properties": {}}::vertex +(5 rows) + +-- clean up +SELECT * FROM cypher('issue_1691', $$MATCH ()-[e]->() DELETE e $$) AS (a agtype); + a +--- +(0 rows) + +SELECT * FROM cypher('issue_1691', $$MATCH (u) DELETE u $$) AS (a agtype); + a +--- +(0 rows) + +-- +-- clean up graphs +-- SELECT * FROM cypher('cypher_merge', $$MATCH (n) DETACH DELETE n $$) AS (a agtype); a --- @@ -1398,9 +1542,9 @@ SELECT * FROM cypher('issue_1630', $$MATCH (n) DETACH DELETE n $$) AS (a agtype) --- (0 rows) -/* - * Clean up graph - */ +-- +-- delete graphs +-- SELECT drop_graph('cypher_merge', true); NOTICE: drop cascades to 19 other objects DETAIL: drop cascades to table cypher_merge._ag_label_vertex @@ -1439,3 +1583,18 @@ NOTICE: graph "issue_1630" has been dropped (1 row) +SELECT drop_graph('issue_1691', true); +NOTICE: drop cascades to 4 other objects +DETAIL: drop cascades to table issue_1691._ag_label_vertex +drop cascades to table issue_1691._ag_label_edge +drop cascades to table issue_1691.knows +drop cascades to table issue_1691."TEST" +NOTICE: graph "issue_1691" has been dropped + drop_graph +------------ + +(1 row) + +-- +-- End +-- diff --git a/regress/sql/cypher_merge.sql b/regress/sql/cypher_merge.sql index fe9ee8184..555aff6b3 100644 --- a/regress/sql/cypher_merge.sql +++ b/regress/sql/cypher_merge.sql @@ -606,8 +606,8 @@ SELECT * FROM cypher('cypher_merge', $$ CREATE (n), (m) WITH n AS r MERGE (m) $$) as (a agtype); ---- ---- Issue 1630 MERGE using array not working in some cases +-- +-- Issue 1630 - MERGE using array not working in some cases -- SELECT * FROM create_graph('issue_1630'); SELECT * FROM cypher('issue_1630', $$ MATCH (u) RETURN (u) $$) AS (u agtype); @@ -670,13 +670,71 @@ SELECT * FROM cypher('issue_1630', MERGE (v:PERSION {first: cols.first, last: cols.last}) RETURN v, cols $$) AS (v agtype, cols agtype); +-- +-- Issue 1691 - MERGE incorrectly creates multiple vertices +-- +SELECT * FROM create_graph('issue_1691'); +SELECT * FROM cypher('issue_1691', $$ MATCH (u) RETURN (u) $$) AS (u agtype); + +-- should only create 2 distinct rows but return 4, the extra 2 being duplicates +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "bar", "foo", "foo"] as n + MERGE (u {name: n})-[e:knows]->(v) + RETURN u, e, v $$) AS (u agtype, e agtype, v agtype); +-- should only return the same above 4 rows +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "bar", "foo", "foo"] as n + MERGE (u {name: n})-[e:knows]->(v) + RETURN u, e, v $$) AS (u agtype, e agtype, v agtype); +-- should only return 2 distinct rows from above +SELECT * FROM cypher('issue_1691', $$ MATCH (u)-[e]->(v) + RETURN u, e, v $$) AS (u agtype, e agtype, v agtype); + +SELECT * FROM cypher('issue_1691', $$MATCH ()-[e]->() DELETE e $$) AS (a agtype); +SELECT * FROM cypher('issue_1691', $$MATCH (u) DELETE u $$) AS (a agtype); + +-- should only create 1 record but return 2, one a dup of the other +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "foo"] AS each + MERGE (v:TEST {name: each}) + RETURN v $$) AS (v agtype); + +SELECT * FROM cypher('issue_1691', $$ MATCH (u) RETURN (u) $$) AS (u agtype); + +-- should just return 5 foo records that are all the same one +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "foo", "bar", "foo", "bar"] AS each + MERGE (v:TEST {name: "foo"}) + RETURN v $$) AS (v agtype); + +SELECT * FROM cypher('issue_1691', $$ MATCH (u) RETURN (u) $$) AS (u agtype); + +-- should just return 5 bar records that are all the same one +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "foo", "bar", "foo", "bar"] AS each + MERGE (v:TEST {name: "bar"}) + RETURN v $$) AS (v agtype); + +SELECT * FROM cypher('issue_1691', $$ MATCH (u) RETURN (u) $$) AS (u agtype); +SELECT * FROM cypher('issue_1691', $$MATCH (u) DELETE u $$) AS (a agtype); + +-- should create 2 rows foo->bar and bar->bar and the other 3 are just returning dups +SELECT * FROM cypher('issue_1691', $$ UNWIND ["foo", "bar", "foo", "foo", "bar"] as n + MERGE (u {name: n})-[e1:knows]->(v {name: "bar"})-[e2:knows]->(w) + RETURN u, e1, v, e2, w $$) AS (u agtype, e1 agtype, v agtype, e2 agtype, w agtype); + +-- clean up +SELECT * FROM cypher('issue_1691', $$MATCH ()-[e]->() DELETE e $$) AS (a agtype); +SELECT * FROM cypher('issue_1691', $$MATCH (u) DELETE u $$) AS (a agtype); ---clean up +-- +-- clean up graphs +-- SELECT * FROM cypher('cypher_merge', $$MATCH (n) DETACH DELETE n $$) AS (a agtype); SELECT * FROM cypher('issue_1630', $$MATCH (n) DETACH DELETE n $$) AS (a agtype); -/* - * Clean up graph - */ +-- +-- delete graphs +-- SELECT drop_graph('cypher_merge', true); SELECT drop_graph('issue_1630', true); +SELECT drop_graph('issue_1691', true); + +-- +-- End +-- diff --git a/src/backend/executor/cypher_merge.c b/src/backend/executor/cypher_merge.c index 7faa3f9a2..3cb2a15af 100644 --- a/src/backend/executor/cypher_merge.c +++ b/src/backend/executor/cypher_merge.c @@ -22,6 +22,40 @@ #include "catalog/ag_label.h" #include "executor/cypher_executor.h" #include "executor/cypher_utils.h" +#include "utils/datum.h" + +/* + * The following structure is used to hold a single vertex or edge component + * of a path. The smallest path is just a single vertex. + * + * Note: This structure is only useful for paths when stored in a dynamic + * array. + */ +typedef struct path_entry +{ + bool actual; /* actual tuple passed in for a vertex */ + cypher_rel_dir direction; /* the direction for the edge */ + graphid id; /* id of the vertex or edge */ + bool id_isNull; /* id isNull */ + graphid start_id; /* edge start id */ + graphid end_id; /* edge end id */ + Oid label; /* label oid */ + Datum prop; /* properties datum */ + bool prop_isNull; /* properties isNull */ + uint32 dih; /* datum_image_hash of properties datum */ +} path_entry; + +/* + * The following structure is used to hold a path_entry in a linked list. + * + * Note: The path_entry is stored as a pointer to a pointer. In this case + * the **path_entry is a dynamic array of path_entry elements. + */ +typedef struct created_path +{ + struct created_path *next; /* next link in linked list of path_entrys */ + struct path_entry **entry; /* path_entry array for this link */ +} created_path; static void begin_cypher_merge(CustomScanState *node, EState *estate, int eflags); @@ -29,14 +63,19 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node); static void end_cypher_merge(CustomScanState *node); static void rescan_cypher_merge(CustomScanState *node); static Datum merge_vertex(cypher_merge_custom_scan_state *css, - cypher_target_node *node, ListCell *next, List* list); + cypher_target_node *node, ListCell *next, List* list, + path_entry **path_array, int path_index, + bool should_insert); static void merge_edge(cypher_merge_custom_scan_state *css, cypher_target_node *node, Datum prev_vertex_id, - ListCell *next, List *list); + ListCell *next, List *list, + path_entry **path_array, int path_index, + bool should_insert); static void process_simple_merge(CustomScanState *node); static bool check_path(cypher_merge_custom_scan_state *css, TupleTableSlot *slot); -static void process_path(cypher_merge_custom_scan_state *css); +static void process_path(cypher_merge_custom_scan_state *css, + path_entry **path_array, bool should_insert); static void mark_tts_isnull(TupleTableSlot *slot); const CustomExecMethods cypher_merge_exec_methods = {MERGE_SCAN_STATE_NAME, @@ -47,6 +86,13 @@ const CustomExecMethods cypher_merge_exec_methods = {MERGE_SCAN_STATE_NAME, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL}; +static path_entry **prebuild_path(CustomScanState *node); +static bool compare_2_paths(path_entry **lhs, path_entry **rhs, + int path_length); +static path_entry **find_duplicate_path(CustomScanState *node, + path_entry **path_array); +static void free_path_entry_array(path_entry **path_array, int length); + /* * Initializes the MERGE Execution Node at the beginning of the execution * phase. @@ -58,6 +104,7 @@ static void begin_cypher_merge(CustomScanState *node, EState *estate, (cypher_merge_custom_scan_state *)node; ListCell *lc = NULL; Plan *subplan = NULL; + css->created_paths_list = NULL; Assert(list_length(css->cs->custom_plans) == 1); @@ -192,7 +239,8 @@ static bool check_path(cypher_merge_custom_scan_state *css, return false; } -static void process_path(cypher_merge_custom_scan_state *css) +static void process_path(cypher_merge_custom_scan_state *css, + path_entry **path_array, bool should_insert) { cypher_create_path *path = css->path; List *list = path->target_nodes; @@ -202,7 +250,8 @@ static void process_path(cypher_merge_custom_scan_state *css) * Create the first vertex. The create_vertex function will * create the rest of the path, if necessary. */ - merge_vertex(css, lfirst(lc), lnext(list, lc), list); + merge_vertex(css, lfirst(lc), lnext(list, lc), list, + path_array, 0, should_insert); /* * If this path is a variable, take the list that was accumulated @@ -264,7 +313,7 @@ static void process_simple_merge(CustomScanState *node) /* setup the scantuple that the process_path needs */ econtext->ecxt_scantuple = sss->ss.ss_ScanTupleSlot; - process_path(css); + process_path(css, NULL, true); } } @@ -290,6 +339,217 @@ static void mark_tts_isnull(TupleTableSlot *slot) } } +/* helper function to free a path_entry array given its length */ +static void free_path_entry_array(path_entry **path_array, int length) +{ + int index; + + for (index = 0; index < length; index++) + { + pfree(path_array[index]); + } +} + +/* + * Helper function to prebuild a path. The user needs to free the returned + * path_entry when done. + * + * Note: The prebuilt path and its components are not filled out completely by + * this function. merge_vertex and merge_edge will/should fill out the + * rest. This is because the ID fields autoincrement the next available ID + * when evaluated AND the generated prebuilt path might not be used. + */ +static path_entry **prebuild_path(CustomScanState *node) +{ + cypher_merge_custom_scan_state *css = + (cypher_merge_custom_scan_state *)node; + List *nodes = css->path->target_nodes; + int path_length = list_length(nodes); + ListCell *lc = NULL; + ExprContext *econtext = css->css.ss.ps.ps_ExprContext; + int counter = 0; + + path_entry **path_array = NULL; + path_array = palloc0(sizeof(path_entry *) * path_length); + + /* iterate through the path, partially prebuilding it */ + foreach (lc, nodes) + { + /* get the node/edge and allocate the memory needed */ + cypher_target_node *node = lfirst(lc); + path_entry *entry = palloc0(sizeof(path_entry)); + + /* if this isn't an actual passed in tuple */ + if (CYPHER_TARGET_NODE_INSERT_ENTITY(node->flags)) + { + bool isNull = false; + + entry->actual = false; + entry->id = 0; + entry->id_isNull = true; + entry->direction = node->dir; + entry->label = node->relid; + entry->prop = ExecEvalExprSwitchContext(node->prop_expr_state, + econtext, &isNull); + entry->prop_isNull = isNull; + entry->dih = datum_image_hash(entry->prop, false, -1); + } + /* otherwise, it is */ + else + { + EState *estate = css->css.ss.ps.state; + TupleTableSlot *scanTupleSlot = econtext->ecxt_scantuple; + + agtype *agt = NULL; + Datum d; + agtype_value *agtv_vertex = NULL; + agtype_value *agtv_id = NULL; + + /* check that the variable isn't NULL */ + if (scanTupleSlot->tts_isnull[node->tuple_position - 1]) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Existing variable %s cannot be NULL in MERGE clause", + node->variable_name))); + } + + /* get the vertex agtype in the scanTupleSlot */ + d = scanTupleSlot->tts_values[node->tuple_position - 1]; + agt = DATUM_GET_AGTYPE_P(d); + + /* Convert to an agtype value */ + agtv_vertex = get_ith_agtype_value_from_container(&agt->root, 0); + + if (agtv_vertex->type != AGTV_VERTEX) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("agtype must resolve to a vertex"))); + } + + /* extract the id agtype field */ + agtv_id = GET_AGTYPE_VALUE_OBJECT_VALUE(agtv_vertex, "id"); + + /* set the necessary entry fields - actual & id */ + entry->actual = true; + entry->id = (graphid) agtv_id->val.int_value; + entry->id_isNull = false; + entry->prop = 0; + entry->prop_isNull = true; + entry->dih = 0; + + if (!SAFE_TO_SKIP_EXISTENCE_CHECK(node->flags)) + { + if (!entity_exists(estate, css->graph_oid, entry->id)) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("vertex assigned to variable %s was deleted", + node->variable_name))); + } + } + } + + /* save the pointer and move to the next */ + path_array[counter++] = entry; + } + + return path_array; +} + +/* + * Helper function to compare 2 paths. By definition, paths don't know + * specifics, so this comparison is somewhat generic. + */ +static bool compare_2_paths(path_entry **lhs, path_entry **rhs, int path_length) +{ + int i; + + /* iterate through the entire path, returning false for any mismatch */ + for (i = 0; i < path_length; i++) + { + /* if these are actual vertices (passed in from a variable) */ + if (lhs[i]->actual == rhs[i]->actual && + lhs[i]->actual == true) + { + /* just check the IDs */ + if (lhs[i]->id != rhs[i]->id) + { + return false; + } + else + { + continue; + } + } + + /* are the labels the same */ + if (lhs[i]->label != rhs[i]->label) + { + return false; + } + + /* are the directions the same */ + if (lhs[i]->direction != rhs[i]->direction) + { + return false; + } + + /* are the properties datum hashes the same */ + if (lhs[i]->dih != rhs[i]->dih) + { + return false; + } + + /* are the properties datum images the same */ + if (!datum_image_eq(lhs[i]->prop, rhs[i]->prop, false, -1)) + { + return false; + } + } + + /* no mismatches so it must match */ + return true; +} + +/* helper function to find a duplicate path in the created_paths_list */ +static path_entry **find_duplicate_path(CustomScanState *node, + path_entry **path_array) +{ + cypher_merge_custom_scan_state *css = + (cypher_merge_custom_scan_state *)node; + int path_length = list_length(css->path->target_nodes); + + /* if the list is NULL just return NULL */ + if (css->created_paths_list == NULL) + { + return NULL; + } + /* otherwise, check to see if the path already exists */ + else + { + /* set to the top of the list */ + created_path *curr_path = css->created_paths_list; + + /* iterate through our list of created paths */ + while (curr_path != NULL) + { + /* if we have found the entry, return it */ + if (compare_2_paths(path_array, curr_path->entry, path_length)) + { + return curr_path->entry; + } + + /* otherwise, get the next path */ + curr_path = curr_path->next; + } + } + + /* if we didn't find it, return NULL */ + return NULL; +} + /* * Function that is called mid-execution. This function will call * its subtree in the execution tree, and depending on the results @@ -351,9 +611,68 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node) econtext->ecxt_scantuple = node->ss.ps.lefttree->ps_ProjInfo->pi_exprContext->ecxt_scantuple; + /* + * Check the subtree to see if the lateral join representing the + * MERGE path found results. If not, we need to create the path + */ if (check_path(css, econtext->ecxt_scantuple)) { - process_path(css); + path_entry **prebuilt_path_array = NULL; + path_entry **found_path_array = NULL; + int path_length = list_length(css->path->target_nodes); + + /* + * Prebuild our path and verify that it wasn't already created. + * + * Note: This is currently only needed when there is a previous + * clause. This is due to the fact that MERGE can't see + * what it has just created. This isn't due to transaction + * or command ids, it's due to the join's scan not being + * able to add in the newly inserted tuples and rescan + * with these tuples. + * + * Note: The prebuilt path is purposely generic as it needs to + * only match a path. The more specific items will be + * added by merge_vertex and merge_edge if it is inserted. + * + * Note: The IDs are purposely not created here because we may + * need to throw them away if a path was previously + * created. Remember, the IDs are automatically + * incremented when fetched. + */ + prebuilt_path_array = prebuild_path(node); + + found_path_array = find_duplicate_path(node, + prebuilt_path_array); + + /* if found we don't need to insert anything, just reuse it */ + if (found_path_array) + { + /* we don't need our prebuilt path anymore */ + free_path_entry_array(prebuilt_path_array, path_length); + + /* as this path exists, we don't need to insert it */ + process_path(css, found_path_array, false); + } + /* otherwise, we need to insert the new, prebuilt, path */ + else + { + created_path *new_path = palloc0(sizeof(created_path)); + + /* build the next linked list entry for our created_paths */ + new_path = palloc0(sizeof(created_path)); + new_path->next = css->created_paths_list; + new_path->entry = prebuilt_path_array; + + /* we need to push our prebuilt path onto the list */ + css->created_paths_list = new_path; + + /* + * We need to pass in the prebuilt path so that it can get + * filled in with more specific information + */ + process_path(css, prebuilt_path_array, true); + } } } while (terminal); @@ -365,6 +684,7 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node) } econtext->ecxt_scantuple = ExecProject(node->ss.ps.lefttree->ps_ProjInfo); + return ExecProject(node->ss.ps.ps_ProjInfo); } @@ -494,7 +814,7 @@ static TupleTableSlot *exec_cypher_merge(CustomScanState *node) econtext->ecxt_scantuple = sss->ss.ss_ScanTupleSlot; // create the path - process_path(css); + process_path(css, NULL, true); // mark the create_new_path flag to true. css->created_new_path = true; @@ -538,6 +858,7 @@ static void end_cypher_merge(CustomScanState *node) (cypher_merge_custom_scan_state *)node; cypher_create_path *path = css->path; ListCell *lc = NULL; + int path_length = list_length(path->target_nodes); // increment the command counter CommandCounterIncrement(); @@ -560,6 +881,25 @@ static void end_cypher_merge(CustomScanState *node) table_close(cypher_node->resultRelInfo->ri_RelationDesc, RowExclusiveLock); } + + /* free up our created paths lists */ + while (css->created_paths_list != NULL) + { + created_path *next = css->created_paths_list->next; + path_entry **entry = css->created_paths_list->entry; + + /* free up the path array elements */ + free_path_entry_array(entry, path_length); + + /* free up the array container */ + pfree(entry); + + /* free up the created_path container */ + pfree(css->created_paths_list); + + css->created_paths_list = next; + } + } /* @@ -616,7 +956,9 @@ Node *create_cypher_merge_plan_state(CustomScan *cscan) * the create_edge function. */ static Datum merge_vertex(cypher_merge_custom_scan_state *css, - cypher_target_node *node, ListCell *next, List *list) + cypher_target_node *node, ListCell *next, List *list, + path_entry **path_array, int path_index, + bool should_insert) { bool isNull; Datum id; @@ -652,13 +994,70 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css, ExecClearTuple(elemTupleSlot); - /* get the next graphid for this vertex */ - id = ExecEvalExpr(node->id_expr_state, econtext, &isNull); + /* if we not are going to insert, we need our structure pointers */ + if (should_insert == false && + (path_array == NULL || path_array[path_index] == NULL)) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid input parameter combination"))); + } + + /* + * If we shouldn't insert the vertex, we need to retrieve it from the + * storage structure. + */ + if (should_insert == false && + path_array != NULL && + path_array[path_index] != NULL) + { + id = path_array[path_index]->id; + isNull = path_array[path_index]->id_isNull; + } + /* + * Otherwise, we need to retrieve the vertex normally and store its + * unique values if the storage structure exists. + */ + else if (should_insert == true) + { + /* get the next graphid for this vertex */ + id = ExecEvalExpr(node->id_expr_state, econtext, &isNull); + + if (path_array != NULL && path_array[path_index] != NULL) + { + /* store it */ + path_array[path_index]->id = id; + path_array[path_index]->id_isNull = isNull; + } + } + else + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid input parameter combination"))); + } + + /* put the id values into the tuple slot */ elemTupleSlot->tts_values[vertex_tuple_id] = id; elemTupleSlot->tts_isnull[vertex_tuple_id] = isNull; - /* get the properties for this vertex */ - prop = ExecEvalExpr(node->prop_expr_state, econtext, &isNull); + /* + * Retrieve the properties and isNull values if the storage structure + * exists. + */ + if (path_array != NULL && path_array[path_index] != NULL) + { + prop = path_array[path_index]->prop; + isNull = path_array[path_index]->prop_isNull; + } + /* otherwise, get them normally */ + else + { + /* get the properties for this vertex */ + prop = ExecEvalExpr(node->prop_expr_state, econtext, &isNull); + } + + /* put the prop values into the tuple slot */ elemTupleSlot->tts_values[vertex_tuple_properties] = prop; elemTupleSlot->tts_isnull[vertex_tuple_properties] = isNull; @@ -686,7 +1085,8 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css, * following command to see the updates generated by this instance of * merge. */ - if (css->base_currentCommandId == GetCurrentCommandId(false)) + if (should_insert && + css->base_currentCommandId == GetCurrentCommandId(false)) { insert_entity_tuple(resultRelInfo, elemTupleSlot, estate); @@ -698,7 +1098,7 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css, */ CommandCounterIncrement(); } - else + else if (should_insert) { insert_entity_tuple_cid(resultRelInfo, elemTupleSlot, estate, css->base_currentCommandId); @@ -755,6 +1155,26 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css, } } } + /* + * If we have the storage structure pointers, we have already retrieved the + * ID from the datum in the scan tuple, so just retrieve it from the + * structure. + */ + else if (path_array != NULL && path_array[path_index] != NULL) + { + /* retrieve the id of the vertex */ + id = path_array[path_index]->id; + + /* + * Add the Datum to the list of entities for creating the path variable + */ + if (CYPHER_TARGET_NODE_IN_PATH(node->flags)) + { + Datum vertex = scanTupleSlot->tts_values[node->tuple_position - 1]; + css->path_values = lappend(css->path_values, + DatumGetPointer(vertex)); + } + } else { agtype *a = NULL; @@ -827,7 +1247,8 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css, /* If the path continues, create the next edge, passing the vertex's id. */ if (next != NULL) { - merge_edge(css, lfirst(next), id, lnext(list, next), list); + merge_edge(css, lfirst(next), id, lnext(list, next), list, + path_array, path_index+1, should_insert); } return id; @@ -838,7 +1259,9 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css, */ static void merge_edge(cypher_merge_custom_scan_state *css, cypher_target_node *node, Datum prev_vertex_id, - ListCell *next, List *list) + ListCell *next, List *list, + path_entry **path_array, int path_index, + bool should_insert) { bool isNull; EState *estate = css->css.ss.ps.state; @@ -859,7 +1282,8 @@ static void merge_edge(cypher_merge_custom_scan_state *css, * next vertex's id. */ css->path_values = NIL; - next_vertex_id = merge_vertex(css, lfirst(next), lnext(list, next), list); + next_vertex_id = merge_vertex(css, lfirst(next), lnext(list, next), list, + path_array, path_index+1, should_insert); /* * Set the start and end vertex ids @@ -897,26 +1321,90 @@ static void merge_edge(cypher_merge_custom_scan_state *css, ExecClearTuple(elemTupleSlot); - // Graph Id for the edge - id = ExecEvalExpr(node->id_expr_state, econtext, &isNull); + /* if we not are going to insert, we need our structure pointers */ + if (should_insert == false && + (path_array == NULL || path_array[path_index] == NULL)) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid input parameter combination"))); + } + + /* + * If we shouldn't insert the edge, we need to retrieve the entire edge from + * the storage structure. + */ + if (should_insert == false && + path_array != NULL && + path_array[path_index] != NULL) + { + id = path_array[path_index]->id; + isNull = path_array[path_index]->id_isNull; + start_id = path_array[path_index]->start_id; + end_id = path_array[path_index]->end_id; + } + /* + * Otherwise, we need to get the edge's ID and store its unique values if + * the storage structure exists + */ + else if (should_insert == true) + { + /* get the next graphid for this edge */ + id = ExecEvalExpr(node->id_expr_state, econtext, &isNull); + + if (path_array != NULL && path_array[path_index] != NULL) + { + /* store it */ + path_array[path_index]->id = id; + path_array[path_index]->id_isNull = isNull; + path_array[path_index]->start_id = start_id; + path_array[path_index]->end_id = end_id; + } + } + else + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid input parameter combination"))); + } + + /* put the id values into the tuple slot */ elemTupleSlot->tts_values[edge_tuple_id] = id; elemTupleSlot->tts_isnull[edge_tuple_id] = isNull; - // Graph id for the starting vertex + /* Graph id for the starting vertex */ elemTupleSlot->tts_values[edge_tuple_start_id] = start_id; elemTupleSlot->tts_isnull[edge_tuple_start_id] = false; - // Graph id for the ending vertex + /* Graph id for the ending vertex */ elemTupleSlot->tts_values[edge_tuple_end_id] = end_id; elemTupleSlot->tts_isnull[edge_tuple_end_id] = false; - // Edge's properties map - prop = ExecEvalExpr(node->prop_expr_state, econtext, &isNull); + /* + * Retrieve the properties and isNull values if the storage structure + * exists. + */ + if (path_array != NULL && path_array[path_index] != NULL) + { + prop = path_array[path_index]->prop; + isNull = path_array[path_index]->prop_isNull; + } + /* otherwise, get them normally */ + else + { + /* get the properties for this edge */ + prop = ExecEvalExpr(node->prop_expr_state, econtext, &isNull); + } + + /* store the properties in the tuple slot */ elemTupleSlot->tts_values[edge_tuple_properties] = prop; elemTupleSlot->tts_isnull[edge_tuple_properties] = isNull; - // Insert the new edge - insert_entity_tuple(resultRelInfo, elemTupleSlot, estate); + /* Insert the edge, if it is a new edge */ + if (should_insert) + { + insert_entity_tuple(resultRelInfo, elemTupleSlot, estate); + } /* restore the old result relation info */ estate->es_result_relations = old_estate_es_result_relations; diff --git a/src/include/executor/cypher_utils.h b/src/include/executor/cypher_utils.h index f758ca481..5a8812ab0 100644 --- a/src/include/executor/cypher_utils.h +++ b/src/include/executor/cypher_utils.h @@ -106,6 +106,7 @@ typedef struct cypher_merge_custom_scan_state bool created_new_path; bool found_a_path; CommandId base_currentCommandId; + struct created_path *created_paths_list; } cypher_merge_custom_scan_state; TupleTableSlot *populate_vertex_tts(TupleTableSlot *elemTupleSlot,