Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix rollback invocation after CmdAdd failure in CNI server #5548

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/agent/cniserver/ipam/ipam_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ func RegisterIPAMDriver(ipamType string, ipamDriver IPAMDriver) {
ipamDrivers[ipamType] = append(ipamDrivers[ipamType], ipamDriver)
}

func ResetIPAMDrivers(ipamType string) {
if ipamDrivers != nil {
delete(ipamDrivers, ipamType)
}
}

func argsFromEnv(cniArgs *cnipb.CniCmdArgs) *invoke.Args {
return &invoke.Args{
ContainerID: cniArgs.ContainerId,
Expand Down
91 changes: 47 additions & 44 deletions pkg/agent/cniserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (s *CNIServer) loadNetworkConfig(request *cnipb.CniCmdRequest) (*CNIConfig,
cniConfig.MTU = s.networkConfig.InterfaceMTU
}
cniConfig.CniCmdArgs = request.CniArgs
klog.V(3).Infof("Load network configurations: %v", cniConfig)
klog.V(3).InfoS("Loaded network configuration", "conf", cniConfig)
return &cniConfig, nil
}

Expand All @@ -215,7 +215,7 @@ func (s *CNIServer) validateCNIAndIPAMType(cniConfig *CNIConfig) *cnipb.CniCmdRe
return nil
}
if !ipam.IsIPAMTypeValid(ipamType) {
klog.Errorf("Unsupported IPAM type %s", ipamType)
klog.ErrorS(nil, "Unsupported IPAM type", "type", ipamType)
return s.unsupportedFieldResponse("ipam/type", ipamType)
}
if s.enableBridgingMode {
Expand All @@ -230,7 +230,7 @@ func (s *CNIServer) validateCNIAndIPAMType(cniConfig *CNIConfig) *cnipb.CniCmdRe
return s.unsupportedFieldResponse("type", cniConfig.Type)
}
if ipamType != ipam.AntreaIPAMType {
klog.Errorf("Unsupported IPAM type %s", ipamType)
klog.ErrorS(nil, "Unsupported IPAM type", "type", ipamType)
return s.unsupportedFieldResponse("ipam/type", ipamType)
}
// IPAM for an interface not managed by Antrea CNI.
Expand All @@ -241,14 +241,14 @@ func (s *CNIServer) validateCNIAndIPAMType(cniConfig *CNIConfig) *cnipb.CniCmdRe
func (s *CNIServer) validateRequestMessage(request *cnipb.CniCmdRequest) (*CNIConfig, *cnipb.CniCmdResponse) {
cniConfig, err := s.loadNetworkConfig(request)
if err != nil {
klog.Errorf("Failed to parse network configuration: %v", err)
klog.ErrorS(err, "Failed to parse network configuration")
return nil, s.decodingFailureResponse("network config")
}

cniVersion := cniConfig.CNIVersion
// Check if CNI version in the request is supported
if !s.isCNIVersionSupported(cniVersion) {
klog.Errorf(fmt.Sprintf("Unsupported CNI version [%s], supported CNI versions %s", cniVersion, version.All.SupportedVersions()))
klog.ErrorS(nil, "Unsupported CNI version", "requested", cniVersion, "supported", version.All.SupportedVersions())
return nil, s.incompatibleCniVersionResponse(cniVersion)
}

Expand Down Expand Up @@ -344,19 +344,19 @@ func buildVersionSet() map[string]bool {

func (s *CNIServer) parsePrevResultFromRequest(networkConfig *types.NetworkConfig) (*current.Result, *cnipb.CniCmdResponse) {
if networkConfig.PrevResult == nil && networkConfig.RawPrevResult == nil {
klog.Errorf("Previous network configuration not specified")
klog.ErrorS(nil, "Previous network configuration not specified")
return nil, s.unsupportedFieldResponse("prevResult", "")
}

if err := parsePrevResult(networkConfig); err != nil {
klog.Errorf("Failed to parse previous network configuration")
klog.ErrorS(err, "Failed to parse previous network configuration")
return nil, s.decodingFailureResponse("prevResult")
}
// Convert whatever the result was into the current Result type (for the current CNI
// version)
prevResult, err := current.NewResultFromResult(networkConfig.PrevResult)
if err != nil {
klog.Errorf("Failed to construct prevResult using previous network configuration")
klog.ErrorS(err, "Failed to construct prevResult using previous network configuration")
return nil, s.unsupportedFieldResponse("prevResult", networkConfig.PrevResult)
}
prevResult.CNIVersion = networkConfig.CNIVersion
Expand All @@ -372,7 +372,7 @@ func (s *CNIServer) validatePrevResult(cfgArgs *cnipb.CniCmdArgs, prevResult *cu
// Find interfaces from previous configuration
containerIntf := parseContainerIfaceFromResults(cfgArgs, prevResult)
if containerIntf == nil {
klog.Errorf("Failed to find interface %s of container %s", cfgArgs.Ifname, containerID)
klog.ErrorS(nil, "Failed to find interface of container", "interface", cfgArgs.Ifname, "container", containerID)
return s.invalidNetworkConfigResponse("prevResult does not match network configuration")
}
if err := s.podConfigurator.checkInterfaces(
Expand Down Expand Up @@ -422,7 +422,7 @@ func (s *CNIServer) ipamCheck(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, erro
}

func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*cnipb.CniCmdResponse, error) {
klog.Infof("Received CmdAdd request %v", request)
klog.InfoS("Received CmdAdd request", "request", request)
cniConfig, response := s.validateRequestMessage(request)
if response != nil {
return response, nil
Expand All @@ -439,7 +439,7 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*

select {
case <-time.After(networkReadyTimeout):
klog.Errorf("Cannot process CmdAdd request for container %v because network is not ready", cniConfig.ContainerId)
klog.ErrorS(nil, "Cannot process CmdAdd request for container because network is not ready", "container", cniConfig.ContainerId, "timeout", networkReadyTimeout)
return s.tryAgainLaterResponse(), nil
case <-s.networkReadyCh:
}
Expand All @@ -450,15 +450,15 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*

success := false
defer func() {
// Rollback to delete configurations once ADD is failure.
// Rollback to delete configurations if ADD fails.
if !success {
if isInfraContainer {
klog.Warningf("CmdAdd for container %v failed, and try to rollback", cniConfig.ContainerId)
if _, err := s.CmdDel(ctx, request); err != nil {
klog.Warningf("Failed to rollback after CNI add failure: %v", err)
klog.InfoS("CmdAdd for container failed, trying to rollback", "container", cniConfig.ContainerId)
if _, err := s.cmdDel(ctx, cniConfig); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it is an improvement to directly use the existing "cniConfig" in cmdDel to avoid unnecessary call on loadNetworkConfig.

However, I didn't understand why the original code can append a duplicate PodCIDR range in the IPAM configuration. When we call function loadNetworkConfig in CmdDel/CmdAdd/CmdCheck, it has generated a new CNIConfig object, and the cniConfig.NetworkConfig is unmarshaled from the json stringrequest.CniArgs.NetworkConfiguration. So even if we call CmdDel in CmdAdd rollback, the cniConfig passed to IPAM plugin from a new call with loadNetworkConfig is supposed to equal to the values existing in CmdAdd.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code was calling validateRequest for the rollback (because CmdDel calls validateRequest), using the same request object. validateRequest actually mutates the request but it is not obvious when looking at the code:

  • loadNetworkConfig has the following assignment:
    cniConfig.CniCmdArgs = request.CniArgs
    It means that mutating the contents of cniConfig.CniCmdArgs (a pointer to a protobuf struct) will mutate the request.
  • updateLocalIPAMSubnet has the following assignment:
    cniConfig.NetworkConfiguration, _ = json.Marshal(cniConfig.NetworkConfig)
    cniConfig.NetworkConfiguration is actually the same as cniConfig.CniCmdArgs.NetworkConfiguration (it is a byte slice). So at this point we have mutated the request. The next time we call validateRequestMessage (which we no longer do with my patch), the network configuration of the request already includes the Node subnets in the IPAM section.

I confirmed all of this with my unit test.

Note that I don't know why we are mutating the request in the first place; I don't know if it's intentional, but I think we have been doing it for 4 years. Since using the existing cniConfig seemed like the right thing to do, that's what I did in this patch. We could consider a follow-up patch to clean up existing logic, but I didn't want to risk introducing a new bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify the statement above:

cniConfig.NetworkConfiguration is actually the same as cniConfig.CniCmdArgs.NetworkConfiguration

This is because CniCmdArgs is an embedded field:

type CNIConfig struct {
*types.NetworkConfig
// AntreaIPAM for an interface not managed by Antrea CNI.
secondaryNetworkIPAM bool
// CniCmdArgs received from the CNI plugin. IPAM data in CniCmdArgs can be updated with the
// Node's Pod CIDRs for NodeIPAM.
*cnipb.CniCmdArgs
// K8s CNI_ARGS passed to the CNI plugin.
*types.K8sArgs
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks for the clarification.

klog.ErrorS(err, "Failed to rollback after CNI add failure", "container", cniConfig.ContainerId)
}
} else {
klog.Warningf("CmdAdd for container %v failed", cniConfig.ContainerId)
klog.InfoS("CmdAdd for container failed", "container", cniConfig.ContainerId)
}
}
}()
Expand Down Expand Up @@ -487,7 +487,7 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*
// Request IP Address from IPAM driver.
ipamResult, err = ipam.ExecIPAMAdd(cniConfig.CniCmdArgs, cniConfig.K8sArgs, cniConfig.IPAM.Type, infraContainer)
if err != nil {
klog.Errorf("Failed to request IP addresses for container %v: %v", cniConfig.ContainerId, err)
klog.ErrorS(err, "Failed to request IP addresses for container", "container", cniConfig.ContainerId)
return s.ipamFailureResponse(err), nil
}
}
Expand All @@ -514,13 +514,13 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*
isInfraContainer,
s.containerAccess,
); err != nil {
klog.Errorf("Failed to configure interfaces for container %s: %v", cniConfig.ContainerId, err)
klog.ErrorS(err, "Failed to configure interfaces for container", "container", cniConfig.ContainerId)
return s.configInterfaceFailureResponse(err), nil
}
cniVersion := cniConfig.CNIVersion
cniResult, _ := result.Result.GetAsVersion(cniVersion)

klog.Infof("CmdAdd for container %v succeeded", cniConfig.ContainerId)
klog.InfoS("CmdAdd for container succeeded", "container", cniConfig.ContainerId)
// mark success as true to avoid rollback
success = true

Expand All @@ -534,15 +534,7 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*
return resultToResponse(cniResult), nil
}

func (s *CNIServer) CmdDel(_ context.Context, request *cnipb.CniCmdRequest) (
*cnipb.CniCmdResponse, error) {
klog.Infof("Received CmdDel request %v", request)

cniConfig, response := s.validateRequestMessage(request)
if response != nil {
return response, nil
}

func (s *CNIServer) cmdDel(_ context.Context, cniConfig *CNIConfig) (*cnipb.CniCmdResponse, error) {
infraContainer := cniConfig.getInfraContainer()
s.containerAccess.lockContainer(infraContainer)
defer s.containerAccess.unlockContainer(infraContainer)
Expand All @@ -557,16 +549,16 @@ func (s *CNIServer) CmdDel(_ context.Context, request *cnipb.CniCmdRequest) (
}
// Release IP to IPAM driver
if err := ipam.ExecIPAMDelete(cniConfig.CniCmdArgs, cniConfig.K8sArgs, cniConfig.IPAM.Type, infraContainer); err != nil {
klog.Errorf("Failed to delete IP addresses for container %v: %v", cniConfig.ContainerId, err)
klog.ErrorS(err, "Failed to delete IP addresses for container", "container", cniConfig.ContainerId)
return s.ipamFailureResponse(err), nil
}
klog.Infof("Deleted IP addresses for container %v", cniConfig.ContainerId)
klog.InfoS("Deleted IP addresses for container", "container", cniConfig.ContainerId)
// Remove host interface and OVS configuration
if err := s.podConfigurator.removeInterfaces(cniConfig.ContainerId); err != nil {
klog.Errorf("Failed to remove interfaces for container %s: %v", cniConfig.ContainerId, err)
klog.ErrorS(err, "Failed to remove interfaces for container", "container", cniConfig.ContainerId)
return s.configInterfaceFailureResponse(err), nil
}
klog.Infof("CmdDel for container %v succeeded", cniConfig.ContainerId)
klog.InfoS("CmdDel for container succeeded", "container", cniConfig.ContainerId)
if s.secondaryNetworkEnabled {
podName := string(cniConfig.K8S_POD_NAME)
podNamespace := string(cniConfig.K8S_POD_NAMESPACE)
Expand All @@ -580,9 +572,20 @@ func (s *CNIServer) CmdDel(_ context.Context, request *cnipb.CniCmdRequest) (
return &cnipb.CniCmdResponse{CniResult: []byte("")}, nil
}

func (s *CNIServer) CmdDel(ctx context.Context, request *cnipb.CniCmdRequest) (*cnipb.CniCmdResponse, error) {
klog.InfoS("Received CmdDel request", "request", request)

cniConfig, response := s.validateRequestMessage(request)
if response != nil {
return response, nil
}

return s.cmdDel(ctx, cniConfig)
}

func (s *CNIServer) CmdCheck(_ context.Context, request *cnipb.CniCmdRequest) (
*cnipb.CniCmdResponse, error) {
klog.Infof("Received CmdCheck request %v", request)
klog.InfoS("Received CmdCheck request", "request", request)

cniConfig, response := s.validateRequestMessage(request)
if response != nil {
Expand All @@ -603,7 +606,7 @@ func (s *CNIServer) CmdCheck(_ context.Context, request *cnipb.CniCmdRequest) (
}

if err := ipam.ExecIPAMCheck(cniConfig.CniCmdArgs, cniConfig.K8sArgs, cniConfig.IPAM.Type); err != nil {
klog.Errorf("Failed to check IPAM configuration for container %v: %v", cniConfig.ContainerId, err)
klog.ErrorS(err, "Failed to check IPAM configuration for container", "container", cniConfig.ContainerId)
return s.ipamFailureResponse(err), nil
}

Expand All @@ -615,7 +618,7 @@ func (s *CNIServer) CmdCheck(_ context.Context, request *cnipb.CniCmdRequest) (
return response, nil
}
}
klog.Infof("CmdCheck for container %v succeeded", cniConfig.ContainerId)
klog.InfoS("CmdCheck for container succeeded", "container", cniConfig.ContainerId)
return &cnipb.CniCmdResponse{CniResult: []byte("")}, nil
}

Expand Down Expand Up @@ -676,8 +679,8 @@ func (s *CNIServer) Initialize(
}

func (s *CNIServer) Run(stopCh <-chan struct{}) {
klog.Info("Starting CNI server")
defer klog.Info("Shutting down CNI server")
klog.InfoS("Starting CNI server")
defer klog.InfoS("Shutting down CNI server")

listener, err := util.ListenLocalSocket(s.cniSocket)
if err != nil {
Expand All @@ -686,10 +689,10 @@ func (s *CNIServer) Run(stopCh <-chan struct{}) {
rpcServer := grpc.NewServer()

cnipb.RegisterCniServer(rpcServer, s)
klog.Info("CNI server is listening ...")
klog.InfoS("CNI server is listening ...")
go func() {
if err := rpcServer.Serve(listener); err != nil {
klog.Errorf("Failed to serve connections: %v", err)
klog.ErrorS(err, "Failed to serve connections")
}
}()
<-stopCh
Expand All @@ -699,10 +702,10 @@ func (s *CNIServer) Run(stopCh <-chan struct{}) {
// be called prior to Antrea CNI to allocate IP and ports. Antrea takes allocated port
// and hooks it to OVS br-int.
func (s *CNIServer) interceptAdd(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, error) {
klog.Infof("CNI Chaining: add for container %s", cniConfig.ContainerId)
klog.InfoS("CNI Chaining: add for container", "container", cniConfig.ContainerId)
prevResult, response := s.parsePrevResultFromRequest(cniConfig.NetworkConfig)
if response != nil {
klog.Infof("Failed to parse prev result for container %s", cniConfig.ContainerId)
klog.InfoS("Failed to parse prev result", "container", cniConfig.ContainerId)
return response, nil
}
podName := string(cniConfig.K8S_POD_NAME)
Expand Down Expand Up @@ -737,15 +740,15 @@ func (s *CNIServer) interceptAdd(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, e
}

func (s *CNIServer) interceptDel(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, error) {
klog.Infof("CNI Chaining: delete for container %s", cniConfig.ContainerId)
klog.InfoS("CNI Chaining: delete for container", "container", cniConfig.ContainerId)
return &cnipb.CniCmdResponse{CniResult: []byte("")}, s.podConfigurator.disconnectInterceptedInterface(
string(cniConfig.K8S_POD_NAME),
string(cniConfig.K8S_POD_NAMESPACE),
cniConfig.ContainerId)
}

func (s *CNIServer) interceptCheck(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, error) {
klog.Infof("CNI Chaining: check for container %s", cniConfig.ContainerId)
klog.InfoS("CNI Chaining: check for container", "container", cniConfig.ContainerId)
// TODO, check for host interface setup later
return &cnipb.CniCmdResponse{CniResult: []byte("")}, nil
}
Expand All @@ -754,7 +757,7 @@ func (s *CNIServer) interceptCheck(cniConfig *CNIConfig) (*cnipb.CniCmdResponse,
// installing Pod flows, so as part of this reconciliation process we retrieve the Pod list from the
// K8s apiserver and replay the necessary flows.
func (s *CNIServer) reconcile() error {
klog.Infof("Reconciliation for CNI server")
klog.InfoS("Reconciliation for CNI server")
// For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from
// the watch cache in kube-apiserver.
pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
Expand Down
Loading