Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
test
  • Loading branch information
yyyshi committed Aug 9, 2024
1 parent afe4455 commit f16b150
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 1 deletion.
6 changes: 6 additions & 0 deletions src/dtx/dtx_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,7 @@ dtx_handle_init(struct dtx_id *dti, daos_handle_t coh, struct dtx_epoch *epoch,
dth->dth_need_validation = 0;

// 存在修改的dtx 的dtx id 数组 dti_cos
// cos == conflicts
dth->dth_dti_cos = dti_cos;
dth->dth_dti_cos_count = dti_cos_cnt;
dth->dth_ent = NULL;
Expand Down Expand Up @@ -1331,9 +1332,11 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul
goto sync;
}

// 事务的参与者
mbs = (struct dtx_memberships *)(dte + 1);
memcpy(mbs, dth->dth_mbs, size - sizeof(*dte));

// 填充entry 的信息
dte->dte_xid = dth->dth_xid;
dte->dte_ver = dth->dth_ver;
dte->dte_refs = 1;
Expand All @@ -1353,6 +1356,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul

// 在一次dtx 要结束的时候,向cos 缓存中添加dtx entry
// 这是向 dcr_prio_list 中插入一个item,这个list 将在 dtx_leader_begin 的时候遍历并处理
// dte 里面有dtx id
rc = dtx_add_cos(cont, dte, &dth->dth_leader_oid,
dth->dth_dkey_hash, dth->dth_epoch, flags);
dtx_entry_put(dte);
Expand All @@ -1378,8 +1382,10 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul
* done successfully. That is not only for sync commit, but also for async
* batched commit.
*/
// 标记为可提交
vos_dtx_mark_committable(dth);
dte = &dth->dth_dte;
// 提交事务
rc = dtx_commit(cont, &dte, NULL, 1);
if (rc != 0)
D_WARN(DF_UUID": Fail to sync commit DTX "DF_DTI": "DF_RC"\n",
Expand Down
3 changes: 3 additions & 0 deletions src/dtx/dtx_cos.c
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt,
return i;
}

// 从dbtree 中获取以oid 和dkey_hash 为key 的cos 列表
int
dtx_list_cos(struct ds_cont_child *cont, daos_unit_oid_t *oid,
uint64_t dkey_hash, int max, struct dtx_id **dtis)
Expand All @@ -347,6 +348,7 @@ dtx_list_cos(struct ds_cont_child *cont, daos_unit_oid_t *oid,
if (rc != 0)
return rc == -DER_NONEXIST ? 0 : rc;

// 获取查到的cos 数组
dcr = (struct dtx_cos_rec *)riov.iov_buf;
if (dcr->dcr_prio_count == 0)
return 0;
Expand Down Expand Up @@ -381,6 +383,7 @@ dtx_list_cos(struct ds_cont_child *cont, daos_unit_oid_t *oid,
return count;
}

// 添加以oid & dkey_hash 作为key 的cos item 到dbtree
int
dtx_add_cos(struct ds_cont_child *cont, struct dtx_entry *dte,
daos_unit_oid_t *oid, uint64_t dkey_hash,
Expand Down
14 changes: 14 additions & 0 deletions src/dtx/dtx_rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,12 @@ dtx_req_list_send(struct dtx_common_args *dca, daos_epoch_t epoch, int len)

D_DEBUG(DB_TRACE, "DTX req for opc %x, future %p start.\n", dra->dra_opc, dra->dra_future);

// todo: 这个head 里的item 都是哪里来的
d_list_for_each_entry(drr, &dca->dca_head, drr_link) {
drr->drr_parent = dra;
drr->drr_result = 0;

// 对应的处理rpc 的函数是:dtx_handler(crt_rpc_t *rpc)
if (unlikely(dra->dra_opc == DTX_COMMIT && i == 0 &&
DAOS_FAIL_CHECK(DAOS_DTX_FAIL_COMMIT)))
rc = dtx_req_send(drr, 1);
Expand Down Expand Up @@ -644,6 +646,7 @@ dtx_rpc_internal(struct dtx_common_args *dca)

D_ASSERT(length > 0);

// 发送dtx 的rpc
return dtx_req_list_send(dca, dca->dca_epoch, length);
}

Expand Down Expand Up @@ -705,6 +708,7 @@ dtx_rpc_prep(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **
}

/* Use helper ULT to handle DTX RPC if there are enough helper XS. */
// 如果有额外的helper xs,使用ult 来处理dtx rpc
if (dss_has_enough_helper())
rc = dss_ult_create(dtx_rpc_helper, dca, DSS_XS_IOFW, dca->dca_tgtid,
DSS_DEEP_STACK_SZ, &dca->dca_helper);
Expand All @@ -724,11 +728,13 @@ dtx_rpc_post(struct dtx_common_args *dca, int ret, bool keep_head)
if (dca->dca_helper != ABT_THREAD_NULL)
ABT_thread_free(&dca->dca_helper);

// dtx 的args
rc = dtx_req_wait(&dca->dca_dra);

if (daos_handle_is_valid(dca->dca_tree_hdl))
dbtree_destroy(dca->dca_tree_hdl, NULL);

// keep_head == false
if (!keep_head) {
while ((drr = d_list_pop_entry(&dca->dca_head, struct dtx_req_rec,
drr_link)) != NULL)
Expand All @@ -752,6 +758,7 @@ dtx_rpc_post(struct dtx_common_args *dca, int ret, bool keep_head)
* as one target has committed, then the DTX logic can re-sync those failed
* targets when dtx_resync() is triggered next time.
*/
// 全局提交给定的dtx 数组
int
dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes,
struct dtx_cos_key *dcks, int count)
Expand All @@ -764,6 +771,8 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes,
int rc1 = 0;
int i;

// 发送dtx 的rpc 请求给参与者
// todo; 这个是2pc 中的p 吗?
rc = dtx_rpc_prep(cont, NULL, dtes, count, DTX_COMMIT, 0, NULL, NULL, NULL, &dca);

/*
Expand All @@ -777,6 +786,10 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes,
*
* Some RPC may has been sent, so need to wait even if dtx_rpc_prep hit failure.
*/
// 在提交该dtx 到远程参与者之前,不能本地移除active 状态的dtx。否则,在远程参与者提交之前
// 本地已提交的dtx 记录将通过dtx agg被移除。在这种情况时,如果一些远程dtx 这样...会那样...
// 所以我们让远程参与者先提交,如果失败了就让leader 重试直到成功
// dtx_rpc_post 这个函数是等待远程的参与者提交完成
rc = dtx_rpc_post(&dca, rc, false);
if (rc > 0 || rc == -DER_NONEXIST || rc == -DER_EXCLUDED)
rc = 0;
Expand All @@ -801,6 +814,7 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes,
}
}

