Skip to content

Commit

Permalink
[CLIENT-2645] Add ttl option for default write policies in client con…
Browse files Browse the repository at this point in the history
…fig (#536)

* client.batch_operate() now takes in an optional ttl parameter instead of taking ttl through a batch policy.
* Apply policy now takes in a ttl option.
* Add ttl option to aerospike.Scan class.
* Docs: batch apply policies section now references TTL constants section instead of copying information from the latter.

---------

Co-authored-by: dwelch-spike <[email protected]>
juliannguyen4 and dwelch-spike authored Nov 8, 2023
1 parent 1604973 commit 4f37473
Showing 15 changed files with 349 additions and 103 deletions.
4 changes: 3 additions & 1 deletion aerospike-stubs/aerospike.pyi
Original file line number Diff line number Diff line change
@@ -284,6 +284,7 @@ SERIALIZER_USER: Literal[3]
TTL_DONT_UPDATE: Literal[0xFFFFFFFE]
TTL_NAMESPACE_DEFAULT: Literal[0]
TTL_NEVER_EXPIRE: Literal[0xFFFFFFFF]
TTL_CLIENT_DEFAULT: Literal[0xFFFFFFFD]
UDF_TYPE_LUA: Literal[0]

@final
@@ -320,7 +321,7 @@ class Client:
def apply(self, key: tuple, module: str, function: str, args: list, policy: dict = ...) -> Union[str, int, float, bytearray, list, dict]: ...
def batch_apply(self, keys: list, module: str, function: str, args: list, policy_batch: dict = ..., policy_batch_apply: dict = ...) -> BatchRecords: ...
def batch_get_ops(self, keys: list, ops: list, policy: dict) -> list: ...
def batch_operate(self, keys: list, ops: list, policy_batch: dict = ..., policy_batch_write: dict = ...) -> BatchRecords: ...
def batch_operate(self, keys: list, ops: list, policy_batch: dict = ..., policy_batch_write: dict = ..., ttl: int = ...) -> BatchRecords: ...
def batch_remove(self, keys: list, policy_batch: dict = ..., policy_batch_remove: dict = ...) -> BatchRecords: ...
def batch_read(self, keys: list, bins: list[str] = ..., policy_batch: dict = ...) -> BatchRecords: ...
def batch_write(self, batch_records: BatchRecords, policy_batch: dict = ...) -> BatchRecords: ...
@@ -442,6 +443,7 @@ class Query:
def where(self, predicate: tuple, ctx: list = ...) -> None: ...

class Scan:
ttl: int
def __init__(self, *args, **kwargs) -> None: ...
def add_ops(self, ops: list) -> None: ...
def apply(self, module: str, function: str, arguments: list = ...) -> Any: ...
7 changes: 7 additions & 0 deletions doc/aerospike.rst
Original file line number Diff line number Diff line change
@@ -858,6 +858,13 @@ Specifies the TTL constants.

Do not change the current TTL of the record.

.. data:: TTL_CLIENT_DEFAULT

NOTE: only applies to the policies mentioned below.

Use the applicable policy ttl in write, operate, batch write, and scan policies.
If the policy is not defined for the transaction, use the default client-level policy's ttl.

.. _auth_mode:

Auth Mode Constants
59 changes: 41 additions & 18 deletions doc/client.rst
Original file line number Diff line number Diff line change
@@ -358,14 +358,15 @@ Batch Operations

.. note:: Requires server version >= 6.0.0.

.. method:: batch_operate(keys: list, ops: list, [policy_batch: dict], [policy_batch_write: dict]) -> BatchRecords
.. method:: batch_operate(keys: list, ops: list, [policy_batch: dict], [policy_batch_write: dict], [ttl: int]) -> BatchRecords

Perform the same read/write transactions on multiple keys.

:param list keys: The keys to operate on.
:param list ops: List of operations to apply.
:param dict policy_batch: See :ref:`aerospike_batch_policies`.
:param dict policy_batch_write: See :ref:`aerospike_batch_write_policies`.
:param int ttl: The time-to-live (expiration) of each record in seconds.

:return: an instance of :class:`BatchRecords <aerospike_helpers.batch.records>`.

@@ -1508,7 +1509,7 @@ Metadata Dictionary

The metadata dictionary has the following key-value pairs:

* ``"ttl"`` (:class:`int`): record time to live in seconds. See :ref:`TTL_CONSTANTS`.
* ``"ttl"`` (:class:`int`): record time to live in seconds. See :ref:`TTL_CONSTANTS` for possible special values.
* ``"gen"`` (:class:`int`): record generation

.. _aerospike_policies:
@@ -1575,6 +1576,14 @@ Write Policies
| One of the :ref:`POLICY_EXISTS` values such as :data:`aerospike.POLICY_EXISTS_CREATE`
|
| Default: :data:`aerospike.POLICY_EXISTS_IGNORE`
* **ttl**
The default time-to-live (expiration) of the record in seconds. This field will only be used if
the write transaction:

1. Doesn't contain a metadata dictionary with a ``ttl`` value.
2. Contains a metadata dictionary with a ``ttl`` value set to :data:`aerospike.TTL_CLIENT_DEFAULT`.

There are also special values that can be set for this option. See :ref:`TTL_CONSTANTS`.
* **gen**
| One of the :ref:`POLICY_GEN` values such as :data:`aerospike.POLICY_GEN_IGNORE`
|
@@ -1740,6 +1749,14 @@ Operate Policies
| One of the :ref:`POLICY_GEN` values such as :data:`aerospike.POLICY_GEN_IGNORE`
|
| Default: :data:`aerospike.POLICY_GEN_IGNORE`
* **ttl** (:class:`int`)
The default time-to-live (expiration) of the record in seconds. This field will only be used if an
operate transaction:

1. Doesn't contain a metadata dictionary with a ``ttl`` value.
2. Contains a metadata dictionary with a ``ttl`` value set to :data:`aerospike.TTL_CLIENT_DEFAULT`.

There are also special values that can be set for this option. See :ref:`TTL_CONSTANTS`.
* **replica**
| One of the :ref:`POLICY_REPLICA` values such as :data:`aerospike.POLICY_REPLICA_MASTER`
|
@@ -1843,6 +1860,11 @@ Apply Policies
| One of the :ref:`POLICY_COMMIT_LEVEL` values such as :data:`aerospike.POLICY_COMMIT_LEVEL_ALL`
|
| Default: :data:`aerospike.POLICY_COMMIT_LEVEL_ALL`
* **ttl** (:class:`int`)
The default time-to-live (expiration) of the record in seconds. This field will only be used if an apply
transaction doesn't have an apply policy with a ``ttl`` value that overrides this field.

There are also special values that can be set for this field. See :ref:`TTL_CONSTANTS`.
* **durable_delete** (:class:`bool`)
| Perform durable delete
|
@@ -2086,11 +2108,21 @@ Batch Write Policies
|
| Default: None
* **ttl** :class:`int`
| The time-to-live (expiration) in seconds to apply to every record in the batch.
|
| The ttl must be a 32-bit unsigned integer, or a :exc:`~aerospike.exception.ParamError` will be raised.
|
| Default: ``0``
The time-to-live (expiration) in seconds to apply to every record in the batch. This field will only be
used if:
1. A :meth:`~aerospike.Client.batch_write` call contains a :class:`~aerospike_helpers.batch.records.Write` that:

a. Doesn't contain a metadata dictionary with a ``ttl`` value.
b. Contains a metadata dictionary with a ``ttl`` value set to :data:`aerospike.TTL_CLIENT_DEFAULT`.

2. A :meth:`~aerospike.Client.batch_operate` call:

a. Doesn't pass in a `ttl` argument.
b. Passes in `aerospike.TTL_CLIENT_DEFAULT` to the `ttl` parameter.

There are also special values that can be set for this field. See :ref:`TTL_CONSTANTS`.

Default: ``0``

.. _aerospike_batch_apply_policies:

@@ -2115,20 +2147,11 @@ Batch Apply Policies
* **ttl** int
| Time to live (expiration) of the record in seconds.
|
| 0 which means that the
| record will adopt the default TTL value from the namespace.
| See :ref:`TTL_CONSTANTS` for possible special values.
|
| 0xFFFFFFFF (also, -1 in a signed 32 bit int)
| which means that the record
| will get an internal "void_time" of zero, and thus will never expire.
|
| 0xFFFFFFFE (also, -2 in a signed 32 bit int)
| which means that the record
|
| ttl will not change when the record is updated.
| Note that the TTL value will be employed ONLY on write/update calls.
|
| Default: 0
| Default: ``0``
* **durable_delete** :class:`bool`
| If the transaction results in a record deletion, leave a tombstone for the record. This prevents deleted records from reappearing after node failures. Valid for Aerospike Server Enterprise Edition only.
|
16 changes: 3 additions & 13 deletions doc/query.rst
Original file line number Diff line number Diff line change
@@ -80,20 +80,10 @@ Fields
Default: ``0`` (no limit)

ttl (:class:`int`)
The time-to-live (expiration) of the record in seconds.
The time-to-live (expiration) of the record in seconds. If set to :data:`aerospike.TTL_CLIENT_DEFAULT`, use the
client's default write policy ttl.

There are also special values that can be set in the record TTL:

``0`` (``TTL_NAMESPACE_DEFAULT``)
Which means that the record will adopt the default TTL value from the namespace.

``0xFFFFFFFF`` (``TTL_NEVER_EXPIRE``)
(also, ``-1`` in a signed 32 bit int) Which means that the record will never expire.

``0xFFFFFFFE`` (``TTL_DONT_UPDATE``)
(also, ``-2`` in a signed 32 bit int)
Which means that the record ttl will not change when the record is
updated.
See :ref:`TTL_CONSTANTS` for more possible special values.

.. note::
Note that the TTL value will be employed ONLY on background query writes.
26 changes: 26 additions & 0 deletions doc/scan.rst
Original file line number Diff line number Diff line change
@@ -22,10 +22,30 @@ bins returned can be filtered using :meth:`select`.
`Scans <http://www.aerospike.com/docs/guide/scan.html>`_ and \
`Managing Scans <http://www.aerospike.com/docs/operations/manage/scans/>`_.

Fields
======

.. class:: Scan

ttl (:class:`int`)
The time-to-live (expiration) of the record in seconds. Note that ttl
is only used on background scan writes.

If this is set to :data:`aerospike.TTL_CLIENT_DEFAULT`, the scan will use the
client's default scan policy ttl.

See :ref:`TTL_CONSTANTS` for special values that can be set in the record ttl.

Default: ``0`` (no limit)

.. note::
Requires server version >= 6.0.0

Methods
=======

.. class:: Scan
:noindex:

.. deprecated:: 7.0.0 :class:`aerospike.Query` should be used instead.

@@ -556,6 +576,12 @@ Policies
| One of the :ref:`POLICY_REPLICA` values such as :data:`aerospike.POLICY_REPLICA_MASTER`
|
| Default: ``aerospike.POLICY_REPLICA_SEQUENCE``
* **ttl** (:class:`int`)
The default time-to-live (expiration) of the record in seconds. This field will only be used on
background scan writes if :py:attr:`aerospike.Scan.ttl` is set to
:data:`aerospike.TTL_CLIENT_DEFAULT`.

There are also special values that can be set for this field. See :ref:`TTL_CONSTANTS`.

.. _aerospike_scan_options:

48 changes: 23 additions & 25 deletions src/main/client/batch_operate.c
Original file line number Diff line number Diff line change
@@ -108,11 +108,13 @@ static bool batch_operate_cb(const as_batch_result *results, uint32_t n,
* @param py_ops The list containing op dictionaries.
* @param py_policy_batch Python dict used to populate policy_batch.
* @param py_policy_batch_write Python dict used to populate policy_batch_write.
* @param py_ttl TTL value to set for each record.
*******************************************************************************************************
*/
static PyObject *AerospikeClient_Batch_Operate_Invoke(
AerospikeClient *self, as_error *err, PyObject *py_keys, PyObject *py_ops,
PyObject *py_policy_batch, PyObject *py_policy_batch_write)
PyObject *py_policy_batch, PyObject *py_policy_batch_write,
PyObject *py_ttl)
{
long operation;
long return_type = -1;
@@ -220,26 +222,15 @@ static PyObject *AerospikeClient_Batch_Operate_Invoke(
&batch_write_exp_list_p) != AEROSPIKE_OK) {
goto CLEANUP;
}
}

