From 9be7e2ffc591e78074234ef38a336db9f5db2f57 Mon Sep 17 00:00:00 2001 From: Muhammad Taha Naveed Date: Wed, 14 Aug 2024 23:34:04 +0500 Subject: [PATCH] Revamp age csv loader (#2044) * Allow 0 as entry_id - No regression test were impacted by this change. * Use batch inserts to improve performance - Changed heap_insert to heap_multi_insert since it is faster than calling heap_insert() in a loop. When multiple tuples can be inserted on a single page, just a single WAL record covering all of them, and only need to lock/unlock the page once. - BATCH_SIZE is set to 1000, which is the number of tuples to insert in a single batch. This number was chosen after some experimentation. - Change some of the field names to avoid confusion. * Use sequence for generating ids for edge and vertex - Sequence is not used if the id_field_exists is true in load_labels_from_file function, since the entry id is present in the csv. * Add function to create temporary table for ids - Created a temporary table and populate it with already generated vertex ids. A unique index is created on id column to ensure that new ids generated (using entry id from csv) are unique. * Insert generated ids in the temporary table to enforce uniqueness - Insert ids in the temporary table and also update index to enforce uniqueness. - If the entry id provided in the CSV is greater than the current sequence value, the sequence value is updated to match the entry ID. For example: Suppose the current sequence value is 1, and the CSV entry ID is 2. If we use 2 but not update the sequence to 2, next time the CREATE clause is used, 2 will be returned by sequence as an entry id, resulting in duplicate. - Update batch functions * Add functions to create graph and label automatically - These functions will check existence of graph and label, and create them if they don't exist. * Add regression tests --- regress/expected/age_load.out | 144 +++++++++--- regress/sql/age_load.sql | 77 +++++- src/backend/catalog/ag_label.c | 5 + src/backend/commands/graph_commands.c | 32 ++- src/backend/utils/load/ag_load_edges.c | 133 +++++++++-- src/backend/utils/load/ag_load_labels.c | 297 ++++++++++++++++++++++-- src/backend/utils/load/age_load.c | 126 +++++++++- src/include/catalog/ag_label.h | 2 + src/include/commands/graph_commands.h | 1 + src/include/utils/graphid.h | 2 +- src/include/utils/load/ag_load_edges.h | 12 +- src/include/utils/load/ag_load_labels.h | 11 +- src/include/utils/load/age_load.h | 17 ++ 13 files changed, 749 insertions(+), 110 deletions(-) diff --git a/regress/expected/age_load.out b/regress/expected/age_load.out index 8635a499b..b638e636b 100644 --- a/regress/expected/age_load.out +++ b/regress/expected/age_load.out @@ -19,6 +19,7 @@ \! cp -r regress/age_load/data regress/instance/data/age_load LOAD 'age'; SET search_path TO ag_catalog; +-- Create a country using CREATE clause SELECT create_graph('agload_test_graph'); NOTICE: graph "agload_test_graph" has been created create_graph @@ -26,34 +27,79 @@ NOTICE: graph "agload_test_graph" has been created (1 row) -SELECT create_vlabel('agload_test_graph','Country'); -NOTICE: VLabel "Country" has been created - create_vlabel ---------------- - +SELECT * FROM cypher('agload_test_graph', $$CREATE (n:Country {__id__:1}) RETURN n$$) as (n agtype); + n +---------------------------------------------------------------------------------- + {"id": 844424930131969, "label": "Country", "properties": {"__id__": 1}}::vertex (1 row) +-- +-- Load countries with id +-- SELECT load_labels_from_file('agload_test_graph', 'Country', - 'age_load/countries.csv'); + 'age_load/countries.csv', true); load_labels_from_file ----------------------- (1 row) -SELECT create_vlabel('agload_test_graph','City'); -NOTICE: VLabel "City" has been created - create_vlabel ---------------- - +-- A temporary table should have been created with 54 ids; 1 from CREATE and 53 from file +SELECT COUNT(*)=54 FROM "_agload_test_graph_ag_vertex_ids"; + ?column? +---------- + t +(1 row) + +-- Sequence should be equal to max entry id i.e. 248 +SELECT currval('agload_test_graph."Country_id_seq"')=248; + ?column? +---------- + t (1 row) +-- Should error out on loading the same file again due to duplicate id +SELECT load_labels_from_file('agload_test_graph', 'Country', + 'age_load/countries.csv', true); +ERROR: Cannot insert duplicate vertex id: 844424930131970 +HINT: Entry id 2 is already used +-- +-- Load cities with id +-- +-- Should create City label automatically and load cities SELECT load_labels_from_file('agload_test_graph', 'City', - 'age_load/cities.csv'); + 'age_load/cities.csv', true); +NOTICE: VLabel "City" has been created load_labels_from_file ----------------------- (1 row) +-- Temporary table should have 54+72485 rows now +SELECT COUNT(*)=54+72485 FROM "_agload_test_graph_ag_vertex_ids"; + ?column? +---------- + t +(1 row) + +-- Sequence should be equal to max entry id i.e. 146941 +SELECT currval('agload_test_graph."City_id_seq"')=146941; + ?column? +---------- + t +(1 row) + +-- Should error out on loading the same file again due to duplicate id +SELECT load_labels_from_file('agload_test_graph', 'City', + 'age_load/cities.csv', true); +ERROR: Cannot insert duplicate vertex id: 1125899906842777 +HINT: Entry id 153 is already used +-- +-- Load edges -- Connects cities to countries +-- +-- Should error out for using vertex label +SELECT load_edges_from_file('agload_test_graph', 'Country', + 'age_load/edges.csv'); +ERROR: label "Country" already exists as edge label SELECT create_elabel('agload_test_graph','has_city'); NOTICE: ELabel "has_city" has been created create_elabel @@ -68,6 +114,17 @@ SELECT load_edges_from_file('agload_test_graph', 'has_city', (1 row) +-- Sequence should be equal to number of edges loaded i.e. 72485 +SELECT currval('agload_test_graph."has_city_id_seq"')=72485; + ?column? +---------- + t +(1 row) + +-- Should error out for using edge label +SELECT load_labels_from_file('agload_test_graph', 'has_city', + 'age_load/cities.csv'); +ERROR: label "has_city" already exists as vertex label SELECT table_catalog, table_schema, lower(table_name) as table_name, table_type FROM information_schema.tables WHERE table_schema = 'agload_test_graph' ORDER BY table_name ASC; @@ -83,7 +140,7 @@ WHERE table_schema = 'agload_test_graph' ORDER BY table_name ASC; SELECT COUNT(*) FROM agload_test_graph."Country"; count ------- - 53 + 54 (1 row) SELECT COUNT(*) FROM agload_test_graph."City"; @@ -101,7 +158,7 @@ SELECT COUNT(*) FROM agload_test_graph."has_city"; SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH(n) RETURN n$$) as (n agtype); count ------- - 72538 + 72539 (1 row) SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH (a)-[e]->(b) RETURN e$$) as (n agtype); @@ -110,6 +167,17 @@ SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH (a)-[e]->(b) RETURN e$$ 72485 (1 row) +-- +-- Load countries and cities without id +-- +-- Should load countries in Country label without error since it should use sequence now +SELECT load_labels_from_file('agload_test_graph', 'Country', + 'age_load/countries.csv', false); + load_labels_from_file +----------------------- + +(1 row) + SELECT create_vlabel('agload_test_graph','Country2'); NOTICE: VLabel "Country2" has been created create_vlabel @@ -153,6 +221,7 @@ SELECT COUNT(*) FROM agload_test_graph."City2"; SELECT id FROM agload_test_graph."Country" LIMIT 10; id ----------------- + 844424930131969 844424930131970 844424930131971 844424930131974 @@ -162,7 +231,6 @@ SELECT id FROM agload_test_graph."Country" LIMIT 10; 844424930131996 844424930132002 844424930132023 - 844424930132025 (10 rows) SELECT id FROM agload_test_graph."Country2" LIMIT 10; @@ -180,13 +248,16 @@ SELECT id FROM agload_test_graph."Country2" LIMIT 10; 1688849860263946 (10 rows) +-- Should return 2 rows for Country with same properties, but different ids SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country {iso2 : 'BE'}) RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, "n.iso2" agtype); id(n) | n.name | n.iso2 -----------------+-----------+-------- 844424930131990 | "Belgium" | "BE" -(1 row) + 844424930132223 | "Belgium" | "BE" +(2 rows) +-- Should return 1 row SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'BE'}) RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, "n.iso2" agtype); id(n) | n.name | n.iso2 @@ -194,13 +265,16 @@ SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'BE'}) 1688849860263942 | "Belgium" | "BE" (1 row) +-- Should return 2 rows for Country with same properties, but different ids SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country {iso2 : 'AT'}) RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, "n.iso2" agtype); id(n) | n.name | n.iso2 -----------------+-----------+-------- 844424930131983 | "Austria" | "AT" -(1 row) + 844424930132221 | "Austria" | "AT" +(2 rows) +-- Should return 1 row SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'AT'}) RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, "n.iso2" agtype); id(n) | n.name | n.iso2 @@ -208,14 +282,23 @@ SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'AT'}) 1688849860263940 | "Austria" | "AT" (1 row) +-- Should return 2 rows for Country with same properties, but different ids SELECT * FROM cypher('agload_test_graph', $$ MATCH (u:Country {region : "Europe"}) WHERE u.name =~ 'Cro.*' - RETURN u.name, u.region -$$) AS (result_1 agtype, result_2 agtype); - result_1 | result_2 ------------+---------- - "Croatia" | "Europe" + RETURN id(u), u.name, u.region +$$) AS ("id(u)" agtype, result_1 agtype, result_2 agtype); + id(u) | result_1 | result_2 +-----------------+-----------+---------- + 844424930132023 | "Croatia" | "Europe" + 844424930132226 | "Croatia" | "Europe" +(2 rows) + +-- There shouldn't be any duplicates +SELECT * FROM cypher('agload_test_graph', $$return graph_stats('agload_test_graph')$$) as (a agtype); + a +------------------------------------------------------------------------------------------ + {"graph": "agload_test_graph", "num_loaded_edges": 72485, "num_loaded_vertices": 145130} (1 row) SELECT drop_graph('agload_test_graph', true); @@ -236,22 +319,11 @@ NOTICE: graph "agload_test_graph" has been dropped -- -- Test property type conversion -- -SELECT create_graph('agload_conversion'); -NOTICE: graph "agload_conversion" has been created - create_graph --------------- - -(1 row) - -- vertex: load as agtype -SELECT create_vlabel('agload_conversion','Person1'); -NOTICE: VLabel "Person1" has been created - create_vlabel ---------------- - -(1 row) - +-- Should create graph and label automatically SELECT load_labels_from_file('agload_conversion', 'Person1', 'age_load/conversion_vertices.csv', true, true); +NOTICE: graph "agload_conversion" has been created +NOTICE: VLabel "Person1" has been created load_labels_from_file ----------------------- diff --git a/regress/sql/age_load.sql b/regress/sql/age_load.sql index cee34f59c..425ca5417 100644 --- a/regress/sql/age_load.sql +++ b/regress/sql/age_load.sql @@ -22,20 +22,65 @@ LOAD 'age'; SET search_path TO ag_catalog; + +-- Create a country using CREATE clause SELECT create_graph('agload_test_graph'); -SELECT create_vlabel('agload_test_graph','Country'); +SELECT * FROM cypher('agload_test_graph', $$CREATE (n:Country {__id__:1}) RETURN n$$) as (n agtype); + +-- +-- Load countries with id +-- +SELECT load_labels_from_file('agload_test_graph', 'Country', + 'age_load/countries.csv', true); + +-- A temporary table should have been created with 54 ids; 1 from CREATE and 53 from file +SELECT COUNT(*)=54 FROM "_agload_test_graph_ag_vertex_ids"; + +-- Sequence should be equal to max entry id i.e. 248 +SELECT currval('agload_test_graph."Country_id_seq"')=248; + +-- Should error out on loading the same file again due to duplicate id SELECT load_labels_from_file('agload_test_graph', 'Country', - 'age_load/countries.csv'); + 'age_load/countries.csv', true); + +-- +-- Load cities with id +-- + +-- Should create City label automatically and load cities +SELECT load_labels_from_file('agload_test_graph', 'City', + 'age_load/cities.csv', true); + +-- Temporary table should have 54+72485 rows now +SELECT COUNT(*)=54+72485 FROM "_agload_test_graph_ag_vertex_ids"; -SELECT create_vlabel('agload_test_graph','City'); +-- Sequence should be equal to max entry id i.e. 146941 +SELECT currval('agload_test_graph."City_id_seq"')=146941; + +-- Should error out on loading the same file again due to duplicate id SELECT load_labels_from_file('agload_test_graph', 'City', - 'age_load/cities.csv'); + 'age_load/cities.csv', true); + +-- +-- Load edges -- Connects cities to countries +-- + +-- Should error out for using vertex label +SELECT load_edges_from_file('agload_test_graph', 'Country', + 'age_load/edges.csv'); SELECT create_elabel('agload_test_graph','has_city'); SELECT load_edges_from_file('agload_test_graph', 'has_city', 'age_load/edges.csv'); +-- Sequence should be equal to number of edges loaded i.e. 72485 +SELECT currval('agload_test_graph."has_city_id_seq"')=72485; + +-- Should error out for using edge label +SELECT load_labels_from_file('agload_test_graph', 'has_city', + 'age_load/cities.csv'); + SELECT table_catalog, table_schema, lower(table_name) as table_name, table_type FROM information_schema.tables WHERE table_schema = 'agload_test_graph' ORDER BY table_name ASC; @@ -48,6 +93,14 @@ SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH(n) RETURN n$$) as (n ag SELECT COUNT(*) FROM cypher('agload_test_graph', $$MATCH (a)-[e]->(b) RETURN e$$) as (n agtype); +-- +-- Load countries and cities without id +-- + +-- Should load countries in Country label without error since it should use sequence now +SELECT load_labels_from_file('agload_test_graph', 'Country', + 'age_load/countries.csv', false); + SELECT create_vlabel('agload_test_graph','Country2'); SELECT load_labels_from_file('agload_test_graph', 'Country2', 'age_load/countries.csv', false); @@ -62,31 +115,39 @@ SELECT COUNT(*) FROM agload_test_graph."City2"; SELECT id FROM agload_test_graph."Country" LIMIT 10; SELECT id FROM agload_test_graph."Country2" LIMIT 10; +-- Should return 2 rows for Country with same properties, but different ids SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country {iso2 : 'BE'}) RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, "n.iso2" agtype); +-- Should return 1 row SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'BE'}) RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, "n.iso2" agtype); +-- Should return 2 rows for Country with same properties, but different ids SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country {iso2 : 'AT'}) RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, "n.iso2" agtype); +-- Should return 1 row SELECT * FROM cypher('agload_test_graph', $$MATCH(n:Country2 {iso2 : 'AT'}) RETURN id(n), n.name, n.iso2 $$) as ("id(n)" agtype, "n.name" agtype, "n.iso2" agtype); +-- Should return 2 rows for Country with same properties, but different ids SELECT * FROM cypher('agload_test_graph', $$ MATCH (u:Country {region : "Europe"}) WHERE u.name =~ 'Cro.*' - RETURN u.name, u.region -$$) AS (result_1 agtype, result_2 agtype); + RETURN id(u), u.name, u.region +$$) AS ("id(u)" agtype, result_1 agtype, result_2 agtype); + +-- There shouldn't be any duplicates +SELECT * FROM cypher('agload_test_graph', $$return graph_stats('agload_test_graph')$$) as (a agtype); SELECT drop_graph('agload_test_graph', true); -- -- Test property type conversion -- -SELECT create_graph('agload_conversion'); -- vertex: load as agtype -SELECT create_vlabel('agload_conversion','Person1'); + +-- Should create graph and label automatically SELECT load_labels_from_file('agload_conversion', 'Person1', 'age_load/conversion_vertices.csv', true, true); SELECT * FROM cypher('agload_conversion', $$ MATCH (n:Person1) RETURN properties(n) $$) as (a agtype); diff --git a/src/backend/catalog/ag_label.c b/src/backend/catalog/ag_label.c index 6cf2a552a..5ca2cdaf8 100644 --- a/src/backend/catalog/ag_label.c +++ b/src/backend/catalog/ag_label.c @@ -174,6 +174,11 @@ char get_label_kind(const char *label_name, Oid label_graph) } } +char *get_label_seq_relation_name(const char *label_name) +{ + return psprintf("%s_id_seq", label_name); +} + PG_FUNCTION_INFO_V1(_label_name); /* diff --git a/src/backend/commands/graph_commands.c b/src/backend/commands/graph_commands.c index 3d51e67aa..3e2bffd9a 100644 --- a/src/backend/commands/graph_commands.c +++ b/src/backend/commands/graph_commands.c @@ -42,6 +42,7 @@ #include "catalog/ag_label.h" #include "commands/label_commands.h" #include "utils/graphid.h" +#include "commands/graph_commands.h" #include "utils/name_validation.h" /* @@ -60,10 +61,7 @@ PG_FUNCTION_INFO_V1(create_graph); /* function that is evoked for creating a graph */ Datum create_graph(PG_FUNCTION_ARGS) { - char *graph; Name graph_name; - char *graph_name_str; - Oid nsp_id; /* if no argument is passed with the function, graph name cannot be null */ if (PG_ARGISNULL(0)) @@ -75,6 +73,23 @@ Datum create_graph(PG_FUNCTION_ARGS) /* gets graph name as function argument */ graph_name = PG_GETARG_NAME(0); + create_graph_internal(graph_name); + + ereport(NOTICE, + (errmsg("graph \"%s\" has been created", NameStr(*graph_name)))); + + /* + * According to postgres specification of c-language functions + * if function returns void this is the syntax. + */ + PG_RETURN_VOID(); +} + +Oid create_graph_internal(const Name graph_name) +{ + Oid nsp_id; + char *graph_name_str; + graph_name_str = NameStr(*graph_name); /* checking if the name of the graph falls under the pre-decided graph naming conventions(regex) */ @@ -101,15 +116,10 @@ Datum create_graph(PG_FUNCTION_ARGS) CommandCounterIncrement(); /* Create the default label tables */ - graph = graph_name->data; - create_label(graph, AG_DEFAULT_LABEL_VERTEX, LABEL_TYPE_VERTEX, NIL); - create_label(graph, AG_DEFAULT_LABEL_EDGE, LABEL_TYPE_EDGE, NIL); + create_label(graph_name_str, AG_DEFAULT_LABEL_VERTEX, LABEL_TYPE_VERTEX, NIL); + create_label(graph_name_str, AG_DEFAULT_LABEL_EDGE, LABEL_TYPE_EDGE, NIL); - ereport(NOTICE, - (errmsg("graph \"%s\" has been created", NameStr(*graph_name)))); - - /* according to postgres specification of c-language functions if function returns void this is the syntax */ - PG_RETURN_VOID(); + return nsp_id; } PG_FUNCTION_INFO_V1(age_graph_exists); diff --git a/src/backend/utils/load/ag_load_edges.c b/src/backend/utils/load/ag_load_edges.c index 4f2f66a35..71683bf4c 100644 --- a/src/backend/utils/load/ag_load_edges.c +++ b/src/backend/utils/load/ag_load_edges.c @@ -20,9 +20,13 @@ #include "postgres.h" #include "utils/load/ag_load_edges.h" -#include "utils/load/age_load.h" #include "utils/load/csv.h" +void init_edge_batch_insert(batch_insert_state **batch_state, + char *label_name, Oid graph_oid); +void finish_edge_batch_insert(batch_insert_state **batch_state, + char *label_name, Oid graph_oid); + void edge_field_cb(void *field, size_t field_len, void *data) { @@ -58,6 +62,7 @@ void edge_row_cb(int delim __attribute__((unused)), void *data) { csv_edge_reader *cr = (csv_edge_reader*)data; + batch_insert_state *batch_state = cr->batch_state; size_t i, n_fields; int64 start_id_int; @@ -68,9 +73,9 @@ void edge_row_cb(int delim __attribute__((unused)), void *data) graphid end_vertex_graph_id; int end_vertex_type_id; - graphid object_graph_id; - - agtype* props = NULL; + graphid edge_id; + int64 entry_id; + TupleTableSlot *slot; n_fields = cr->cur_field; @@ -89,7 +94,8 @@ void edge_row_cb(int delim __attribute__((unused)), void *data) } else { - object_graph_id = make_graphid(cr->object_id, (int64)cr->row); + entry_id = nextval_internal(cr->label_seq_relid, true); + edge_id = make_graphid(cr->label_id, entry_id); start_id_int = strtol(cr->fields[0], NULL, 10); start_vertex_type_id = get_label_id(cr->fields[1], cr->graph_oid); @@ -99,14 +105,35 @@ void edge_row_cb(int delim __attribute__((unused)), void *data) start_vertex_graph_id = make_graphid(start_vertex_type_id, start_id_int); end_vertex_graph_id = make_graphid(end_vertex_type_id, end_id_int); - props = create_agtype_from_list_i(cr->header, cr->fields, - n_fields, 4, cr->load_as_agtype); - - insert_edge_simple(cr->graph_oid, cr->object_name, - object_graph_id, start_vertex_graph_id, - end_vertex_graph_id, props); - - pfree(props); + /* Get the appropriate slot from the batch state */ + slot = batch_state->slots[batch_state->num_tuples]; + + /* Clear the slots contents */ + ExecClearTuple(slot); + + /* Fill the values in the slot */ + slot->tts_values[0] = GRAPHID_GET_DATUM(edge_id); + slot->tts_values[1] = GRAPHID_GET_DATUM(start_vertex_graph_id); + slot->tts_values[2] = GRAPHID_GET_DATUM(end_vertex_graph_id); + slot->tts_values[3] = AGTYPE_P_GET_DATUM( + create_agtype_from_list_i( + cr->header, cr->fields, + n_fields, 4, cr->load_as_agtype)); + slot->tts_isnull[0] = false; + slot->tts_isnull[1] = false; + slot->tts_isnull[2] = false; + slot->tts_isnull[3] = false; + + /* Make the slot as containing virtual tuple */ + ExecStoreVirtualTuple(slot); + batch_state->num_tuples++; + + if (batch_state->num_tuples >= batch_state->max_tuples) + { + /* Insert the batch when it is full (i.e. BATCH_SIZE) */ + insert_batch(batch_state, cr->label_name, cr->graph_oid); + batch_state->num_tuples = 0; + } } for (i = 0; i < n_fields; ++i) @@ -119,7 +146,6 @@ void edge_row_cb(int delim __attribute__((unused)), void *data) ereport(NOTICE,(errmsg("THere is some error"))); } - cr->cur_field = 0; cr->curr_row_length = 0; cr->row += 1; @@ -152,8 +178,8 @@ static int is_term(unsigned char c) int create_edges_from_csv_file(char *file_path, char *graph_name, Oid graph_oid, - char *object_name, - int object_id, + char *label_name, + int label_id, bool load_as_agtype) { @@ -163,6 +189,7 @@ int create_edges_from_csv_file(char *file_path, size_t bytes_read; unsigned char options = 0; csv_edge_reader cr; + char *label_seq_name; if (csv_init(&p, options) != 0) { @@ -180,6 +207,7 @@ int create_edges_from_csv_file(char *file_path, (errmsg("Failed to open %s\n", file_path))); } + label_seq_name = get_label_seq_relation_name(label_name); memset((void*)&cr, 0, sizeof(csv_edge_reader)); cr.alloc = 128; @@ -189,10 +217,14 @@ int create_edges_from_csv_file(char *file_path, cr.curr_row_length = 0; cr.graph_name = graph_name; cr.graph_oid = graph_oid; - cr.object_name = object_name; - cr.object_id = object_id; + cr.label_name = label_name; + cr.label_id = label_id; + cr.label_seq_relid = get_relname_relid(label_seq_name, graph_oid); cr.load_as_agtype = load_as_agtype; + /* Initialize the batch insert state */ + init_edge_batch_insert(&cr.batch_state, label_name, graph_oid); + while ((bytes_read=fread(buf, 1, 1024, fp)) > 0) { if (csv_parse(&p, buf, bytes_read, edge_field_cb, @@ -205,6 +237,9 @@ int create_edges_from_csv_file(char *file_path, csv_fini(&p, edge_field_cb, edge_row_cb, &cr); + /* Finish any remaining batch inserts */ + finish_edge_batch_insert(&cr.batch_state, label_name, graph_oid); + if (ferror(fp)) { ereport(ERROR, (errmsg("Error while reading file %s\n", file_path))); @@ -216,3 +251,65 @@ int create_edges_from_csv_file(char *file_path, csv_free(&p); return EXIT_SUCCESS; } + +/* + * Initialize the batch insert state for edges. + */ +void init_edge_batch_insert(batch_insert_state **batch_state, + char *label_name, Oid graph_oid) +{ + Relation relation; + int i; + + // Open a temporary relation to get the tuple descriptor + relation = table_open(get_label_relation(label_name, graph_oid), AccessShareLock); + + // Initialize the batch insert state + *batch_state = (batch_insert_state *) palloc0(sizeof(batch_insert_state)); + (*batch_state)->max_tuples = BATCH_SIZE; + (*batch_state)->slots = palloc(sizeof(TupleTableSlot *) * BATCH_SIZE); + (*batch_state)->num_tuples = 0; + + // Create slots + for (i = 0; i < BATCH_SIZE; i++) + { + (*batch_state)->slots[i] = MakeSingleTupleTableSlot( + RelationGetDescr(relation), + &TTSOpsHeapTuple); + } + + table_close(relation, AccessShareLock); +} + +/* + * Finish the batch insert for edges. Insert the + * remaining tuples in the batch state and clean up. + */ +void finish_edge_batch_insert(batch_insert_state **batch_state, + char *label_name, Oid graph_oid) +{ + int i; + Relation relation; + + if ((*batch_state)->num_tuples > 0) + { + insert_batch(*batch_state, label_name, graph_oid); + (*batch_state)->num_tuples = 0; + } + + // Open a temporary relation to ensure resources are properly cleaned up + relation = table_open(get_label_relation(label_name, graph_oid), AccessShareLock); + + // Free slots + for (i = 0; i < BATCH_SIZE; i++) + { + ExecDropSingleTupleTableSlot((*batch_state)->slots[i]); + } + + // Clean up batch state + pfree((*batch_state)->slots); + pfree(*batch_state); + *batch_state = NULL; + + table_close(relation, AccessShareLock); +} \ No newline at end of file diff --git a/src/backend/utils/load/ag_load_labels.c b/src/backend/utils/load/ag_load_labels.c index f377f1cb3..9e73a2125 100644 --- a/src/backend/utils/load/ag_load_labels.c +++ b/src/backend/utils/load/ag_load_labels.c @@ -17,11 +17,25 @@ * under the License. */ #include "postgres.h" +#include "executor/spi.h" +#include "catalog/namespace.h" +#include "executor/executor.h" #include "utils/load/ag_load_labels.h" -#include "utils/load/age_load.h" #include "utils/load/csv.h" +static void setup_temp_table_for_vertex_ids(char *graph_name); +static void insert_batch_in_temp_table(batch_insert_state *batch_state, + Oid graph_oid, Oid relid); +static void init_vertex_batch_insert(batch_insert_state **batch_state, + char *label_name, Oid graph_oid, + Oid temp_table_relid); +static void finish_vertex_batch_insert(batch_insert_state **batch_state, + char *label_name, Oid graph_oid, + Oid temp_table_relid); +static void insert_vertex_batch(batch_insert_state *batch_state, char *label_name, + Oid graph_oid, Oid temp_table_relid); + void vertex_field_cb(void *field, size_t field_len, void *data) { @@ -55,16 +69,16 @@ void vertex_field_cb(void *field, size_t field_len, void *data) void vertex_row_cb(int delim __attribute__((unused)), void *data) { - csv_vertex_reader *cr = (csv_vertex_reader*)data; - agtype *props = NULL; + batch_insert_state *batch_state = cr->batch_state; size_t i, n_fields; - graphid object_graph_id; - int64 label_id_int; + graphid vertex_id; + int64 entry_id; + TupleTableSlot *slot; + TupleTableSlot *temp_id_slot; n_fields = cr->cur_field; - if (cr->row == 0) { cr->header_num = cr->cur_field; @@ -82,36 +96,67 @@ void vertex_row_cb(int delim __attribute__((unused)), void *data) { if (cr->id_field_exists) { - label_id_int = strtol(cr->fields[0], NULL, 10); + entry_id = strtol(cr->fields[0], NULL, 10); + if (entry_id > cr->curr_seq_num) + { + DirectFunctionCall2(setval_oid, + ObjectIdGetDatum(cr->label_seq_relid), + Int64GetDatum(entry_id)); + cr->curr_seq_num = entry_id; + } } else { - label_id_int = (int64)cr->row; + entry_id = nextval_internal(cr->label_seq_relid, true); } - object_graph_id = make_graphid(cr->object_id, label_id_int); + vertex_id = make_graphid(cr->label_id, entry_id); - props = create_agtype_from_list(cr->header, cr->fields, - n_fields, label_id_int, - cr->load_as_agtype); - insert_vertex_simple(cr->graph_oid, cr->object_name, - object_graph_id, props); - pfree(props); - } + /* Get the appropriate slot from the batch state */ + slot = batch_state->slots[batch_state->num_tuples]; + temp_id_slot = batch_state->temp_id_slots[batch_state->num_tuples]; + + /* Clear the slots contents */ + ExecClearTuple(slot); + ExecClearTuple(temp_id_slot); + /* Fill the values in the slot */ + slot->tts_values[0] = GRAPHID_GET_DATUM(vertex_id); + slot->tts_values[1] = AGTYPE_P_GET_DATUM( + create_agtype_from_list(cr->header, cr->fields, + n_fields, entry_id, + cr->load_as_agtype)); + slot->tts_isnull[0] = false; + slot->tts_isnull[1] = false; + + temp_id_slot->tts_values[0] = GRAPHID_GET_DATUM(vertex_id); + temp_id_slot->tts_isnull[0] = false; + + /* Make the slot as containing virtual tuple */ + ExecStoreVirtualTuple(slot); + ExecStoreVirtualTuple(temp_id_slot); + + batch_state->num_tuples++; + + if (batch_state->num_tuples >= batch_state->max_tuples) + { + /* Insert the batch when it is full (i.e. BATCH_SIZE) */ + insert_vertex_batch(batch_state, cr->label_name, cr->graph_oid, + cr->temp_table_relid); + batch_state->num_tuples = 0; + } + } for (i = 0; i < n_fields; ++i) { free(cr->fields[i]); } - if (cr->error) { ereport(NOTICE,(errmsg("THere is some error"))); } - cr->cur_field = 0; cr->curr_row_length = 0; cr->row += 1; @@ -144,8 +189,8 @@ static int is_term(unsigned char c) int create_labels_from_csv_file(char *file_path, char *graph_name, Oid graph_oid, - char *object_name, - int object_id, + char *label_name, + int label_id, bool id_field_exists, bool load_as_agtype) { @@ -156,6 +201,8 @@ int create_labels_from_csv_file(char *file_path, size_t bytes_read; unsigned char options = 0; csv_vertex_reader cr; + char *label_seq_name; + Oid temp_table_relid; if (csv_init(&p, options) != 0) { @@ -163,6 +210,13 @@ int create_labels_from_csv_file(char *file_path, (errmsg("Failed to initialize csv parser\n"))); } + temp_table_relid = RelnameGetRelid(GET_TEMP_VERTEX_ID_TABLE(graph_name)); + if (!OidIsValid(temp_table_relid)) + { + setup_temp_table_for_vertex_ids(graph_name); + temp_table_relid = RelnameGetRelid(GET_TEMP_VERTEX_ID_TABLE(graph_name)); + } + csv_set_space_func(&p, is_space); csv_set_term_func(&p, is_term); @@ -173,6 +227,7 @@ int create_labels_from_csv_file(char *file_path, (errmsg("Failed to open %s\n", file_path))); } + label_seq_name = get_label_seq_relation_name(label_name); memset((void*)&cr, 0, sizeof(csv_vertex_reader)); @@ -183,12 +238,28 @@ int create_labels_from_csv_file(char *file_path, cr.curr_row_length = 0; cr.graph_name = graph_name; cr.graph_oid = graph_oid; - cr.object_name = object_name; - cr.object_id = object_id; + cr.label_name = label_name; + cr.label_id = label_id; cr.id_field_exists = id_field_exists; + cr.label_seq_relid = get_relname_relid(label_seq_name, graph_oid); cr.load_as_agtype = load_as_agtype; + cr.temp_table_relid = temp_table_relid; + + if (cr.id_field_exists) + { + /* + * Set the curr_seq_num since we will need it to compare with + * incoming entry_id. + * + * We cant use currval because it will error out if nextval was + * not called before in the session. + */ + cr.curr_seq_num = nextval_internal(cr.label_seq_relid, true); + } - + /* Initialize the batch insert state */ + init_vertex_batch_insert(&cr.batch_state, label_name, graph_oid, + cr.temp_table_relid); while ((bytes_read=fread(buf, 1, 1024, fp)) > 0) { @@ -202,6 +273,10 @@ int create_labels_from_csv_file(char *file_path, csv_fini(&p, vertex_field_cb, vertex_row_cb, &cr); + /* Finish any remaining batch inserts */ + finish_vertex_batch_insert(&cr.batch_state, label_name, graph_oid, + cr.temp_table_relid); + if (ferror(fp)) { ereport(ERROR, (errmsg("Error while reading file %s\n", @@ -214,3 +289,179 @@ int create_labels_from_csv_file(char *file_path, csv_free(&p); return EXIT_SUCCESS; } + +static void insert_vertex_batch(batch_insert_state *batch_state, char *label_name, + Oid graph_oid, Oid temp_table_relid) +{ + insert_batch_in_temp_table(batch_state, graph_oid, temp_table_relid); + insert_batch(batch_state, label_name, graph_oid); +} + +/* + * Create and populate a temporary table with vertex ids that are already + * present in the graph. This table will be used to check if the new vertex + * id generated by loader is a duplicate. + * Unique index is created to enforce uniqueness of the ids. + * + * We dont need this for loading edges since the ids are generated using + * sequence and are unique. + */ +static void setup_temp_table_for_vertex_ids(char *graph_name) +{ + char *create_as_query; + char *index_query; + + create_as_query = psprintf("CREATE TEMP TABLE IF NOT EXISTS %s AS " + "SELECT DISTINCT id FROM \"%s\".%s", + GET_TEMP_VERTEX_ID_TABLE(graph_name), graph_name, + AG_DEFAULT_LABEL_VERTEX); + + index_query = psprintf("CREATE UNIQUE INDEX ON %s (id)", + GET_TEMP_VERTEX_ID_TABLE(graph_name)); + SPI_connect(); + SPI_execute(create_as_query, false, 0); + SPI_execute(index_query, false, 0); + + SPI_finish(); +} + +/* + * Inserts batch of tuples into the temporary table. + * This function also updates the index to check for + * uniqueness of the ids. + */ +static void insert_batch_in_temp_table(batch_insert_state *batch_state, + Oid graph_oid, Oid relid) +{ + int i; + EState *estate; + ResultRelInfo *resultRelInfo; + Relation rel; + List *result; + + rel = table_open(relid, RowExclusiveLock); + + /* Initialize executor state */ + estate = CreateExecutorState(); + + /* Initialize result relation information */ + resultRelInfo = makeNode(ResultRelInfo); + InitResultRelInfo(resultRelInfo, rel, 1, NULL, estate->es_instrument); + estate->es_result_relations = &resultRelInfo; + + /* Open the indices */ + ExecOpenIndices(resultRelInfo, false); + + /* Insert the batch into the temporary table */ + heap_multi_insert(rel, batch_state->temp_id_slots, batch_state->num_tuples, + GetCurrentCommandId(true), 0, NULL); + + for (i = 0; i < batch_state->num_tuples; i++) + { + result = ExecInsertIndexTuples(resultRelInfo, batch_state->temp_id_slots[i], + estate, false, true, NULL, NIL); + /* Check if the unique cnstraint is violated */ + if (list_length(result) != 0) + { + Datum id; + bool isnull; + + id = slot_getattr(batch_state->temp_id_slots[i], 1, &isnull); + ereport(ERROR, (errmsg("Cannot insert duplicate vertex id: %ld", + DATUM_GET_GRAPHID(id)), + errhint("Entry id %ld is already used", + get_graphid_entry_id(id)))); + } + } + /* Clean up and close the indices */ + ExecCloseIndices(resultRelInfo); + + FreeExecutorState(estate); + table_close(rel, RowExclusiveLock); + + CommandCounterIncrement(); +} + +/* + * Initialize the batch insert state for vertices. + */ +static void init_vertex_batch_insert(batch_insert_state **batch_state, + char *label_name, Oid graph_oid, + Oid temp_table_relid) +{ + Relation relation; + Oid relid; + + Relation temp_table_relation; + int i; + + /* Open a temporary relation to get the tuple descriptor */ + relid = get_label_relation(label_name, graph_oid); + relation = table_open(relid, AccessShareLock); + + temp_table_relation = table_open(temp_table_relid, AccessShareLock); + + /* Initialize the batch insert state */ + *batch_state = (batch_insert_state *) palloc0(sizeof(batch_insert_state)); + (*batch_state)->max_tuples = BATCH_SIZE; + (*batch_state)->slots = palloc(sizeof(TupleTableSlot *) * BATCH_SIZE); + (*batch_state)->temp_id_slots = palloc(sizeof(TupleTableSlot *) * BATCH_SIZE); + (*batch_state)->num_tuples = 0; + + /* Create slots */ + for (i = 0; i < BATCH_SIZE; i++) + { + (*batch_state)->slots[i] = MakeSingleTupleTableSlot( + RelationGetDescr(relation), + &TTSOpsHeapTuple); + (*batch_state)->temp_id_slots[i] = MakeSingleTupleTableSlot( + RelationGetDescr(temp_table_relation), + &TTSOpsHeapTuple); + } + + table_close(relation, AccessShareLock); + table_close(temp_table_relation, AccessShareLock); +} + +/* + * Finish the batch insert for vertices. Insert the + * remaining tuples in the batch state and clean up. + */ +static void finish_vertex_batch_insert(batch_insert_state **batch_state, + char *label_name, Oid graph_oid, + Oid temp_table_relid) +{ + Relation relation; + Oid relid; + + Relation temp_table_relation; + int i; + + if ((*batch_state)->num_tuples > 0) + { + insert_vertex_batch(*batch_state, label_name, graph_oid, temp_table_relid); + (*batch_state)->num_tuples = 0; + } + + /* Open a temporary relation to ensure resources are properly cleaned up */ + relid = get_label_relation(label_name, graph_oid); + relation = table_open(relid, AccessShareLock); + + temp_table_relation = table_open(temp_table_relid, AccessShareLock); + + /* Free slots */ + for (i = 0; i < BATCH_SIZE; i++) + { + ExecDropSingleTupleTableSlot((*batch_state)->slots[i]); + ExecDropSingleTupleTableSlot((*batch_state)->temp_id_slots[i]); + } + + /* Clean up batch state */ + pfree((*batch_state)->slots); + pfree((*batch_state)->temp_id_slots); + pfree(*batch_state); + *batch_state = NULL; + + table_close(relation, AccessShareLock); + table_close(temp_table_relation, AccessShareLock); +} \ No newline at end of file diff --git a/src/backend/utils/load/age_load.c b/src/backend/utils/load/age_load.c index 750020d8e..73f305dfd 100644 --- a/src/backend/utils/load/age_load.c +++ b/src/backend/utils/load/age_load.c @@ -29,6 +29,9 @@ static agtype_value *csv_value_to_agtype_value(char *csv_val); static bool json_validate(text *json); +static Oid get_or_create_graph(const Name graph_name); +static int32 get_or_create_label(Oid graph_oid, char *graph_name, + char *label_name, char label_kind); agtype *create_empty_agtype(void) { @@ -282,6 +285,34 @@ void insert_vertex_simple(Oid graph_oid, char *label_name, graphid vertex_id, CommandCounterIncrement(); } +void insert_batch(batch_insert_state *batch_state, char *label_name, + Oid graph_oid) +{ + Relation label_relation; + BulkInsertState bistate; + Oid relid; + + // Get the relation OID + relid = get_label_relation(label_name, graph_oid); + + // Open the relation + label_relation = table_open(relid, RowExclusiveLock); + + // Prepare the BulkInsertState + bistate = GetBulkInsertState(); + + // Perform the bulk insert + heap_multi_insert(label_relation, batch_state->slots, + batch_state->num_tuples, GetCurrentCommandId(true), + 0, bistate); + + // Clean up + FreeBulkInsertState(bistate); + table_close(label_relation, RowExclusiveLock); + + CommandCounterIncrement(); +} + PG_FUNCTION_INFO_V1(load_labels_from_file); Datum load_labels_from_file(PG_FUNCTION_ARGS) { @@ -320,19 +351,24 @@ Datum load_labels_from_file(PG_FUNCTION_ARGS) id_field_exists = PG_GETARG_BOOL(3); load_as_agtype = PG_GETARG_BOOL(4); - graph_name_str = NameStr(*graph_name); label_name_str = NameStr(*label_name); + + if (strcmp(label_name_str, "") == 0) + { + label_name_str = AG_DEFAULT_LABEL_VERTEX; + } + file_path_str = text_to_cstring(file_path); - graph_oid = get_graph_oid(graph_name_str); - label_id = get_label_id(label_name_str, graph_oid); + graph_oid = get_or_create_graph(graph_name); + label_id = get_or_create_label(graph_oid, graph_name_str, + label_name_str, LABEL_KIND_VERTEX); create_labels_from_csv_file(file_path_str, graph_name_str, graph_oid, label_name_str, label_id, id_field_exists, load_as_agtype); PG_RETURN_VOID(); - } PG_FUNCTION_INFO_V1(load_edges_from_file); @@ -374,13 +410,91 @@ Datum load_edges_from_file(PG_FUNCTION_ARGS) graph_name_str = NameStr(*graph_name); label_name_str = NameStr(*label_name); + + if (strcmp(label_name_str, "") == 0) + { + label_name_str = AG_DEFAULT_LABEL_EDGE; + } + file_path_str = text_to_cstring(file_path); - graph_oid = get_graph_oid(graph_name_str); - label_id = get_label_id(label_name_str, graph_oid); + graph_oid = get_or_create_graph(graph_name); + label_id = get_or_create_label(graph_oid, graph_name_str, + label_name_str, LABEL_KIND_EDGE); create_edges_from_csv_file(file_path_str, graph_name_str, graph_oid, label_name_str, label_id, load_as_agtype); PG_RETURN_VOID(); +} + +/* + * Helper function to create a graph if it does not exist. + * Just returns Oid of the graph if it already exists. + */ +static Oid get_or_create_graph(const Name graph_name) +{ + Oid graph_oid; + char *graph_name_str; + + graph_name_str = NameStr(*graph_name); + graph_oid = get_graph_oid(graph_name_str); + if (OidIsValid(graph_oid)) + { + return graph_oid; + } + + graph_oid = create_graph_internal(graph_name); + ereport(NOTICE, + (errmsg("graph \"%s\" has been created", NameStr(*graph_name)))); + + return graph_oid; } + +/* + * Helper function to create a label if it does not exist. + * Just returns label_id of the label if it already exists. + */ +static int32 get_or_create_label(Oid graph_oid, char *graph_name, + char *label_name, char label_kind) +{ + int32 label_id; + + label_id = get_label_id(label_name, graph_oid); + + /* Check if label exists */ + if (label_id_is_valid(label_id)) + { + char *label_kind_full = (label_kind == LABEL_KIND_VERTEX) + ? "vertex" : "edge"; + char opposite_label_kind = (label_kind == LABEL_KIND_VERTEX) + ? LABEL_KIND_EDGE : LABEL_KIND_VERTEX; + + /* If it exists, but as another label_kind, throw an error */ + if (get_label_kind(label_name, graph_oid) == opposite_label_kind) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("label \"%s\" already exists as %s label", + label_name, label_kind_full))); + } + } + else + { + /* Create a label */ + RangeVar *rv; + List *parent; + char *default_label = (label_kind == LABEL_KIND_VERTEX) + ? AG_DEFAULT_LABEL_VERTEX : AG_DEFAULT_LABEL_EDGE; + rv = get_label_range_var(graph_name, graph_oid, default_label); + parent = list_make1(rv); + + create_label(graph_name, label_name, label_kind, parent); + label_id = get_label_id(label_name, graph_oid); + + ereport(NOTICE, + (errmsg("VLabel \"%s\" has been created", label_name))); + } + + return label_id; +} \ No newline at end of file diff --git a/src/include/catalog/ag_label.h b/src/include/catalog/ag_label.h index 6c5e03334..0a8480b1a 100644 --- a/src/include/catalog/ag_label.h +++ b/src/include/catalog/ag_label.h @@ -73,6 +73,8 @@ int32 get_label_id(const char *label_name, Oid graph_oid); Oid get_label_relation(const char *label_name, Oid graph_oid); char *get_label_relation_name(const char *label_name, Oid graph_oid); char get_label_kind(const char *label_name, Oid label_graph); +char *get_label_seq_relation_name(const char *label_name); + bool label_id_exists(Oid graph_oid, int32 label_id); RangeVar *get_label_range_var(char *graph_name, Oid graph_oid, diff --git a/src/include/commands/graph_commands.h b/src/include/commands/graph_commands.h index e4d93fc1c..d456ef8c4 100644 --- a/src/include/commands/graph_commands.h +++ b/src/include/commands/graph_commands.h @@ -21,5 +21,6 @@ #define AG_GRAPH_COMMANDS_H Datum create_graph(PG_FUNCTION_ARGS); +Oid create_graph_internal(const Name graph_name); #endif diff --git a/src/include/utils/graphid.h b/src/include/utils/graphid.h index bfb72ee8c..407e9a585 100644 --- a/src/include/utils/graphid.h +++ b/src/include/utils/graphid.h @@ -36,7 +36,7 @@ typedef int64 graphid; #define label_id_is_valid(id) (id >= LABEL_ID_MIN && id <= LABEL_ID_MAX) -#define ENTRY_ID_MIN INT64CONST(1) +#define ENTRY_ID_MIN INT64CONST(0) /* 0x0000ffffffffffff */ #define ENTRY_ID_MAX INT64CONST(281474976710655) #define INVALID_ENTRY_ID INT64CONST(0) diff --git a/src/include/utils/load/ag_load_edges.h b/src/include/utils/load/ag_load_edges.h index 6bb8ac279..df663b1dd 100644 --- a/src/include/utils/load/ag_load_edges.h +++ b/src/include/utils/load/ag_load_edges.h @@ -17,6 +17,9 @@ * under the License. */ +#include "access/heapam.h" +#include "utils/load/age_load.h" + #ifndef AG_LOAD_EDGES_H #define AG_LOAD_EDGES_H @@ -34,12 +37,13 @@ typedef struct { size_t curr_row_length; char *graph_name; Oid graph_oid; - char *object_name; - int object_id; + char *label_name; + int label_id; + Oid label_seq_relid; char *start_vertex; char *end_vertex; bool load_as_agtype; - + batch_insert_state *batch_state; } csv_edge_reader; @@ -47,7 +51,7 @@ void edge_field_cb(void *field, size_t field_len, void *data); void edge_row_cb(int delim __attribute__((unused)), void *data); int create_edges_from_csv_file(char *file_path, char *graph_name, Oid graph_oid, - char *object_name, int object_id, + char *label_name, int label_id, bool load_as_agtype); #endif /*AG_LOAD_EDGES_H */ diff --git a/src/include/utils/load/ag_load_labels.h b/src/include/utils/load/ag_load_labels.h index 7d272efbc..3a70a5c05 100644 --- a/src/include/utils/load/ag_load_labels.h +++ b/src/include/utils/load/ag_load_labels.h @@ -22,6 +22,7 @@ #define AG_LOAD_LABELS_H #include "access/heapam.h" +#include "utils/load/age_load.h" #define AGE_VERTIX 1 #define AGE_EDGE 2 @@ -47,10 +48,14 @@ typedef struct { size_t curr_row_length; char *graph_name; Oid graph_oid; - char *object_name; - int object_id; + char *label_name; + int label_id; + Oid label_seq_relid; + Oid temp_table_relid; bool id_field_exists; bool load_as_agtype; + int curr_seq_num; + batch_insert_state *batch_state; } csv_vertex_reader; @@ -58,7 +63,7 @@ void vertex_field_cb(void *field, size_t field_len, void *data); void vertex_row_cb(int delim __attribute__((unused)), void *data); int create_labels_from_csv_file(char *file_path, char *graph_name, Oid graph_oid, - char *object_name, int object_id, + char *label_name, int label_id, bool id_field_exists, bool load_as_agtype); #endif /* AG_LOAD_LABELS_H */ diff --git a/src/include/utils/load/age_load.h b/src/include/utils/load/age_load.h index 1c650bb81..b1335581b 100644 --- a/src/include/utils/load/age_load.h +++ b/src/include/utils/load/age_load.h @@ -24,11 +24,26 @@ #include "catalog/ag_graph.h" #include "catalog/ag_label.h" #include "commands/label_commands.h" +#include "commands/graph_commands.h" #include "utils/ag_cache.h" #ifndef AGE_ENTITY_CREATOR_H #define AGE_ENTITY_CREATOR_H +#define TEMP_VERTEX_ID_TABLE_SUFFIX "_ag_vertex_ids" +#define GET_TEMP_VERTEX_ID_TABLE(graph_name) \ + psprintf("_%s%s", graph_name, TEMP_VERTEX_ID_TABLE_SUFFIX) + +#define BATCH_SIZE 1000 + +typedef struct +{ + TupleTableSlot **slots; + TupleTableSlot **temp_id_slots; + int num_tuples; + int max_tuples; +} batch_insert_state; + agtype* create_empty_agtype(void); agtype* create_agtype_from_list(char **header, char **fields, @@ -42,5 +57,7 @@ void insert_vertex_simple(Oid graph_oid, char *label_name, graphid vertex_id, void insert_edge_simple(Oid graph_oid, char *label_name, graphid edge_id, graphid start_id, graphid end_id, agtype* end_properties); +void insert_batch(batch_insert_state *batch_state, char *label_name, + Oid graph_oid); #endif /* AGE_ENTITY_CREATOR_H */