Skip to content

Commit

Permalink
DAOS-14021 pool: enable md dup op detection (#13536)
Browse files Browse the repository at this point in the history
With this change, pool and container metadata duplicate operation detection
logic is enabled in daos_engine handler code. Previous patches established
the handler structure but avoided saving RPC metadata operation history
in the "svc_ops" KVS in a pool's rdb.

This is enabled mainly by this patch arranging for the svc_ops KVS entries
to be ordered in client-provided hybrid logical clock (HLC) time order.
This is accomplished by adding a new RDB KVS class RDB_OID_CLASS_LEXICAL,
used by the svc_ops KVS, that maps to vos object type DAOS_OT_MULTI_LEXICAL.
And by the engine handlers encoding the received client keys such that the
btree memcmp-based direct key comparison will result in time-ordered entries.

Also with this change, the svc_ops KVS within rdb enforces a maximum capacity,
calculated from a default "maximum age" in seconds goal (5 minutes),
and an assumed maximum reasonable metadata RPC rate of 4K ops/sec. When
processing every metadata RPC request, the number of entries in svc_ops
is evaluated, and the oldest entry is evaluated for whether it exceeds the
maximum age goal. If the maximum number of entries exists, or the oldest
entry exceeds the age, then the oldest item is removed before potentially
inserting another entry due to the execution of the current handler. Future
changes may clean multiple of the first entries in the KVS if they all exceed
the maximum age goal (even if the total entries capacity has not been reached).

In the suite/daos_test pool_op_retry() and co_op_retry() tests, and in the
corresponding engine handling code, two new fault injections are added:
DAOS_MD_OP_PASS_NOREPLY_NEWLDR
DAOS_MD_OP_FAIL_NOREPLY_NEWLDR
These faults cause the engine handler to either fully execute (successfully)
or simulate a (non-retryable) handling error (-DER_MISC), then force the
client to retry the RPC by sending a (retryable, -DER_TIMEDOUT) error, and
also step down as the pool service leader. All of this is done so that when the
client RPC retry is sent, it is handled by a new pool service leader engine,
and so that the correct duplicate operation detection behavior is still seen
even in the presence of this leadership change.

Additionally in this PR, logic and metadata ops (rdb) KVS creation/interaction
is refactored / centralized in the pool engine code, with internal-server API calls
made from container code.

Signed-off-by: Kenneth Cain <[email protected]>
  • Loading branch information
kccain authored Jan 23, 2024
1 parent ffe9ea0 commit e1d0cd7
Show file tree
Hide file tree
Showing 25 changed files with 845 additions and 383 deletions.
7 changes: 7 additions & 0 deletions src/common/prop.c
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,13 @@ daos_prop_valid(daos_prop_t *prop, bool pool, bool input)
return false;
}
break;
case DAOS_PROP_PO_SVC_OPS_ENABLED:
val = prop->dpp_entries[i].dpe_val;
if (val > 1) {
D_ERROR("invalid svc_ops_enabled " DF_U64 ".\n", val);
return false;
}
break;
/* container-only properties */
case DAOS_PROP_CO_LAYOUT_TYPE:
val = prop->dpp_entries[i].dpe_val;
Expand Down
42 changes: 42 additions & 0 deletions src/common/tests_dmg_helpers.c
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,48 @@ dmg_pool_create(const char *dmg_config_file,
D_GOTO(out, rc = -DER_NOMEM);
has_label = true;
}

entry = daos_prop_entry_get(prop, DAOS_PROP_PO_SCRUB_MODE);
if (entry != NULL) {
const char *scrub_str = NULL;

switch (entry->dpe_val) {
case DAOS_SCRUB_MODE_OFF:
scrub_str = "off";
break;
case DAOS_SCRUB_MODE_LAZY:
scrub_str = "lazy";
break;
case DAOS_SCRUB_MODE_TIMED:
scrub_str = "timed";
break;
default:
break;
}

if (scrub_str) {
args = cmd_push_arg(args, &argcount, "--properties=scrub:%s ",
scrub_str);
if (args == NULL)
D_GOTO(out, rc = -DER_NOMEM);
}
}

entry = daos_prop_entry_get(prop, DAOS_PROP_PO_SVC_OPS_ENABLED);
if (entry != NULL) {
args = cmd_push_arg(args, &argcount, "--properties=svc_ops_enabled:%zu ",
entry->dpe_val);
if (args == NULL)
D_GOTO(out, rc = -DER_NOMEM);
}

