diff --git a/pkg/agent/cniserver/ipam/ipam_service.go b/pkg/agent/cniserver/ipam/ipam_service.go index 2fd98161100..60d36bf9526 100644 --- a/pkg/agent/cniserver/ipam/ipam_service.go +++ b/pkg/agent/cniserver/ipam/ipam_service.go @@ -54,6 +54,12 @@ func RegisterIPAMDriver(ipamType string, ipamDriver IPAMDriver) { ipamDrivers[ipamType] = append(ipamDrivers[ipamType], ipamDriver) } +func ResetIPAMDrivers(ipamType string) { + if ipamDrivers != nil { + delete(ipamDrivers, ipamType) + } +} + func argsFromEnv(cniArgs *cnipb.CniCmdArgs) *invoke.Args { return &invoke.Args{ ContainerID: cniArgs.ContainerId, diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index 41b1c4abaf7..691281fe1aa 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -196,7 +196,7 @@ func (s *CNIServer) loadNetworkConfig(request *cnipb.CniCmdRequest) (*CNIConfig, cniConfig.MTU = s.networkConfig.InterfaceMTU } cniConfig.CniCmdArgs = request.CniArgs - klog.V(3).Infof("Load network configurations: %v", cniConfig) + klog.V(3).InfoS("Loaded network configuration", "conf", cniConfig) return &cniConfig, nil } @@ -215,7 +215,7 @@ func (s *CNIServer) validateCNIAndIPAMType(cniConfig *CNIConfig) *cnipb.CniCmdRe return nil } if !ipam.IsIPAMTypeValid(ipamType) { - klog.Errorf("Unsupported IPAM type %s", ipamType) + klog.ErrorS(nil, "Unsupported IPAM type", "type", ipamType) return s.unsupportedFieldResponse("ipam/type", ipamType) } if s.enableBridgingMode { @@ -230,7 +230,7 @@ func (s *CNIServer) validateCNIAndIPAMType(cniConfig *CNIConfig) *cnipb.CniCmdRe return s.unsupportedFieldResponse("type", cniConfig.Type) } if ipamType != ipam.AntreaIPAMType { - klog.Errorf("Unsupported IPAM type %s", ipamType) + klog.ErrorS(nil, "Unsupported IPAM type", "type", ipamType) return s.unsupportedFieldResponse("ipam/type", ipamType) } // IPAM for an interface not managed by Antrea CNI. @@ -241,14 +241,14 @@ func (s *CNIServer) validateCNIAndIPAMType(cniConfig *CNIConfig) *cnipb.CniCmdRe func (s *CNIServer) validateRequestMessage(request *cnipb.CniCmdRequest) (*CNIConfig, *cnipb.CniCmdResponse) { cniConfig, err := s.loadNetworkConfig(request) if err != nil { - klog.Errorf("Failed to parse network configuration: %v", err) + klog.ErrorS(err, "Failed to parse network configuration") return nil, s.decodingFailureResponse("network config") } cniVersion := cniConfig.CNIVersion // Check if CNI version in the request is supported if !s.isCNIVersionSupported(cniVersion) { - klog.Errorf(fmt.Sprintf("Unsupported CNI version [%s], supported CNI versions %s", cniVersion, version.All.SupportedVersions())) + klog.ErrorS(nil, "Unsupported CNI version", "requested", cniVersion, "supported", version.All.SupportedVersions()) return nil, s.incompatibleCniVersionResponse(cniVersion) } @@ -344,19 +344,19 @@ func buildVersionSet() map[string]bool { func (s *CNIServer) parsePrevResultFromRequest(networkConfig *types.NetworkConfig) (*current.Result, *cnipb.CniCmdResponse) { if networkConfig.PrevResult == nil && networkConfig.RawPrevResult == nil { - klog.Errorf("Previous network configuration not specified") + klog.ErrorS(nil, "Previous network configuration not specified") return nil, s.unsupportedFieldResponse("prevResult", "") } if err := parsePrevResult(networkConfig); err != nil { - klog.Errorf("Failed to parse previous network configuration") + klog.ErrorS(err, "Failed to parse previous network configuration") return nil, s.decodingFailureResponse("prevResult") } // Convert whatever the result was into the current Result type (for the current CNI // version) prevResult, err := current.NewResultFromResult(networkConfig.PrevResult) if err != nil { - klog.Errorf("Failed to construct prevResult using previous network configuration") + klog.ErrorS(err, "Failed to construct prevResult using previous network configuration") return nil, s.unsupportedFieldResponse("prevResult", networkConfig.PrevResult) } prevResult.CNIVersion = networkConfig.CNIVersion @@ -372,7 +372,7 @@ func (s *CNIServer) validatePrevResult(cfgArgs *cnipb.CniCmdArgs, prevResult *cu // Find interfaces from previous configuration containerIntf := parseContainerIfaceFromResults(cfgArgs, prevResult) if containerIntf == nil { - klog.Errorf("Failed to find interface %s of container %s", cfgArgs.Ifname, containerID) + klog.ErrorS(nil, "Failed to find interface of container", "interface", cfgArgs.Ifname, "container", containerID) return s.invalidNetworkConfigResponse("prevResult does not match network configuration") } if err := s.podConfigurator.checkInterfaces( @@ -422,7 +422,7 @@ func (s *CNIServer) ipamCheck(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, erro } func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*cnipb.CniCmdResponse, error) { - klog.Infof("Received CmdAdd request %v", request) + klog.InfoS("Received CmdAdd request", "request", request) cniConfig, response := s.validateRequestMessage(request) if response != nil { return response, nil @@ -439,7 +439,7 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (* select { case <-time.After(networkReadyTimeout): - klog.Errorf("Cannot process CmdAdd request for container %v because network is not ready", cniConfig.ContainerId) + klog.ErrorS(nil, "Cannot process CmdAdd request for container because network is not ready", "container", cniConfig.ContainerId, "timeout", networkReadyTimeout) return s.tryAgainLaterResponse(), nil case <-s.networkReadyCh: } @@ -450,15 +450,15 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (* success := false defer func() { - // Rollback to delete configurations once ADD is failure. + // Rollback to delete configurations if ADD fails. if !success { if isInfraContainer { - klog.Warningf("CmdAdd for container %v failed, and try to rollback", cniConfig.ContainerId) - if _, err := s.CmdDel(ctx, request); err != nil { - klog.Warningf("Failed to rollback after CNI add failure: %v", err) + klog.InfoS("CmdAdd for container failed, trying to rollback", "container", cniConfig.ContainerId) + if _, err := s.cmdDel(ctx, cniConfig); err != nil { + klog.ErrorS(err, "Failed to rollback after CNI add failure", "container", cniConfig.ContainerId) } } else { - klog.Warningf("CmdAdd for container %v failed", cniConfig.ContainerId) + klog.InfoS("CmdAdd for container failed", "container", cniConfig.ContainerId) } } }() @@ -487,7 +487,7 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (* // Request IP Address from IPAM driver. ipamResult, err = ipam.ExecIPAMAdd(cniConfig.CniCmdArgs, cniConfig.K8sArgs, cniConfig.IPAM.Type, infraContainer) if err != nil { - klog.Errorf("Failed to request IP addresses for container %v: %v", cniConfig.ContainerId, err) + klog.ErrorS(err, "Failed to request IP addresses for container", "container", cniConfig.ContainerId) return s.ipamFailureResponse(err), nil } } @@ -514,13 +514,13 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (* isInfraContainer, s.containerAccess, ); err != nil { - klog.Errorf("Failed to configure interfaces for container %s: %v", cniConfig.ContainerId, err) + klog.ErrorS(err, "Failed to configure interfaces for container", "container", cniConfig.ContainerId) return s.configInterfaceFailureResponse(err), nil } cniVersion := cniConfig.CNIVersion cniResult, _ := result.Result.GetAsVersion(cniVersion) - klog.Infof("CmdAdd for container %v succeeded", cniConfig.ContainerId) + klog.InfoS("CmdAdd for container succeeded", "container", cniConfig.ContainerId) // mark success as true to avoid rollback success = true @@ -534,15 +534,7 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (* return resultToResponse(cniResult), nil } -func (s *CNIServer) CmdDel(_ context.Context, request *cnipb.CniCmdRequest) ( - *cnipb.CniCmdResponse, error) { - klog.Infof("Received CmdDel request %v", request) - - cniConfig, response := s.validateRequestMessage(request) - if response != nil { - return response, nil - } - +func (s *CNIServer) cmdDel(_ context.Context, cniConfig *CNIConfig) (*cnipb.CniCmdResponse, error) { infraContainer := cniConfig.getInfraContainer() s.containerAccess.lockContainer(infraContainer) defer s.containerAccess.unlockContainer(infraContainer) @@ -557,16 +549,16 @@ func (s *CNIServer) CmdDel(_ context.Context, request *cnipb.CniCmdRequest) ( } // Release IP to IPAM driver if err := ipam.ExecIPAMDelete(cniConfig.CniCmdArgs, cniConfig.K8sArgs, cniConfig.IPAM.Type, infraContainer); err != nil { - klog.Errorf("Failed to delete IP addresses for container %v: %v", cniConfig.ContainerId, err) + klog.ErrorS(err, "Failed to delete IP addresses for container", "container", cniConfig.ContainerId) return s.ipamFailureResponse(err), nil } - klog.Infof("Deleted IP addresses for container %v", cniConfig.ContainerId) + klog.InfoS("Deleted IP addresses for container", "container", cniConfig.ContainerId) // Remove host interface and OVS configuration if err := s.podConfigurator.removeInterfaces(cniConfig.ContainerId); err != nil { - klog.Errorf("Failed to remove interfaces for container %s: %v", cniConfig.ContainerId, err) + klog.ErrorS(err, "Failed to remove interfaces for container", "container", cniConfig.ContainerId) return s.configInterfaceFailureResponse(err), nil } - klog.Infof("CmdDel for container %v succeeded", cniConfig.ContainerId) + klog.InfoS("CmdDel for container succeeded", "container", cniConfig.ContainerId) if s.secondaryNetworkEnabled { podName := string(cniConfig.K8S_POD_NAME) podNamespace := string(cniConfig.K8S_POD_NAMESPACE) @@ -580,9 +572,20 @@ func (s *CNIServer) CmdDel(_ context.Context, request *cnipb.CniCmdRequest) ( return &cnipb.CniCmdResponse{CniResult: []byte("")}, nil } +func (s *CNIServer) CmdDel(ctx context.Context, request *cnipb.CniCmdRequest) (*cnipb.CniCmdResponse, error) { + klog.InfoS("Received CmdDel request", "request", request) + + cniConfig, response := s.validateRequestMessage(request) + if response != nil { + return response, nil + } + + return s.cmdDel(ctx, cniConfig) +} + func (s *CNIServer) CmdCheck(_ context.Context, request *cnipb.CniCmdRequest) ( *cnipb.CniCmdResponse, error) { - klog.Infof("Received CmdCheck request %v", request) + klog.InfoS("Received CmdCheck request", "request", request) cniConfig, response := s.validateRequestMessage(request) if response != nil { @@ -603,7 +606,7 @@ func (s *CNIServer) CmdCheck(_ context.Context, request *cnipb.CniCmdRequest) ( } if err := ipam.ExecIPAMCheck(cniConfig.CniCmdArgs, cniConfig.K8sArgs, cniConfig.IPAM.Type); err != nil { - klog.Errorf("Failed to check IPAM configuration for container %v: %v", cniConfig.ContainerId, err) + klog.ErrorS(err, "Failed to check IPAM configuration for container", "container", cniConfig.ContainerId) return s.ipamFailureResponse(err), nil } @@ -615,7 +618,7 @@ func (s *CNIServer) CmdCheck(_ context.Context, request *cnipb.CniCmdRequest) ( return response, nil } } - klog.Infof("CmdCheck for container %v succeeded", cniConfig.ContainerId) + klog.InfoS("CmdCheck for container succeeded", "container", cniConfig.ContainerId) return &cnipb.CniCmdResponse{CniResult: []byte("")}, nil } @@ -676,8 +679,8 @@ func (s *CNIServer) Initialize( } func (s *CNIServer) Run(stopCh <-chan struct{}) { - klog.Info("Starting CNI server") - defer klog.Info("Shutting down CNI server") + klog.InfoS("Starting CNI server") + defer klog.InfoS("Shutting down CNI server") listener, err := util.ListenLocalSocket(s.cniSocket) if err != nil { @@ -686,10 +689,10 @@ func (s *CNIServer) Run(stopCh <-chan struct{}) { rpcServer := grpc.NewServer() cnipb.RegisterCniServer(rpcServer, s) - klog.Info("CNI server is listening ...") + klog.InfoS("CNI server is listening ...") go func() { if err := rpcServer.Serve(listener); err != nil { - klog.Errorf("Failed to serve connections: %v", err) + klog.ErrorS(err, "Failed to serve connections") } }() <-stopCh @@ -699,10 +702,10 @@ func (s *CNIServer) Run(stopCh <-chan struct{}) { // be called prior to Antrea CNI to allocate IP and ports. Antrea takes allocated port // and hooks it to OVS br-int. func (s *CNIServer) interceptAdd(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, error) { - klog.Infof("CNI Chaining: add for container %s", cniConfig.ContainerId) + klog.InfoS("CNI Chaining: add for container", "container", cniConfig.ContainerId) prevResult, response := s.parsePrevResultFromRequest(cniConfig.NetworkConfig) if response != nil { - klog.Infof("Failed to parse prev result for container %s", cniConfig.ContainerId) + klog.InfoS("Failed to parse prev result", "container", cniConfig.ContainerId) return response, nil } podName := string(cniConfig.K8S_POD_NAME) @@ -737,7 +740,7 @@ func (s *CNIServer) interceptAdd(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, e } func (s *CNIServer) interceptDel(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, error) { - klog.Infof("CNI Chaining: delete for container %s", cniConfig.ContainerId) + klog.InfoS("CNI Chaining: delete for container", "container", cniConfig.ContainerId) return &cnipb.CniCmdResponse{CniResult: []byte("")}, s.podConfigurator.disconnectInterceptedInterface( string(cniConfig.K8S_POD_NAME), string(cniConfig.K8S_POD_NAMESPACE), @@ -745,7 +748,7 @@ func (s *CNIServer) interceptDel(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, e } func (s *CNIServer) interceptCheck(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, error) { - klog.Infof("CNI Chaining: check for container %s", cniConfig.ContainerId) + klog.InfoS("CNI Chaining: check for container", "container", cniConfig.ContainerId) // TODO, check for host interface setup later return &cnipb.CniCmdResponse{CniResult: []byte("")}, nil } @@ -754,7 +757,7 @@ func (s *CNIServer) interceptCheck(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, // installing Pod flows, so as part of this reconciliation process we retrieve the Pod list from the // K8s apiserver and replay the necessary flows. func (s *CNIServer) reconcile() error { - klog.Infof("Reconciliation for CNI server") + klog.InfoS("Reconciliation for CNI server") // For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from // the watch cache in kube-apiserver. pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{ diff --git a/pkg/agent/cniserver/server_test.go b/pkg/agent/cniserver/server_test.go index 6bfd4d29a82..cdf42faa4a3 100644 --- a/pkg/agent/cniserver/server_test.go +++ b/pkg/agent/cniserver/server_test.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "net" + "reflect" "strings" "testing" @@ -138,31 +139,77 @@ func checkErrorResponse(t *testing.T, resp *cnipb.CniCmdResponse, code cnipb.Err } } +// networkConfigIPAMMatcher is used to validate the IPAMConfig received by the IPAM driver mock. +type networkConfigIPAMMatcher struct { + ipamConfig *types.IPAMConfig +} + +func (m *networkConfigIPAMMatcher) Matches(x interface{}) bool { + var networkConfig types.NetworkConfig + if err := json.Unmarshal(x.([]byte), &networkConfig); err != nil { + return false + } + return reflect.DeepEqual(networkConfig.IPAM, m.ipamConfig) +} + +func (m *networkConfigIPAMMatcher) String() string { + return fmt.Sprintf("IPAMConfig is equal to %v", m.ipamConfig) +} + func TestIPAMService(t *testing.T) { - controller := gomock.NewController(t) - ipamMock := ipamtest.NewMockIPAMDriver(controller) - ipam.RegisterIPAMDriver(testIpamType, ipamMock) - cniServer := newCNIServer(t) - ifaceStore := interfacestore.NewInterfaceStore() - cniServer.podConfigurator = &podConfigurator{ifaceStore: ifaceStore} + networkCfg := generateNetworkConfiguration("", supportedCNIVersion, "", testIpamType) - require.True(t, ipam.IsIPAMTypeValid(testIpamType), "Failed to register IPAM service") - require.False(t, ipam.IsIPAMTypeValid("not_a_valid_IPAM_driver")) + setup := func(t *testing.T) (*ipamtest.MockIPAMDriver, *CNIServer, *cnipb.CniCmdRequest) { + // required to provide isolation between subtests + // note that subtests CANNOT share an instance of gomock.Controller + ipam.ResetIPAMDrivers(testIpamType) + controller := gomock.NewController(t) + ipamMock := ipamtest.NewMockIPAMDriver(controller) + ipam.RegisterIPAMDriver(testIpamType, ipamMock) + cniServer := newCNIServer(t) + ifaceStore := interfacestore.NewInterfaceStore() + cniServer.podConfigurator = &podConfigurator{ifaceStore: ifaceStore} + + require.True(t, ipam.IsIPAMTypeValid(testIpamType), "Failed to register IPAM service") + require.False(t, ipam.IsIPAMTypeValid("not_a_valid_IPAM_driver")) + requestMsg, _ := newRequest(args, networkCfg, "", t) + return ipamMock, cniServer, requestMsg + } // Test IPAM_Failure cases cxt := context.Background() - networkCfg := generateNetworkConfiguration("", supportedCNIVersion, "", testIpamType) - requestMsg, _ := newRequest(args, networkCfg, "", t) + + expectedIPAMConfig := &types.IPAMConfig{ + Type: testIpamType, + Ranges: []types.RangeSet{ + []types.Range{ + { + Subnet: testNodeConfig.PodIPv4CIDR.String(), + Gateway: testNodeConfig.GatewayConfig.IPv4.String(), + }, + }, + []types.Range{ + { + Subnet: testNodeConfig.PodIPv6CIDR.String(), + Gateway: testNodeConfig.GatewayConfig.IPv6.String(), + }, + }, + }, + } t.Run("Error on ADD", func(t *testing.T) { - ipamMock.EXPECT().Add(gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil, fmt.Errorf("IPAM add error")) - ipamMock.EXPECT().Del(gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil) + ipamMock, cniServer, requestMsg := setup(t) + ipamMock.EXPECT().Add(gomock.Any(), gomock.Any(), &networkConfigIPAMMatcher{expectedIPAMConfig}).Return(true, nil, fmt.Errorf("IPAM add error")) + // Del call triggered by automatic rollback. + // IPAMConfig should be the same for both calls (Add and Del). + ipamMock.EXPECT().Del(gomock.Any(), gomock.Any(), &networkConfigIPAMMatcher{expectedIPAMConfig}).Return(true, nil) response, err := cniServer.CmdAdd(cxt, requestMsg) require.Nil(t, err, "expected no rpc error") checkErrorResponse(t, response, cnipb.ErrorCode_IPAM_FAILURE, "IPAM add error") }) t.Run("Error on DEL", func(t *testing.T) { + ipamMock, cniServer, requestMsg := setup(t) // Prepare cached IPAM result which will be deleted later. ipamMock.EXPECT().Add(gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil, nil) cniConfig, _ := cniServer.validateRequestMessage(requestMsg) @@ -181,6 +228,7 @@ func TestIPAMService(t *testing.T) { }) t.Run("Error on CHECK", func(t *testing.T) { + ipamMock, cniServer, requestMsg := setup(t) ipamMock.EXPECT().Check(gomock.Any(), gomock.Any(), gomock.Any()).Return(true, fmt.Errorf("IPAM check error")) response, err := cniServer.CmdCheck(cxt, requestMsg) require.Nil(t, err, "expected no rpc error") @@ -188,6 +236,7 @@ func TestIPAMService(t *testing.T) { }) t.Run("Idempotent Call of IPAM ADD/DEL for the same Pod", func(t *testing.T) { + ipamMock, cniServer, requestMsg := setup(t) ipamMock.EXPECT().Add(gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil, nil) ipamMock.EXPECT().Del(gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil).Times(2) cniConfig, response := cniServer.validateRequestMessage(requestMsg) @@ -204,6 +253,7 @@ func TestIPAMService(t *testing.T) { }) t.Run("Idempotent Call of IPAM ADD/DEL for the same Pod with different containers", func(t *testing.T) { + ipamMock, cniServer, requestMsg := setup(t) ipamMock.EXPECT().Add(gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil, nil).Times(2) ipamMock.EXPECT().Del(gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil).Times(2) cniConfig, response := cniServer.validateRequestMessage(requestMsg)