From d7174ff1197b1694af582890ca643ac0775da6e2 Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Wed, 3 Oct 2018 15:53:00 +0000 Subject: [PATCH 01/21] Fix memory safety issue when a deserialization fails on get/select/operate --- VERSION | 2 +- src/main/aerospike.c | 2 +- src/main/client/get.c | 8 +++----- src/main/client/operate.c | 16 +++++++++------- src/main/client/select.c | 10 ++++++---- 5 files changed, 20 insertions(+), 18 deletions(-) diff --git a/VERSION b/VERSION index 4d9d11cf5..6cb9d3dd0 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.4.2 +3.4.3 diff --git a/src/main/aerospike.c b/src/main/aerospike.c index dab15cefa..88224b17e 100644 --- a/src/main/aerospike.c +++ b/src/main/aerospike.c @@ -93,7 +93,7 @@ AerospikeConstants operator_constants[] = { MOD_INIT(aerospike) { - const char version[8] = "3.4.2"; + const char version[8] = "3.4.3"; // Makes things "thread-safe" PyEval_InitThreads(); int i = 0; diff --git a/src/main/client/get.c b/src/main/client/get.c index 0966f0fb3..0afed5d61 100644 --- a/src/main/client/get.c +++ b/src/main/client/get.c @@ -86,16 +86,14 @@ PyObject * AerospikeClient_Get_Invoke( goto CLEANUP; } - // Initialize record - as_record_init(rec, 0); - // Record initialised successfully. - record_initialised = true; // Invoke operation Py_BEGIN_ALLOW_THREADS aerospike_key_get(self->as, &err, read_policy_p, &key, &rec); Py_END_ALLOW_THREADS if (err.code == AEROSPIKE_OK) { + record_initialised = true; + if (record_to_pyobject(self, &err, rec, &key, &py_rec) != AEROSPIKE_OK) { goto CLEANUP; } @@ -121,7 +119,7 @@ PyObject * AerospikeClient_Get_Invoke( // Destroy key only if it is initialised. as_key_destroy(&key); } - if (record_initialised == true) { + if (rec && record_initialised) { // Destroy record only if it is initialised. as_record_destroy(rec); } diff --git a/src/main/client/operate.c b/src/main/client/operate.c index 7aca69491..32f656248 100644 --- a/src/main/client/operate.c +++ b/src/main/client/operate.c @@ -686,6 +686,7 @@ PyObject * AerospikeClient_Operate_Invoke( int i = 0; long operation; long return_type = -1; + bool operation_succeeded = false; PyObject * py_rec = NULL; as_record * rec = NULL; as_policy_operate operate_policy; @@ -729,8 +730,6 @@ PyObject * AerospikeClient_Operate_Invoke( goto CLEANUP; } - // Initialize record - as_record_init(rec, 0); Py_BEGIN_ALLOW_THREADS aerospike_key_operate(self->as, err, operate_policy_p, key, &ops, &rec); @@ -740,6 +739,9 @@ PyObject * AerospikeClient_Operate_Invoke( as_error_update(err, err->code, NULL); goto CLEANUP; } + /* The op succeeded; it's now safe to free the record */ + operation_succeeded = true; + if (rec) { record_to_pyobject(self, err, rec, key, &py_rec); } @@ -751,7 +753,7 @@ PyObject * AerospikeClient_Operate_Invoke( as_vector_destroy(unicodeStrVector); - if (rec) { + if (rec && operation_succeeded) { as_record_destroy(rec); } if (key->valuep) { @@ -843,6 +845,8 @@ static PyObject * AerospikeClient_OperateOrdered_Invoke( { long operation; long return_type = -1; + bool operation_succeeded = false; + PyObject * py_rec = NULL; as_record * rec = NULL; as_policy_operate operate_policy; @@ -898,9 +902,6 @@ static PyObject * AerospikeClient_OperateOrdered_Invoke( goto CLEANUP; } - // Initialize record - as_record_init(rec, 0); - Py_BEGIN_ALLOW_THREADS aerospike_key_operate(self->as, err, operate_policy_p, key, &ops, &rec); Py_END_ALLOW_THREADS @@ -910,6 +911,7 @@ static PyObject * AerospikeClient_OperateOrdered_Invoke( goto CLEANUP; } + operation_succeeded = true; if (rec) { /* Build the return tuple: (key, meta, bins) */ key_to_pyobject(err, key, &py_return_key); @@ -953,7 +955,7 @@ static PyObject * AerospikeClient_OperateOrdered_Invoke( as_vector_destroy(unicodeStrVector); - if (rec) { + if (rec && operation_succeeded) { as_record_destroy(rec); } if (key->valuep) { diff --git a/src/main/client/select.c b/src/main/client/select.c index b4b585433..26d588f49 100644 --- a/src/main/client/select.c +++ b/src/main/client/select.c @@ -53,6 +53,8 @@ PyObject * AerospikeClient_Select_Invoke( as_policy_read * read_policy_p = NULL; as_key key; as_record * rec = NULL; + // It's only safe to free the record if this succeeded. + bool select_succeeded = false; char ** bins = NULL; // Initialisation flags @@ -135,15 +137,13 @@ PyObject * AerospikeClient_Select_Invoke( goto CLEANUP; } - // Initialize record - as_record_init(rec, 0); - // Invoke operation Py_BEGIN_ALLOW_THREADS aerospike_key_select(self->as, &err, read_policy_p, &key, (const char **) bins, &rec); Py_END_ALLOW_THREADS if (err.code == AEROSPIKE_OK) { + select_succeeded = true; record_to_pyobject(self, &err, rec, &key, &py_rec); } else { @@ -161,7 +161,9 @@ PyObject * AerospikeClient_Select_Invoke( as_key_destroy(&key); } - as_record_destroy(rec); + if (rec && select_succeeded) { + as_record_destroy(rec); + } if (err.code != AEROSPIKE_OK) { PyObject * py_err = NULL; From ac1a93b84543bcb56c9027ebc4c600a0af281380 Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Wed, 3 Oct 2018 17:46:51 +0000 Subject: [PATCH 02/21] Update c client version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 2f1f2a8d2..c62eb049f 100644 --- a/setup.py +++ b/setup.py @@ -30,7 +30,7 @@ os.environ['ARCHFLAGS'] = '-arch x86_64' AEROSPIKE_C_VERSION = os.getenv('AEROSPIKE_C_VERSION') if not AEROSPIKE_C_VERSION: - AEROSPIKE_C_VERSION = '4.3.17' + AEROSPIKE_C_VERSION = '4.3.18' DOWNLOAD_C_CLIENT = os.getenv('DOWNLOAD_C_CLIENT') AEROSPIKE_C_HOME = os.getenv('AEROSPIKE_C_HOME') PREFIX = None From efa8e622644e7bc800bac4d5f2a5582744ee7d81 Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Tue, 9 Oct 2018 20:30:57 +0000 Subject: [PATCH 03/21] Fix reference count leak with querys --- src/main/query/type.c | 5 +++++ src/main/scan/type.c | 7 ++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/main/query/type.c b/src/main/query/type.c index 5018dcd6c..20e2b85c8 100644 --- a/src/main/query/type.c +++ b/src/main/query/type.c @@ -103,6 +103,10 @@ static PyObject * AerospikeQuery_Type_New(PyTypeObject * type, PyObject * args, self = (AerospikeQuery *) type->tp_alloc(type, 0); + if (self) { + self->client = NULL; + } + return (PyObject *) self; } @@ -182,6 +186,7 @@ static void AerospikeQuery_Type_Dealloc(AerospikeQuery * self) } as_query_destroy(&self->query); + Py_CLEAR(self->client); Py_TYPE(self)->tp_free((PyObject *) self); } diff --git a/src/main/scan/type.c b/src/main/scan/type.c index 4c6d449f3..dd0d418a2 100644 --- a/src/main/scan/type.c +++ b/src/main/scan/type.c @@ -80,7 +80,11 @@ static PyObject * AerospikeScan_Type_New(PyTypeObject * type, PyObject * args, P self = (AerospikeScan *) type->tp_alloc(type, 0); - return (PyObject *) self; + if (self) { + self->client = NULL; + } + + return (PyObject *) self; } static int AerospikeScan_Type_Init(AerospikeScan * self, PyObject * args, PyObject * kwds) @@ -127,6 +131,7 @@ static int AerospikeScan_Type_Init(AerospikeScan * self, PyObject * args, PyObje static void AerospikeScan_Type_Dealloc(PyObject * self) { as_scan_destroy(&((AerospikeScan *)self)->scan); + Py_CLEAR(((AerospikeScan *)self)->client); Py_TYPE(self)->tp_free((PyObject *) self); } From fc1e46e9fdca46cd5b4299dfd5babd927f8a391a Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Tue, 2 Oct 2018 21:45:56 +0000 Subject: [PATCH 04/21] Start of 3.5.0 in progress --- VERSION | 2 +- setup.py | 1 + src/include/cdt_types.h | 23 +++++++ src/include/types.h | 8 +++ src/main/aerospike.c | 11 +++- src/main/cdt_types/type.c | 132 ++++++++++++++++++++++++++++++++++++++ src/main/conversions.c | 5 +- 7 files changed, 179 insertions(+), 3 deletions(-) create mode 100644 src/include/cdt_types.h create mode 100644 src/main/cdt_types/type.c diff --git a/VERSION b/VERSION index 6cb9d3dd0..1545d9665 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.4.3 +3.5.0 diff --git a/setup.py b/setup.py index c62eb049f..b8fd3bbd0 100644 --- a/setup.py +++ b/setup.py @@ -289,6 +289,7 @@ def resolve_c_client(): 'src/main/tls_config.c', 'src/main/global_hosts/type.c', 'src/main/nullobject/type.c', + 'src/main/cdt_types/type.c', ], # Compile diff --git a/src/include/cdt_types.h b/src/include/cdt_types.h new file mode 100644 index 000000000..206adec8b --- /dev/null +++ b/src/include/cdt_types.h @@ -0,0 +1,23 @@ +/******************************************************************************* + * Copyright 2013-2017 Aerospike, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +#pragma once +#define AS_CDT_WILDCARD_NAME "aerospike.CDTWildcard" +#define AS_CDT_INFINITE_NAME "aerospike.CDTInfinite" +#include + +PyTypeObject * AerospikeWildcardObject_Ready(); +PyTypeObject * AerospikeInfiniteObject_Ready(); diff --git a/src/include/types.h b/src/include/types.h index c7daed87e..108ecd91a 100644 --- a/src/include/types.h +++ b/src/include/types.h @@ -37,6 +37,14 @@ typedef struct { PyObject_HEAD } AerospikeNullObject; +typedef struct { + PyObject_HEAD +} AerospikeCDTWildcardObject; + +typedef struct { + PyObject_HEAD +} AerospikeCDTInfObject; + typedef struct { PyObject_HEAD aerospike * as; diff --git a/src/main/aerospike.c b/src/main/aerospike.c index 88224b17e..42d056f12 100644 --- a/src/main/aerospike.c +++ b/src/main/aerospike.c @@ -31,6 +31,7 @@ #include "serializer.h" #include "module_functions.h" #include "nullobject.h" +#include "cdt_types.h" PyObject *py_global_hosts; int counter = 0xA7000000; @@ -93,7 +94,7 @@ AerospikeConstants operator_constants[] = { MOD_INIT(aerospike) { - const char version[8] = "3.4.3"; + const char version[8] = "3.5.0"; // Makes things "thread-safe" PyEval_InitThreads(); int i = 0; @@ -151,5 +152,13 @@ MOD_INIT(aerospike) Py_INCREF(null_object); PyModule_AddObject(aerospike, "null", (PyObject *) null_object); + PyTypeObject * wildcard_object = AerospikeWildcardObject_Ready(); + Py_INCREF(wildcard_object); + PyModule_AddObject(aerospike, "CDTWildcard", (PyObject *) wildcard_object); + + PyTypeObject * infinite_object = AerospikeInfiniteObject_Ready(); + Py_INCREF(infinite_object); + PyModule_AddObject(aerospike, "CDTInfinite", (PyObject *) infinite_object); + return MOD_SUCCESS_VAL(aerospike); } diff --git a/src/main/cdt_types/type.c b/src/main/cdt_types/type.c new file mode 100644 index 000000000..d8d2c3cf0 --- /dev/null +++ b/src/main/cdt_types/type.c @@ -0,0 +1,132 @@ +/******************************************************************************* + * Copyright 2013-2017 Aerospike, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +#include +#include +#include +#include +#include "types.h" +#include "cdt_types.h" + +static PyObject * AerospikeWildCardType_New(PyTypeObject * parent, PyObject * args, PyObject * kwds); +static PyObject * AerospikeInfiniteType_New(PyTypeObject * parent, PyObject * args, PyObject * kwds); + +/******************************************************************************* + * PYTHON TYPE DESCRIPTOR + ******************************************************************************/ + +static PyTypeObject AerospikeCDTWildcard_Type = { + PyVarObject_HEAD_INIT(NULL, 0) + AS_CDT_WILDCARD_NAME, // tp_name + sizeof(AerospikeCDTWildcardObject), // tp_basicsize + 0, // tp_itemsize + 0, // tp_dealloc + 0, // tp_print + 0, // tp_getattr + 0, // tp_setattr + 0, // tp_compare + 0, // tp_repr + 0, // tp_as_number + 0, // tp_as_sequence + 0, // tp_as_mapping + 0, // tp_hash + 0, // tp_call + 0, // tp_str + 0, // tp_getattro + 0, // tp_setattro + 0, // tp_as_buffer + Py_TPFLAGS_DEFAULT, // tp_flags + "A type used to match anything when used in a Map or list comparison.\n", //tp_doc + 0, // tp_traverse + 0, // tp_clear + 0, // tp_richcompare + 0, // tp_weaklistoffset + 0, // tp_iter + 0, // tp_iternext + 0, // tp_methods + 0, // tp_members + 0, // tp_getset + 0, // tp_base + 0, // tp_dict + 0, // tp_descr_get + 0, // tp_descr_set + 0, // tp_dictoffset + 0, // tp_init + 0, // tp_alloc + AerospikeWildCardType_New // tp_new +}; + +PyTypeObject * AerospikeWildcardObject_Ready() +{ + return PyType_Ready(&AerospikeCDTWildcard_Type) == 0 ? &AerospikeCDTWildcard_Type : NULL; +} + +static PyTypeObject AerospikeCDTInfinite_Type = { + PyVarObject_HEAD_INIT(NULL, 0) + AS_CDT_INFINITE_NAME, // tp_name + sizeof(AerospikeCDTInfObject), // tp_basicsize + 0, // tp_itemsize + 0, // tp_dealloc + 0, // tp_print + 0, // tp_getattr + 0, // tp_setattr + 0, // tp_compare + 0, // tp_repr + 0, // tp_as_number + 0, // tp_as_sequence + 0, // tp_as_mapping + 0, // tp_hash + 0, // tp_call + 0, // tp_str + 0, // tp_getattro + 0, // tp_setattro + 0, // tp_as_buffer + Py_TPFLAGS_DEFAULT, // tp_flags + "A type used to match anything when used in a Map or list comparison.\n", //tp_doc + 0, // tp_traverse + 0, // tp_clear + 0, // tp_richcompare + 0, // tp_weaklistoffset + 0, // tp_iter + 0, // tp_iternext + 0, // tp_methods + 0, // tp_members + 0, // tp_getset + 0, // tp_base + 0, // tp_dict + 0, // tp_descr_get + 0, // tp_descr_set + 0, // tp_dictoffset + 0, // tp_init + 0, // tp_alloc + AerospikeInfiniteType_New // tp_new +}; + +PyTypeObject * AerospikeInfiniteObject_Ready() +{ + return PyType_Ready(&AerospikeCDTInfinite_Type) == 0 ? &AerospikeCDTInfinite_Type : NULL; +} + + +static PyObject * AerospikeWildCardType_New(PyTypeObject * parent, PyObject * args, PyObject * kwds) +{ + return (PyObject *) PyObject_New(AerospikeCDTWildcardObject, parent); +} + +static PyObject * AerospikeInfiniteType_New(PyTypeObject * parent, PyObject * args, PyObject * kwds) +{ + return (PyObject *) PyObject_New(AerospikeCDTInfObject, parent); +} \ No newline at end of file diff --git a/src/main/conversions.c b/src/main/conversions.c index 83a6c166c..de389fa8f 100644 --- a/src/main/conversions.c +++ b/src/main/conversions.c @@ -35,6 +35,7 @@ #include #include #include +#include #include "conversions.h" #include "geo.h" @@ -474,7 +475,9 @@ as_status pyobject_to_val(AerospikeClient * self, as_error * err, PyObject * py_ } else if (Py_None == py_obj) { *val = as_val_reserve(&as_nil); } else if (!strcmp(py_obj->ob_type->tp_name, "aerospike.null")) { - *val = (as_val *) &as_nil; + *val = (as_val *) as_val_reserve(&as_nil); + } else if (!strcmp(py_obj->ob_type->tp_name, "aerospike.null")) { + *val = (as_val *) &AS_CMP_WILDCARD; } else { if (aerospike_has_double(self->as) && PyFloat_Check(py_obj)) { double d = PyFloat_AsDouble(py_obj); From 2e52fc15b592cd2e3ea008dce3a79a00d00f17bf Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Tue, 2 Oct 2018 22:25:19 +0000 Subject: [PATCH 05/21] Add support for CDT Wildcard types --- src/include/macros.h | 3 +++ src/main/conversions.c | 15 +++++++++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/include/macros.h b/src/include/macros.h index 4756a6026..1c4f3e0dc 100644 --- a/src/include/macros.h +++ b/src/include/macros.h @@ -49,3 +49,6 @@ ob = Py_InitModule3(name, methods, doc); #define MOD_SUCCESS_VAL(val) #endif + +// pyval is a PyObject* classname is a string +#define AS_Matches_Classname(pyval, classname) (strcmp((pyval)->ob_type->tp_name, (classname)) == 0) \ No newline at end of file diff --git a/src/main/conversions.c b/src/main/conversions.c index de389fa8f..23c411824 100644 --- a/src/main/conversions.c +++ b/src/main/conversions.c @@ -42,6 +42,7 @@ #include "policy.h" #include "serializer.h" #include "exceptions.h" +#include "cdt_types.h" #define PY_KEYT_NAMESPACE 0 #define PY_KEYT_SET 1 @@ -476,8 +477,10 @@ as_status pyobject_to_val(AerospikeClient * self, as_error * err, PyObject * py_ *val = as_val_reserve(&as_nil); } else if (!strcmp(py_obj->ob_type->tp_name, "aerospike.null")) { *val = (as_val *) as_val_reserve(&as_nil); - } else if (!strcmp(py_obj->ob_type->tp_name, "aerospike.null")) { - *val = (as_val *) &AS_CMP_WILDCARD; + } else if (AS_Matches_Classname(py_obj, AS_CDT_WILDCARD_NAME)) { + *val = (as_val *) as_val_reserve(&as_cmp_wildcard); + } else if (AS_Matches_Classname(py_obj, AS_CDT_INFINITE_NAME)) { + *val = (as_val *) as_val_reserve(&as_cmp_inf); } else { if (aerospike_has_double(self->as) && PyFloat_Check(py_obj)) { double d = PyFloat_AsDouble(py_obj); @@ -803,9 +806,13 @@ as_status pyobject_to_astype_write(AerospikeClient * self, as_error * err, PyObj if (err->code == AEROSPIKE_OK) { *val = (as_val *) map; } - } else if (!strcmp(py_value->ob_type->tp_name, "aerospike.null")) { + } else if (AS_Matches_Classname(py_value, "aerospike.null")) { *val = (as_val *) &as_nil; - } else { + } else if (AS_Matches_Classname(py_value, AS_CDT_WILDCARD_NAME)) { + *val = (as_val *) as_val_reserve(&as_cmp_wildcard); + } else if (AS_Matches_Classname(py_value, AS_CDT_INFINITE_NAME)) { + *val = (as_val *) as_val_reserve(&as_cmp_inf); + }else { if (aerospike_has_double(self->as) && PyFloat_Check(py_value)) { double d = PyFloat_AsDouble(py_value); *val = (as_val *) as_double_new(d); From e2255c3962cca9058f51ccfbb2fe4b309e7d8f8b Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Thu, 4 Oct 2018 18:05:11 +0000 Subject: [PATCH 06/21] Update C client version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index b8fd3bbd0..5b8c35069 100644 --- a/setup.py +++ b/setup.py @@ -30,7 +30,7 @@ os.environ['ARCHFLAGS'] = '-arch x86_64' AEROSPIKE_C_VERSION = os.getenv('AEROSPIKE_C_VERSION') if not AEROSPIKE_C_VERSION: - AEROSPIKE_C_VERSION = '4.3.18' + AEROSPIKE_C_VERSION = '4.3.19' DOWNLOAD_C_CLIENT = os.getenv('DOWNLOAD_C_CLIENT') AEROSPIKE_C_HOME = os.getenv('AEROSPIKE_C_HOME') PREFIX = None From 8bda60225f30e07925693579f855b860615d1e39 Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Fri, 19 Oct 2018 22:36:48 +0000 Subject: [PATCH 07/21] Fix memory leak when query or scan fails --- src/main/query/results.c | 21 +-------------------- src/main/scan/results.c | 1 + 2 files changed, 2 insertions(+), 20 deletions(-) diff --git a/src/main/query/results.c b/src/main/query/results.c index 1f7001943..3a2619503 100644 --- a/src/main/query/results.c +++ b/src/main/query/results.c @@ -50,30 +50,18 @@ static bool each_result(const as_val * val, void * udata) as_error err; - TRACE(); - PyGILState_STATE gstate; gstate = PyGILState_Ensure(); - TRACE(); - val_to_pyobject(data->client, &err, val, &py_result); - TRACE(); if (py_result) { - TRACE(); PyList_Append(py_results, py_result); - - TRACE(); Py_DECREF(py_result); } - - TRACE(); - PyGILState_Release(gstate); - TRACE(); return true; } @@ -119,33 +107,26 @@ PyObject * AerospikeQuery_Results(AerospikeQuery * self, PyObject * args, PyObje goto CLEANUP; } - TRACE(); py_results = PyList_New(0); data.py_results = py_results; - TRACE(); PyThreadState * _save = PyEval_SaveThread(); - TRACE(); aerospike_query_foreach(self->client->as, &err, query_policy_p, &self->query, each_result, &data); - TRACE(); PyEval_RestoreThread(_save); CLEANUP:/*??trace()*/ - TRACE(); if (err.code != AEROSPIKE_OK) { + Py_XDECREF(py_results); PyObject * py_err = NULL; error_to_pyobject(&err, &py_err); PyObject *exception_type = raise_exception(&err); PyErr_SetObject(exception_type, py_err); Py_DECREF(py_err); - TRACE(); return NULL; } - TRACE(); - if (self->query.apply.arglist) { as_arraylist_destroy( (as_arraylist *) self->query.apply.arglist ); } diff --git a/src/main/scan/results.c b/src/main/scan/results.c index eb6d9759f..414557a4e 100644 --- a/src/main/scan/results.c +++ b/src/main/scan/results.c @@ -144,6 +144,7 @@ PyObject * AerospikeScan_Results(AerospikeScan * self, PyObject * args, PyObject Py_XDECREF(py_ustr); if (err.code != AEROSPIKE_OK) { + Py_XDECREF(py_results); PyObject * py_err = NULL; error_to_pyobject(&err, &py_err); PyObject *exception_type = raise_exception(&err); From 5978828c11143a208a19c6a0ca7cb2622671b284 Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Thu, 25 Oct 2018 20:55:45 +0000 Subject: [PATCH 08/21] Remove deprecated config item from conf file --- .travis/aerospike.conf | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis/aerospike.conf b/.travis/aerospike.conf index 6c9145bd7..9138023c8 100644 --- a/.travis/aerospike.conf +++ b/.travis/aerospike.conf @@ -20,7 +20,6 @@ logging { } mod-lua { - system-path ${home}/share/udf/lua user-path ${home}/var/udf/lua } From b8b98a5d69988bcda5bc6f91691a8a4adc6ce17c Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Fri, 26 Oct 2018 16:47:36 +0000 Subject: [PATCH 09/21] Updated helpers to allow passing of map policy to map modify operations --- .../operations/map_operations.py | 41 +++++++++++++++---- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/aerospike_helpers/operations/map_operations.py b/aerospike_helpers/operations/map_operations.py index 637c5ff23..02299dde2 100644 --- a/aerospike_helpers/operations/map_operations.py +++ b/aerospike_helpers/operations/map_operations.py @@ -24,7 +24,7 @@ def map_set_policy(bin_name, policy): Args: bin_name (str): The name of the bin containing the map. - policy (dict): The map policy dictionary + policy (dict): The map policy dictionary. See: See :ref:`aerospike_map_policies` Returns: A dictionary usable in operate or operate_ordered. The format of the dictionary should be considered an internal detail, and subject to change. @@ -36,7 +36,7 @@ def map_set_policy(bin_name, policy): } -def map_put(bin_name, key, value): +def map_put(bin_name, key, value, map_policy=None): """Creates a map_put operation to be used with operate or operate_ordered The operation allows a user to set the value of an item in the map stored @@ -46,19 +46,25 @@ def map_put(bin_name, key, value): bin_name (str): The name of the bin containing the map. key: The key for the map. value: The item to store in the map with the corresponding key. + map_policy (dict, optional): Optional map_policy dictionary dictates the type of map to create when it does not exist. + The map policy also specifies the mode used when writing items to the map. Defaults to `None`. See :ref:`aerospike_map_policies` Returns: A dictionary usable in operate or operate_ordered. The format of the dictionary should be considered an internal detail, and subject to change. """ - return { + op_dict = { OP_KEY: aerospike.OP_MAP_PUT, BIN_KEY: bin_name, KEY_KEY: key, VALUE_KEY: value } + if map_policy is not None: + op_dict[POLICY_KEY] = map_policy + + return op_dict -def map_put_items(bin_name, item_dict): +def map_put_items(bin_name, item_dict, map_policy=None): """Creates a map_put_items operation to be used with operate or operate_ordered The operation allows a user to add or update items in the map stored on the server. @@ -66,6 +72,8 @@ def map_put_items(bin_name, item_dict): Args: bin_name (str): The name of the bin containing the map. item_dict (dict): A dictionary of key value pairs to be added to the map on the server. + map_policy (dict, optional): Optional map_policy dictionary dictates the type of map to create when it does not exist. + The map policy also specifies the mode used when writing items to the map. Defaults to `None`. See :ref:`aerospike_map_policies` Returns: A dictionary usable in operate or operate_ordered. The format of the dictionary should be considered an internal detail, and subject to change. @@ -76,8 +84,12 @@ def map_put_items(bin_name, item_dict): VALUE_KEY: item_dict } + if map_policy is not None: + op_dict[POLICY_KEY] = map_policy + + return op_dict -def map_increment(bin_name, key, amount): +def map_increment(bin_name, key, amount, map_policy=None): """Creates a map_increment operation to be used with operate or operate_ordered The operation allows a user to increment the value of a value stored in the map on the server. @@ -86,19 +98,26 @@ def map_increment(bin_name, key, amount): bin_name (str): The name of the bin containing the map. key: The key for the value to be incremented. amount: The amount by which to increment the value stored in map[key] + map_policy (dict, optional): Optional map_policy dictionary dictates the type of map to create when it does not exist. + The map policy also specifies the mode used when writing items to the map. Defaults to `None`. See See :ref:`aerospike_map_policies` Returns: A dictionary usable in operate or operate_ordered. The format of the dictionary should be considered an internal detail, and subject to change. """ - return { + op_dict = { OP_KEY: aerospike.OP_MAP_INCREMENT, BIN_KEY: bin_name, KEY_KEY: key, VALUE_KEY: amount } + if map_policy is not None: + op_dict[POLICY_KEY] = map_policy + + return op_dict + -def map_decrement(bin_name, key, amount): +def map_decrement(bin_name, key, amount, map_policy=None): """Creates a map_decrement operation to be used with operate or operate_ordered The operation allows a user to decrement the value of a value stored in the map on the server. @@ -107,17 +126,23 @@ def map_decrement(bin_name, key, amount): bin_name (str): The name of the bin containing the map. key: The key for the value to be decremented. amount: The amount by which to decrement the value stored in map[key] + map_policy (dict, optional): Optional map_policy dictionary dictates the type of map to create when it does not exist. + The map policy also specifies the mode used when writing items to the map. Defaults to `None`. See See :ref:`aerospike_map_policies` Returns: A dictionary usable in operate or operate_ordered. The format of the dictionary should be considered an internal detail, and subject to change. """ - return { + op_dict = { OP_KEY: aerospike.OP_MAP_DECREMENT, BIN_KEY: bin_name, KEY_KEY: key, VALUE_KEY: amount } + if map_policy is not None: + op_dict[POLICY_KEY] = map_policy + + return op_dict def map_size(bin_name): """Creates a map_size operation to be used with operate or operate_ordered From c1b1ceee40683e59331eccfe65a1ad2fd8ce0b7b Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Thu, 8 Nov 2018 23:21:43 +0000 Subject: [PATCH 10/21] Remove batch_direct references --- .../operations/map_operations.py | 4 +- doc/aerospike.rst | 1 - setup.py | 2 +- src/main/client/get_many.c | 9 +- src/main/client/select_many.c | 9 +- src/main/client/type.c | 13 --- src/main/policy.c | 1 - src/main/policy_config.c | 5 -- test/new_tests/test_connect.py | 3 +- test/new_tests/test_exists_many.py | 87 +------------------ test/new_tests/test_get_many.py | 16 ---- test/new_tests/test_select_many.py | 6 -- 12 files changed, 8 insertions(+), 148 deletions(-) diff --git a/aerospike_helpers/operations/map_operations.py b/aerospike_helpers/operations/map_operations.py index 02299dde2..c558f6ce4 100644 --- a/aerospike_helpers/operations/map_operations.py +++ b/aerospike_helpers/operations/map_operations.py @@ -113,7 +113,7 @@ def map_increment(bin_name, key, amount, map_policy=None): if map_policy is not None: op_dict[POLICY_KEY] = map_policy - + return op_dict @@ -141,7 +141,7 @@ def map_decrement(bin_name, key, amount, map_policy=None): if map_policy is not None: op_dict[POLICY_KEY] = map_policy - + return op_dict def map_size(bin_name): diff --git a/doc/aerospike.rst b/doc/aerospike.rst index 5d9782afe..8c704af77 100644 --- a/doc/aerospike.rst +++ b/doc/aerospike.rst @@ -99,7 +99,6 @@ in an in-memory primary index. that has already been reaped by the server. Default: 0 seconds (disabled) for non-TLS connections, 55 seconds for TLS connections. * **max_conns_per_node** maximum number of pipeline connections allowed for each node - * **batch_direct** whether to use the batch-direct protocol (default: ``False``, so will use batch-index if available) (**Deprecated**: set 'use_batch_direct' in the batch policy dictionary) * **tend_interval** polling interval in milliseconds for tending the cluster (default: 1000) * **compression_threshold** compress data for transmission if the object size is greater than a given number of bytes (default: 0, meaning 'never compress') (**Deprecated**, set this in the 'write' policy dictionary) * **cluster_name** only server nodes matching this name will be used when determining the cluster diff --git a/setup.py b/setup.py index 5b8c35069..2d117b3a4 100644 --- a/setup.py +++ b/setup.py @@ -30,7 +30,7 @@ os.environ['ARCHFLAGS'] = '-arch x86_64' AEROSPIKE_C_VERSION = os.getenv('AEROSPIKE_C_VERSION') if not AEROSPIKE_C_VERSION: - AEROSPIKE_C_VERSION = '4.3.19' + AEROSPIKE_C_VERSION = '4.3.20' DOWNLOAD_C_CLIENT = os.getenv('DOWNLOAD_C_CLIENT') AEROSPIKE_C_HOME = os.getenv('AEROSPIKE_C_HOME') PREFIX = None diff --git a/src/main/client/get_many.c b/src/main/client/get_many.c index a77286054..a78fae5a6 100644 --- a/src/main/client/get_many.c +++ b/src/main/client/get_many.c @@ -323,7 +323,6 @@ PyObject * AerospikeClient_Get_Many_Invoke( as_policy_batch policy; as_policy_batch * batch_policy_p = NULL; bool has_batch_index = false; - bool use_batch_direct = false; // Initialize error as_error_init(&err); @@ -344,14 +343,8 @@ PyObject * AerospikeClient_Get_Many_Invoke( goto CLEANUP; } - if (batch_policy_p) { - use_batch_direct = batch_policy_p->use_batch_direct; - } else { - use_batch_direct = self->as->config.policies.batch.use_batch_direct; - } - has_batch_index = aerospike_has_batch_index(self->as); - if (has_batch_index && !use_batch_direct) { + if (has_batch_index) { py_recs = batch_get_aerospike_batch_read(&err, self, py_keys, batch_policy_p); } else { py_recs = batch_get_aerospike_batch_get(&err, self, py_keys, batch_policy_p); diff --git a/src/main/client/select_many.c b/src/main/client/select_many.c index b4e363fdd..7bb6cfa4e 100644 --- a/src/main/client/select_many.c +++ b/src/main/client/select_many.c @@ -312,7 +312,6 @@ PyObject * AerospikeClient_Select_Many_Invoke( Py_ssize_t bins_size = 0; char **filter_bins = NULL; bool has_batch_index = false; - bool use_batch_direct = false; // Unicode object's pool UnicodePyObjects u_objs; @@ -378,14 +377,8 @@ PyObject * AerospikeClient_Select_Many_Invoke( goto CLEANUP; } - if (batch_policy_p) { - use_batch_direct = batch_policy_p->use_batch_direct; - } else { - use_batch_direct = self->as->config.policies.batch.use_batch_direct; - } - has_batch_index = aerospike_has_batch_index(self->as); - if (has_batch_index && !use_batch_direct) { + if (has_batch_index) { py_recs = batch_select_aerospike_batch_read(&err, self, py_keys, batch_policy_p, filter_bins, bins_size); } else { py_recs = batch_select_aerospike_batch_get(&err, self, py_keys, batch_policy_p, filter_bins, bins_size); diff --git a/src/main/client/type.c b/src/main/client/type.c index ad0b923e3..9d022ef37 100644 --- a/src/main/client/type.c +++ b/src/main/client/type.c @@ -1047,7 +1047,6 @@ static int AerospikeClient_Type_Init(AerospikeClient * self, PyObject * args, Py as_policies_init(&config.policies); //Set default value of use_batch_direct - config.policies.batch.use_batch_direct = false; PyObject * py_policies = PyDict_GetItemString(py_config, "policies"); if (py_policies && PyDict_Check(py_policies)) { @@ -1167,13 +1166,6 @@ static int AerospikeClient_Type_Init(AerospikeClient * self, PyObject * args, Py config.thread_pool_size = PyInt_AsLong(py_thread_pool_size); } - // This does not match documentation (wrong name and location in dict), - // but leave it for now for customers who may be using it - PyObject * py_use_batch_direct = PyDict_GetItemString(py_policies, "use_batch_direct"); - if (py_use_batch_direct && PyBool_Check(py_use_batch_direct)) { - config.policies.batch.use_batch_direct = PyInt_AsLong(py_use_batch_direct); - } - /* * Generation policy is removed from constructor. */ @@ -1222,11 +1214,6 @@ static int AerospikeClient_Type_Init(AerospikeClient * self, PyObject * args, Py config.max_conns_per_node = PyInt_AsLong(py_max_conns); } - // batch_direct - PyObject * py_batch_direct = PyDict_GetItemString(py_config, "batch_direct"); - if (py_batch_direct && PyBool_Check(py_batch_direct)) { - config.policies.batch.use_batch_direct = PyInt_AsLong(py_batch_direct); - } //conn_timeout_ms PyObject * py_connect_timeout = PyDict_GetItemString(py_config, "connect_timeout"); diff --git a/src/main/policy.c b/src/main/policy.c index 0b865cb8c..e9d182f16 100644 --- a/src/main/policy.c +++ b/src/main/policy.c @@ -737,7 +737,6 @@ as_status pyobject_to_policy_batch(as_error * err, PyObject * py_policy, POLICY_SET_FIELD(consistency_level, as_policy_consistency_level); POLICY_SET_FIELD(concurrent, bool); - POLICY_SET_FIELD(use_batch_direct, bool); POLICY_SET_FIELD(allow_inline, bool); POLICY_SET_FIELD(send_set_name, bool); POLICY_SET_FIELD(deserialize, bool); diff --git a/src/main/policy_config.c b/src/main/policy_config.c index 949854051..c0e26ac0e 100644 --- a/src/main/policy_config.c +++ b/src/main/policy_config.c @@ -415,11 +415,6 @@ as_status set_batch_policy(as_policy_batch* batch_policy, PyObject* py_policy) { return status; } - status = set_optional_bool_property(&batch_policy->use_batch_direct, py_policy, "use_batch_direct"); - if (status != AEROSPIKE_OK) { - return status; - } - status = set_optional_bool_property(&batch_policy->allow_inline, py_policy, "allow_inline"); if (status != AEROSPIKE_OK) { return status; diff --git a/test/new_tests/test_connect.py b/test/new_tests/test_connect.py index 94dc37bbc..ffdc6a341 100644 --- a/test/new_tests/test_connect.py +++ b/test/new_tests/test_connect.py @@ -83,8 +83,7 @@ def test_connect_positive_unicode_hosts(self): uni = json.dumps(self.connection_config['hosts'][0]) hostlist = json.loads(uni) config = { - 'hosts': [tuple(hostlist)], - 'policies': {'use_batch_direct': True} + 'hosts': [tuple(hostlist)] } with open_as_connection(config) as client: assert client is not None diff --git a/test/new_tests/test_exists_many.py b/test/new_tests/test_exists_many.py index 60d370d9b..b7ac9dd3b 100644 --- a/test/new_tests/test_exists_many.py +++ b/test/new_tests/test_exists_many.py @@ -174,91 +174,8 @@ def test_pos_exists_many_with_record_expiry(self, put_data): records = self.as_connection.exists_many(keys) assert isinstance(records, list) assert len(records) == 1 - for x in records: - assert x[1] is None - - def test_exists_many_with_batch_direct_as_constructor_arg(self, put_data): - - hostlist, user, password = TestBaseClass.get_hosts() - config = {'policies': {'use_batch_direct': True}} - client_batch_direct = TestBaseClass.get_new_connection(add_config=config) - - self.keys = [] - rec_length = 5 - for i in range(rec_length): - key = ('test', 'demo', i) - record = {'name': 'name%s' % (str(i)), 'age': i} - put_data(self.as_connection, key, record) - self.keys.append(key) - - self.keys.append(('test', 'demo', 'some_key')) - - for i in range(15, 20): - key = ('test', 'demo', i) - rec = {'name': 'name%s' % (str(i)), 'age': i} - put_data(self.as_connection, key, rec) - self.keys.append(key) - - records = client_batch_direct.exists_many(self.keys) - - assert isinstance(records, list) - assert len(records) == len(self.keys) - - client_batch_direct.close() - - def test_with_batch_direct_true_in_constructor_false_in_args(self, put_data): - - hostlist, user, password = TestBaseClass.get_hosts() - config = {'policies': {'use_batch_direct': True}} - client_batch_direct = TestBaseClass.get_new_connection(add_config=config) - policies = {'use_batch_direct': False} - self.keys = [] - rec_length = 5 - for i in range(rec_length): - key = ('test', 'demo', i) - record = {'name': 'name%s' % (str(i)), 'age': i} - put_data(self.as_connection, key, record) - self.keys.append(key) - - self.keys.append(('test', 'demo', 'some_key')) - - for i in range(15, 20): - key = ('test', 'demo', i) - rec = {'name': 'name%s' % (str(i)), 'age': i} - put_data(self.as_connection, key, rec) - self.keys.append(key) - - records = client_batch_direct.exists_many(self.keys) - - assert isinstance(records, list) - assert len(records) == len(self.keys) - - client_batch_direct.close() - - def test_exists_many_with_batch_direct_as_method_arg(self, put_data): - - policies = {'use_batch_direct': True} - - self.keys = [] - rec_length = 5 - for i in range(rec_length): - key = ('test', 'demo', i) - record = {'name': 'name%s' % (str(i)), 'age': i} - put_data(self.as_connection, key, record) - self.keys.append(key) - - self.keys.append(('test', 'demo', 'some_key')) - - for i in range(15, 20): - key = ('test', 'demo', i) - rec = {'name': 'name%s' % (str(i)), 'age': i} - put_data(self.as_connection, key, rec) - self.keys.append(key) - - records = self.as_connection.exists_many(self.keys, policies) - - assert isinstance(records, list) - assert len(records) == len(self.keys) + for record in records: + assert record[1] is None def test_exists_many_with_bytearray_key(self, put_data): self.keys = [('test', 'demo', bytearray([1, 2, 3]))] diff --git a/test/new_tests/test_get_many.py b/test/new_tests/test_get_many.py index 75d73a73a..ed886ffa9 100644 --- a/test/new_tests/test_get_many.py +++ b/test/new_tests/test_get_many.py @@ -185,22 +185,6 @@ def test_get_many_with_bytearray_key(self): bytearray_pk = bytearray_key[2] assert bytearray_pk == bytearray([1, 2, 3]) - def test_pos_get_many_with_use_batch_direct(self): - - hostlist, user, password = TestBaseClass.get_hosts() - config = {'policies': {'batch': {'use_batch_direct': True}}} - client_batch_direct = TestBaseClass.get_new_connection(add_config=config) - - records = client_batch_direct.get_many(self.keys) - - assert isinstance(records, list) - assert len(records) == 6 - assert Counter([x[0][2] for x in records]) == Counter([0, 1, 2, 3, - 4, 'float_value']) - assert records[5][2] == {'float_value': 4.3} - - client_batch_direct.close() - def test_pos_get_many_with_constructor_batch_direct_and_method_arg(self): ''' This sets use batch_direct to true in the constructor diff --git a/test/new_tests/test_select_many.py b/test/new_tests/test_select_many.py index 3d2ed2c50..5f45bfe78 100644 --- a/test/new_tests/test_select_many.py +++ b/test/new_tests/test_select_many.py @@ -189,12 +189,6 @@ def test_get_many_with_bytearray_key(self): bytearray_pk = bytearray_key[2] assert bytearray_pk == bytearray([1, 2, 3]) - def test_with_use_batch_direct_true_argument(self): - policies = {'use_batch_direct': True} - records = self.as_connection.select_many(self.keys, [], policies) - assert isinstance(records, list) - assert len(records) == len(self.keys) - def test_with_use_batch_direct_true_in_constructor_false_argument(self): hostlist, user, password = TestBaseClass.get_hosts() From 61f5684a39090021d9564dae6a298e83713153a4 Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Fri, 9 Nov 2018 22:32:01 +0000 Subject: [PATCH 11/21] Add new background_execute method to query, fix system error in predicate construction --- setup.py | 1 + src/include/query.h | 6 +++ src/main/predicates.c | 26 ++++------ src/main/query/execute_background.c | 80 +++++++++++++++++++++++++++++ src/main/query/type.c | 7 +++ test/new_tests/test_predicates.py | 18 +++++++ 6 files changed, 122 insertions(+), 16 deletions(-) create mode 100644 src/main/query/execute_background.c create mode 100644 test/new_tests/test_predicates.py diff --git a/setup.py b/setup.py index 2d117b3a4..548fa75f9 100644 --- a/setup.py +++ b/setup.py @@ -272,6 +272,7 @@ def resolve_c_client(): 'src/main/query/results.c', 'src/main/query/select.c', 'src/main/query/where.c', + 'src/main/query/execute_background.c', 'src/main/scan/type.c', 'src/main/scan/foreach.c', 'src/main/scan/results.c', diff --git a/src/include/query.h b/src/include/query.h index dc2144cc0..861db7e64 100644 --- a/src/include/query.h +++ b/src/include/query.h @@ -94,6 +94,12 @@ PyObject * AerospikeQuery_Foreach(AerospikeQuery * self, PyObject * args, PyObje */ PyObject * AerospikeQuery_Results(AerospikeQuery * self, PyObject * args, PyObject * kwds); +/** + * Execute a UDF in the background. Returns the query id to allow status of the query to be monitored + * */ + +PyObject * AerospikeQuery_ExecuteBackground(AerospikeQuery * self, PyObject * args, PyObject * kwds); + /** * Store the Unicode -> UTF8 string converted PyObject into * a pool of PyObjects. So that, they will be decref'ed at later stages diff --git a/src/main/predicates.c b/src/main/predicates.c index 7a353e2c8..8698d84c9 100644 --- a/src/main/predicates.c +++ b/src/main/predicates.c @@ -35,7 +35,7 @@ static PyObject * AerospikePredicates_Equals(PyObject * self, PyObject * args) if (PyArg_ParseTuple(args, "OO:equals", &py_bin, &py_val) == false) { - goto exit; + return NULL; } if (PyInt_Check(py_val) || PyLong_Check(py_val)) { @@ -44,7 +44,6 @@ static PyObject * AerospikePredicates_Equals(PyObject * self, PyObject * args) return Py_BuildValue("iiOO", AS_PREDICATE_EQUAL, AS_INDEX_STRING, py_bin, py_val); } -exit: Py_INCREF(Py_None); return Py_None; } @@ -58,7 +57,7 @@ static PyObject * AerospikePredicates_Contains(PyObject * self, PyObject * args) if (PyArg_ParseTuple(args, "OOO:equals", &py_bin, &py_indextype, &py_val) == false) { - goto exit; + return NULL; } if (PyInt_Check(py_indextype)) { @@ -90,7 +89,7 @@ static PyObject * AerospikePredicates_RangeContains(PyObject * self, PyObject * if (PyArg_ParseTuple(args, "OOOO:equals", &py_bin, &py_indextype, &py_min, &py_max) == false) { - goto exit; + return NULL; } if (PyInt_Check(py_indextype)) { @@ -118,14 +117,13 @@ static PyObject * AerospikePredicates_Between(PyObject * self, PyObject * args) if (PyArg_ParseTuple(args, "OOO:between", &py_bin, &py_min, &py_max) == false) { - goto exit; + return NULL; } if ((PyInt_Check(py_min) || PyLong_Check(py_min)) && (PyInt_Check(py_max) || PyLong_Check(py_max))) { return Py_BuildValue("iiOOO", AS_PREDICATE_RANGE, AS_INDEX_NUMERIC, py_bin, py_min, py_max); } -exit: Py_INCREF(Py_None); return Py_None; } @@ -138,7 +136,7 @@ static PyObject * AerospikePredicates_GeoWithin_GeoJSONRegion(PyObject * self, P if (PyArg_ParseTuple(args, "OO|O:geo_within_geojson_region", &py_bin, &py_shape, &py_indexType) == false) { - goto exit; + return NULL; } if (!py_indexType) { @@ -149,7 +147,6 @@ static PyObject * AerospikePredicates_GeoWithin_GeoJSONRegion(PyObject * self, P return Py_BuildValue("iiOOOO", AS_PREDICATE_RANGE, AS_INDEX_GEO2DSPHERE, py_bin, py_shape, Py_None, py_indexType); } -exit: Py_INCREF(Py_None); return Py_None; } @@ -168,17 +165,16 @@ static PyObject * AerospikePredicates_GeoWithin_Radius(PyObject * self, PyObject as_error err; as_error_init(&err); - py_geo_object = PyDict_New(); - if (PyArg_ParseTuple(args, "OOOO|O:geo_within_radius", &py_bin, &py_lat, &py_long, &py_radius, &py_indexType) == false) { - goto CLEANUP; + return NULL; } if (!py_indexType) { py_indexType = Py_BuildValue("i", AS_INDEX_TYPE_DEFAULT); } + py_geo_object = PyDict_New(); PyObject *py_circle = PyString_FromString("AeroCircle"); PyDict_SetItemString(py_geo_object, "type", py_circle); Py_DECREF(py_circle); @@ -247,7 +243,7 @@ static PyObject * AerospikePredicates_GeoContains_GeoJSONPoint(PyObject * self, PyObject * py_indexType = NULL; if (PyArg_ParseTuple(args, "OO|O:geo_contains_geojson_point", &py_bin, &py_point, &py_indexType) == false) { - goto exit; + return NULL; } if (!py_indexType) { @@ -258,7 +254,6 @@ static PyObject * AerospikePredicates_GeoContains_GeoJSONPoint(PyObject * self, return Py_BuildValue("iiOOOO", AS_PREDICATE_RANGE, AS_INDEX_GEO2DSPHERE, py_bin, py_point, Py_None, py_indexType); } -exit: Py_INCREF(Py_None); return Py_None; } @@ -276,17 +271,16 @@ static PyObject * AerospikePredicates_GeoContains_Point(PyObject * self, PyObjec as_error err; as_error_init(&err); - py_geo_object = PyDict_New(); - if (PyArg_ParseTuple(args, "OOO|O:geo_contains_point", &py_bin, &py_lat, &py_long, &py_indexType) == false) { - goto CLEANUP; + return NULL; } if (!py_indexType) { py_indexType = Py_BuildValue("i", AS_INDEX_TYPE_DEFAULT); } + py_geo_object = PyDict_New(); PyObject *py_point = PyString_FromString("Point"); PyDict_SetItemString(py_geo_object, "type", py_point); Py_DECREF(py_point); diff --git a/src/main/query/execute_background.c b/src/main/query/execute_background.c new file mode 100644 index 000000000..9eaf982a8 --- /dev/null +++ b/src/main/query/execute_background.c @@ -0,0 +1,80 @@ +/******************************************************************************* + * Copyright 2013-2017 Aerospike, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +#include +#include +#include + +#include +#include +#include + +#include "client.h" +#include "conversions.h" +#include "exceptions.h" +#include "policy.h" +#include "query.h" + + +PyObject * AerospikeQuery_ExecuteBackground(AerospikeQuery * self, PyObject * args, PyObject * kwds) +{ + PyObject * py_policy = NULL; + + as_policy_write write_policy; + as_policy_write * write_policy_p = NULL; + + uint64_t query_id = 0; + + static char * kwlist[] = {"policy", NULL}; + + if (PyArg_ParseTupleAndKeywords(args, kwds, "|O:execute_background", kwlist, &py_policy) == false) { + return NULL; + } + + as_error err; + as_error_init(&err); + + if (!self || !self->client->as) { + as_error_update(&err, AEROSPIKE_ERR_PARAM, "Invalid aerospike object"); + goto CLEANUP; + } + if (!self->client->is_conn_16) { + as_error_update(&err, AEROSPIKE_ERR_CLUSTER, "No connection to aerospike cluster"); + goto CLEANUP; + } + + if (pyobject_to_policy_write(&err, py_policy, &write_policy, &write_policy_p, + &self->client->as->config.policies.write) != AEROSPIKE_OK) { + goto CLEANUP; + } + + Py_BEGIN_ALLOW_THREADS + aerospike_query_background(self->client->as, &err, write_policy_p, &self->query, &query_id); + Py_END_ALLOW_THREADS + +CLEANUP: + + if (err.code != AEROSPIKE_OK) { + PyObject * py_err = NULL; + error_to_pyobject(&err, &py_err); + PyObject *exception_type = raise_exception(&err); + PyErr_SetObject(exception_type, py_err); + Py_DECREF(py_err); + return NULL; + } + + return PyLong_FromUnsignedLongLong(query_id); +} diff --git a/src/main/query/type.c b/src/main/query/type.c index 20e2b85c8..417cf1b07 100644 --- a/src/main/query/type.c +++ b/src/main/query/type.c @@ -66,6 +66,11 @@ PyDoc_STRVAR(predexp_doc, \n\ Set a the predicates for the query. The values predicate1.. should be generated by the aerospike predexp module functions"); +PyDoc_STRVAR(execute_background_doc, +"execute_background([policy]) -> list of (key, meta, bins)\n\ +\n\ +Buffer the records resulting from the query, and return them as a list of records."); + /******************************************************************************* * PYTHON TYPE METHODS ******************************************************************************/ @@ -89,6 +94,8 @@ static PyMethodDef AerospikeQuery_Type_Methods[] = { {"predexp", (PyCFunction) AerospikeQuery_Predexp, METH_VARARGS, predexp_doc}, + {"execute_background", (PyCFunction) AerospikeQuery_ExecuteBackground, METH_VARARGS, + execute_background_doc}, {NULL} }; diff --git a/test/new_tests/test_predicates.py b/test/new_tests/test_predicates.py new file mode 100644 index 000000000..c18cc3ef9 --- /dev/null +++ b/test/new_tests/test_predicates.py @@ -0,0 +1,18 @@ +from aerospike import predicates as as_predicates +import pytest + +PREDICATE_METHDOS = [ + as_predicates.equals, + as_predicates.contains, + as_predicates.between, + as_predicates.range, + as_predicates.geo_contains_geojson_point, + as_predicates.geo_contains_point, + as_predicates.geo_within_geojson_region, + as_predicates.geo_within_radius +] + +@pytest.mark.parametrize('predicate', PREDICATE_METHDOS) +def test_invalid_predicate_use(predicate): + with pytest.raises(TypeError): + predicate() From 9528d5c538d1445046c471b6bd7cb264f07f36f5 Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Sat, 10 Nov 2018 00:21:17 +0000 Subject: [PATCH 12/21] Allow keyword policy for background_execute, add tests --- src/main/query/type.c | 2 +- .../test_query_execute_background.py | 169 ++++++++++++++++++ 2 files changed, 170 insertions(+), 1 deletion(-) create mode 100644 test/new_tests/test_query_execute_background.py diff --git a/src/main/query/type.c b/src/main/query/type.c index 417cf1b07..9183d4a0b 100644 --- a/src/main/query/type.c +++ b/src/main/query/type.c @@ -94,7 +94,7 @@ static PyMethodDef AerospikeQuery_Type_Methods[] = { {"predexp", (PyCFunction) AerospikeQuery_Predexp, METH_VARARGS, predexp_doc}, - {"execute_background", (PyCFunction) AerospikeQuery_ExecuteBackground, METH_VARARGS, + {"execute_background", (PyCFunction) AerospikeQuery_ExecuteBackground, METH_VARARGS | METH_KEYWORDS, execute_background_doc}, {NULL} diff --git a/test/new_tests/test_query_execute_background.py b/test/new_tests/test_query_execute_background.py new file mode 100644 index 000000000..318acd130 --- /dev/null +++ b/test/new_tests/test_query_execute_background.py @@ -0,0 +1,169 @@ +# -*- coding: utf-8 -*- +import pytest +import time + +import aerospike +from aerospike import exception, predexp, predicates + +TEST_NS = 'test' +TEST_SET = 'background' +TEST_UDF_MODULE = 'query_apply' +TEST_UDF_FUNCTION = 'mark_as_applied' +# Hack to get long to exist in Python 3 +try: + long +except NameError: + long = int + +def add_indexes_to_client(client): + try: + client.index_integer_create(TEST_NS, TEST_SET, 'number', + 'test_background_number_idx') + except exception.IndexFoundError: + pass + +def add_test_udf(client): + policy = {} + client.udf_put(u"query_apply.lua", 0, policy) + + +def drop_test_udf(client): + client.udf_remove("query_apply.lua") + + +def remove_indexes_from_client(client): + client.index_remove(TEST_NS, 'test_background_number_idx') + +def validate_records(client, keys, validator): + for key in keys: + _, _, rec = client.get(key) + assert validator(rec) + +# Add records around the test +@pytest.fixture(scope='function') +def clean_test_background(as_connection): + keys = [(TEST_NS, TEST_SET, i) for i in range(50)] + for i, key in enumerate(keys): + as_connection.put(key, {'number': i}, policy={'exists': aerospike.POLICY_EXISTS_REPLACE}) + yield + +class TestQueryApply(object): + + # These functions will run once for this test class, and do all of the + # required setup and teardown + connection_setup_functions = (add_test_udf, add_indexes_to_client) + connection_teardown_functions = (drop_test_udf, remove_indexes_from_client) + + @pytest.fixture(autouse=True) + def setup(self, connection_with_config_funcs): + pass + + def test_background_execute_return_val(self, clean_test_background): + """ + Ensure that Query.execute_background() returns an int like object + """ + test_bin = 't1' + query = self.as_connection.query(TEST_NS, TEST_SET) + query.apply(TEST_UDF_MODULE, TEST_UDF_FUNCTION, [test_bin]) + res = query.execute_background() + assert isinstance(res, (int, long)) + + def test_background_execute_no_predicate(self, clean_test_background): + """ + Ensure that Query.execute_background() gets applied to all records + """ + test_bin = 't2' + keys = [(TEST_NS, TEST_SET, i) for i in range(50)] + + query = self.as_connection.query(TEST_NS, TEST_SET) + query.apply(TEST_UDF_MODULE, TEST_UDF_FUNCTION, [test_bin]) + query.execute_background() + # Give time for the query to finish + time.sleep(5) + validate_records( + self.as_connection, keys, + lambda rec: rec[test_bin] == 'aerospike' + ) + + def test_background_execute_sindex_predicate(self, clean_test_background): + """ + Ensure that Query.execute_background() only applies to records matched by + the specified predicate + """ + test_bin = 't3' + keys = [(TEST_NS, TEST_SET, i) for i in range(50)] + number_predicate = predicates.equals('number', 5) + + query = self.as_connection.query(TEST_NS, TEST_SET) + query.where(number_predicate) + query.apply(TEST_UDF_MODULE, TEST_UDF_FUNCTION, [test_bin]) + query.execute_background() + # Give time for the query to finish + time.sleep(5) + keys = [(TEST_NS, TEST_SET, i) for i in range(50) if i != 5] + validate_records( + self.as_connection, keys, + lambda rec: test_bin not in rec + ) + _, _, num_5_record = self.as_connection.get((TEST_NS, TEST_SET, 5)) + assert num_5_record[test_bin] == 'aerospike' + + def test_background_execute_sindex_predexp(self, clean_test_background): + """ + Ensure that Query.execute_background() only applies to records matched by + the specified predicate + """ + test_bin = 't4' + keys = [(TEST_NS, TEST_SET, i) for i in range(50)] + + # rec['number'] < 10 + predexps = [predexp.integer_bin('number'), predexp.integer_value(10), predexp.integer_less()] + + query = self.as_connection.query(TEST_NS, TEST_SET) + query.predexp(predexps) + query.apply(TEST_UDF_MODULE, TEST_UDF_FUNCTION, [test_bin]) + query.execute_background() + # Give time for the query to finish + time.sleep(5) + + # Records with number > 10 should not have had the UDF applied + validate_records( + self.as_connection, keys[10:], + lambda rec: test_bin not in rec + ) + # Records with number < 10 should have had the udf applied + validate_records( + self.as_connection, keys[:10], + lambda rec: rec[test_bin] == 'aerospike' + ) + + def test_background_execute_with_policy(self, clean_test_background): + """ + Ensure that Query.execute_background() returns an int like object + """ + test_bin = 't5' + query = self.as_connection.query(TEST_NS, TEST_SET) + query.apply(TEST_UDF_MODULE, TEST_UDF_FUNCTION, [test_bin]) + res = query.execute_background({'socket_timeout': 10000}) + assert isinstance(res, (int, long)) + + def test_background_execute_with_policy_kwarg(self, clean_test_background): + """ + Ensure that Query.execute_background() returns an int like object + """ + test_bin = 't6' + query = self.as_connection.query(TEST_NS, TEST_SET) + query.apply(TEST_UDF_MODULE, TEST_UDF_FUNCTION, [test_bin]) + res = query.execute_background(policy={}) + assert isinstance(res, (int, long)) + + def test_background_execute_with_invalid_policy_type(self, clean_test_background): + """ + Ensure that Query.execute_background() returns an int like object + """ + test_bin = 't6' + query = self.as_connection.query(TEST_NS, TEST_SET) + query.apply(TEST_UDF_MODULE, TEST_UDF_FUNCTION, [test_bin]) + # Policy needs to be a dict. Not a string + with pytest.raises(exception.ParamError): + res = query.execute_background("Honesty is the best Policy") From 83265421a0de46860f181e04ac204a9579924ccf Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Mon, 12 Nov 2018 18:59:31 +0000 Subject: [PATCH 13/21] Add tests for cdt types --- test/new_tests/test_cdt_compators.py | 166 +++++++++++++++++++++++++++ 1 file changed, 166 insertions(+) create mode 100644 test/new_tests/test_cdt_compators.py diff --git a/test/new_tests/test_cdt_compators.py b/test/new_tests/test_cdt_compators.py new file mode 100644 index 000000000..9bab46eaa --- /dev/null +++ b/test/new_tests/test_cdt_compators.py @@ -0,0 +1,166 @@ +# These fail if we don't have a new server +# -*- coding: utf-8 -*- +import pytest +import aerospike +from aerospike import exception as e +from aerospike_helpers.operations import list_operations as lo +from aerospike_helpers.operations import map_operations as map_ops + +def get_list_result_from_operation(client, key, operation, binname): + ''' + Just perform a single operation and return the bins portion of the + result + ''' + _, _, result_bins = client.operate(key, [operation]) + return result_bins[binname] + + +@pytest.mark.xfail(reason="This requires server 4.3.13") +class TestNewRelativeCDTValues(object): + @pytest.fixture(autouse=True) + def setup(self, request, as_connection): + """ + Setup Method + """ + self.keys = [] + cdt_list_val = [ + [0, "a"], + [1, "b"], + [1, "c"], + [1, "d", "e"], + [2, "f"], + [2, "two"], + [3, "g"] + ] + cdt_map_val = { + 'a': [0, 'a'], + 'b': [1, 'b'], + 'c': [1, 'c'], + 'd': [1, 'd', 'e'], + 'e': [2, 'f'], + 'f': [2, 'two'], + 'g': [3, 'g'] + } + + self.cdt_key = ('test', 'cdt_values', 'wildcard') + self.cdt_list_bin = "cdt_list_bin" + self.cdt_map_bin = "cdt_map_bin" + + self.as_connection.put( + self.cdt_key, + { + self.cdt_list_bin: cdt_list_val, + self.cdt_map_bin: cdt_map_val + } + ) + # Make sure the list is ordered, in order to get expected return order. + ops = [lo.list_sort(self.cdt_list_bin, 0), + lo.list_set_order(self.cdt_list_bin, aerospike.LIST_ORDERED)] + self.as_connection.operate(self.cdt_key, ops) + + self.keys.append(self.cdt_key) + + yield + + for rec_key in self.keys: + try: + self.as_connection.remove(rec_key) + except e.AerospikeError: + pass + + def test_cdt_wild_card_list_value_multi_element(self): + operation = lo.list_get_by_value( + self.cdt_list_bin, [1, aerospike.CDTWildcard()], aerospike.LIST_RETURN_VALUE) + + result = get_list_result_from_operation( + self.as_connection, + self.keys[0], + operation, + self.cdt_list_bin + ) + + # All items starting with 1 + assert len(result) == 3 + for lst in result: + assert lst[0] == 1 + + def test_cdt_wild_card_list_value(self): + # This is does gthe value match [*] + operation = lo.list_get_by_value( + self.cdt_list_bin, aerospike.CDTWildcard(), aerospike.LIST_RETURN_VALUE) + + result = get_list_result_from_operation( + self.as_connection, + self.keys[0], + operation, + self.cdt_list_bin + ) + + # All items starting with 1 + assert len(result) == 7 + + def test_cdt_infinite_list_range_value(self): + operation = lo.list_get_by_value_range( + self.cdt_list_bin, + aerospike.LIST_RETURN_VALUE, + [1, aerospike.null()], + [1, aerospike.CDTInfinite()] + ) + + result = get_list_result_from_operation( + self.as_connection, + self.keys[0], + operation, + self.cdt_list_bin + ) + + # All items starting with 1 + assert len(result) == 3 + for lst in result: + assert lst[0] == 1 + + def test_map_value_wildcard(self): + operation = map_ops.map_get_by_value( + self.cdt_map_bin, + aerospike.CDTWildcard(), + aerospike.MAP_RETURN_KEY + ) + result = get_list_result_from_operation( + self.as_connection, + self.cdt_key, + operation, + self.cdt_map_bin + ) + + assert set(result) == set(('a', 'b', 'c', 'd', 'e', 'f', 'g')) + + def test_map_value_wildcard(self): + operation = map_ops.map_get_by_value( + self.cdt_map_bin, + [1, aerospike.CDTWildcard()], + aerospike.MAP_RETURN_KEY, + ) + result = get_list_result_from_operation( + self.as_connection, + self.cdt_key, + operation, + self.cdt_map_bin + ) + + assert set(result) == set(('b', 'c', 'd')) + + def test_map_value_null_infinity_range(self): + operation = map_ops.map_get_by_value_range( + self.cdt_map_bin, + [1, aerospike.null()], + [1, aerospike.CDTInfinite()], + aerospike.MAP_RETURN_KEY + ) + result = get_list_result_from_operation( + self.as_connection, + self.cdt_key, + operation, + self.cdt_map_bin + ) + + assert set(result) == set(('b', 'c', 'd')) From 32f8db4bcf28b62238328a13101e5734f810af1d Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Mon, 12 Nov 2018 19:12:52 +0000 Subject: [PATCH 14/21] Fix background query test --- test/new_tests/test_query_execute_background.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/new_tests/test_query_execute_background.py b/test/new_tests/test_query_execute_background.py index 318acd130..1c10c1eac 100644 --- a/test/new_tests/test_query_execute_background.py +++ b/test/new_tests/test_query_execute_background.py @@ -44,7 +44,7 @@ def validate_records(client, keys, validator): def clean_test_background(as_connection): keys = [(TEST_NS, TEST_SET, i) for i in range(50)] for i, key in enumerate(keys): - as_connection.put(key, {'number': i}, policy={'exists': aerospike.POLICY_EXISTS_REPLACE}) + as_connection.put(key, {'number': i}) yield class TestQueryApply(object): From 10e9ea26ba2792077356d8d52f5eb5cef38a3e52 Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Mon, 12 Nov 2018 19:38:42 +0000 Subject: [PATCH 15/21] Add CDT Comparator type documentation --- doc/aerospike.rst | 50 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/doc/aerospike.rst b/doc/aerospike.rst index 8c704af77..856038308 100644 --- a/doc/aerospike.rst +++ b/doc/aerospike.rst @@ -173,6 +173,56 @@ in an in-memory primary index. .. versionadded:: 2.0.1 +.. py:function:: CDTWildcard() + + A type representing a wildcard object. This type may only be used as a comparison value in operations. + It may not be stored in the database. + + :return: a type representing a wildcard value. + + .. code-block:: python + + import aerospike + from aerospike_helpers.operations import list_operations as list_ops + + client = aerospike.client({'hosts': [('localhost', 3000)]}).connect() + key = 'test', 'demo', 1 + + # get all values of the form [1, ...] from a list of lists. + # For example if list is [[1, 2, 3], [2, 3, 4], [1, 'a']], this operation will match + # [1, 2, 3] and [1, 'a'] + operations = [list_ops.list_get_by_value('list_bin', [1, aerospike.CDTWildcard()], aerospike.LIST_RETURN_VALUE)] + _, _, bins = client.operate(key, operations) + + .. versionadded:: 3.5.0 + .. note:: This requires Aerospike Server 4.3.1.3 or greater + + +.. py:function:: CDTInfinite() + + A type representing an infinte value. This type may only be used as a comparison value in operations. + It may not be stored in the database. + + :return: a type representing an infinite value. + + .. code-block:: python + + import aerospike + from aerospike_helpers.operations import list_operations as list_ops + + client = aerospike.client({'hosts': [('localhost', 3000)]}).connect() + key = 'test', 'demo', 1 + + # get all values of the form [1, ...] from a list of lists. + # For example if list is [[1, 2, 3], [2, 3, 4], [1, 'a']], this operation will match + # [1, 2, 3] and [1, 'a'] + operations = [list_ops.list_get_by_value_range('list_bin', aerospike.LIST_RETURN_VALUE, [1], [1, aerospike.CDTInfinite()])] + _, _, bins = client.operate(key, operations) + + .. versionadded:: 3.5.0 + .. note:: This requires Aerospike Server 4.3.1.3 or greater + + .. py:function:: calc_digest(ns, set, key) -> bytearray Calculate the digest of a particular key. See: :ref:`aerospike_key_tuple`. From f724245e5dad28943879940737f9909a84d940f9 Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Mon, 12 Nov 2018 21:45:41 +0000 Subject: [PATCH 16/21] Add execute_background to documentation --- doc/query.rst | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/doc/query.rst b/doc/query.rst index e63d139a0..fc9bbbc8b 100644 --- a/doc/query.rst +++ b/doc/query.rst @@ -261,6 +261,23 @@ Query Class --- :class:`Query` big_records = query.results() client.close() + .. method:: execute_background([, policy]) + + Execute a record UDF on records found by the query in the background. This method returns before the query has completed. + A UDF must have been added to the query with :meth:`Query.apply` . + + :param dict policy: optional :ref:`aerospike_write_policies`. + + :return: a job ID that can be used with :meth:`aerospike.Client.job_info` to track the status of the ``aerospike.JOB_QUERY`` , as it runs in the background. + + .. code-block:: python + + import aerospike + query = client.query('test', 'demo') + query.apply('myudfs', 'myfunction', ['a', 1]) + # This id can be used to monitor the progress of the background query + query_id = query.execute_background() + .. _aerospike_query_policies: Query Policies From 61a790ba77472862979b6bceca1940416240ea8a Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Tue, 13 Nov 2018 17:43:06 +0000 Subject: [PATCH 17/21] Add map_write_flags to map policy to allow no_fail/partial in server > 4.3.0 --- src/main/policy.c | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/src/main/policy.c b/src/main/policy.c index e9d182f16..183388f1a 100644 --- a/src/main/policy.c +++ b/src/main/policy.c @@ -30,6 +30,8 @@ #include "policy.h" #include "macros.h" +#define MAP_WRITE_FLAGS_KEY "map_write_flags" + #define POLICY_INIT(__policy) \ as_error_reset(err);\ if (!py_policy || py_policy == Py_None) {\ @@ -256,7 +258,14 @@ AerospikeConstants aerospike_constants[] = { { AS_MAP_WRITE_PARTIAL, "MAP_WRITE_PARTIAL"}, { AS_LIST_WRITE_NO_FAIL, "LIST_WRITE_NO_FAIL"}, - { AS_LIST_WRITE_PARTIAL, "LIST_WRITE_PARTIAL"} + { AS_LIST_WRITE_PARTIAL, "LIST_WRITE_PARTIAL"}, + + /* Map write flags post 3.5.0 */ + { AS_MAP_WRITE_DEFAULT, "MAP_WRITE_FLAGS_DEFAULT"}, + { AS_MAP_WRITE_CREATE_ONLY, "MAP_WRITE_FLAGS_CREATE_ONLY"}, + { AS_MAP_WRITE_UPDATE_ONLY, "MAP_WRITE_FLAGS_UPDATE_ONLY"}, + { AS_MAP_WRITE_NO_FAIL, "MAP_WRITE_FLAGS_NO_FAIL"}, + { AS_MAP_WRITE_PARTIAL, "MAP_WRITE_FLAGS_PARTIAL"} }; static @@ -756,10 +765,27 @@ as_status pyobject_to_map_policy(as_error * err, PyObject * py_policy, long map_order = AS_MAP_UNORDERED; long map_write_mode = AS_MAP_UPDATE; + uint32_t map_write_flags = AS_MAP_WRITE_DEFAULT; - MAP_POLICY_SET_FIELD(map_write_mode); MAP_POLICY_SET_FIELD(map_order); + PyObject* mode_or_flags = PyDict_GetItemString(py_policy, MAP_WRITE_FLAGS_KEY); + + /* + This only works for client >= 3.5.0 and server >= 4.3.0 + If py_policy["map_write_flags"] is set, we use it + otherwise we use py_policy["map_write_mode"] + */ + if (mode_or_flags) { + if (PyInt_Check(mode_or_flags)) { + map_write_flags = (uint32_t)PyInt_AsLong(mode_or_flags); + as_map_policy_set_flags(policy, map_order, map_write_flags); + } else { + as_error_update(err, AEROSPIKE_ERR_PARAM, "map write flags must be an integer"); + } + return err->code; + } + MAP_POLICY_SET_FIELD(map_write_mode); as_map_policy_set(policy, map_order, map_write_mode); return err->code; From ed732f2cc48820585493bea93e2b99dbe2505ad7 Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Tue, 13 Nov 2018 18:19:36 +0000 Subject: [PATCH 18/21] Add documentation of new write flags --- doc/client.rst | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/doc/client.rst b/doc/client.rst index ac328aa60..3f3621014 100644 --- a/doc/client.rst +++ b/doc/client.rst @@ -2950,17 +2950,39 @@ Map Policies .. object:: policy - A :class:`dict` of optional map policies, which are applicable to map operations. + A :class:`dict` of optional map policies, which are applicable to map operations. Only one of ``map_write_mode`` or ``map_write_flags`` should + be provided. ``map_write_mode`` should be used for Aerospike Server versions < `4.3.0` and ``map_write_flags`` should be used for Aerospike server versions + greater than or equal to `4.3.0` . .. hlist:: :columns: 1 - * **map_write_mode** write mode for the map. Valid values: ``aerospike.MAP_UPDATE``, ``aerospike.MAP_UPDATE_ONLY``, ``aerospike.MAP_CREATE_ONLY``, - ``aerospike.MAP_WRITE_PARTIAL`` and ``aerospike.MAP_WRITE_NO_FAIL``. - ``aerospike.MAP_WRITE_PARTIAL`` and ``aerospike.MAP_WRITE_NO_FAIL``. require server version 4.3.0 or greater. The values may be or'd together: - ``aerospike.MAP_UPDATE_ONLY | aerospike.MAP_WRITE_NO_FAIL`` + * **map_write_mode** write mode for the map. This should only be used for Server version < 4.3.0 Valid values: ``aerospike.MAP_UPDATE``, ``aerospike.MAP_UPDATE_ONLY``, ``aerospike.MAP_CREATE_ONLY``, + * **map_write_flags** Flags to apply to the map operation. This is only valid for Aerospike Server versions >= 4.3.0: + + | Possible values are ``aerospike.MAP_WRITE_FLAGS_DEFAULT``, ``aerospike.MAP_WRITE_FLAGS_CREATE_ONLY``, ``aerospike.MAP_WRITE_FLAGS_UPDATE_ONLY``, + | ``aerospike.MAP_WRITE_FLAGS_PARTIAL`` and ``aerospike.MAP_WRITE_FLAGS_NO_FAIL``. + | The values may be or'd together: + | ``aerospike.MAP_WRITE_FLAGS_UPDATE_ONLY | aerospike.MAP_WRITE_FLAGS_NO_FAIL`` + | `New in version 3.5.0` + * **map_order** ordering to maintain for the map entries. Valid values: ``aerospike.MAP_UNORDERED``, ``aerospike.MAP_KEY_ORDERED``, ``aerospike.MAP_KEY_VALUE_ORDERED`` + Example: + + .. code-block:: python + + # Server >= 4.3.0 + map_policy = { + 'map_order': aerospike.MAP_UNORDERED, + 'map_write_flags': aerospike.MAP_WRITE_FLAGS_CREATE_ONLY + } + + # Server < 4.3.0 + map_policy = { + 'map_order': aerospike.MAP_UNORDERED, + 'map_write_mode': aerospike.MAP_CREATE_ONLY + } .. _aerospike_privilege_dict: From 2456737d9601e87d2f9c7b38f6f7896d39c3961c Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Tue, 13 Nov 2018 18:57:18 +0000 Subject: [PATCH 19/21] Fix issue with map_put_items, add test for new write_flags --- .../operations/map_operations.py | 2 +- test/new_tests/test_map_write_flags.py | 215 ++++++++++++++++++ 2 files changed, 216 insertions(+), 1 deletion(-) create mode 100644 test/new_tests/test_map_write_flags.py diff --git a/aerospike_helpers/operations/map_operations.py b/aerospike_helpers/operations/map_operations.py index c558f6ce4..d4e739778 100644 --- a/aerospike_helpers/operations/map_operations.py +++ b/aerospike_helpers/operations/map_operations.py @@ -78,7 +78,7 @@ def map_put_items(bin_name, item_dict, map_policy=None): A dictionary usable in operate or operate_ordered. The format of the dictionary should be considered an internal detail, and subject to change. """ - return { + op_dict = { OP_KEY: aerospike.OP_MAP_PUT_ITEMS, BIN_KEY: bin_name, VALUE_KEY: item_dict diff --git a/test/new_tests/test_map_write_flags.py b/test/new_tests/test_map_write_flags.py new file mode 100644 index 000000000..1663efb6d --- /dev/null +++ b/test/new_tests/test_map_write_flags.py @@ -0,0 +1,215 @@ +# These fail if we don't have a new server +# -*- coding: utf-8 -*- +import pytest +import aerospike +from aerospike import exception as e +from aerospike_helpers.operations import map_operations as map_ops + + +def skip_less_than_430(version): + if version < [4, 3]: + print(version) + pytest.skip("Requires server > 4.3.0 to work") + +class TestMapWriteFlags(object): + @pytest.fixture(autouse=True) + def setup(self, request, as_connection): + """ + Setup Method + """ + self.keys = [] + + yield + + for key in self.keys: + self.as_connection.remove(key) + + def test_default_allows_update_and_create(self): + skip_less_than_430(self.server_version) + key = 'test', 'write_flags', 1 + self.keys.append(key) + self.as_connection.put(key, {'map': {'existing': 'old'}}) + + map_policy = { + 'map_write_flags': aerospike.MAP_WRITE_FLAGS_DEFAULT + } + ops = [ + map_ops.map_put('map', 'existing', 'new', map_policy=map_policy), + map_ops.map_put('map', 'new', 'new', map_policy=map_policy), + ] + self.as_connection.operate(key, ops) + + _, _, bins = self.as_connection.get(key) + + map_bin = bins['map'] + assert map_bin['existing'] == 'new' + assert map_bin['new'] == 'new' + + def test_create_only_does_not_allow_update(self): + skip_less_than_430(self.server_version) + key = 'test', 'write_flags', 1 + self.keys.append(key) + self.as_connection.put(key, {'map': {'existing': 'old'}}) + + map_policy = { + 'map_write_flags': aerospike.MAP_WRITE_FLAGS_CREATE_ONLY + } + ops = [ + map_ops.map_put('map', 'existing', 'new', map_policy=map_policy), + ] + with pytest.raises(e.AerospikeError): + self.as_connection.operate(key, ops) + + _, _, bins = self.as_connection.get(key) + + map_bin = bins['map'] + assert map_bin['existing'] == 'old' + + def test_create_only_allows_create(self): + skip_less_than_430(self.server_version) + key = 'test', 'write_flags', 1 + self.keys.append(key) + self.as_connection.put(key, {'map': {'existing': 'old'}}) + + map_policy = { + 'map_write_flags': aerospike.MAP_WRITE_FLAGS_CREATE_ONLY + } + ops = [ + map_ops.map_put('map', 'new', 'new', map_policy=map_policy), + ] + self.as_connection.operate(key, ops) + _, _, bins = self.as_connection.get(key) + + map_bin = bins['map'] + assert map_bin['existing'] == 'old' + assert map_bin['new'] == 'new' + + def test_update_only_does_not_allow_create(self): + skip_less_than_430(self.server_version) + key = 'test', 'write_flags', 1 + self.keys.append(key) + self.as_connection.put(key, {'map': {'existing': 'old'}}) + + map_policy = { + 'map_write_flags': aerospike.MAP_WRITE_FLAGS_UPDATE_ONLY + } + ops = [ + map_ops.map_put('map', 'new', 'new', map_policy=map_policy), + ] + + with pytest.raises(e.AerospikeError): + self.as_connection.operate(key, ops) + + _, _, bins = self.as_connection.get(key) + + map_bin = bins['map'] + assert map_bin['existing'] == 'old' + assert 'new' not in map_bin + + def test_update_only_allows_update(self): + skip_less_than_430(self.server_version) + key = 'test', 'write_flags', 1 + self.keys.append(key) + self.as_connection.put(key, {'map': {'existing': 'old'}}) + + map_policy = { + 'map_write_flags': aerospike.MAP_WRITE_FLAGS_UPDATE_ONLY + } + ops = [ + map_ops.map_put('map', 'existing', 'new', map_policy=map_policy), + ] + + self.as_connection.operate(key, ops) + + _, _, bins = self.as_connection.get(key) + + map_bin = bins['map'] + assert map_bin['existing'] == 'new' + + def test_nofail_allows_an_op_to_fail_silently(self): + skip_less_than_430(self.server_version) + key = 'test', 'write_flags', 1 + self.keys.append(key) + self.as_connection.put(key, {'map': {'existing': 'old'}}) + + map_policy = { + 'map_write_flags': aerospike.MAP_WRITE_FLAGS_UPDATE_ONLY | aerospike.MAP_WRITE_FLAGS_NO_FAIL + } + ops = [ + map_ops.map_put('map', 'existing', 'new', map_policy=map_policy), + map_ops.map_put('map', 'new', 'new', map_policy=map_policy), + ] + + self.as_connection.operate(key, ops) + + _, _, bins = self.as_connection.get(key) + + map_bin = bins['map'] + assert map_bin['existing'] == 'new' + assert 'new' not in map_bin + + def test_partial_allows_partial_write(self): + skip_less_than_430(self.server_version) + key = 'test', 'write_flags', 1 + self.keys.append(key) + self.as_connection.put(key, {'map': {'existing': 'old'}}) + + map_policy = { + 'map_write_flags': ( + aerospike.MAP_WRITE_FLAGS_CREATE_ONLY | + aerospike.MAP_WRITE_FLAGS_PARTIAL | + aerospike.MAP_WRITE_FLAGS_NO_FAIL + ) + } + ops = [ + map_ops.map_put_items( + 'map', + { + 'existing': 'new', + 'new1': 'new1', + 'new2': 'new2' + }, + map_policy=map_policy), + ] + + self.as_connection.operate(key, ops) + + _, _, bins = self.as_connection.get(key) + + map_bin = bins['map'] + assert map_bin['existing'] == 'old' + assert map_bin['new1'] == 'new1' + assert map_bin['new2'] == 'new2' + + def test_no_fail_does_not_allow_partial_write(self): + skip_less_than_430(self.server_version) + key = 'test', 'write_flags', 1 + self.keys.append(key) + self.as_connection.put(key, {'map': {'existing': 'old'}}) + + map_policy = { + 'map_write_flags': ( + aerospike.MAP_WRITE_FLAGS_CREATE_ONLY | + aerospike.MAP_WRITE_FLAGS_NO_FAIL + ) + } + ops = [ + map_ops.map_put_items( + 'map', + { + 'existing': 'new', + 'new1': 'new1', + 'new2': 'new2' + }, + map_policy=map_policy), + ] + + self.as_connection.operate(key, ops) + + _, _, bins = self.as_connection.get(key) + + map_bin = bins['map'] + assert map_bin['existing'] == 'old' + assert 'new1' not in map_bin + assert 'new2' not in map_bin + From c866b59bd6ae54e350f5417c9deb36fcc5710fcb Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Tue, 13 Nov 2018 23:34:08 +0000 Subject: [PATCH 20/21] Add tests for map write mode --- test/new_tests/test_map_write_mode.py | 116 ++++++++++++++++++ .../new_tests/test_relative_cdt_operations.py | 10 +- 2 files changed, 124 insertions(+), 2 deletions(-) create mode 100644 test/new_tests/test_map_write_mode.py diff --git a/test/new_tests/test_map_write_mode.py b/test/new_tests/test_map_write_mode.py new file mode 100644 index 000000000..5abe83629 --- /dev/null +++ b/test/new_tests/test_map_write_mode.py @@ -0,0 +1,116 @@ +# These fail if we don't have a new server +# -*- coding: utf-8 -*- +import pytest +import aerospike +from aerospike import exception as e +from aerospike_helpers.operations import map_operations as map_ops + +class TestMapWriteMode(object): + @pytest.fixture(autouse=True) + def setup(self, request, as_connection): + """ + Setup Method + """ + self.keys = [] + + yield + + for key in self.keys: + self.as_connection.remove(key) + + def test_default_allows_update_and_create(self): + key = 'test', 'write_mode', 1 + self.keys.append(key) + self.as_connection.put(key, {'map': {'existing': 'old'}}) + + map_policy = { + 'map_write_mode': aerospike.MAP_UPDATE + } + ops = [ + map_ops.map_put('map', 'existing', 'new', map_policy=map_policy), + map_ops.map_put('map', 'new', 'new', map_policy=map_policy), + ] + self.as_connection.operate(key, ops) + + _, _, bins = self.as_connection.get(key) + + map_bin = bins['map'] + assert map_bin['existing'] == 'new' + assert map_bin['new'] == 'new' + + def test_create_only_does_not_allow_update(self): + key = 'test', 'write_mode', 1 + self.keys.append(key) + self.as_connection.put(key, {'map': {'existing': 'old'}}) + + map_policy = { + 'map_write_mode': aerospike.MAP_CREATE_ONLY + } + ops = [ + map_ops.map_put('map', 'existing', 'new', map_policy=map_policy), + ] + with pytest.raises(e.ElementExistsError): + self.as_connection.operate(key, ops) + + _, _, bins = self.as_connection.get(key) + + map_bin = bins['map'] + assert map_bin['existing'] == 'old' + + def test_create_only_allows_create(self): + key = 'test', 'write_mode', 1 + self.keys.append(key) + self.as_connection.put(key, {'map': {'existing': 'old'}}) + + map_policy = { + 'map_write_mode': aerospike.MAP_CREATE_ONLY + } + ops = [ + map_ops.map_put('map', 'new', 'new', map_policy=map_policy), + ] + self.as_connection.operate(key, ops) + _, _, bins = self.as_connection.get(key) + + map_bin = bins['map'] + assert map_bin['existing'] == 'old' + assert map_bin['new'] == 'new' + + def test_update_only_does_not_allow_create(self): + key = 'test', 'write_mode', 1 + self.keys.append(key) + self.as_connection.put(key, {'map': {'existing': 'old'}}) + + map_policy = { + 'map_write_mode': aerospike.MAP_UPDATE_ONLY + } + ops = [ + map_ops.map_put('map', 'new', 'new', map_policy=map_policy), + ] + + with pytest.raises(e.ElementNotFoundError): + self.as_connection.operate(key, ops) + + _, _, bins = self.as_connection.get(key) + + map_bin = bins['map'] + assert map_bin['existing'] == 'old' + assert 'new' not in map_bin + + def test_update_only_allows_update(self): + key = 'test', 'write_mode', 1 + self.keys.append(key) + self.as_connection.put(key, {'map': {'existing': 'old'}}) + + map_policy = { + 'map_write_mode': aerospike.MAP_UPDATE_ONLY + } + ops = [ + map_ops.map_put('map', 'existing', 'new', map_policy=map_policy), + ] + + self.as_connection.operate(key, ops) + + _, _, bins = self.as_connection.get(key) + + map_bin = bins['map'] + assert map_bin['existing'] == 'new' \ No newline at end of file diff --git a/test/new_tests/test_relative_cdt_operations.py b/test/new_tests/test_relative_cdt_operations.py index 260d94c70..574ede6bd 100644 --- a/test/new_tests/test_relative_cdt_operations.py +++ b/test/new_tests/test_relative_cdt_operations.py @@ -17,8 +17,11 @@ def get_list_result_from_operation(client, key, operation, binname): _, _, result_bins = client.operate(key, [operation]) return result_bins[binname] +def skip_less_than_430(version): + if version < [4, 3]: + print(version) + pytest.skip("Requires server > 4.3.0 to work") -@pytest.mark.xfail(reason="Requires Aerospike Server 4.3.0 or greater") class TestNewRelativeListOperations(object): @pytest.fixture(autouse=True) @@ -26,6 +29,7 @@ def setup(self, request, as_connection): """ Setup Method """ + skip_less_than_430(self.server_version) self.keys = [] # INDEXES 0, 1, 2, 3, 4, 05 # RINDEX 5, 4, 3, 2, 1, 0 @@ -249,7 +253,6 @@ def get_list_result_from_operation(client, key, operation, binname): return result_bins[binname] -@pytest.mark.xfail(reason="Requires Aerospike Server 4.3.0 or greater") class TestNewRelativeMapOperations(object): @pytest.fixture(autouse=True) @@ -257,6 +260,9 @@ def setup(self, request, as_connection): """ Setup Method """ + + skip_less_than_430(self.server_version) + self.keys = [] # INDEXES 0, 1, 2, 3, 4, 05 # RINDEX 5, 4, 3, 2, 1, 0 From 5ae88aff84f62a7a59254acb1eba47c71176d56f Mon Sep 17 00:00:00 2001 From: Robert Marks Date: Wed, 14 Nov 2018 00:15:48 +0000 Subject: [PATCH 21/21] Add a test for non integer flag --- test/new_tests/test_map_write_flags.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/test/new_tests/test_map_write_flags.py b/test/new_tests/test_map_write_flags.py index 1663efb6d..28653f47f 100644 --- a/test/new_tests/test_map_write_flags.py +++ b/test/new_tests/test_map_write_flags.py @@ -213,3 +213,26 @@ def test_no_fail_does_not_allow_partial_write(self): assert 'new1' not in map_bin assert 'new2' not in map_bin + + def test_non_int_write_flag_raises_exception(self): + skip_less_than_430(self.server_version) + key = 'test', 'write_flags', 1 + self.keys.append(key) + self.as_connection.put(key, {'map': {'existing': 'old'}}) + + map_policy = { + 'map_write_flags': "waving flag" + } + ops = [ + map_ops.map_put_items( + 'map', + { + 'existing': 'new', + 'new1': 'new1', + 'new2': 'new2' + }, + map_policy=map_policy), + ] + + with pytest.raises(e.ParamError): + self.as_connection.operate(key, ops)