Skip to content

Commit

Permalink
[CLIENT-2196] Add missing batch policies to client config (#495)
Browse files Browse the repository at this point in the history
* Reorganize client config tests
* Docs: add note that client level policies don't accept expressions
  • Loading branch information
juliannguyen4 authored Aug 28, 2023
1 parent 961119b commit d2034ce
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 21 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
14.0.0-rc.4
14.0.0-rc.5
10 changes: 9 additions & 1 deletion doc/aerospike.rst
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ Only the `hosts` key is required; the rest of the keys are optional.
Default: ``./``

* **policies** (:class:`dict`)
A :class:`dict` of policies
A :class:`dict` of policies. Note that these policies do not accept expressions.

* **read** (:class:`dict`)
Contains :ref:`aerospike_read_policies`.
Expand All @@ -394,6 +394,14 @@ Only the `hosts` key is required; the rest of the keys are optional.
Contains :ref:`aerospike_scan_policies`.
* **batch** (:class:`dict`)
Contains :ref:`aerospike_batch_policies`.
* **batch_remove** (:class:`dict`)
Default delete policy used in batch remove commands. Contains :ref:`aerospike_batch_remove_policies`.
* **batch_apply** (:class:`dict`)
Default user defined function policy used in batch UDF apply commands. Contains :ref:`aerospike_batch_apply_policies`.
* **batch_write** (:class:`dict`)
Default write policy used in batch operate commands. Contains :ref:`aerospike_batch_write_policies`.
* **batch_parent_write** (:class:`dict`)
Default parent policy used in batch write commands. Contains :ref:`aerospike_batch_policies`.
* **info** (:class:`dict`)
Contains :ref:`aerospike_info_policies`.
* **admin** (:class:`dict`)
Expand Down
6 changes: 6 additions & 0 deletions src/include/policy_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,9 @@ as_status set_operate_policy(as_policy_operate *operate_policy,
as_status set_batch_policy(as_policy_batch *batch_policy, PyObject *py_policy);
as_status set_info_policy(as_policy_info *info_policy, PyObject *py_policy);
as_status set_admin_policy(as_policy_admin *admin_policy, PyObject *py_policy);
as_status set_batch_apply_policy(as_policy_batch_apply *batch_apply_policy,
PyObject *py_policy);
as_status set_batch_write_policy(as_policy_batch_write *batch_write_policy,
PyObject *py_policy);
as_status set_batch_remove_policy(as_policy_batch_remove *batch_remove_policy,
PyObject *py_policy);
2 changes: 1 addition & 1 deletion src/main/aerospike.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ static int Aerospike_Clear(PyObject *aerospike)
PyMODINIT_FUNC PyInit_aerospike(void)
{

const char version[] = "14.0.0-rc.4";
const char version[] = "14.0.0-rc.5";
// Makes things "thread-safe"
Py_Initialize();
int i = 0;
Expand Down
194 changes: 194 additions & 0 deletions src/main/policy_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
#include <stdint.h>

#include "policy_config.h"

as_status set_optional_key(as_policy_key *target_ptr, PyObject *py_policy,
Expand Down Expand Up @@ -100,6 +102,38 @@ as_status set_subpolicies(as_config *config, PyObject *py_policies)
return set_policy_status;
}

PyObject *batch_apply_policy =
PyDict_GetItemString(py_policies, "batch_apply");
set_policy_status = set_batch_apply_policy(&config->policies.batch_apply,
batch_apply_policy);
if (set_policy_status != AEROSPIKE_OK) {
return set_policy_status;
}

PyObject *batch_remove_policy =
PyDict_GetItemString(py_policies, "batch_remove");
set_policy_status = set_batch_remove_policy(&config->policies.batch_remove,
batch_remove_policy);
if (set_policy_status != AEROSPIKE_OK) {
return set_policy_status;
}

PyObject *batch_write_policy =
PyDict_GetItemString(py_policies, "batch_write");
set_policy_status = set_batch_write_policy(&config->policies.batch_write,
batch_write_policy);
if (set_policy_status != AEROSPIKE_OK) {
return set_policy_status;
}

PyObject *batch_parent_write_policy =
PyDict_GetItemString(py_policies, "batch_parent_write");
set_policy_status = set_batch_policy(&config->policies.batch_parent_write,
batch_parent_write_policy);
if (set_policy_status != AEROSPIKE_OK) {
return set_policy_status;
}

return AEROSPIKE_OK;
}

Expand Down Expand Up @@ -531,6 +565,134 @@ as_status set_admin_policy(as_policy_admin *admin_policy, PyObject *py_policy)
return AEROSPIKE_OK;
}

// For batch write, batch apply, and batch remove policies:
// Don't set expressions field since it depends on the client's
// serialization policy

as_status set_batch_apply_policy(as_policy_batch_apply *batch_apply_policy,
PyObject *py_policy)
{
as_status status = AEROSPIKE_OK;
if (!py_policy) {
return AEROSPIKE_OK;
}

if (!PyDict_Check(py_policy)) {
return AEROSPIKE_ERR_PARAM;
}

status = set_optional_commit_level(&batch_apply_policy->commit_level,
py_policy, "commit_level");
if (status != AEROSPIKE_OK) {
return status;
}

status = set_optional_bool_property(&batch_apply_policy->durable_delete,
py_policy, "durable_delete");
if (status != AEROSPIKE_OK) {
return status;
}

status = set_optional_key(&batch_apply_policy->key, py_policy, "key");
if (status != AEROSPIKE_OK) {
return status;
}

status = set_optional_uint32_property(&batch_apply_policy->ttl, py_policy,
"ttl");
if (status != AEROSPIKE_OK) {
return status;
}

return AEROSPIKE_OK;
}

as_status set_batch_write_policy(as_policy_batch_write *batch_write_policy,
PyObject *py_policy)
{
as_status status = AEROSPIKE_OK;
if (!py_policy) {
return AEROSPIKE_OK;
}

if (!PyDict_Check(py_policy)) {
return AEROSPIKE_ERR_PARAM;
}

status = set_optional_commit_level(&batch_write_policy->commit_level,
py_policy, "commit_level");
if (status != AEROSPIKE_OK) {
return status;
}

status = set_optional_bool_property(&batch_write_policy->durable_delete,
py_policy, "durable_delete");
if (status != AEROSPIKE_OK) {
return status;
}

status =
set_optional_exists(&batch_write_policy->exists, py_policy, "exists");
if (status != AEROSPIKE_OK) {
return status;
}

status = set_optional_gen(&batch_write_policy->gen, py_policy, "gen");
if (status != AEROSPIKE_OK) {
return status;
}

status = set_optional_key(&batch_write_policy->key, py_policy, "key");
if (status != AEROSPIKE_OK) {
return status;
}

return AEROSPIKE_OK;
}

as_status set_batch_remove_policy(as_policy_batch_remove *batch_remove_policy,
PyObject *py_policy)
{
as_status status = AEROSPIKE_OK;
if (!py_policy) {
return AEROSPIKE_OK;
}

if (!PyDict_Check(py_policy)) {
return AEROSPIKE_ERR_PARAM;
}

status = set_optional_commit_level(&batch_remove_policy->commit_level,
py_policy, "commit_level");
if (status != AEROSPIKE_OK) {
return status;
}

status = set_optional_bool_property(&batch_remove_policy->durable_delete,
py_policy, "durable_delete");
if (status != AEROSPIKE_OK) {
return status;
}

status = set_optional_gen(&batch_remove_policy->gen, py_policy, "gen");
if (status != AEROSPIKE_OK) {
return status;
}

status = set_optional_uint16_property(&batch_remove_policy->generation,
py_policy, "generation");
if (status != AEROSPIKE_OK) {
return status;
}

status = set_optional_key(&batch_remove_policy->key, py_policy, "key");
if (status != AEROSPIKE_OK) {
return status;
}

return AEROSPIKE_OK;
}

as_status set_base_policy(as_policy_base *base_policy, PyObject *py_policy)
{

Expand Down Expand Up @@ -631,6 +793,38 @@ as_status set_optional_uint32_property(uint32_t *target_ptr,
return AEROSPIKE_ERR_PARAM;
}

as_status set_optional_uint16_property(uint16_t *target_ptr,
PyObject *py_policy, const char *name)
{
// Assume py_policy is a Python dictionary
PyObject *py_policy_val = PyDict_GetItemString(py_policy, name);
if (!py_policy_val) {
// Key doesn't exist in policy
return AEROSPIKE_OK;
}
Py_INCREF(py_policy_val);

if (!PyLong_Check(py_policy_val)) {
return AEROSPIKE_ERR_PARAM;
}

long int_value = PyLong_AsLong(py_policy_val);
if (int_value == -1 && PyErr_Occurred()) {
// This wasn't a valid int, or was too large
// We are handling the error ourselves, so clear the overflow error
PyErr_Clear();
return AEROSPIKE_ERR_PARAM;

/* If the number was less than zero, or would not fit in a uint16, error */
}
if (int_value < 0 || int_value > UINT16_MAX) {
return AEROSPIKE_ERR_PARAM;
}

*target_ptr = (uint16_t)int_value;
return AEROSPIKE_OK;
}

as_status set_optional_bool_property(bool *target_ptr, PyObject *py_policy,
const char *name)
{
Expand Down
18 changes: 0 additions & 18 deletions test/new_tests/test_client_config.py

This file was deleted.

57 changes: 57 additions & 0 deletions test/new_tests/test_new_constructor.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,60 @@ def test_setting_wrong_type_services_alternate():
config["use_services_alternate"] = "True"
with pytest.raises(e.ParamError):
aerospike.client(config)


def test_setting_rack_aware():
config = copy.deepcopy(gconfig)
config["rack_aware"] = True
config["rack_id"] = 1
config["policies"]["batch"]["replica"] = aerospike.POLICY_REPLICA_PREFER_RACK
config["policies"]["scan"]["replica"] = aerospike.POLICY_REPLICA_PREFER_RACK
config["policies"]["query"]["replica"] = aerospike.POLICY_REPLICA_PREFER_RACK
aerospike.client(config)


def test_setting_batch_remove_gen():
config = copy.deepcopy(gconfig)
config["policies"]["batch_remove"] = {
"generation": 24
}
aerospike.client(config)


def test_setting_batch_remove_gen_invalid_type():
config = copy.deepcopy(gconfig)
config["policies"]["batch_remove"] = {
"generation": 0.3
}
with pytest.raises(e.ParamError) as excinfo:
aerospike.client(config)
assert excinfo.value.msg == "Invalid Policy setting value"


def test_setting_batch_remove_gen_too_large():
config = copy.deepcopy(gconfig)
config["policies"]["batch_remove"] = {
# Larger than max size for 16-bit unsigned integer
"generation": 2**16
}
with pytest.raises(e.ParamError) as excinfo:
aerospike.client(config)
assert excinfo.value.msg == "Invalid Policy setting value"


def test_setting_batch_remove_gen_neg_value():
config = copy.deepcopy(gconfig)
config["policies"]["batch_remove"] = {
"generation": -1
}
with pytest.raises(e.ParamError) as excinfo:
aerospike.client(config)
assert excinfo.value.msg == "Invalid Policy setting value"


def test_setting_batch_policies():
config = copy.deepcopy(gconfig)
policies = ["batch_remove", "batch_apply", "batch_write", "batch_parent_write"]
for policy in policies:
config["policies"][policy] = {}
aerospike.client(config)

0 comments on commit d2034ce

Please sign in to comment.