Skip to content

Commit

Permalink
Merge pull request #1191 from huww98/ctx-logging
Browse files Browse the repository at this point in the history
common: enable contextual logging
  • Loading branch information
k8s-ci-robot authored Nov 19, 2024
2 parents 927db13 + 568daa0 commit bb4873e
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 29 deletions.
49 changes: 33 additions & 16 deletions pkg/common/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"
)

func WrapControllerServerWithValidator(server csi.ControllerServer) csi.ControllerServer {
Expand All @@ -16,24 +17,28 @@ type ControllerServerWithValidator struct {
csi.ControllerServer
}

func (cs *ControllerServerWithValidator) CreateVolume(context context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
func (cs *ControllerServerWithValidator) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
if len(req.Name) == 0 {
return nil, status.Error(codes.InvalidArgument, "Name is required")
}
if len(req.VolumeCapabilities) == 0 {
return nil, status.Error(codes.InvalidArgument, "VolumeCapabilities is required")
}
return cs.ControllerServer.CreateVolume(context, req)
logger := klog.FromContext(ctx)
ctx = klog.NewContext(ctx, logger.WithValues("name", req.Name))
return cs.ControllerServer.CreateVolume(ctx, req)
}

func (cs *ControllerServerWithValidator) DeleteVolume(context context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
func (cs *ControllerServerWithValidator) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
if len(req.VolumeId) == 0 {
return nil, status.Error(codes.InvalidArgument, "VolumeId is required")
}
return cs.ControllerServer.DeleteVolume(context, req)
logger := klog.FromContext(ctx)
ctx = klog.NewContext(ctx, logger.WithValues("volumeID", req.VolumeId))
return cs.ControllerServer.DeleteVolume(ctx, req)
}

func (cs *ControllerServerWithValidator) ControllerPublishVolume(context context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
func (cs *ControllerServerWithValidator) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
if len(req.VolumeId) == 0 {
return nil, status.Error(codes.InvalidArgument, "VolumeId is required")
}
Expand All @@ -43,54 +48,66 @@ func (cs *ControllerServerWithValidator) ControllerPublishVolume(context context
if req.VolumeCapability == nil {
return nil, status.Error(codes.InvalidArgument, "VolumeCapability is required")
}
return cs.ControllerServer.ControllerPublishVolume(context, req)
logger := klog.FromContext(ctx)
ctx = klog.NewContext(ctx, logger.WithValues("volumeID", req.VolumeId, "nodeID", req.NodeId))
return cs.ControllerServer.ControllerPublishVolume(ctx, req)
}

func (cs *ControllerServerWithValidator) ControllerUnpublishVolume(context context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
func (cs *ControllerServerWithValidator) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
if len(req.VolumeId) == 0 {
return nil, status.Error(codes.InvalidArgument, "VolumeId is required")
}
if len(req.NodeId) == 0 {
return nil, status.Error(codes.InvalidArgument, "NodeId is required")
}
return cs.ControllerServer.ControllerUnpublishVolume(context, req)
logger := klog.FromContext(ctx)
ctx = klog.NewContext(ctx, logger.WithValues("volumeID", req.VolumeId, "nodeID", req.NodeId))
return cs.ControllerServer.ControllerUnpublishVolume(ctx, req)
}

func (cs *ControllerServerWithValidator) ValidateVolumeCapabilities(context context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
func (cs *ControllerServerWithValidator) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
if len(req.VolumeId) == 0 {
return nil, status.Error(codes.InvalidArgument, "VolumeId is required")
}
if len(req.VolumeCapabilities) == 0 {
return nil, status.Error(codes.InvalidArgument, "VolumeCapabilities is required")
}
return cs.ControllerServer.ValidateVolumeCapabilities(context, req)
logger := klog.FromContext(ctx)
ctx = klog.NewContext(ctx, logger.WithValues("volumeID", req.VolumeId))
return cs.ControllerServer.ValidateVolumeCapabilities(ctx, req)
}

func (cs *ControllerServerWithValidator) CreateSnapshot(context context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
func (cs *ControllerServerWithValidator) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
if len(req.SourceVolumeId) == 0 {
return nil, status.Error(codes.InvalidArgument, "SourceVolumeId is required")
}
if len(req.Name) == 0 {
return nil, status.Error(codes.InvalidArgument, "Name is required")
}
return cs.ControllerServer.CreateSnapshot(context, req)
logger := klog.FromContext(ctx)
ctx = klog.NewContext(ctx, logger.WithValues("name", req.Name, "source", req.SourceVolumeId))
return cs.ControllerServer.CreateSnapshot(ctx, req)
}

func (cs *ControllerServerWithValidator) DeleteSnapshot(context context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
func (cs *ControllerServerWithValidator) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
if len(req.SnapshotId) == 0 {
return nil, status.Error(codes.InvalidArgument, "SnapshotId is required")
}
return cs.ControllerServer.DeleteSnapshot(context, req)
logger := klog.FromContext(ctx)
ctx = klog.NewContext(ctx, logger.WithValues("SnapshotID", req.SnapshotId))
return cs.ControllerServer.DeleteSnapshot(ctx, req)
}

func (cs *ControllerServerWithValidator) ControllerExpandVolume(context context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
func (cs *ControllerServerWithValidator) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
if len(req.VolumeId) == 0 {
return nil, status.Error(codes.InvalidArgument, "VolumeId is required")
}
if req.CapacityRange == nil {
return nil, status.Error(codes.InvalidArgument, "CapacityRange is required")
}
return cs.ControllerServer.ControllerExpandVolume(context, req)
logger := klog.FromContext(ctx)
ctx = klog.NewContext(ctx, logger.WithValues("volumeID", req.VolumeId))
return cs.ControllerServer.ControllerExpandVolume(ctx, req)
}

type GenericControllerServer struct {
Expand Down
11 changes: 6 additions & 5 deletions pkg/common/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ import (
)

func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
klog.Infof("GRPC call: %s", info.FullMethod)
klog.V(4).Infof("GRPC request: %s", protosanitizer.StripSecrets(req))
resp, err := handler(ctx, req)
logger := klog.FromContext(ctx).WithValues("method", info.FullMethod)
logger.Info("GRPC call start")
logger.V(4).Info("GRPC request", "request", protosanitizer.StripSecrets(req))
resp, err := handler(klog.NewContext(ctx, logger), req)
if err != nil {
klog.Errorf("GRPC error: %v", err)
logger.Error(err, "GRPC error")
} else {
klog.V(4).Infof("GRPC response: %s", protosanitizer.StripSecrets(resp))
logger.V(4).Info("GRPC response", "response", protosanitizer.StripSecrets(resp))
}
return resp, err
}
Expand Down
28 changes: 20 additions & 8 deletions pkg/common/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func (s *NodeServerWithValidator) NodeStageVolume(ctx context.Context, req *csi.
if err != nil || !ok {
return nil, status.Errorf(codes.InvalidArgument, "Staging path %q is not a subpath of %s", req.StagingTargetPath, utils.KubeletRootDir)
}
logger := klog.FromContext(ctx)
ctx = klog.NewContext(ctx, logger.WithValues("volumeID", req.VolumeId))
return s.NodeServer.NodeStageVolume(ctx, req)
}

Expand All @@ -100,47 +102,57 @@ func (s *NodeServerWithValidator) NodePublishVolume(ctx context.Context, req *cs
if err != nil || !ok {
return nil, status.Errorf(codes.InvalidArgument, "Target path %q is not a subpath of %s", req.TargetPath, utils.KubeletRootDir)
}
logger := klog.FromContext(ctx)
ctx = klog.NewContext(ctx, logger.WithValues("volumeID", req.VolumeId))
return s.NodeServer.NodePublishVolume(ctx, req)
}

func (s *NodeServerWithValidator) NodeUnstageVolume(context context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
func (s *NodeServerWithValidator) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
if len(req.VolumeId) == 0 {
return nil, status.Error(codes.InvalidArgument, "VolumeId is required")
}
if len(req.StagingTargetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "StagingTargetPath is required")
}
return s.NodeServer.NodeUnstageVolume(context, req)
logger := klog.FromContext(ctx)
ctx = klog.NewContext(ctx, logger.WithValues("volumeID", req.VolumeId))
return s.NodeServer.NodeUnstageVolume(ctx, req)
}

func (s *NodeServerWithValidator) NodeUnpublishVolume(context context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
func (s *NodeServerWithValidator) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
if len(req.VolumeId) == 0 {
return nil, status.Error(codes.InvalidArgument, "VolumeId is required")
}
if len(req.TargetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "TargetPath is required")
}
return s.NodeServer.NodeUnpublishVolume(context, req)
logger := klog.FromContext(ctx)
ctx = klog.NewContext(ctx, logger.WithValues("volumeID", req.VolumeId))
return s.NodeServer.NodeUnpublishVolume(ctx, req)
}