entry = daos_prop_entry_get(prop, DAOS_PROP_PO_SPACE_RB);
if (entry != NULL) {
args = cmd_push_arg(args, &argcount, "--properties=space_rb:%zu ",
entry->dpe_val);
if (args == NULL)
D_GOTO(out, rc = -DER_NOMEM);
}
}

if (!has_label) {
Expand Down
166 changes: 40 additions & 126 deletions src/container/srv_container.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,8 @@ cont_svc_init(struct cont_svc *svc, const uuid_t pool_uuid, uint64_t id,
if (rc != 0)
goto err_hdls;

/* cs_ops */
rc = rdb_path_clone(&svc->cs_root, &svc->cs_ops);
if (rc != 0)
goto err_hdls;
rc = rdb_path_push(&svc->cs_ops, &ds_cont_prop_svc_ops);
if (rc != 0)
goto err_svcops;

return 0;

err_svcops:
rdb_path_fini(&svc->cs_ops);
err_hdls:
rdb_path_fini(&svc->cs_hdls);
err_conts:
Expand All @@ -160,7 +150,6 @@ cont_svc_init(struct cont_svc *svc, const uuid_t pool_uuid, uint64_t id,
static void
cont_svc_fini(struct cont_svc *svc)
{
rdb_path_fini(&svc->cs_ops);
rdb_path_fini(&svc->cs_hdls);
rdb_path_fini(&svc->cs_conts);
rdb_path_fini(&svc->cs_uuids);
Expand Down Expand Up @@ -5237,142 +5226,47 @@ static int
cont_op_lookup(struct rdb_tx *tx, struct ds_pool_hdl *pool_hdl, struct cont_svc *svc,
crt_rpc_t *rpc, int cont_proto_ver, bool *is_dup, struct ds_pool_svc_op_val *valp)
{
struct cont_op_v8_in *in8 = crt_req_get(rpc);
struct ds_pool_svc_op_key op_key;
struct ds_pool_svc_op_val op_val;
d_iov_t key;
d_iov_t val;
uint32_t svc_ops_enabled;
bool proto_enabled;
bool dup = false;
crt_opcode_t opc = opc_get(rpc->cr_opc);
int rc = 0;
struct cont_op_v8_in *in8 = crt_req_get(rpc);
crt_opcode_t opc = opc_get(rpc->cr_opc);
int rc = 0;

/* If client didn't provide a key (old protocol), skip */
proto_enabled = (cont_proto_ver >= CONT_PROTO_VER_WITH_SVC_OP_KEY);
if (!proto_enabled)
if (cont_proto_ver < CONT_PROTO_VER_WITH_SVC_OP_KEY)
goto out;

/* If the operation is not a write, skip (read-only ops not tracked for duplicates) */
if (!cont_op_is_write(opc))
goto out;

/* If enabled, lookup client-provided op key, assign dup_op accordingly. */
/* TODO: lookup from a cached value in struct pool_svc rather than rdb */
d_iov_set(&val, &svc_ops_enabled, sizeof(svc_ops_enabled));
rc = rdb_tx_lookup(tx, &svc->cs_root, &ds_cont_prop_svc_ops_enabled, &val);
if (rc == -DER_NONEXIST) {
rc = 0;
goto out;
} else if (rc != 0) {
DL_ERROR(rc, DF_CONT ": failed to lookup svc_ops_enabled",
DP_CONT(pool_hdl->sph_pool->sp_uuid, in8->ci_uuid));
goto out;
}
if (!svc_ops_enabled)
goto out;

uuid_copy(op_key.ok_client_id, in8->ci_cli_id);
op_key.ok_client_time = in8->ci_time;
d_iov_set(&key, &op_key, sizeof(op_key));
d_iov_set(&val, &op_val, sizeof(op_val));

rc = rdb_tx_lookup(tx, &svc->cs_ops, &key, &val);
if (rc == 0) {
/* found - this is a retry/duplicate RPC being handled */
D_DEBUG(DB_MD,
DF_CONT ": retry RPC detected client=" DF_UUID " time=" DF_X64 " rc=%d\n",
DP_CONT(pool_hdl->sph_pool->sp_uuid, in8->ci_uuid), DP_UUID(in8->ci_cli_id),
in8->ci_time, op_val.ov_rc);
dup = true;
} else if (rc == -DER_NONEXIST) {
/* not found - new, unique RPC being handled */
rc = 0;
} else {
DL_ERROR(rc, DF_CONT ": failed to lookup RPC client=" DF_UUID " time=" DF_X64,
DP_CONT(pool_hdl->sph_pool->sp_uuid, in8->ci_uuid),
DP_UUID(in8->ci_cli_id), in8->ci_time);
goto out;
}
rc = ds_pool_svc_ops_lookup(tx, NULL /* pool_svc */, pool_hdl->sph_pool->sp_uuid,
&in8->ci_cli_id, in8->ci_time, is_dup, valp);

out:
if (rc == 0) {
*is_dup = dup;
if (dup)
*valp = op_val;
}
return rc;
}

/* Save results of the operation in svc_ops KVS, in the existing rdb_tx context. */
static int
cont_op_save(struct rdb_tx *tx, struct ds_pool_hdl *pool_hdl, struct cont_svc *svc, crt_rpc_t *rpc,
int cont_proto_ver, int rc_in, struct ds_pool_svc_op_val *op_valp)
bool dup_op, int cont_proto_ver, int rc_in, struct ds_pool_svc_op_val *op_valp)
{
struct cont_op_v8_in *in8 = crt_req_get(rpc);
d_iov_t key;
d_iov_t val;
struct ds_pool_svc_op_key op_key;
uint32_t svc_ops_enabled;
bool proto_enabled;
crt_opcode_t opc = opc_get(rpc->cr_opc);
int rc = 0;

op_valp->ov_rc = rc_in;
if (!dup_op)
op_valp->ov_rc = rc_in;

/* If client didn't provide a key (old protocol), skip */
proto_enabled = (cont_proto_ver >= CONT_PROTO_VER_WITH_SVC_OP_KEY);
if (!proto_enabled)
if (cont_proto_ver < CONT_PROTO_VER_WITH_SVC_OP_KEY)
goto out;

/* If the operation is not a write, skip (read-only ops not tracked for duplicates) */
if (!cont_op_is_write(opc))
goto out;

/* If enabled, save client-provided op key and result of the operation. */
d_iov_set(&val, &svc_ops_enabled, sizeof(svc_ops_enabled));
rc = rdb_tx_lookup(tx, &svc->cs_root, &ds_cont_prop_svc_ops_enabled, &val);
if (rc == -DER_NONEXIST) {
rc = 0;
goto out;
} else if (rc != 0) {
DL_ERROR(rc, DF_CONT ": failed to lookup svc_ops_enabled",
DP_CONT(pool_hdl->sph_pool->sp_uuid, in8->ci_uuid));
goto out;
}
if (!svc_ops_enabled)
goto out;

/* TODO: implement mechanism to constrain rdb space usage by this KVS. */
goto out;

/* Save result in cs_ops KVS, only if the return code is "definitive" (not retryable). */
if (!daos_rpc_retryable_rc(op_valp->ov_rc)) {
/* If the write operation failed, discard its (unwanted) updates first. */
if (op_valp->ov_rc != 0)
rdb_tx_discard(tx);

uuid_copy(op_key.ok_client_id, in8->ci_cli_id);
op_key.ok_client_time = in8->ci_time;
d_iov_set(&key, &op_key, sizeof(op_key));
d_iov_set(&val, op_valp, sizeof(*op_valp));

rc = rdb_tx_lookup(tx, &svc->cs_ops, &key, &val);
if (rc != -DER_NONEXIST) {
D_ASSERT(rc != 0);
goto out;
}

rc = rdb_tx_update(tx, &svc->cs_ops, &key, &val);
if (rc != 0) {
DL_ERROR(rc,
DF_CONT ": failed to update svc_ops client=" DF_UUID
" time=" DF_X64,
DP_CONT(pool_hdl->sph_pool->sp_uuid, in8->ci_uuid),
DP_UUID(in8->ci_cli_id), in8->ci_time);
goto out;
}
}
rc = ds_pool_svc_ops_save(tx, NULL /* pool_svc */, svc->cs_pool_uuid, &in8->ci_cli_id,
in8->ci_time, dup_op, rc_in, op_valp);

out:
return rc;
Expand All @@ -5399,8 +5293,13 @@ cont_op_with_svc(struct ds_pool_hdl *pool_hdl, struct cont_svc *svc,
struct ds_pool_svc_op_val op_val;
bool fi_pass_noreply = DAOS_FAIL_CHECK(DAOS_MD_OP_PASS_NOREPLY);
bool fi_fail_noreply = DAOS_FAIL_CHECK(DAOS_MD_OP_FAIL_NOREPLY);
bool fi_pass_nl_noreply;
bool fi_fail_nl_noreply;
int rc;

fi_pass_nl_noreply = DAOS_FAIL_CHECK(DAOS_MD_OP_PASS_NOREPLY_NEWLDR);
fi_fail_nl_noreply = DAOS_FAIL_CHECK(DAOS_MD_OP_FAIL_NOREPLY_NEWLDR);

rc = rdb_tx_begin(svc->cs_rsvc->s_db, svc->cs_rsvc->s_term, &tx);
if (rc != 0)
goto out;
Expand All @@ -5414,7 +5313,7 @@ cont_op_with_svc(struct ds_pool_hdl *pool_hdl, struct cont_svc *svc,
rc = cont_op_lookup(&tx, pool_hdl, svc, rpc, cont_proto_ver, &dup_op, &op_val);
if (rc != 0)
goto out_lock;
else if (fi_fail_noreply)
else if (fi_fail_noreply || fi_fail_nl_noreply)
goto out_commit;

switch (opc) {
Expand All @@ -5439,6 +5338,8 @@ cont_op_with_svc(struct ds_pool_hdl *pool_hdl, struct cont_svc *svc,
uuid_copy(olbl_out->colo_uuid, cont->c_uuid);
break;
case CONT_DESTROY_BYLABEL:
if (dup_op)
goto out_commit;
cont_op_in_get_label(rpc, opc, cont_proto_ver, &clbl);
rc = cont_lookup_bylabel(&tx, svc, clbl, &cont);
if (rc != 0)
Expand All @@ -5447,6 +5348,8 @@ cont_op_with_svc(struct ds_pool_hdl *pool_hdl, struct cont_svc *svc,
dup_op, &op_val);
break;
default:
if ((opc == CONT_DESTROY) && dup_op)
goto out_commit;
rc = cont_lookup(&tx, svc, in->ci_uuid, &cont);
if (rc != 0)
goto out_commit;
Expand All @@ -5465,10 +5368,9 @@ cont_op_with_svc(struct ds_pool_hdl *pool_hdl, struct cont_svc *svc,
goto out_commit;

out_commit:
if ((rc == 0) && !dup_op && fi_fail_noreply)
if ((rc == 0) && !dup_op && (fi_fail_noreply || fi_fail_nl_noreply))
rc = -DER_MISC;
if (!dup_op)
rc = cont_op_save(&tx, pool_hdl, svc, rpc, cont_proto_ver, rc, &op_val);
rc = cont_op_save(&tx, pool_hdl, svc, rpc, dup_op, cont_proto_ver, rc, &op_val);
if (rc != 0)
goto out_contref;

Expand Down Expand Up @@ -5498,13 +5400,25 @@ cont_op_with_svc(struct ds_pool_hdl *pool_hdl, struct cont_svc *svc,

if ((rc == 0) && !dup_op && fi_pass_noreply) {
rc = -DER_TIMEDOUT;
D_DEBUG(DB_MD, DF_UUID ": fault injected: DAOS_MD_OP_PASS_NOREPLY\n",
DP_UUID(in->ci_uuid));
D_DEBUG(DB_MD, DF_CONT ": fault injected: DAOS_MD_OP_PASS_NOREPLY\n",
DP_CONT(pool_hdl->sph_pool->sp_uuid, in->ci_uuid));
}
if ((rc == -DER_MISC) && !dup_op && fi_fail_noreply) {
rc = -DER_TIMEDOUT;
D_DEBUG(DB_MD, DF_UUID ": fault injected: DAOS_MD_OP_FAIL_NOREPLY\n",
DP_UUID(in->ci_uuid));
D_DEBUG(DB_MD, DF_CONT ": fault injected: DAOS_MD_OP_FAIL_NOREPLY\n",
DP_CONT(pool_hdl->sph_pool->sp_uuid, in->ci_uuid));
}
if ((rc == 0) && !dup_op && fi_pass_nl_noreply) {
rc = -DER_TIMEDOUT;
D_DEBUG(DB_MD, DF_CONT ": fault injected: DAOS_MD_OP_PASS_NOREPLY_NEWLDR\n",
DP_CONT(pool_hdl->sph_pool->sp_uuid, in->ci_uuid));
rdb_resign(svc->cs_rsvc->s_db, svc->cs_rsvc->s_term);
}
if ((rc == -DER_MISC) && !dup_op && fi_fail_nl_noreply) {
rc = -DER_TIMEDOUT;
D_DEBUG(DB_MD, DF_CONT ": fault injected: DAOS_MD_OP_FAIL_NOREPLY_NEWLDR\n",
DP_CONT(pool_hdl->sph_pool->sp_uuid, in->ci_uuid));
rdb_resign(svc->cs_rsvc->s_db, svc->cs_rsvc->s_term);
}

D_DEBUG(DB_MD, DF_CONT": opc=%d returning, "DF_RC"\n",
Expand Down
3 changes: 1 addition & 2 deletions src/container/srv_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ struct cont_svc {
rdb_path_t cs_root; /* root KVS */
rdb_path_t cs_uuids; /* container UUIDs KVS */
rdb_path_t cs_conts; /* container KVS */
rdb_path_t cs_hdls; /* container handle KVS */
rdb_path_t cs_ops; /* metadata ops KVS */
rdb_path_t cs_hdls; /* container handle KVS */
struct ds_pool *cs_pool;

/* Manage the EC aggregation epoch */
Expand Down
3 changes: 0 additions & 3 deletions src/container/srv_layout.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ RDB_STRING_KEY(ds_cont_prop_, cuuids);
RDB_STRING_KEY(ds_cont_prop_, conts);
RDB_STRING_KEY(ds_cont_prop_, cont_handles);
RDB_STRING_KEY(ds_cont_prop_, oit_oids);
RDB_STRING_KEY(ds_cont_prop_, svc_ops);
RDB_STRING_KEY(ds_cont_prop_, svc_ops_enabled);

/* Container properties KVS */
RDB_STRING_KEY(ds_cont_prop_, ghce);
RDB_STRING_KEY(ds_cont_prop_, ghpce);
Expand Down
10 changes: 4 additions & 6 deletions src/container/srv_layout.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,17 @@
*
* extern d_iov_t ds_cont_prop_new_key; comment_on_value_type
*
* Note 1. The "new_key" name in ds_cont_prop_new_key must not appear (with very few exceptions)
* in the root KVS in src/pool/srv_layout.h, that is, there must usually not be a
* ds_pool_prop_new_key, because the two root KVSs are the same RDB KVS.
* Note 1. The "new_key" name in ds_cont_prop_new_key must not appear in the root KVS in
* src/pool/srv_layout.h, that is, there must not be a ds_pool_prop_new_key, because the two root
* KVSs are the same RDB KVS.
*
* Note 2. The comment_on_value_type shall focus on the value type only;
* usage shall be described above in this comment following existing
* examples. If the value is another KVS, its type shall be the KVS name.
*/
extern d_iov_t ds_cont_prop_cuuids; /* container UUIDs KVS */
extern d_iov_t ds_cont_prop_conts; /* container KVS */
extern d_iov_t ds_cont_prop_cont_handles; /* container handle KVS */
extern d_iov_t ds_cont_prop_svc_ops; /* service ops KVS - common to pool, container */
extern d_iov_t ds_cont_prop_svc_ops_enabled; /* uint32_t - common to pool, container */
extern d_iov_t ds_cont_prop_cont_handles; /* container handle KVS */
/* Please read the IMPORTANT notes above before adding new keys. */

/*
Expand Down
3 changes: 2 additions & 1 deletion src/control/lib/daos/pool_cont_prop.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ const (
//PoolPropertyPerfDomain is pool performance domain
PoolPropertyPerfDomain = C.DAOS_PROP_PO_PERF_DOMAIN
//PoolPropertyReintMode is pool reintegration mode
PoolPropertyReintMode = C.DAOS_PROP_PO_REINT_MODE
PoolPropertyReintMode = C.DAOS_PROP_PO_REINT_MODE
PoolPropertySvcOpsEnabled = C.DAOS_PROP_PO_SVC_OPS_ENABLED
)

const (
Expand Down
Loading

0 comments on commit e1d0cd7

Please sign in to comment.