// The C client's batch write policy doesn't have a ttl option
// The correct way is to set the ttl inside the as_operations object
PyObject *py_ttl = PyDict_GetItemString(py_policy_batch_write, "ttl");
Py_XINCREF(py_ttl);
// Default ttl
if (py_ttl != NULL) {
if (PyLong_Check(py_ttl)) {
long ttl = PyLong_AsLong(py_ttl);
if (ttl > UINT32_MAX || ttl < 0) {
as_error_update(err, AEROSPIKE_ERR_PARAM,
"ttl is out of range. It must be a 32 bit "
"unsigned integer.");
Py_DECREF(py_ttl);
goto CLEANUP;
}
ops.ttl = ttl;
}
}
Py_XDECREF(py_ttl);
if (py_ttl == NULL || py_ttl == Py_None) {
// If ttl in this transaction's batch write policy isn't set, use the client config's default batch write
// policy ttl
ops.ttl = AS_RECORD_CLIENT_DEFAULT_TTL;
}
else {
ops.ttl = (uint32_t)PyLong_AsLong(py_ttl);
}

// import batch_records helper
@@ -355,15 +346,16 @@ PyObject *AerospikeClient_Batch_Operate(AerospikeClient *self, PyObject *args,
PyObject *py_keys = NULL;
PyObject *py_ops = NULL;
PyObject *py_results = NULL;
PyObject *py_ttl = NULL;

as_error_init(&err);

// Python Function Keyword Arguments
static char *kwlist[] = {"keys", "ops", "policy_batch",
"policy_batch_write", NULL};
if (PyArg_ParseTupleAndKeywords(args, kwds, "OO|OO:batch_Operate", kwlist,
static char *kwlist[] = {
"keys", "ops", "policy_batch", "policy_batch_write", "ttl", NULL};
if (PyArg_ParseTupleAndKeywords(args, kwds, "OO|OOO:batch_Operate", kwlist,
&py_keys, &py_ops, &py_policy_batch,
&py_policy_batch_write) == false) {
&py_policy_batch_write, &py_ttl) == false) {
return NULL;
}

@@ -381,8 +373,14 @@ PyObject *AerospikeClient_Batch_Operate(AerospikeClient *self, PyObject *args,
goto ERROR;
}

if (py_ttl && py_ttl != Py_None && !PyLong_Check(py_ttl)) {
as_error_update(&err, AEROSPIKE_ERR_PARAM, "ttl should be an integer");
goto ERROR;
}

py_results = AerospikeClient_Batch_Operate_Invoke(
self, &err, py_keys, py_ops, py_policy_batch, py_policy_batch_write);
self, &err, py_keys, py_ops, py_policy_batch, py_policy_batch_write,
py_ttl);

return py_results;

6 changes: 2 additions & 4 deletions src/main/client/batch_write.c
Original file line number Diff line number Diff line change
@@ -277,10 +277,8 @@ static PyObject *AerospikeClient_BatchWriteInvoke(AerospikeClient *self,
ops = as_operations_new(py_ops_size);
garb->ops_to_free = ops;

if (py_meta) {
if (check_and_set_meta(py_meta, ops, err) != AEROSPIKE_OK) {
goto CLEANUP0;
}
if (check_and_set_meta(py_meta, ops, err) != AEROSPIKE_OK) {
goto CLEANUP0;
}

for (Py_ssize_t i = 0; i < py_ops_size; i++) {
12 changes: 4 additions & 8 deletions src/main/client/operate.c
Original file line number Diff line number Diff line change
@@ -868,10 +868,8 @@ static PyObject *AerospikeClient_Operate_Invoke(AerospikeClient *self,
memset(&static_pool, 0, sizeof(static_pool));
CHECK_CONNECTED(err);

if (py_meta) {
if (check_and_set_meta(py_meta, &ops, err) != AEROSPIKE_OK) {
goto CLEANUP;
}
if (check_and_set_meta(py_meta, &ops, err) != AEROSPIKE_OK) {
goto CLEANUP;
}

for (i = 0; i < size; i++) {
@@ -1041,10 +1039,8 @@ AerospikeClient_OperateOrdered_Invoke(AerospikeClient *self, as_error *err,
}
}

if (py_meta) {
if (check_and_set_meta(py_meta, &ops, err) != AEROSPIKE_OK) {
goto CLEANUP;
}
if (check_and_set_meta(py_meta, &ops, err) != AEROSPIKE_OK) {
goto CLEANUP;
}

for (Py_ssize_t i = 0; i < ops_list_size; i++) {
14 changes: 14 additions & 0 deletions src/main/conversions.c
Original file line number Diff line number Diff line change
@@ -1118,6 +1118,9 @@ as_status pyobject_to_record(AerospikeClient *self, as_error *err,
"TTL should be an int or long");
}
}
else {
rec->ttl = AS_RECORD_CLIENT_DEFAULT_TTL;
}

if (py_gen) {
if (PyLong_Check(py_gen)) {
@@ -1137,6 +1140,9 @@ as_status pyobject_to_record(AerospikeClient *self, as_error *err,
}
}
}
else {
rec->ttl = AS_RECORD_CLIENT_DEFAULT_TTL;
}

if (err->code != AEROSPIKE_OK) {
as_record_destroy(rec);
@@ -2183,6 +2189,10 @@ as_status check_and_set_meta(PyObject *py_meta, as_operations *ops,
}
ops->ttl = ttl;
}
else {
// Metadata dict was present, but ttl field did not exist
ops->ttl = AS_RECORD_CLIENT_DEFAULT_TTL;
}

if (py_gen) {
if (PyLong_Check(py_gen)) {
@@ -2205,6 +2215,10 @@ as_status check_and_set_meta(PyObject *py_meta, as_operations *ops,
return as_error_update(err, AEROSPIKE_ERR_PARAM,
"Metadata should be of type dictionary");
}
else {
// Metadata dict was not set by user
ops->ttl = AS_RECORD_CLIENT_DEFAULT_TTL;
}
return err->code;
}

2 changes: 2 additions & 0 deletions src/main/policy.c
Original file line number Diff line number Diff line change
@@ -236,6 +236,7 @@ static AerospikeConstants aerospike_constants[] = {
{AS_RECORD_DEFAULT_TTL, "TTL_NAMESPACE_DEFAULT"},
{AS_RECORD_NO_EXPIRE_TTL, "TTL_NEVER_EXPIRE"},
{AS_RECORD_NO_CHANGE_TTL, "TTL_DONT_UPDATE"},
{AS_RECORD_CLIENT_DEFAULT_TTL, "TTL_CLIENT_DEFAULT"},
{AS_AUTH_INTERNAL, "AUTH_INTERNAL"},
{AS_AUTH_EXTERNAL, "AUTH_EXTERNAL"},
{AS_AUTH_EXTERNAL_INSECURE, "AUTH_EXTERNAL_INSECURE"},
@@ -628,6 +629,7 @@ as_status pyobject_to_policy_apply(AerospikeClient *self, as_error *err,
//POLICY_SET_FIELD(gen, as_policy_gen); removed
POLICY_SET_FIELD(commit_level, as_policy_commit_level);
POLICY_SET_FIELD(durable_delete, bool);
POLICY_SET_FIELD(ttl, uint32_t);

// C client 5.0 new expressions
POLICY_SET_EXPRESSIONS_BASE_FIELD();
27 changes: 27 additions & 0 deletions src/main/policy_config.c
Original file line number Diff line number Diff line change
@@ -229,6 +229,11 @@ as_status set_write_policy(as_policy_write *write_policy, PyObject *py_policy)
return status;
}

status = set_optional_uint32_property(&write_policy->ttl, py_policy, "ttl");
if (status != AEROSPIKE_OK) {
return status;
}

status = set_optional_uint32_property(
(uint32_t *)&write_policy->compression_threshold, py_policy,
"compression_threshold");
@@ -273,6 +278,11 @@ as_status set_apply_policy(as_policy_apply *apply_policy, PyObject *py_policy)
return status;
}

status = set_optional_uint32_property(&apply_policy->ttl, py_policy, "ttl");
if (status != AEROSPIKE_OK) {
return status;
}

status = set_optional_commit_level(&apply_policy->commit_level, py_policy,
"commit_level");
if (status != AEROSPIKE_OK) {
@@ -393,6 +403,11 @@ as_status set_scan_policy(as_policy_scan *scan_policy, PyObject *py_policy)
return status;
}

status = set_optional_uint32_property(&scan_policy->ttl, py_policy, "ttl");
if (status != AEROSPIKE_OK) {
return status;
}

status = set_optional_replica(&scan_policy->replica, py_policy, "replica");
if (status != AEROSPIKE_OK) {
return status;
@@ -437,6 +452,12 @@ as_status set_operate_policy(as_policy_operate *operate_policy,
return status;
}

status =
set_optional_uint32_property(&operate_policy->ttl, py_policy, "ttl");
if (status != AEROSPIKE_OK) {
return status;
}

status = set_optional_gen(&operate_policy->gen, py_policy, "gen");
if (status != AEROSPIKE_OK) {
return status;
@@ -637,6 +658,12 @@ as_status set_batch_write_policy(as_policy_batch_write *batch_write_policy,
return status;
}

status = set_optional_uint32_property(&batch_write_policy->ttl, py_policy,
"ttl");
if (status != AEROSPIKE_OK) {
return status;
}

status = set_optional_gen(&batch_write_policy->gen, py_policy, "gen");
if (status != AEROSPIKE_OK) {
return status;
34 changes: 20 additions & 14 deletions src/main/scan/type.c
Original file line number Diff line number Diff line change
@@ -99,6 +99,12 @@ static PyMethodDef AerospikeScan_Type_Methods[] = {

{NULL}};

static PyMemberDef AerospikeScan_Type_custom_members[] = {
{"ttl", T_UINT, offsetof(AerospikeScan, scan) + offsetof(as_scan, ttl), 0,
"The time-to-live (expiration) of the record in seconds."},
{NULL} /* Sentinel */
};

/*******************************************************************************
* PYTHON TYPE HOOKS
******************************************************************************/
@@ -207,20 +213,20 @@ static PyTypeObject AerospikeScan_Type = {
"operation. To create a new instance of the Scan class, call the\n"
"scan() method on an instance of a Client class.\n",
// tp_doc
0, // tp_traverse
0, // tp_clear
0, // tp_richcompare
0, // tp_weaklistoffset
0, // tp_iter
0, // tp_iternext
AerospikeScan_Type_Methods, // tp_methods
0, // tp_members
0, // tp_getset
0, // tp_base
0, // tp_dict
0, // tp_descr_get
0, // tp_descr_set
0, // tp_dictoffset
0, // tp_traverse
0, // tp_clear
0, // tp_richcompare
0, // tp_weaklistoffset
0, // tp_iter
0, // tp_iternext
AerospikeScan_Type_Methods, // tp_methods
AerospikeScan_Type_custom_members, // tp_members
0, // tp_getset
0, // tp_base
0, // tp_dict
0, // tp_descr_get
0, // tp_descr_set
0, // tp_dictoffset
(initproc)AerospikeScan_Type_Init,
// tp_init
0, // tp_alloc
34 changes: 15 additions & 19 deletions test/new_tests/test_batch_operate.py
Original file line number Diff line number Diff line change
@@ -58,14 +58,15 @@ def teardown():
request.addfinalizer(teardown)

@pytest.mark.parametrize(
"name, keys, ops, policy_batch, policy_batch_write, exp_res, exp_rec",
"name, keys, ops, policy_batch, policy_batch_write, ttl, exp_res, exp_rec",
[
(
"simple-write",
[("test", "demo", 0)],
[op.write("count", 2), op.read("count")],
None,
None,
None,
[AerospikeStatus.AEROSPIKE_OK],
[{"count": 2}],
),
@@ -75,6 +76,7 @@ def teardown():
[op.write("count", 3), op.read("count")],
{},
{},
None,
[AerospikeStatus.AEROSPIKE_OK],
[{"count": 3}],
),
@@ -94,6 +96,7 @@ def teardown():
).compile(),
},
{},
None,
[AerospikeStatus.AEROSPIKE_OK],
[{"count": 7}],
),
@@ -110,6 +113,7 @@ def teardown():
"durable_delete": False,
"expressions": exp.Eq(exp.IntBin("count"), 0).compile(),
},
None,
[AerospikeStatus.AEROSPIKE_OK],
[{"count": 7}],
),
@@ -121,9 +125,8 @@ def teardown():
op.read("count")
],
{},
{
"ttl": 200
},
{},
9000,
[AerospikeStatus.AEROSPIKE_OK],
[{"count": 7}],
),
@@ -150,24 +153,30 @@ def teardown():
"durable_delete": False,
"expressions": exp.Eq(exp.IntBin("count"), 0).compile(), # this expression takes precedence
},
None,
[AerospikeStatus.AEROSPIKE_OK],
[{"count": 7}],
),
],
)
def test_batch_operate_pos(self, name, keys, ops, policy_batch, policy_batch_write, exp_res, exp_rec):
def test_batch_operate_pos(self, name, keys, ops, policy_batch, policy_batch_write, ttl, exp_res, exp_rec):
"""
Test batch_operate positive.
"""

res = self.as_connection.batch_operate(keys, ops, policy_batch, policy_batch_write)
res = self.as_connection.batch_operate(keys, ops, policy_batch, policy_batch_write, ttl)

for i, batch_rec in enumerate(res.batch_records):
assert batch_rec.result == exp_res[i]
assert batch_rec.record[2] == exp_rec[i]
assert batch_rec.key[:3] == keys[i] # checking key
assert batch_rec.record[0][:3] == keys[i] # checking key in record

if ttl is not None:
for key in keys:
_, meta = self.as_connection.exists(key)
assert meta["ttl"] in range(9000 - 50, 9000 + 50)

def test_batch_operate_many_pos(self):
"""
Test batch operate with many keys.
@@ -254,19 +263,6 @@ def test_batch_operate_many_pos(self):
["bad-batch-write-policy"],
e.ParamError,
),
(
"bad-batch-write-policy-ttl",
[("test", "demo", 1)],
[
op.write("count", 2),
],
{},
{
# Out of bounds
"ttl": 2**32
},
e.ParamError,
),
],
)
def test_batch_operate_neg(self, name, keys, ops, policy_batch, policy_batch_write, exp_res):
161 changes: 161 additions & 0 deletions test/new_tests/test_new_constructor.py
Original file line number Diff line number Diff line change
@@ -4,6 +4,9 @@
from .test_base_class import TestBaseClass
import aerospike
from aerospike import exception as e
from aerospike_helpers.operations import operations
from aerospike_helpers.batch.records import Write, BatchRecords
from .test_scan_execute_background import wait_for_job_completion
import copy

gconfig = {}
@@ -201,3 +204,161 @@ def test_setting_batch_policies():
for policy in policies:
config["policies"][policy] = {}
aerospike.client(config)


class TestConfigTTL:
NEW_TTL = 9000

@pytest.fixture
def config_ttl_setup(self, policy_name: str):
config = copy.deepcopy(gconfig)
config["policies"][policy_name] = {
"ttl": self.NEW_TTL
}
self.client = aerospike.client(config)
self.key = ("test", "demo", 0)

if "apply" in policy_name:
self.client.udf_put("test_record_udf.lua")

yield

# Teardown

if "apply" in policy_name:
try:
self.client.udf_remove("test_record_udf.lua")
except e.UDFError:
# In case UDF module does not exist
pass

try:
self.client.remove(self.key)
except e.RecordNotFound:
pass

def check_ttl(self):
_, meta = self.client.exists(self.key)
clock_skew_tolerance_secs = 50
assert meta["ttl"] in range(self.NEW_TTL - clock_skew_tolerance_secs, self.NEW_TTL + clock_skew_tolerance_secs)

@pytest.mark.parametrize("policy_name", ["write"])
@pytest.mark.parametrize(
"meta",
[None, {"ttl": aerospike.TTL_CLIENT_DEFAULT}, {"gen": 10}],
ids=["no metadata", "metadata with special ttl value", "metadata without ttl"]
)
def test_setting_write_ttl(self, config_ttl_setup, meta):
self.client.put(self.key, bins={"a": 1}, meta=meta)
self.check_ttl()

@pytest.mark.parametrize("policy_name", ["operate"])
@pytest.mark.parametrize(
"meta",
# The reason we also test a metadata dict without ttl for operate()
# is the codepath that handles the metadata dict for operate() is different
# from that for put()
[None, {"ttl": aerospike.TTL_CLIENT_DEFAULT}, {"gen": 10}],
ids=["no metadata", "metadata with special ttl value", "metadata without ttl"]
)
def test_setting_operate_ttl(self, config_ttl_setup, meta):
ops = [
operations.write("a", 1)
]
self.client.operate(self.key, ops, meta=meta)
self.check_ttl()

@pytest.mark.parametrize("policy_name", ["apply"])
def test_setting_apply_ttl(self, config_ttl_setup):
# Setup
self.client.put(self.key, {"bin": "a"})

# Call without setting the ttl in the transaction's apply policy
# Args: bin name, str
self.client.apply(self.key, module="test_record_udf", function="bin_udf_operation_string", args=["bin", "a"])
self.check_ttl()

@pytest.mark.parametrize("policy_name", ["batch_write"])
@pytest.mark.parametrize(
"meta",
[None, {"ttl": aerospike.TTL_CLIENT_DEFAULT}],
ids=["no metadata", "metadata with special ttl value"]
)
def test_setting_batch_write_ttl_with_batch_write(self, config_ttl_setup, meta):
ops = [
operations.write("bin", 1)
]
batch_records = BatchRecords([
Write(self.key, ops=ops, meta=meta)
])
brs = self.client.batch_write(batch_records)
# assert brs.result == 0
for br in brs.batch_records:
assert br.result == 0

self.check_ttl()

@pytest.mark.parametrize("policy_name", ["batch_write"])
@pytest.mark.parametrize(
"ttl",
[None, aerospike.TTL_CLIENT_DEFAULT],
)
def test_setting_batch_write_ttl_with_batch_operate(self, config_ttl_setup, ttl):
ops = [
operations.write("bin", 1)
]
keys = [self.key]
brs = self.client.batch_operate(keys, ops, ttl=ttl)
# assert brs.result == 0
for br in brs.batch_records:
assert br.result == 0

self.check_ttl()

@pytest.mark.parametrize("policy_name", ["batch_apply"])
def test_setting_batch_apply_ttl(self, config_ttl_setup):
# Setup
self.client.put(self.key, {"bin": "a"})

# Call without setting the ttl in batch_apply()'s batch apply policy
keys = [
self.key
]
self.client.batch_apply(keys, module="test_record_udf", function="bin_udf_operation_string", args=["bin", "a"])
self.check_ttl()

@pytest.mark.parametrize("policy_name", ["scan"])
def test_setting_scan_ttl(self, config_ttl_setup):
# Setup
self.client.put(self.key, {"bin": "a"})

# Tell scan to use client config's scan policy ttl
scan = self.client.scan("test", "demo")
scan.ttl = aerospike.TTL_CLIENT_DEFAULT
ops = [
operations.append("bin", "a")
]
scan.add_ops(ops)
job_id = scan.execute_background()

wait_for_job_completion(self.client, job_id)

self.check_ttl()

@pytest.mark.parametrize("policy_name", ["write"])
def test_query_client_default_ttl(self, config_ttl_setup):
# Setup
self.client.put(self.key, {"bin": "a"}, meta={"ttl": 90})

# Tell scan to use client config's write policy ttl
query = self.client.query("test", "demo")
query.ttl = aerospike.TTL_CLIENT_DEFAULT
ops = [
operations.append("bin", "a")
]
query.add_ops(ops)
job_id = query.execute_background()

wait_for_job_completion(self.client, job_id)

self.check_ttl()

0 comments on commit 4f37473

Please sign in to comment.