// 向远程参与者发送完提交rpc 并等待远程提交完成后,开始进行本地提交
rc1 = vos_dtx_commit(cont->sc_hdl, dca.dca_dtis, count, rm_cos);
if (rc1 > 0) {
dra->dra_committed += rc1;
Expand Down
2 changes: 2 additions & 0 deletions src/dtx/dtx_srv.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ struct dss_module_metrics dtx_metrics = {
.dmm_nr_metrics = dtx_metrics_count,
};

// dtx rpc 的服务端接口
static void
dtx_handler(crt_rpc_t *rpc)
{
Expand Down Expand Up @@ -175,6 +176,7 @@ dtx_handler(crt_rpc_t *rpc)
dpm = cont->sc_pool->spc_metrics[DAOS_DTX_MODULE];

switch (opc) {
// 提交dtx rpc 请求
case DTX_COMMIT: {
uint64_t opc_cnt = 0;
uint64_t ent_cnt = 0;
Expand Down
1 change: 1 addition & 0 deletions src/include/daos_srv/dtx_srv.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct dtx_handle {
/** Match dtx_entry::dte_refs. */
uint32_t dth_refs;
/** The DTX participants information. */
// 事务参与者的信息
struct dtx_memberships *dth_mbs;
};
};
Expand Down
1 change: 1 addition & 0 deletions src/include/daos_srv/vos_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ struct dtx_entry {
/** The reference count. */
uint32_t dte_refs;
/** The DAOS targets participating in the DTX. */
// 此次事务参与的targets
struct dtx_memberships *dte_mbs;
};

Expand Down
1 change: 1 addition & 0 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -2905,6 +2905,7 @@ ds_obj_rw_handler(crt_rpc_t *rpc)
uint32_t opc = opc_get(rpc->cr_opc);
struct dtx_memberships *mbs = NULL;
struct daos_shard_tgt *tgts = NULL;
// 两阶段提交事务标识符,由客户端产生,全局唯一
struct dtx_id *dti_cos = NULL;
struct obj_pool_metrics *opm;
int dti_cos_cnt;
Expand Down
4 changes: 3 additions & 1 deletion src/vea/vea_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ vea_unload(struct vea_space_info *vsi)
* Load space tracking information from SCM to initialize the in-memory
* compound index.
*/
// 生成vsi
int
vea_load(struct umem_instance *umem, struct umem_tx_stage_data *txd,
struct vea_space_df *md, struct vea_unmap_context *unmap_ctxt,
Expand Down Expand Up @@ -334,6 +335,7 @@ vea_load(struct umem_instance *umem, struct umem_tx_stage_data *txd,
vsi->vsi_bitmap_btr = DAOS_HDL_INVAL;
D_INIT_LIST_HEAD(&vsi->vsi_agg_lru);
vsi->vsi_agg_btr = DAOS_HDL_INVAL;
// flush 参数
vsi->vsi_flush_time = 0;
vsi->vsi_flush_scheduled = false;
vsi->vsi_unmap_ctxt = *unmap_ctxt;
Expand All @@ -349,7 +351,7 @@ vea_load(struct umem_instance *umem, struct umem_tx_stage_data *txd,
memset(&uma, 0, sizeof(uma));
uma.uma_id = UMEM_CLASS_VMEM;
/* Create in-memory free extent tree */
// free extent 的树,后面申请预留资源信息都是些在这个树上
// free extent 的树,后面申请预留资源信息都是写在这个树上
rc = dbtree_create(DBTREE_CLASS_IFV, BTR_FEAT_DIRECT_KEY, VEA_TREE_ODR, &uma, NULL,
&vsi->vsi_free_btr);
if (rc != 0)
Expand Down
1 change: 1 addition & 0 deletions src/vos/vos_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,7 @@ pool_open(void *ph, struct vos_pool_df *pool_df, unsigned int flags, void *metri
unmap_ctxt.vnc_data = vos_data_ioctxt(pool);
unmap_ctxt.vnc_ext_flush = flags & VOS_POF_EXTERNAL_FLUSH;
// 打开vos pool 的时候,先加载vea 的space 信息
// 后面 vea_reserve 是要根据vsi 信息来申请资源
// pool_df->pd_vea_df 里存储了vsi 的元数据信息
rc = vea_load(&pool->vp_umm, vos_txd_get(flags & VOS_POF_SYSDB),
&pool_df->pd_vea_df, &unmap_ctxt, vea_metrics, &pool->vp_vea_info);
Expand Down

0 comments on commit f16b150

Please sign in to comment.