From d96bbfd2e60964218b6c95da33f523218e359fbe Mon Sep 17 00:00:00 2001 From: Tom Nabarro Date: Tue, 10 Dec 2024 23:48:16 +0000 Subject: [PATCH] refactor to reduce repetition Features: control Required-githooks: true Signed-off-by: Tom Nabarro --- src/control/server/mgmt_system.go | 201 +++++++++++-------------- src/control/server/mgmt_system_test.go | 4 +- 2 files changed, 94 insertions(+), 111 deletions(-) diff --git a/src/control/server/mgmt_system.go b/src/control/server/mgmt_system.go index c56dfedec7c..0909267919e 100644 --- a/src/control/server/mgmt_system.go +++ b/src/control/server/mgmt_system.go @@ -23,6 +23,7 @@ import ( "github.com/pkg/errors" "golang.org/x/sys/unix" "google.golang.org/grpc/peer" + "google.golang.org/protobuf/proto" "github.com/daos-stack/daos/src/control/build" "github.com/daos-stack/daos/src/control/common" @@ -1095,6 +1096,51 @@ func (svc *mgmtSvc) refuseMissingRanks(hosts, ranks string) (*ranklist.RankSet, return hitRanks, nil } +// Build mappings of pools to any ranks that match the input filter by iterating through the pool +// service list. Identify pools by label if possible. +func (svc *mgmtSvc) getPoolsRanks(ranks *ranklist.RankSet) ([]string, poolRanksMap, error) { + poolRanks := make(poolRanksMap) + poolIDs := []string{} // Label or UUID. + + psList, err := svc.sysdb.PoolServiceList(false) + if err != nil { + return nil, nil, err + } + + ranksMap := make(map[ranklist.Rank]struct{}) + for _, r := range ranks.Ranks() { + ranksMap[r] = struct{}{} + } + + for _, ps := range psList { + currentRanks := ps.Storage.CurrentRanks() + + // Label preferred over UUID. + poolID := ps.PoolLabel + if poolID == "" { + poolID = ps.PoolUUID.String() + } + + svc.log.Tracef("pool-service detected: id %s, ranks %v", poolID, currentRanks) + + for _, r := range currentRanks { + if _, exists := ranksMap[r]; !exists { + continue + } + if _, exists := poolRanks[poolID]; !exists { + poolRanks[poolID] = ranklist.MustCreateRankSet("") + poolIDs = append(poolIDs, poolID) + } + poolRanks[poolID].Add(r) + } + } + svc.log.Debugf("pool-ranks to operate on: %v", poolRanks) + + sort.Strings(poolIDs) + + return poolIDs, poolRanks, nil +} + func osaResultsFromRanks(id string, succeeded *ranklist.RankSet, failed poolRanksMap) []*mgmtpb.SystemOsaResult { results := []*mgmtpb.SystemOsaResult{} @@ -1128,7 +1174,7 @@ func osaResultsFromRanks(id string, succeeded *ranklist.RankSet, failed poolRank type poolRanksMap map[string]*ranklist.RankSet -type osaPoolRankOpSig func(*mgmtSvc, context.Context, string, string, ranklist.Rank) (int32, string, error) +type osaPoolRankOpSig func(*mgmtSvc, context.Context, string, string, ranklist.Rank) (int32, string) // Generate OSA operation results by iterating through pool's ranks and calling supplied fn on each. func (svc *mgmtSvc) getOsaResults(ctx context.Context, sys string, poolIDs []string, poolRanks poolRanksMap, drpcCall osaPoolRankOpSig) ([]*mgmtpb.SystemOsaResult, error) { @@ -1142,16 +1188,15 @@ func (svc *mgmtSvc) getOsaResults(ctx context.Context, sys string, poolIDs []str succeeded := ranklist.MustCreateRankSet("") failed := make(poolRanksMap) - // TODO DAOS-6611: Drain multiple pool-ranks per call when drpc.MethodPoolDrain API - // supports it. + svc.log.Tracef("operating on ranks %v on pool %s", rs, id) + + // TODO DAOS-6611: Operate on multiple pool-ranks per call when + // drpc.MethodPool{Drain|Reint} API supports it. for _, r := range rs.Ranks() { - status, errMsg, err := drpcCall(svc, ctx, sys, id, r) - if err != nil { - return nil, err - } + status, errMsg := drpcCall(svc, ctx, sys, id, r) // Each rank-drain failure message will produce a single result. - if status != 0 { + if status != int32(daos.Success) { if _, exists := failed[errMsg]; !exists { failed[errMsg] = ranklist.MustCreateRankSet("") } @@ -1167,87 +1212,57 @@ func (svc *mgmtSvc) getOsaResults(ctx context.Context, sys string, poolIDs []str return results, nil } -// Build mappings of pools to any ranks that match the input filter by iterating through the pool -// service list. Identify pools by label if possible. -func (svc *mgmtSvc) getPoolsRanks(ranks *ranklist.RankSet) ([]string, poolRanksMap, error) { - poolRanks := make(poolRanksMap) - poolIDs := []string{} // Label or UUID. - - psList, err := svc.sysdb.PoolServiceList(false) - if err != nil { - return nil, nil, err +// Drain rank on a pool by calling over dRPC. Function signature satisfies osaPoolRankOpSig type. +func drainPoolRank(svc *mgmtSvc, ctx context.Context, sys, id string, rank ranklist.Rank) (int32, string) { + pbReq := &mgmtpb.PoolDrainReq{ + Sys: sys, + Rank: rank.Uint32(), + Id: id, } - ranksMap := make(map[ranklist.Rank]struct{}) - for _, r := range ranks.Ranks() { - ranksMap[r] = struct{}{} + pbResp, err := svc.PoolDrain(ctx, pbReq) + if err != nil { + return int32(daos.MiscError), err.Error() } - - for _, ps := range psList { - currentRanks := ps.Storage.CurrentRanks() - - // Label preferred over UUID. - poolID := ps.PoolLabel - if poolID == "" { - poolID = ps.PoolUUID.String() - } - - svc.log.Tracef("pool-service detected: id %s, ranks %v", poolID, currentRanks) - - for _, r := range currentRanks { - if _, exists := ranksMap[r]; !exists { - continue - } - if _, exists := poolRanks[poolID]; !exists { - poolRanks[poolID] = ranklist.MustCreateRankSet("") - poolIDs = append(poolIDs, poolID) - } - poolRanks[poolID].Add(r) - } + if pbResp.Status != int32(daos.Success) { + return pbResp.Status, daos.Status(pbResp.Status).Error() } - svc.log.Debugf("pool-ranks to operate on: %v", poolRanks) - sort.Strings(poolIDs) + svc.log.Tracef("pool-drain triggered from system-drain: %+v (req: %+v)", pbResp, pbReq) - return poolIDs, poolRanks, nil + return int32(daos.Success), "" } -// Drain rank on a pool by calling over dRPC. -func drainPoolRank(svc *mgmtSvc, ctx context.Context, sys, id string, rank ranklist.Rank) (int32, string, error) { - var errMsg string - drainReq := &mgmtpb.PoolDrainReq{ +// Reint rank on a pool by calling over dRPC. Function signature satisfies osaPoolRankOpSig type. +func reintPoolRank(svc *mgmtSvc, ctx context.Context, sys, id string, rank ranklist.Rank) (int32, string) { + pbReq := &mgmtpb.PoolReintReq{ Sys: sys, Rank: rank.Uint32(), Id: id, } - drainResp := &mgmtpb.PoolDrainResp{} - drpcResp, err := svc.makeLockedPoolServiceCall(ctx, drpc.MethodPoolDrain, drainReq) + pbResp, err := svc.PoolReint(ctx, pbReq) if err != nil { - return 0, "", err + return int32(daos.MiscError), err.Error() } - - if err := svc.unmarshalPB(drpcResp.Body, drainResp); err != nil { - drainResp.Status = int32(daos.MiscError) - errMsg = err.Error() - } else if drainResp.Status != int32(daos.Success) { - errMsg = daos.Status(drainResp.Status).Error() + if pbResp.Status != int32(daos.Success) { + return pbResp.Status, daos.Status(pbResp.Status).Error() } - svc.log.Tracef("pool-drain triggered from system-drain: %+v (req: %+v)", - drainResp, drainReq) + svc.log.Tracef("pool-reint triggered from system-reint: %+v (req: %+v)", pbResp, pbReq) - return drainResp.Status, errMsg, nil + return int32(daos.Success), "" } -// SystemDrain marks specified ranks on all pools as being in a drain state. -func (svc *mgmtSvc) SystemDrain(ctx context.Context, req *mgmtpb.SystemDrainReq) (*mgmtpb.SystemDrainResp, error) { +// Perform leader and requested ranks checks before mapping ranks to existing pools and generating +// results from the relevant dRPC calls to operate on the pool-ranks. +func (svc *mgmtSvc) doSysOsaOp(ctx context.Context, req proto.Message, hosts, ranks, sys string, opCall osaPoolRankOpSig) ([]*mgmtpb.SystemOsaResult, error) { if err := svc.checkLeaderRequest(wrapCheckerReq(req)); err != nil { return nil, err } // Validate requested hosts or ranks exist and fail if any are missing. - hitRanks, err := svc.refuseMissingRanks(req.Hosts, req.Ranks) + hitRanks, err := svc.refuseMissingRanks(hosts, ranks) if err != nil { return nil, err } @@ -1259,64 +1274,32 @@ func (svc *mgmtSvc) SystemDrain(ctx context.Context, req *mgmtpb.SystemDrainReq) } // Generate results from dRPC calls. - results, err := svc.getOsaResults(ctx, req.Sys, poolIDs, poolRanks, drainPoolRank) - if err != nil { - return nil, err - } - - return &mgmtpb.SystemDrainResp{ - Results: results, - }, nil + return svc.getOsaResults(ctx, sys, poolIDs, poolRanks, opCall) } -// Reint rank on a pool by calling over dRPC. -func reintPoolRank(svc *mgmtSvc, ctx context.Context, sys, id string, rank ranklist.Rank) (int32, string, error) { - var errMsg string - reintReq := &mgmtpb.PoolReintReq{ - Sys: sys, - Rank: rank.Uint32(), - Id: id, +// SystemDrain marks specified ranks on all pools as being in a drain state. +func (svc *mgmtSvc) SystemDrain(ctx context.Context, req *mgmtpb.SystemDrainReq) (*mgmtpb.SystemDrainResp, error) { + if req == nil { + return nil, errors.Errorf("nil %T", req) } - reintResp := &mgmtpb.PoolReintResp{} - drpcResp, err := svc.makeLockedPoolServiceCall(ctx, drpc.MethodPoolReint, reintReq) + results, err := svc.doSysOsaOp(ctx, req, req.Hosts, req.Ranks, req.Sys, drainPoolRank) if err != nil { - return 0, "", err - } - - if err := svc.unmarshalPB(drpcResp.Body, reintResp); err != nil { - reintResp.Status = int32(daos.MiscError) - errMsg = err.Error() - } else if reintResp.Status != int32(daos.Success) { - errMsg = daos.Status(reintResp.Status).Error() + return nil, err } - svc.log.Tracef("pool-reint triggered from system-reint: %+v (req: %+v)", - reintResp, reintReq) - - return reintResp.Status, errMsg, nil + return &mgmtpb.SystemDrainResp{ + Results: results, + }, nil } // SystemReint marks specified ranks on all pools as being in a reint state. func (svc *mgmtSvc) SystemReint(ctx context.Context, req *mgmtpb.SystemReintReq) (*mgmtpb.SystemReintResp, error) { - if err := svc.checkLeaderRequest(wrapCheckerReq(req)); err != nil { - return nil, err - } - - // Validate requested hosts or ranks exist and fail if any are missing. - hitRanks, err := svc.refuseMissingRanks(req.Hosts, req.Ranks) - if err != nil { - return nil, err + if req == nil { + return nil, errors.Errorf("nil %T", req) } - // Retrieve rank-to-pool mappings. - poolIDs, poolRanks, err := svc.getPoolsRanks(hitRanks) - if err != nil { - return nil, err - } - - // Generate results from dRPC calls. - results, err := svc.getOsaResults(ctx, req.Sys, poolIDs, poolRanks, reintPoolRank) + results, err := svc.doSysOsaOp(ctx, req, req.Hosts, req.Ranks, req.Sys, drainPoolRank) if err != nil { return nil, err } diff --git a/src/control/server/mgmt_system_test.go b/src/control/server/mgmt_system_test.go index 136371fabdb..d103c965ad3 100644 --- a/src/control/server/mgmt_system_test.go +++ b/src/control/server/mgmt_system_test.go @@ -1853,7 +1853,7 @@ func TestServer_MgmtSvc_SystemDrain(t *testing.T) { }{ "nil req": { req: (*mgmtpb.SystemDrainReq)(nil), - expErr: errors.New("nil request"), + expErr: errors.New("nil *mgmt.SystemDrainReq"), }, "not system leader": { req: &mgmtpb.SystemDrainReq{ @@ -2096,7 +2096,7 @@ func TestServer_MgmtSvc_SystemReint(t *testing.T) { }{ "nil req": { req: (*mgmtpb.SystemReintReq)(nil), - expErr: errors.New("nil request"), + expErr: errors.New("nil *mgmt.SystemReintReq"), }, "not system leader": { req: &mgmtpb.SystemReintReq{