From 7a42fb5deb0bbf373cf9ffa60f3e90a6cbb3f257 Mon Sep 17 00:00:00 2001 From: Rafsun Masud Date: Tue, 17 Oct 2023 13:42:04 -0700 Subject: [PATCH] Optmize vertex and edge builder functions (#1252) Changes: - Added the agtype_raw module, which contains helper functions to build agtype directly without building an agtype_value first - Optimize _agtype_build_vertex and _agtype_build_edge functions The agtype_raw module: Inserting a composite agtype (i.e. object) into another agtype (i.e. as an array element) requires the first agtype to be deserialized into agtype_value. Then, the agtype_value to be serialized back into the second agtype. This module provides functions that can perform such insertion without deserializing first. It is meant to speed up queries that does deserialization-serialization back and forth involving deeply nested agtype objects. --- Makefile | 1 + src/backend/utils/adt/agtype.c | 132 +++++++-------- src/backend/utils/adt/agtype_raw.c | 249 +++++++++++++++++++++++++++++ src/include/utils/agtype_raw.h | 48 ++++++ 4 files changed, 353 insertions(+), 77 deletions(-) create mode 100644 src/backend/utils/adt/agtype_raw.c create mode 100644 src/include/utils/agtype_raw.h diff --git a/Makefile b/Makefile index deb5ea2ba..69a1e56ca 100644 --- a/Makefile +++ b/Makefile @@ -56,6 +56,7 @@ OBJS = src/backend/age.o \ src/backend/utils/adt/agtype_ops.o \ src/backend/utils/adt/agtype_parser.o \ src/backend/utils/adt/agtype_util.o \ + src/backend/utils/adt/agtype_raw.o \ src/backend/utils/adt/age_global_graph.o \ src/backend/utils/adt/age_session_info.o \ src/backend/utils/adt/age_vle.o \ diff --git a/src/backend/utils/adt/agtype.c b/src/backend/utils/adt/agtype.c index e492a38b8..486a87342 100644 --- a/src/backend/utils/adt/agtype.c +++ b/src/backend/utils/adt/agtype.c @@ -64,6 +64,7 @@ #include "utils/agtype.h" #include "utils/agtype_parser.h" #include "utils/ag_float8_supp.h" +#include "utils/agtype_raw.h" #include "catalog/ag_graph.h" #include "catalog/ag_label.h" #include "utils/graphid.h" @@ -2181,18 +2182,14 @@ PG_FUNCTION_INFO_V1(_agtype_build_vertex); */ Datum _agtype_build_vertex(PG_FUNCTION_ARGS) { - agtype_in_state result; graphid id; + char *label; + agtype *properties; + agtype_build_state *bstate; + agtype *rawscalar; + agtype *vertex; - memset(&result, 0, sizeof(agtype_in_state)); - - result.res = push_agtype_value(&result.parse_state, WAGT_BEGIN_OBJECT, - NULL); - - /* process graphid */ - result.res = push_agtype_value(&result.parse_state, WAGT_KEY, - string_to_agtype_value("id")); - + /* handles null */ if (fcinfo->args[0].isnull) { ereport(ERROR, @@ -2200,39 +2197,24 @@ Datum _agtype_build_vertex(PG_FUNCTION_ARGS) errmsg("_agtype_build_vertex() graphid cannot be NULL"))); } - id = AG_GETARG_GRAPHID(0); - result.res = push_agtype_value(&result.parse_state, WAGT_VALUE, - integer_to_agtype_value(id)); - - /* process label */ - result.res = push_agtype_value(&result.parse_state, WAGT_KEY, - string_to_agtype_value("label")); - if (fcinfo->args[1].isnull) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("_agtype_build_vertex() label cannot be NULL"))); } - result.res = - push_agtype_value(&result.parse_state, WAGT_VALUE, - string_to_agtype_value(PG_GETARG_CSTRING(1))); - - /* process properties */ - result.res = push_agtype_value(&result.parse_state, WAGT_KEY, - string_to_agtype_value("properties")); + id = AG_GETARG_GRAPHID(0); + label = PG_GETARG_CSTRING(1); - //if the properties object is null, push an empty object if (fcinfo->args[2].isnull) { - result.res = push_agtype_value(&result.parse_state, WAGT_BEGIN_OBJECT, - NULL); - result.res = push_agtype_value(&result.parse_state, WAGT_END_OBJECT, - NULL); + agtype_build_state *bstate = init_agtype_build_state(0, AGT_FOBJECT); + properties = build_agtype(bstate); + pfree_agtype_build_state(bstate); } else { - agtype *properties = AG_GET_ARG_AGTYPE_P(2); + properties = AG_GET_ARG_AGTYPE_P(2); if (!AGT_ROOT_IS_OBJECT(properties)) { @@ -2240,15 +2222,24 @@ Datum _agtype_build_vertex(PG_FUNCTION_ARGS) (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("_agtype_build_vertex() properties argument must be an object"))); } - - add_agtype((Datum)properties, false, &result, AGTYPEOID, false); } - result.res = push_agtype_value(&result.parse_state, WAGT_END_OBJECT, NULL); + bstate = init_agtype_build_state(3, AGT_FOBJECT); + write_string(bstate, "id"); + write_string(bstate, "label"); + write_string(bstate, "properties"); + write_graphid(bstate, id); + write_string(bstate, label); + write_container(bstate, properties); + vertex = build_agtype(bstate); + pfree_agtype_build_state(bstate); - result.res->type = AGTV_VERTEX; + bstate = init_agtype_build_state(1, AGT_FARRAY | AGT_FSCALAR); + write_extended(bstate, vertex, AGT_HEADER_VERTEX); + rawscalar = build_agtype(bstate); + pfree_agtype_build_state(bstate); - PG_RETURN_POINTER(agtype_value_to_agtype(result.res)); + PG_RETURN_POINTER(rawscalar); } Datum make_vertex(Datum id, Datum label, Datum properties) @@ -2263,18 +2254,13 @@ PG_FUNCTION_INFO_V1(_agtype_build_edge); */ Datum _agtype_build_edge(PG_FUNCTION_ARGS) { - agtype_in_state result; + agtype_build_state *bstate; + agtype *edge, *rawscalar; graphid id, start_id, end_id; - - memset(&result, 0, sizeof(agtype_in_state)); - - result.res = push_agtype_value(&result.parse_state, WAGT_BEGIN_OBJECT, - NULL); + char *label; + agtype *properties; /* process graph id */ - result.res = push_agtype_value(&result.parse_state, WAGT_KEY, - string_to_agtype_value("id")); - if (fcinfo->args[0].isnull) { ereport(ERROR, @@ -2283,27 +2269,17 @@ Datum _agtype_build_edge(PG_FUNCTION_ARGS) } id = AG_GETARG_GRAPHID(0); - result.res = push_agtype_value(&result.parse_state, WAGT_VALUE, - integer_to_agtype_value(id)); /* process label */ - result.res = push_agtype_value(&result.parse_state, WAGT_KEY, - string_to_agtype_value("label")); - if (fcinfo->args[3].isnull) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("_agtype_build_vertex() label cannot be NULL"))); } - result.res = - push_agtype_value(&result.parse_state, WAGT_VALUE, - string_to_agtype_value(PG_GETARG_CSTRING(3))); + label = PG_GETARG_CSTRING(3); /* process end_id */ - result.res = push_agtype_value(&result.parse_state, WAGT_KEY, - string_to_agtype_value("end_id")); - if (fcinfo->args[2].isnull) { ereport(ERROR, @@ -2312,13 +2288,8 @@ Datum _agtype_build_edge(PG_FUNCTION_ARGS) } end_id = AG_GETARG_GRAPHID(2); - result.res = push_agtype_value(&result.parse_state, WAGT_VALUE, - integer_to_agtype_value(end_id)); /* process start_id */ - result.res = push_agtype_value(&result.parse_state, WAGT_KEY, - string_to_agtype_value("start_id")); - if (fcinfo->args[1].isnull) { ereport(ERROR, @@ -2327,24 +2298,19 @@ Datum _agtype_build_edge(PG_FUNCTION_ARGS) } start_id = AG_GETARG_GRAPHID(1); - result.res = push_agtype_value(&result.parse_state, WAGT_VALUE, - integer_to_agtype_value(start_id)); /* process properties */ - result.res = push_agtype_value(&result.parse_state, WAGT_KEY, - string_to_agtype_value("properties")); /* if the properties object is null, push an empty object */ if (fcinfo->args[4].isnull) { - result.res = push_agtype_value(&result.parse_state, WAGT_BEGIN_OBJECT, - NULL); - result.res = push_agtype_value(&result.parse_state, WAGT_END_OBJECT, - NULL); + agtype_build_state *bstate = init_agtype_build_state(0, AGT_FOBJECT); + properties = build_agtype(bstate); + pfree_agtype_build_state(bstate); } else { - agtype *properties = AG_GET_ARG_AGTYPE_P(4); + properties = AG_GET_ARG_AGTYPE_P(4); if (!AGT_ROOT_IS_OBJECT(properties)) { @@ -2352,15 +2318,27 @@ Datum _agtype_build_edge(PG_FUNCTION_ARGS) (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("_agtype_build_edge() properties argument must be an object"))); } - - add_agtype((Datum)properties, false, &result, AGTYPEOID, false); } - result.res = push_agtype_value(&result.parse_state, WAGT_END_OBJECT, NULL); - - result.res->type = AGTV_EDGE; - - PG_RETURN_POINTER(agtype_value_to_agtype(result.res)); + bstate = init_agtype_build_state(5, AGT_FOBJECT); + write_string(bstate, "id"); + write_string(bstate, "label"); + write_string(bstate, "end_id"); + write_string(bstate, "start_id"); + write_string(bstate, "properties"); + write_graphid(bstate, id); + write_string(bstate, label); + write_graphid(bstate, end_id); + write_graphid(bstate, start_id); + write_container(bstate, properties); + edge = build_agtype(bstate); + pfree_agtype_build_state(bstate); + + bstate = init_agtype_build_state(1, AGT_FARRAY | AGT_FSCALAR); + write_extended(bstate, edge, AGT_HEADER_EDGE); + rawscalar = build_agtype(bstate); + pfree_agtype_build_state(bstate); + PG_RETURN_POINTER(rawscalar); } Datum make_edge(Datum id, Datum startid, Datum endid, Datum label, diff --git a/src/backend/utils/adt/agtype_raw.c b/src/backend/utils/adt/agtype_raw.c new file mode 100644 index 000000000..8321d9734 --- /dev/null +++ b/src/backend/utils/adt/agtype_raw.c @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "postgres.h" +#include "utils/agtype.h" +#include "utils/agtype_ext.h" +#include "utils/agtype_raw.h" + +/* + * Used for building an agtype container. + */ +struct agtype_build_state +{ + int a_offset; // next location to write agtentry + int i; // index of current agtentry being processed + int d_start; // start of variable-length portion + StringInfo buffer; +}; + +/* + * Define the type and size of the agt_header. + * Copied from agtype_ext.c. + */ +#define AGT_HEADER_TYPE uint32 +#define AGT_HEADER_SIZE sizeof(AGT_HEADER_TYPE) + +/* + * Following macros are usable in the context where + * agtype_build_state is available + */ +#define BUFFER_RESERVE(size) reserve_from_buffer(bstate->buffer, (size)) +#define BUFFER_WRITE_PAD() pad_buffer_to_int(bstate->buffer) +#define BUFFER_WRITE_CONST(offset, type, val) *((type *)(bstate->buffer->data + (offset))) = (val) +#define BUFFER_WRITE_PTR(offset, ptr, len) memcpy(bstate->buffer->data + offset, ptr, len) + +static int write_pointer(agtype_build_state *bstate, char *ptr, int len); +static void write_agtentry(agtype_build_state *bstate, agtentry agte); + +/* + * Same as `write_ptr` except the content comes from + * constant value instead of pointer. + */ +#define write_const(val, type) \ + do \ + { \ + int len = sizeof(type); \ + int offset = BUFFER_RESERVE(len); \ + BUFFER_WRITE_CONST(offset, type, val); \ + } \ + while (0) +#define write_ptr(ptr, len) write_pointer(bstate, ptr, len) +#define write_agt(agte) write_agtentry(bstate, agte) + +/* + * Copies the content of `ptr` to the tail of the buffer (variable-length + * portion). + */ +static int write_pointer(agtype_build_state *bstate, char *ptr, int len) +{ + int offset = BUFFER_RESERVE(len); + BUFFER_WRITE_PTR(offset, ptr, len); + return len; +} + +/* + * Copies the content of `agte` to the next available location in the agtentry + * portion of the buffer. That location is pointed by `bstate->a_offset`. + * + * This function must be called after data is written to the variable-length + * portion. + */ +static void write_agtentry(agtype_build_state *bstate, agtentry agte) +{ + int totallen = bstate->buffer->len - bstate->d_start; + + /* + * Bail out if total variable-length data exceeds what will fit in a + * agtentry length field. We check this in each iteration, not just + * once at the end, to forestall possible integer overflow. + */ + if (totallen > AGTENTRY_OFFLENMASK) + { + ereport( + ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg( + "total size of agtype array elements exceeds the maximum of %u bytes", + AGTENTRY_OFFLENMASK))); + } + + if (((bstate->i) % AGT_OFFSET_STRIDE) == 0) + { + agte = (agte & AGTENTRY_TYPEMASK) | totallen | AGTENTRY_HAS_OFF; + } + + BUFFER_WRITE_CONST(bstate->a_offset, agtentry, agte); + bstate->a_offset += sizeof(agtentry); +} + +/* + * `header_flag` = a valid agtype_container header field + * `size` = size of container (number of pairs or elements) + */ +agtype_build_state *init_agtype_build_state(uint32 size, uint32 header_flag) +{ + int agtentry_count; + int agtentry_len; + agtype_build_state *bstate; + + bstate = palloc0(sizeof(agtype_build_state)); + bstate->buffer = makeStringInfo(); + bstate->a_offset = 0; + bstate->i = 0; + + // reserve for varlen header + BUFFER_RESERVE(VARHDRSZ); + bstate->a_offset += VARHDRSZ; + + // write container header + BUFFER_RESERVE(sizeof(uint32)); + BUFFER_WRITE_CONST(bstate->a_offset, uint32, header_flag | size); + bstate->a_offset += sizeof(uint32); + + // reserve for agtentry headers + if ((header_flag & AGT_FOBJECT) != 0) + { + agtentry_count = size * 2; + } + else if ((header_flag & AGT_FARRAY) != 0) + { + agtentry_count = size; + } + else + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Invalid container type."))); + } + + agtentry_len = sizeof(agtentry) * agtentry_count; + BUFFER_RESERVE(agtentry_len); + bstate->d_start = bstate->a_offset + agtentry_len; + + return bstate; +} + +agtype *build_agtype(agtype_build_state *bstate) +{ + agtype *result = (agtype *) bstate->buffer->data; + SET_VARSIZE(result, bstate->buffer->len); + return result; +} + +void pfree_agtype_build_state(agtype_build_state *bstate) +{ + /* + * bstate->buffer->data is not pfree'd because this pointer + * is returned by the `build_agtype` function. + */ + pfree(bstate->buffer); + pfree(bstate); +} + +void write_string(agtype_build_state *bstate, char *str) +{ + int length = strlen(str); + + write_ptr(str, length); + write_agt(AGTENTRY_IS_STRING | length); + + bstate->i++; +} + +void write_graphid(agtype_build_state *bstate, graphid graphid) +{ + int length = 0; + + // padding + length += BUFFER_WRITE_PAD(); + + // graphid header + write_const(AGT_HEADER_INTEGER, AGT_HEADER_TYPE); + length += AGT_HEADER_SIZE; + + // graphid value + write_const(graphid, int64); + length += sizeof(int64); + + // agtentry + write_agt(AGTENTRY_IS_AGTYPE | length); + + bstate->i++; +} + +void write_container(agtype_build_state *bstate, agtype *agtype) +{ + int length = 0; + + // padding + length += BUFFER_WRITE_PAD(); + + // varlen data + length += write_ptr((char *) &agtype->root, VARSIZE(agtype)); + + // agtentry + write_agt(AGTENTRY_IS_CONTAINER | length); + + bstate->i++; +} + +/* + * `val` = container of the extended type + * `header` = AGT_HEADER_VERTEX, AGT_HEADER_EDGE or AGT_HEADER_PATH + */ +void write_extended(agtype_build_state *bstate, agtype *val, uint32 header) +{ + int length = 0; + + // padding + length += BUFFER_WRITE_PAD(); + + // vertex header + write_const(header, AGT_HEADER_TYPE); + length += AGT_HEADER_SIZE; + + // vertex data + length += write_ptr((char *) &val->root, VARSIZE(val)); + + // agtentry + write_agt(AGTENTRY_IS_AGTYPE | length); + + bstate->i++; +} diff --git a/src/include/utils/agtype_raw.h b/src/include/utils/agtype_raw.h new file mode 100644 index 000000000..ff9dbbcd1 --- /dev/null +++ b/src/include/utils/agtype_raw.h @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * This module provides functions for directly building agtype + * without using agtype_value. + */ + +#ifndef AG_AGTYPE_RAW_H +#define AG_AGTYPE_RAw_H + +#include "postgres.h" +#include "utils/agtype.h" +#include "utils/agtype_ext.h" + +/* + * We declare the agtype_build_state here, and in this way, so that it may be + * used elsewhere. However, we keep the contents private by defining it in + * agtype_raw.c + */ +typedef struct agtype_build_state agtype_build_state; + +agtype_build_state *init_agtype_build_state(uint32 size, uint32 header_flag); +agtype *build_agtype(agtype_build_state *bstate); +void pfree_agtype_build_state(agtype_build_state *bstate); + +void write_string(agtype_build_state *bstate, char *str); +void write_graphid(agtype_build_state *bstate, graphid graphid); +void write_container(agtype_build_state *bstate, agtype *agtype); +void write_extended(agtype_build_state *bstate, agtype *val, uint32 header); + +#endif