Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
test
  • Loading branch information
yyyshi committed Dec 13, 2024
1 parent efb9fde commit 205014d
Show file tree
Hide file tree
Showing 19 changed files with 119 additions and 9 deletions.
2 changes: 2 additions & 0 deletions src/bio/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ def scons():
denv.AppendUnique(OBJPREFIX='b_')

# SPDK related libs
# libbio.so 依赖spdk 的库
# 所在目录:/opt/daos/prereq/release/spdk/lib# ll *.so
libs = ['spdk_log', 'spdk_env_dpdk', 'spdk_thread', 'spdk_bdev', 'rte_mempool']
libs += ['rte_mempool_ring', 'rte_bus_pci', 'rte_pci', 'rte_ring']
libs += ['rte_mbuf', 'rte_eal', 'rte_kvargs', 'spdk_bdev_aio']
Expand Down
26 changes: 22 additions & 4 deletions src/bio/bio_xstream.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ static struct bio_nvme_data nvme_glb;
static int
bio_spdk_env_init(void)
{
// 构建spdk opt
struct spdk_env_opts opts;
bool enable_rpc_srv = false;
int rc;
Expand All @@ -123,6 +124,7 @@ bio_spdk_env_init(void)
// == true
if (bio_nvme_configured(SMD_DEV_TYPE_MAX)) {
// 传递allowed bdev pci 地址给spdk,即将nvme addr 添加到opts 的allowlist 中
// nvme 类型的bdev,创建命令是conf 中的 attach controller
rc = bio_add_allowed_alloc(nvme_glb.bd_nvme_conf, &opts, &roles);
if (rc != 0) {
D_ERROR("Failed to add allowed devices to SPDK env, "DF_RC"\n",
Expand Down Expand Up @@ -483,6 +485,7 @@ struct common_cp_arg {
static void
common_prep_arg(struct common_cp_arg *arg)
{
// 设置一个 inflight
arg->cca_inflights = 1;
arg->cca_rc = 0;
arg->cca_bs = NULL;
Expand All @@ -491,6 +494,7 @@ common_prep_arg(struct common_cp_arg *arg)
static void
common_init_cb(void *arg, int rc)
{
// rpc 执行成功或者超时后,inflight 自减
struct common_cp_arg *cp_arg = arg;

D_ASSERT(cp_arg->cca_inflights == 1);
Expand Down Expand Up @@ -1732,28 +1736,39 @@ bio_xsctxt_alloc(struct bio_xs_context **pctxt, int tgt_id, bool self_polling)
common_prep_arg(&cp_arg);
// 这里用的是全局的nvme conf 文件,每个engine 有自己单独的
// spdk 中子系统其实就是模块的意思,比如子系统名字为bdev,表示bdev 模块
// 这里面会调用 spdk_rpc_initialize,会启动rpc 服务监听
spdk_subsystem_init_from_json_config(nvme_glb.bd_nvme_conf,
SPDK_DEFAULT_RPC_ADDR,
subsys_init_cb, &cp_arg,
true);
// init from json 会注册 rpc_subsystem_poll,rpc_client_connect_poller,用于接收rpc 请求
// 这里是为了等待rpc req 被处理完,或者req 超时
rc = xs_poll_completion(ctxt, &cp_arg.cca_inflights, 0);
D_ASSERT(rc == 0);

// 初始化失败了
if (cp_arg.cca_rc != 0) {
rc = cp_arg.cca_rc;
D_ERROR("failed to init bdevs, rc:%d\n", rc);
goto out;
}

/* Continue poll until no more events */
// todo: 这里是为啥
// 这个时间点:
// 1. 不会有其他xs,所有没人给这个 init_thread 通过 spdk_thread_send_msg 发meg
// 2. 所以只能是处理已经注册过的poller:rpc_subsystem_poll,rpc_client_connect_poller
// 1-1: spdk_subsystem_init_from_json_config --> spdk_rpc_initialize --> rpc_subsystem_poll --> spdk_rpc_accept ---> spdk_jsonrpc_server_poll: 检查监听的socket,处理连接
// 2-2: spdk_subsystem_init_from_json_config --> rpc_client_connect_poller --> spdk_jsonrpc_client_poll --> jsonrpc_client_poll / jsonrpc_client_poll_connecting
// return 1 if work was done. 0 if no work was done.
// 返回0表示轮询成功,但是没有处理任何事件;返回>0 表示轮询成功,且处理了n 个事件;返回 <0 表示轮询失败
// 这里是loop 直到所有的事件都被处理完
while (spdk_thread_poll(ctxt->bxc_thread, 0, 0) > 0)
;
// 这个日志会打印,tgt_id 为 0
// todo: 什么样才算初始化完成,此时可以通过 spdk_bdev_first 拿到spdk bdev 了吗?
// 初始化完成,此时可以通过 spdk_bdev_first 拿到spdk bdev
D_DEBUG(DB_MGMT, "SPDK bdev initialized, tgt_id:%d", tgt_id);

// 将spdk thread 设置到全局数据中,表示在众多(每个xs 有各自的spdk thread)的thread 中,当前是扫描bdev那个
// 将spdk thread 设置到全局数据中,表示在众多(每个xs 有各自的spdk thread)的thread 中,当前是初始化的那个
nvme_glb.bd_init_thread = ctxt->bxc_thread;

// 内部会load_blobstore 最终创建spdk bs,只有target 0 会执行
Expand All @@ -1764,7 +1779,10 @@ bio_xsctxt_alloc(struct bio_xs_context **pctxt, int tgt_id, bool self_polling)
goto out;
}

// bio bdev 初始化完成后,重启spdk rpc server。rpc 默认是禁用的
// 上面 spdk_subsystem_init_from_json_config 中会通过调用 spdk_rpc_initialize 自动开启rpc 监听
// 用于处理json 文件中配置的method,其中 nvme_attach_controller 会1. 创建nvme ctrlr 2. 添加ns 到nvme ctrlr 3. 注册spdk nvme bdev
// 在 init_bio_bdevs 函数执行之前,spdk nvme bdev 注册已经完成(通过nvme_attach_controller 对应的mapping 函数,走的rpc 流程)
// 当 bio bdev 初始化完成后,重启spdk rpc server,这里是false,所以不用重启
/* After bio_bdevs are initialized, restart SPDK JSON-RPC server if required. */
// 默认是false
if (nvme_glb.bd_enable_rpc_srv) {
Expand Down
2 changes: 1 addition & 1 deletion src/control/cmd/daos_server_helper/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ type bdevHandler struct {
bdevProvider *bdev.Provider
}

// todo: provider 怎么理解
func (h *bdevHandler) setupProvider(log logging.Logger) {
if h.bdevProvider == nil {
// 调用了bdev 包下的 DefaultProvider。对应的后端是 spdkbackend
Expand All @@ -257,6 +256,7 @@ func (h *bdevScanHandler) Handle(log logging.Logger, req *pbin.Request) *pbin.Re
// bdev scan 外层函数,内部通过 bdev 包下的 setupProvider 最终使用spdkbackend
h.setupProvider(log)

// helper app 收到请求,构建provider 调用scan
sRes, err := h.bdevProvider.Scan(sReq)
if err != nil {
return pbin.NewResponseWithError(err)
Expand Down
1 change: 1 addition & 0 deletions src/control/cmd/daos_server_helper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
)

func main() {
// daos_server 可以调用daos_server_helper
app := pbin.NewApp().
WithAllowedCallers("daos_server")

Expand Down
5 changes: 4 additions & 1 deletion src/control/pbin/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (a *App) configureLogging(logPath string) {

// WithAllowedCallers adds a list of process names allowed to call this
// application.
// 添加一个列表,允许调用当前app
func (a *App) WithAllowedCallers(callers ...string) *App {
a.allowedCallers = callers
return a
Expand Down Expand Up @@ -123,7 +124,7 @@ func (a *App) logError(err error) error {
}

// Run executes the helper application process.

// 执行helper app
func (a *App) Run() error {
parentName, err := a.process.ParentProcessName()
if err != nil {
Expand Down Expand Up @@ -153,6 +154,7 @@ func (a *App) Run() error {
return a.logError(err)
}

// 处理请求
resp := a.handleRequest(req)

err = a.writeResponse(resp, conn)
Expand Down Expand Up @@ -206,6 +208,7 @@ func (a *App) readRequest(rdr io.Reader) (*Request, error) {
}

func (a *App) handleRequest(req *Request) *Response {
// 按照名字获取hdl
reqHandler, ok := a.handlers[req.Method]
if !ok {
err := a.logError(errors.Errorf("unhandled method %q", req.Method))
Expand Down
5 changes: 5 additions & 0 deletions src/control/pbin/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ func ReadMessage(conn io.Reader) ([]byte, error) {

// ExecReq executes the supplied Request by starting a child process
// to service the request. Returns a Response if successful.
// 开启一个子协程执行转发命令
// binPath 为 "daos_server_helper"
func ExecReq(parent context.Context, log logging.Logger, binPath string, req *Request) (res *Response, err error) {
if req == nil {
return nil, errors.New("nil request")
Expand All @@ -130,6 +132,7 @@ func ExecReq(parent context.Context, log logging.Logger, binPath string, req *Re
ctx, killChild := context.WithCancel(parent)
defer killChild()

// 传入bin,调用go api
child := exec.CommandContext(ctx, binPath)
child.Stderr = &cmdLogger{
logFn: log.Error,
Expand Down Expand Up @@ -160,12 +163,14 @@ func ExecReq(parent context.Context, log logging.Logger, binPath string, req *Re
defer func() {
// If there was an error, kill the child so that it can't
// hang around waiting for input.
// 出错时,主动kill child
if err != nil {
killChild()
return
}

// Otherwise, the child should exit normally.
// 成功时,child 会自动退出
err = child.Wait()
}()

Expand Down
7 changes: 7 additions & 0 deletions src/control/pbin/forwarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

type (
// Forwarder provides a common implementation of a request forwarder.
// 提供通用的请求转发功能
Forwarder struct {
Disabled bool

Expand Down Expand Up @@ -57,6 +58,7 @@ func NewForwarder(log logging.Logger, pbinName string) *Forwarder {
}

// GetBinaryName returns the name of the binary requests will be forwarded to.
// 获取请求要转发到的二进制程序名字,这里是转给daos_server_helper
func (f *Forwarder) GetBinaryName() string {
return f.pbinName
}
Expand All @@ -74,6 +76,7 @@ func (f *Forwarder) CanForward() bool {
// SendReq is responsible for marshaling the forwarded request into a message
// that is sent to the privileged binary, then unmarshaling the response for
// the caller.
// 负责转发给又特权的 daos_server_helper
func (f *Forwarder) SendReq(method string, fwdReq interface{}, fwdRes interface{}) error {
if fwdReq == nil {
return errors.New("nil request")
Expand All @@ -82,6 +85,7 @@ func (f *Forwarder) SendReq(method string, fwdReq interface{}, fwdRes interface{
return errors.New("nil response")
}

// 找daos_server_helper
pbinPath, err := common.FindBinary(f.pbinName)
if err != nil {
return err
Expand All @@ -92,12 +96,15 @@ func (f *Forwarder) SendReq(method string, fwdReq interface{}, fwdRes interface{
return errors.Wrap(err, "failed to marshal forwarded request as payload")
}

// 构建req
req := &Request{
Method: method,
Payload: payload,
}

ctx := context.TODO()
// 抓发执行cmd
// pbinPath 为daos_server_helper
res, err := ExecReq(ctx, f.log, pbinPath, req)
if err != nil {
if fault.IsFault(err) {
Expand Down
3 changes: 3 additions & 0 deletions src/control/server/config/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ func (cfg *Server) SetPath(inPath string) error {
return err
}

// active 配置文件:/var/run/daos_server/.daos_server.active.yml
// SaveActiveConfig saves read-only active config, tries config dir then /tmp/.
func (cfg *Server) SaveActiveConfig(log logging.Logger) {
activeConfig := filepath.Join(cfg.SocketDir, ConfigOut)
Expand Down Expand Up @@ -650,10 +651,12 @@ func (cfg *Server) Validate(log logging.Logger) (err error) {
}

// Set DisableVMD reference if unset in config file.
// 如果配置中没有默认为 vmd 开启
if cfg.DisableVMD == nil {
cfg.WithDisableVMD(false)
}

// 日志中:vfio=true hotplug=false vmd=true requested in config
log.Debugf("vfio=%v hotplug=%v vmd=%v requested in config", !cfg.DisableVFIO,
cfg.EnableHotplug, !(*cfg.DisableVMD))

Expand Down
4 changes: 4 additions & 0 deletions src/control/server/ctl_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
// StorageControlService encapsulates the storage part of the control service
type StorageControlService struct {
log logging.Logger
// provider 包含多种类型的抽象类
storage *storage.Provider
instanceStorage map[uint32]*storage.Config
getMemInfo common.GetMemInfoFn
Expand Down Expand Up @@ -58,6 +59,7 @@ func (scs *StorageControlService) WithVMDEnabled() *StorageControlService {
}

// NewStorageControlService returns an initialized *StorageControlService
// 构建最外层的service
func NewStorageControlService(log logging.Logger, ecs []*engine.Config) *StorageControlService {
topCfg := &storage.Config{
Tiers: nil,
Expand All @@ -70,9 +72,11 @@ func NewStorageControlService(log logging.Logger, ecs []*engine.Config) *Storage
instanceStorage[uint32(i)] = &c.Storage
}

// 构建scs
return &StorageControlService{
log: log,
instanceStorage: instanceStorage,
// 使用默认的provider
storage: storage.DefaultProvider(log, 0, topCfg),
getMemInfo: common.GetMemInfo,
}
Expand Down
1 change: 1 addition & 0 deletions src/control/server/ctl_svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

// ControlService implements the control plane control service, satisfying
// ctlpb.CtlSvcServer, and is the data container for the service.
// 控制层服务
type ControlService struct {
ctlpb.UnimplementedCtlSvcServer
StorageControlService
Expand Down
1 change: 1 addition & 0 deletions src/control/server/mgmt_svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (br *batchRequest) sendResponse(parent context.Context, msg proto.Message,

// mgmtSvc implements (the Go portion of) Management Service, satisfying
// mgmtpb.MgmtSvcServer.
// go 语言部分的管理服务
type mgmtSvc struct {
mgmtpb.UnimplementedMgmtSvcServer
log logging.Logger
Expand Down
13 changes: 12 additions & 1 deletion src/control/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ func processConfig(log logging.Logger, cfg *config.Server, fis *hardware.FabricI
}
}

// 保存active 配置文件:/var/run/daos_server/.daos_server.active.yml
cfg.SaveActiveConfig(log)

// 设置daos server helper 的env
if err := setDaosHelperEnvs(cfg, osSetenv); err != nil {
return err
}
Expand Down Expand Up @@ -195,11 +197,14 @@ func (srv *server) createServices(ctx context.Context) (err error) {
if err != nil {
return
}
// todo: mb
srv.membership = system.NewMembership(srv.log, srv.sysdb)

// Create rpcClient for inter-server communication.
// todo: 哪个rpc
cliCfg := control.DefaultConfig()
cliCfg.TransportConfig = srv.cfg.TransportConfig
// 客户端作用
rpcClient := control.NewClient(
control.WithClientComponent(build.ComponentServer),
control.WithConfig(cliCfg),
Expand All @@ -208,12 +213,14 @@ func (srv *server) createServices(ctx context.Context) (err error) {
// Create event distribution primitives.
srv.pubSub = events.NewPubSub(ctx, srv.log)
srv.OnShutdown(srv.pubSub.Close)
// todo: 事件转发
srv.evtForwarder = control.NewEventForwarder(rpcClient, srv.cfg.AccessPoints)
srv.evtLogger = control.NewEventLogger(srv.log)

// nvme 的控制器服务,prepare 和scan 请求都会通过这个控制器服务来完成
// 1. 控制层服务,prepare 和scan 请求都会通过这个控制器服务来完成
srv.ctlSvc = NewControlService(srv.log, srv.harness, srv.cfg, srv.pubSub,
hwprov.DefaultFabricScanner(srv.log))
// 2. 管理层服务
srv.mgmtSvc = newMgmtSvc(srv.harness, srv.membership, srv.sysdb, rpcClient, srv.pubSub)

if err := srv.mgmtSvc.systemProps.UpdateCompPropVal(daos.SystemPropertyDaosSystem, func() string {
Expand Down Expand Up @@ -336,6 +343,7 @@ func (srv *server) addEngines(ctx context.Context) error {

// Retrieve NVMe device details (before engines are started) so static details can be
// recovered by the engine storage provider(s) during scan even if devices are in use.
// 获取nvme 设备的详细信息
nvmeScanResp, err := scanBdevStorage(srv)
if err != nil {
return err
Expand Down Expand Up @@ -594,6 +602,7 @@ func Start(log logging.Logger, cfg *config.Server) error {
return errors.Wrapf(err, "retrieve system memory info")
}

// 处理配置文件
if err = processConfig(log, cfg, fis, mi, lookupIF, genFiAffFn(fis)); err != nil {
return err
}
Expand All @@ -604,6 +613,8 @@ func Start(log logging.Logger, cfg *config.Server) error {
if err != nil {
return err
}

// 日志:fault domain: /server01。配置文件中没配置,默认是host 级别
log.Debugf("fault domain: %s", faultDomain.String())

// 根据容错域和配置信息新建server
Expand Down
5 changes: 4 additions & 1 deletion src/control/server/server_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,15 +370,18 @@ func prepBdevStorage(srv *server, iommuEnabled bool) error {
}

// scanBdevStorage performs discovery and validates existence of configured NVMe SSDs.
// scan 操作用于发现和检查conf 中指定的nvme 是否存在
func scanBdevStorage(srv *server) (*storage.BdevScanResponse, error) {
// 执行dmg storage scan 时会打印
// 1. 执行dmg storage scan 时会打印
// 2. server 启动时候自动会去scan,也会打印,from func addEngines
defer srv.logDuration(track("time to scan bdev storage"))

if srv.cfg.DisableHugepages {
srv.log.Debugf("skip nvme scan as hugepages have been disabled in config")
return &storage.BdevScanResponse{}, nil
}

// 执行scan
nvmeScanResp, err := srv.ctlSvc.NvmeScan(storage.BdevScanRequest{
DeviceList: getBdevCfgsFromSrvCfg(srv.cfg).Bdevs(),
})
Expand Down
Loading

0 comments on commit 205014d

Please sign in to comment.