func (s *NodeServerWithValidator) NodeGetVolumeStats(context context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
func (s *NodeServerWithValidator) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
if len(req.VolumeId) == 0 {
return nil, status.Error(codes.InvalidArgument, "VolumeId is required")
}
if len(req.VolumePath) == 0 {
return nil, status.Error(codes.InvalidArgument, "VolumePath is required")
}
return s.NodeServer.NodeGetVolumeStats(context, req)
logger := klog.FromContext(ctx)
ctx = klog.NewContext(ctx, logger.WithValues("volumeID", req.VolumeId))
return s.NodeServer.NodeGetVolumeStats(ctx, req)
}

func (s *NodeServerWithValidator) NodeExpandVolume(context context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
func (s *NodeServerWithValidator) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
if len(req.VolumeId) == 0 {
return nil, status.Error(codes.InvalidArgument, "VolumeId is required")
}
if len(req.VolumePath) == 0 {
return nil, status.Error(codes.InvalidArgument, "VolumePath is required")
}
return s.NodeServer.NodeExpandVolume(context, req)
logger := klog.FromContext(ctx)
ctx = klog.NewContext(ctx, logger.WithValues("volumeID", req.VolumeId))
return s.NodeServer.NodeExpandVolume(ctx, req)
}

func filepathContains(basePath, path string) (bool, error) {
Expand Down

0 comments on commit bb4873e

Please sign in to comment.