Skip to content

Commit

Permalink
DAOS-9584 chk: Add srv handlers for checker upcalls
Browse files Browse the repository at this point in the history
Implement control plane handlers for the following
engine checker dRPC upcalls:
  * CheckerListPools
  * CheckerRegisterPool
  * CheckerDeregisterPool

Signed-off-by: Michael MacDonald <[email protected]>
  • Loading branch information
mjmac committed Mar 30, 2022
1 parent 2d0bba5 commit 2368ec7
Show file tree
Hide file tree
Showing 7 changed files with 504 additions and 79 deletions.
28 changes: 14 additions & 14 deletions src/control/common/proto/srv/srv.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 19 additions & 9 deletions src/control/server/mgmt_drpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,39 @@ func (mod *mgmtModule) ID() drpc.ModuleID {
return drpc.ModuleMgmt
}

// poolResolver defines an interface to be implemented by
// something that can resolve a pool ID into a PoolService.
type poolResolver interface {
// poolDatabase defines an interface to be implemented by
// a system pool database.
type poolDatabase interface {
FindPoolServiceByLabel(string) (*system.PoolService, error)
FindPoolServiceByUUID(uuid.UUID) (*system.PoolService, error)
PoolServiceList(all bool) ([]*system.PoolService, error)
AddPoolService(ps *system.PoolService) error
RemovePoolService(uuid.UUID) error
UpdatePoolService(ps *system.PoolService) error
}

// srvModule represents the daos_server dRPC module. It handles dRPCs sent by
// the daos_engine (src/engine).
type srvModule struct {
log logging.Logger
sysdb poolResolver
poolDB poolDatabase
engines []Engine
events *events.PubSub
}

// newSrvModule creates a new srv module references to the system database,
// resident EngineInstances and event publish subscribe reference.
func newSrvModule(log logging.Logger, sysdb poolResolver, engines []Engine, events *events.PubSub) *srvModule {
func newSrvModule(log logging.Logger, sysdb poolDatabase, engines []Engine, events *events.PubSub) *srvModule {
return &srvModule{
log: log,
sysdb: sysdb,
poolDB: sysdb,
engines: engines,
events: events,
}
}

// HandleCall is the handler for calls to the srvModule.
func (mod *srvModule) HandleCall(_ context.Context, session *drpc.Session, method drpc.Method, req []byte) ([]byte, error) {
func (mod *srvModule) HandleCall(ctx context.Context, session *drpc.Session, method drpc.Method, req []byte) ([]byte, error) {
switch method {
case drpc.MethodNotifyReady:
return nil, mod.handleNotifyReady(req)
Expand All @@ -80,6 +84,12 @@ func (mod *srvModule) HandleCall(_ context.Context, session *drpc.Session, metho
return mod.handlePoolFindByLabel(req)
case drpc.MethodClusterEvent:
return mod.handleClusterEvent(req)
case drpc.MethodCheckerListPools:
return mod.handleCheckerListPools(ctx, req)
case drpc.MethodCheckerRegisterPool:
return mod.handleCheckerRegisterPool(ctx, req)
case drpc.MethodCheckerDeregisterPool:
return mod.handleCheckerDeregisterPool(ctx, req)
default:
return nil, drpc.UnknownMethodFailure()
}
Expand All @@ -105,7 +115,7 @@ func (mod *srvModule) handleGetPoolServiceRanks(reqb []byte) ([]byte, error) {

resp := new(srvpb.GetPoolSvcResp)

ps, err := mod.sysdb.FindPoolServiceByUUID(uuid)
ps, err := mod.poolDB.FindPoolServiceByUUID(uuid)
if err != nil {
resp.Status = int32(drpc.DaosNonexistant)
mod.log.Debugf("GetPoolSvcResp: %+v", resp)
Expand All @@ -129,7 +139,7 @@ func (mod *srvModule) handlePoolFindByLabel(reqb []byte) ([]byte, error) {

resp := new(srvpb.PoolFindByLabelResp)

ps, err := mod.sysdb.FindPoolServiceByLabel(req.GetLabel())
ps, err := mod.poolDB.FindPoolServiceByLabel(req.GetLabel())
if err != nil {
resp.Status = int32(drpc.DaosNonexistant)
mod.log.Debugf("PoolFindByLabelResp: %+v", resp)
Expand Down
146 changes: 146 additions & 0 deletions src/control/server/mgmt_drpc_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
//
// (C) Copyright 2022 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//

package server

import (
"context"

"github.com/google/uuid"
"google.golang.org/protobuf/proto"

srvpb "github.com/daos-stack/daos/src/control/common/proto/srv"
"github.com/daos-stack/daos/src/control/drpc"
"github.com/daos-stack/daos/src/control/system"
)

func (mod *srvModule) handleCheckerListPools(_ context.Context, reqb []byte) (out []byte, outErr error) {
// TODO: Remove if we never add request fields?
req := new(srvpb.CheckListPoolReq)
if err := proto.Unmarshal(reqb, req); err != nil {
return nil, drpc.UnmarshalingPayloadFailure()
}
mod.log.Debugf("handling CheckerListPools: %+v", req)

resp := new(srvpb.CheckListPoolResp)
defer func() {
mod.log.Debugf("CheckerListPools resp: %+v", resp)
out, outErr = proto.Marshal(resp)
}()

pools, err := mod.poolDB.PoolServiceList(true)
if err != nil {
mod.log.Errorf("failed to list pools: %s", err)
resp.Status = int32(drpc.DaosMiscError)
return
}

for _, ps := range pools {
resp.Pools = append(resp.Pools, &srvpb.CheckListPoolResp_OnePool{
Uuid: ps.PoolUUID.String(),
Label: ps.PoolLabel,
Svcreps: system.RanksToUint32(ps.Replicas),
})
}

return
}

func (mod *srvModule) handleCheckerRegisterPool(_ context.Context, reqb []byte) (out []byte, outErr error) {
req := new(srvpb.CheckRegPoolReq)
if err := proto.Unmarshal(reqb, req); err != nil {
return nil, drpc.UnmarshalingPayloadFailure()
}
mod.log.Debugf("handling CheckerRegisterPool: %+v", req)

resp := new(srvpb.CheckRegPoolResp)
defer func() {
mod.log.Debugf("CheckerRegisterPool resp: %+v", resp)
out, outErr = proto.Marshal(resp)
}()

uuid, err := uuid.Parse(req.Uuid)
if err != nil {
mod.log.Errorf("invalid pool UUID %q: %s", req.Uuid, err)
resp.Status = int32(drpc.DaosInvalidInput)
return
}
if !drpc.LabelIsValid(req.Label) {
mod.log.Errorf("bad pool label %q", req.Label)
resp.Status = int32(drpc.DaosInvalidInput)
return
}
if len(req.Svcreps) == 0 {
mod.log.Errorf("pool %q has zero svcreps", req.Uuid)
resp.Status = int32(drpc.DaosInvalidInput)
return
}
if _, err := mod.poolDB.FindPoolServiceByUUID(uuid); err == nil {
mod.log.Errorf("pool with uuid %q already exists", req.Uuid)
resp.Status = int32(drpc.DaosExists)
return
}
if _, err := mod.poolDB.FindPoolServiceByLabel(req.Label); err == nil {
mod.log.Errorf("pool with label %q already exists", req.Label)
resp.Status = int32(drpc.DaosExists)
return
}

ps := &system.PoolService{
PoolUUID: uuid,
PoolLabel: req.Label,
State: system.PoolServiceStateReady,
Replicas: system.RanksFromUint32(req.Svcreps),
}

if err := mod.poolDB.AddPoolService(ps); err != nil {
mod.log.Errorf("failed to register pool: %s", err)
resp.Status = int32(drpc.DaosMiscError)
return
}

return
}

func (mod *srvModule) handleCheckerDeregisterPool(_ context.Context, reqb []byte) (out []byte, outErr error) {
req := new(srvpb.CheckDeregPoolReq)
if err := proto.Unmarshal(reqb, req); err != nil {
return nil, drpc.UnmarshalingPayloadFailure()
}
mod.log.Debugf("handling CheckerDeregisterPool: %+v", req)

resp := new(srvpb.CheckDeregPoolResp)
defer func() {
mod.log.Debugf("CheckerDeregisterPool resp: %+v", resp)
out, outErr = proto.Marshal(resp)
}()

uuid, err := uuid.Parse(req.Uuid)
if err != nil {
mod.log.Errorf("invalid pool UUID %q: %s", req.Uuid, err)
resp.Status = int32(drpc.DaosInvalidInput)
return
}

if _, err := mod.poolDB.FindPoolServiceByUUID(uuid); err != nil {
if system.IsPoolNotFound(err) {
mod.log.Errorf("pool with uuid %q does not exist", req.Uuid)
resp.Status = int32(drpc.DaosNonexistant)
} else {
mod.log.Errorf("failed to check pool uuid: %s", err)
resp.Status = int32(drpc.DaosMiscError)
}
return
}

if err := mod.poolDB.RemovePoolService(uuid); err != nil {
mod.log.Errorf("failed to remove pool: %s", err)
resp.Status = int32(drpc.DaosMiscError)
return
}

return
}
Loading

0 comments on commit 2368ec7

Please sign in to comment.