diff --git a/charts/templates/ovncni-ds.yaml b/charts/templates/ovncni-ds.yaml index 50e429a55bb..25cb4ddddad 100644 --- a/charts/templates/ovncni-ds.yaml +++ b/charts/templates/ovncni-ds.yaml @@ -78,6 +78,7 @@ spec: - --log_file_max_size=0 - --enable-metrics={{- .Values.networking.ENABLE_METRICS }} - --kubelet-dir={{ .Values.kubelet_conf.KUBELET_DIR }} + - --enable-tproxy={{ .Values.func.ENABLE_TPROXY }} securityContext: runAsUser: 0 privileged: true diff --git a/charts/values.yaml b/charts/values.yaml index c8d5b720f86..dc88a8e52df 100644 --- a/charts/values.yaml +++ b/charts/values.yaml @@ -61,6 +61,7 @@ func: LOGICAL_GATEWAY: false ENABLE_BIND_LOCAL_IP: true U2O_INTERCONNECTION: false + ENABLE_TPROXY: false ipv4: POD_CIDR: "10.16.0.0/16" diff --git a/cmd/daemon/cniserver.go b/cmd/daemon/cniserver.go index 6bb72fbb804..413f4fb1cdb 100644 --- a/cmd/daemon/cniserver.go +++ b/cmd/daemon/cniserver.go @@ -18,7 +18,6 @@ import ( "k8s.io/klog/v2" "k8s.io/sample-controller/pkg/signals" - kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions" "github.com/kubeovn/kube-ovn/pkg/daemon" "github.com/kubeovn/kube-ovn/pkg/util" @@ -96,19 +95,7 @@ func CmdMain() { mux.HandleFunc("/debug/pprof/trace", pprof.Trace) } - addr := "0.0.0.0" - if os.Getenv("ENABLE_BIND_LOCAL_IP") == "true" { - podIpsEnv := os.Getenv("POD_IPS") - podIps := strings.Split(podIpsEnv, ",") - // when pod in dual mode, golang can't support bind v4 and v6 address in the same time, - // so not support bind local ip when in dual mode - if len(podIps) == 1 { - addr = podIps[0] - if util.CheckProtocol(podIps[0]) == kubeovnv1.ProtocolIPv6 { - addr = fmt.Sprintf("[%s]", podIps[0]) - } - } - } + addr := util.GetDefaultListenAddr() if config.EnableVerboseConnCheck { go func() { diff --git a/dist/images/install.sh b/dist/images/install.sh index 7bafb65eb58..3b6a13b6c3c 100755 --- a/dist/images/install.sh +++ b/dist/images/install.sh @@ -32,6 +32,7 @@ IFACE=${IFACE:-} # Note that the dpdk tunnel iface and tunnel ip cidr should be diffierent with Kubernetes api cidr, otherwise the route will be a problem. DPDK_TUNNEL_IFACE=${DPDK_TUNNEL_IFACE:-br-phy} ENABLE_BIND_LOCAL_IP=${ENABLE_BIND_LOCAL_IP:-true} +ENABLE_TPROXY=${ENABLE_TPROXY:-false} # debug DEBUG_WRAPPER=${DEBUG_WRAPPER:-} @@ -4037,6 +4038,7 @@ spec: - --log_file=/var/log/kube-ovn/kube-ovn-cni.log - --log_file_max_size=0 - --kubelet-dir=$KUBELET_DIR + - --enable-tproxy=$ENABLE_TPROXY securityContext: runAsUser: 0 privileged: true diff --git a/dist/images/uninstall.sh b/dist/images/uninstall.sh index 75b8040918f..86516b25405 100644 --- a/dist/images/uninstall.sh +++ b/dist/images/uninstall.sh @@ -21,6 +21,12 @@ iptables -t filter -D FORWARD -m set --match-set ovn40subnets src -j ACCEPT iptables -t filter -D FORWARD -m set --match-set ovn40services dst -j ACCEPT iptables -t filter -D FORWARD -m set --match-set ovn40services src -j ACCEPT iptables -t filter -D OUTPUT -p udp -m udp --dport 6081 -j MARK --set-xmark 0x0 +iptables -t mangle -D PREROUTING -m comment --comment "kube-ovn prerouting rules" -j OVN-PREROUTING +iptables -t mangle -D OUTPUT -m comment --comment "kube-ovn output rules" -j OVN-OUTPUT +iptables -t mangle -F OVN-PREROUTING +iptables -t mangle -X OVN-PREROUTING +iptables -t mangle -F OVN-OUTPUT +iptables -t mangle -X OVN-OUTPUT sleep 1 @@ -51,6 +57,12 @@ ip6tables -t filter -D FORWARD -m set --match-set ovn60subnets src -j ACCEPT ip6tables -t filter -D FORWARD -m set --match-set ovn60services dst -j ACCEPT ip6tables -t filter -D FORWARD -m set --match-set ovn60services src -j ACCEPT ip6tables -t filter -D OUTPUT -p udp -m udp --dport 6081 -j MARK --set-xmark 0x0 +ip6tables -t mangle -D PREROUTING -m comment --comment "kube-ovn prerouting rules" -j OVN-PREROUTING +ip6tables -t mangle -D OUTPUT -m comment --comment "kube-ovn output rules" -j OVN-OUTPUT +ip6tables -t mangle -F OVN-PREROUTING +ip6tables -t mangle -X OVN-PREROUTING +ip6tables -t mangle -F OVN-OUTPUT +ip6tables -t mangle -X OVN-OUTPUT sleep 1 diff --git a/pkg/daemon/config.go b/pkg/daemon/config.go index 982df611696..ca694dd6f1a 100644 --- a/pkg/daemon/config.go +++ b/pkg/daemon/config.go @@ -63,6 +63,7 @@ type Configuration struct { EnableVerboseConnCheck bool TCPConnCheckPort int UDPConnCheckPort int + EnableTProxy bool } // ParseFlags will parse cmd args then init kubeClient and configuration @@ -100,6 +101,7 @@ func ParseFlags() *Configuration { argEnableVerboseConnCheck = pflag.Bool("enable-verbose-conn-check", false, "enable TCP/UDP connectivity check listen port") argTCPConnectivityCheckPort = pflag.Int("tcp-conn-check-port", 8100, "TCP connectivity Check Port") argUDPConnectivityCheckPort = pflag.Int("udp-conn-check-port", 8101, "UDP connectivity Check Port") + argEnableTProxy = pflag.Bool("enable-tproxy", false, "enable tproxy for vpc pod liveness or readiness probe") ) // mute info log for ipset lib @@ -154,6 +156,7 @@ func ParseFlags() *Configuration { EnableVerboseConnCheck: *argEnableVerboseConnCheck, TCPConnCheckPort: *argTCPConnectivityCheckPort, UDPConnCheckPort: *argUDPConnectivityCheckPort, + EnableTProxy: *argEnableTProxy, } return config } diff --git a/pkg/daemon/controller.go b/pkg/daemon/controller.go index a141bb1f032..97f4d714fd9 100644 --- a/pkg/daemon/controller.go +++ b/pkg/daemon/controller.go @@ -625,6 +625,17 @@ func (c *Controller) Run(stopCh <-chan struct{}) { } }, 5*time.Minute, stopCh) + if c.config.EnableTProxy { + go c.StartTProxyForwarding() + go wait.Until(c.runTProxyConfigWorker, 3*time.Second, stopCh) + // Using the tproxy method, kubelet's TCP probe packets cannot reach the namespace of the pod of the custom VPC, + // so tproxy itself probes the pod of the custom VPC, if probe failed remove the iptable rules from + // kubelet to tproxy, if probe success recover the iptable rules + go wait.Until(c.StartTProxyTCPPortProbe, 1*time.Second, stopCh) + } else { + c.cleanTProxyConfig() + } + <-stopCh klog.Info("Shutting down workers") } diff --git a/pkg/daemon/gateway.go b/pkg/daemon/gateway.go index 9409c078cde..7ecca9ce755 100644 --- a/pkg/daemon/gateway.go +++ b/pkg/daemon/gateway.go @@ -3,6 +3,7 @@ package daemon import ( "fmt" "os/exec" + "sort" "strings" v1 "k8s.io/api/core/v1" @@ -246,3 +247,44 @@ func (c *Controller) getEgressNatIpByNode(nodeName string) (map[string]string, e } return subnetsNatIp, nil } + +func (c *Controller) getTProxyConditionPod(needSort bool) ([]*v1.Pod, error) { + + var filteredPods []*v1.Pod + pods, err := c.podsLister.List(labels.Everything()) + if err != nil { + klog.Errorf("list pods failed, %v", err) + return nil, err + } + + for _, pod := range pods { + if pod.Spec.NodeName != c.config.NodeName { + continue + } + + subnetName, ok := pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, util.OvnProvider)] + if !ok { + continue + } + + subnet, err := c.subnetsLister.Get(subnetName) + if err != nil { + err = fmt.Errorf("failed to get subnet '%s', err: %v", subnetName, err) + return nil, err + } + + if subnet.Spec.Vpc == c.config.ClusterRouter { + continue + } + + filteredPods = append(filteredPods, pod) + } + + if needSort { + sort.Slice(filteredPods, func(i, j int) bool { + return filteredPods[i].Namespace+"/"+filteredPods[i].Name < filteredPods[j].Namespace+"/"+filteredPods[j].Name + }) + } + + return filteredPods, nil +} diff --git a/pkg/daemon/gateway_linux.go b/pkg/daemon/gateway_linux.go index 1a59f67da98..45dc72a4ba7 100644 --- a/pkg/daemon/gateway_linux.go +++ b/pkg/daemon/gateway_linux.go @@ -40,10 +40,13 @@ const ( const ( NAT = "nat" + MANGLE = "mangle" Prerouting = "PREROUTING" Postrouting = "POSTROUTING" + Output = "OUTPUT" OvnPrerouting = "OVN-PREROUTING" OvnPostrouting = "OVN-POSTROUTING" + OvnOutput = "OVN-OUTPUT" OvnMasquerade = "OVN-MASQUERADE" OvnNatOutGoingPolicy = "OVN-NAT-POLICY" OvnNatOutGoingPolicySubnet = "OVN-NAT-PSUBNET-" @@ -52,6 +55,10 @@ const ( const ( OnOutGoingNatMark = "0x90001/0x90001" OnOutGoingForwardMark = "0x90002/0x90002" + TProxyOutputMark = 0x90003 + TProxyOutputMask = 0x90003 + TProxyPreroutingMark = 0x90004 + TProxyPreroutingMask = 0x90004 ) type policyRouteMeta struct { @@ -584,9 +591,11 @@ func (c *Controller) setIptables() error { } ) protocols := make([]string, 2) + isDual := false if c.protocol == kubeovnv1.ProtocolDual { protocols[0] = kubeovnv1.ProtocolIPv4 protocols[1] = kubeovnv1.ProtocolIPv6 + isDual = true } else { protocols[0] = c.protocol } @@ -733,6 +742,10 @@ func (c *Controller) setIptables() error { return err } + if err = c.reconcileTProxyIPTableRules(protocol, isDual); err != nil { + return err + } + if err = c.updateIptablesChain(ipt, NAT, OvnPrerouting, Prerouting, natPreroutingRules); err != nil { klog.Errorf("failed to update chain %s/%s: %v", NAT, OvnPrerouting) return err @@ -754,6 +767,125 @@ func (c *Controller) setIptables() error { return nil } +func (c *Controller) reconcileTProxyIPTableRules(protocol string, isDual bool) error { + if !c.config.EnableTProxy { + return nil + } + + ipt := c.iptables[protocol] + tproxyPreRoutingRules := make([]util.IPTableRule, 0) + tproxyOutputRules := make([]util.IPTableRule, 0) + probePorts := strset.New() + + pods, err := c.getTProxyConditionPod(true) + if err != nil { + return err + } + + for _, pod := range pods { + var podIP string + for _, ip := range pod.Status.PodIPs { + if util.CheckProtocol(ip.IP) == protocol { + podIP = ip.IP + break + } + } + + if podIP == "" { + continue + } + + for _, container := range pod.Spec.Containers { + if container.ReadinessProbe != nil { + if httpGet := container.ReadinessProbe.HTTPGet; httpGet != nil { + if port := httpGet.Port.String(); port != "" { + probePorts.Add(port) + } + } + + if tcpSocket := container.ReadinessProbe.TCPSocket; tcpSocket != nil { + if port := tcpSocket.Port.String(); port != "" { + if isTCPProbePortReachable, ok := customVPCPodTCPProbeIPPort.Load(getIPPortString(podIP, port)); ok { + if isTCPProbePortReachable.(bool) { + probePorts.Add(port) + } + } + } + } + } + + if container.LivenessProbe != nil { + if httpGet := container.LivenessProbe.HTTPGet; httpGet != nil { + if port := httpGet.Port.String(); port != "" { + probePorts.Add(port) + } + } + + if tcpSocket := container.LivenessProbe.TCPSocket; tcpSocket != nil { + if port := tcpSocket.Port.String(); port != "" { + if isTCPProbePortReachable, ok := customVPCPodTCPProbeIPPort.Load(getIPPortString(podIP, port)); ok { + if isTCPProbePortReachable.(bool) { + probePorts.Add(port) + } + } + } + } + } + } + + if probePorts.IsEmpty() { + continue + } + + probePortList := probePorts.List() + sort.Strings(probePortList) + for _, probePort := range probePortList { + tProxyOutputMarkMask := fmt.Sprintf("%#x/%#x", TProxyOutputMark, TProxyOutputMask) + tProxyPreRoutingMarkMask := fmt.Sprintf("%#x/%#x", TProxyPreroutingMark, TProxyPreroutingMask) + + hostIP := pod.Status.HostIP + prefixLen := 32 + if protocol == kubeovnv1.ProtocolIPv6 { + prefixLen = 128 + } + + if isDual || os.Getenv("ENABLE_BIND_LOCAL_IP") == "false" { + if protocol == kubeovnv1.ProtocolIPv4 { + hostIP = "0.0.0.0" + } else if protocol == kubeovnv1.ProtocolIPv6 { + hostIP = "::" + } + } + tproxyOutputRules = append(tproxyOutputRules, util.IPTableRule{Table: MANGLE, Chain: OvnOutput, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %s -j MARK --set-xmark %s`, podIP, prefixLen, probePort, tProxyOutputMarkMask))}) + tproxyPreRoutingRules = append(tproxyPreRoutingRules, util.IPTableRule{Table: MANGLE, Chain: OvnPrerouting, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %s -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP, prefixLen, probePort, util.TProxyListenPort, hostIP, tProxyPreRoutingMarkMask))}) + } + } + + if err := c.updateIptablesChain(ipt, MANGLE, OvnPrerouting, Prerouting, tproxyPreRoutingRules); err != nil { + klog.Errorf("failed to update chain %s with rules %v: %v", OvnPrerouting, tproxyPreRoutingRules, err) + return err + } + + if err := c.updateIptablesChain(ipt, MANGLE, OvnOutput, Output, tproxyOutputRules); err != nil { + klog.Errorf("failed to update chain %s with rules %v: %v", OvnOutput, tproxyOutputRules, err) + return err + } + return nil +} + +func (c *Controller) cleanTProxyIPTableRules(protocol string) { + ipt := c.iptables[protocol] + if ipt == nil { + return + } + for _, chain := range [2]string{OvnPrerouting, OvnOutput} { + if err := ipt.ClearChain(MANGLE, chain); err != nil { + klog.Errorf("failed to clear iptables chain %v in table %v, %+v", chain, MANGLE, err) + return + } + } +} + func (c *Controller) reconcileNatOutgoingPolicyIptablesChain(protocol string) error { ipt := c.iptables[protocol] diff --git a/pkg/daemon/tproxy_linux.go b/pkg/daemon/tproxy_linux.go new file mode 100644 index 00000000000..d392bf1391b --- /dev/null +++ b/pkg/daemon/tproxy_linux.go @@ -0,0 +1,381 @@ +package daemon + +import ( + "errors" + "fmt" + "io" + "net" + "strconv" + "strings" + "sync" + "syscall" + + "github.com/containernetworking/plugins/pkg/ns" + "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" + "k8s.io/klog/v2" + + kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" + "github.com/kubeovn/kube-ovn/pkg/ovs" + goTProxy "github.com/kubeovn/kube-ovn/pkg/tproxy" + "github.com/kubeovn/kube-ovn/pkg/util" + "github.com/scylladb/go-set/strset" +) + +var ( + tcpListener net.Listener + + customVPCPodIPToNs sync.Map + customVPCPodTCPProbeIPPort sync.Map +) + +func (c *Controller) StartTProxyForwarding() { + var err error + addr := util.GetDefaultListenAddr() + + protocol := "tcp" + if strings.HasPrefix(addr, "[") && strings.HasSuffix(addr, "]") { + addr = addr[1 : len(addr)-1] + protocol = "tcp6" + } + + tcpListener, err = goTProxy.ListenTCP(protocol, &net.TCPAddr{IP: net.ParseIP(addr), Port: util.TProxyListenPort}) + if err != nil { + klog.Fatalf("Encountered error while binding listener: %s", err) + return + } + + defer func() { + if err := tcpListener.Close(); err != nil { + klog.Errorf("Error tcpListener Close err: %v ", err) + } + }() + + for { + conn, err := tcpListener.Accept() + if err != nil { + klog.Fatalf("Unrecoverable error while accepting connection: %s", err) + return + } + go handleRedirectFlow(conn) + } +} + +func (c *Controller) StartTProxyTCPPortProbe() { + + probePorts := strset.New() + + pods, err := c.getTProxyConditionPod(false) + if err != nil { + return + } + + for _, pod := range pods { + iface := ovs.PodNameToPortName(pod.Name, pod.Namespace, util.OvnProvider) + nsName, err := ovs.GetInterfacePodNs(iface) + if err != nil || nsName == "" { + klog.Infof("iface %s's namespace not found", iface) + continue + } + + for _, podIP := range pod.Status.PodIPs { + customVPCPodIPToNs.Store(podIP.IP, nsName) + for _, container := range pod.Spec.Containers { + if container.ReadinessProbe != nil { + if tcpSocket := container.ReadinessProbe.TCPSocket; tcpSocket != nil { + if port := tcpSocket.Port.String(); port != "" { + probePorts.Add(port) + } + } + } + + if container.LivenessProbe != nil { + if tcpSocket := container.LivenessProbe.TCPSocket; tcpSocket != nil { + if port := tcpSocket.Port.String(); port != "" { + probePorts.Add(port) + } + } + } + } + + probePortsList := probePorts.List() + for _, port := range probePortsList { + probePortInNs(podIP.IP, port, true, nil) + } + } + } +} + +func (c *Controller) runTProxyConfigWorker() { + protocols := getProtocols(c.protocol) + for _, protocol := range protocols { + c.reconcileTProxyRoutes(protocol) + } +} + +func (c *Controller) reconcileTProxyRoutes(protocol string) { + family, err := util.ProtocolToFamily(protocol) + if err != nil { + klog.Errorf("get Protocol %s family failed", protocol) + return + } + + if err := addRuleIfNotExist(family, TProxyOutputMark, TProxyOutputMask, util.TProxyRouteTable); err != nil { + klog.Error("add output rule failed err:%v ", err) + return + } + + if err := addRuleIfNotExist(family, TProxyPreroutingMark, TProxyPreroutingMask, util.TProxyRouteTable); err != nil { + klog.Error("add prerouting rule failed err:%v ", err) + return + } + + dst := GetDefaultRouteDst(protocol) + if err := addRouteIfNotExist(family, util.TProxyRouteTable, &dst); err != nil { + klog.Error("add tproxy route failed err:%v ", err) + return + } +} + +func (c *Controller) cleanTProxyConfig() { + protocols := getProtocols(c.protocol) + for _, protocol := range protocols { + c.cleanTProxyRoutes(protocol) + c.cleanTProxyIPTableRules(protocol) + } +} + +func (c *Controller) cleanTProxyRoutes(protocol string) { + family, err := util.ProtocolToFamily(protocol) + if err != nil { + klog.Errorf("get Protocol %s family failed", protocol) + return + } + + if err := deleteRuleIfExists(family, TProxyOutputMark); err != nil { + klog.Errorf("delete tproxy route rule mark %v failed err: %v ", TProxyOutputMark, err) + } + + if err := deleteRuleIfExists(family, TProxyPreroutingMark); err != nil { + klog.Errorf("delete tproxy route rule mark %v failed err: %v ", TProxyPreroutingMark, err) + } + + dst := GetDefaultRouteDst(protocol) + if err := delRouteIfExist(family, util.TProxyRouteTable, &dst); err != nil { + klog.Errorf("delete tproxy route rule mark %v failed err: %v ", TProxyPreroutingMark, err) + } +} + +func addRuleIfNotExist(family, mark, mask, table int) error { + curRules, err := netlink.RuleListFiltered(family, &netlink.Rule{Mark: mark}, netlink.RT_FILTER_MARK) + if err != nil { + return fmt.Errorf("list rules with mark %x failed err: %v", mark, err) + } + + if len(curRules) != 0 { + return nil + } + + rule := netlink.NewRule() + rule.Mark = mark + rule.Mask = mask + rule.Table = table + rule.Family = family + + if err = netlink.RuleAdd(rule); err != nil && !errors.Is(err, syscall.EEXIST) { + klog.Errorf("add rule %v failed with err %v ", rule, err) + return err + } + + return nil +} + +func deleteRuleIfExists(family, mark int) error { + curRules, err := netlink.RuleListFiltered(family, &netlink.Rule{Mark: mark}, netlink.RT_FILTER_MARK) + if err != nil { + return fmt.Errorf("list rules with mark %x failed err: %v", mark, err) + } + + if len(curRules) != 0 { + for _, r := range curRules { + if err := netlink.RuleDel(&r); err != nil && !errors.Is(err, syscall.ENOENT) { + return fmt.Errorf("delete rule %v failed with err: %v", r, err) + } + } + } + return nil +} + +func addRouteIfNotExist(family, table int, dst *net.IPNet) error { + curRoutes, err := netlink.RouteListFiltered(family, &netlink.Route{Table: table, Dst: dst}, netlink.RT_FILTER_TABLE|netlink.RT_FILTER_DST) + if err != nil { + return fmt.Errorf("list routes with table %d failed with err: %v", table, err) + } + + if len(curRoutes) != 0 { + return nil + } + + link, err := netlink.LinkByName("lo") + if err != nil { + return fmt.Errorf("can't find device lo") + } + + route := netlink.Route{ + LinkIndex: link.Attrs().Index, + Dst: dst, + Table: table, + Scope: unix.RT_SCOPE_HOST, + Type: unix.RTN_LOCAL, + } + + if err = netlink.RouteReplace(&route); err != nil && !errors.Is(err, syscall.EEXIST) { + klog.Errorf("add route %v failed with err %v ", route, err) + return err + } + + return nil +} + +func delRouteIfExist(family, table int, dst *net.IPNet) error { + curRoutes, err := netlink.RouteListFiltered(family, &netlink.Route{Table: table}, netlink.RT_FILTER_TABLE) + if err != nil { + klog.Error("list routes with table %d failed with err: %v", table, err) + return err + } + + if len(curRoutes) == 0 { + return nil + } + + link, err := netlink.LinkByName("lo") + if err != nil { + return fmt.Errorf("can't find device lo") + } + + route := netlink.Route{ + LinkIndex: link.Attrs().Index, + Dst: dst, + Table: table, + Scope: unix.RT_SCOPE_HOST, + Type: unix.RTN_LOCAL, + } + + if err = netlink.RouteDel(&route); err != nil && !errors.Is(err, syscall.ENOENT) { + klog.Errorf("del route %v failed with err %v ", route, err) + return err + } + + return nil +} + +func handleRedirectFlow(conn net.Conn) { + + klog.V(5).Infof("Accepting TCP connection from %v with destination of %v", conn.RemoteAddr().String(), conn.LocalAddr().String()) + defer func() { + if err := conn.Close(); err != nil { + klog.Errorf("conn Close err: %v ", err) + } + }() + + podIPPort := conn.LocalAddr().String() + podIP, probePort, err := net.SplitHostPort(podIPPort) + if err != nil { + klog.Errorf("Get %s Pod IP and Port failed err: %v", podIPPort, err) + return + } + + probePortInNs(podIP, probePort, false, conn) +} + +func probePortInNs(podIP, probePort string, isTProxyProbe bool, conn net.Conn) { + podNs, ok := customVPCPodIPToNs.Load(podIP) + if !ok { + return + } + + iprobePort, err := strconv.Atoi(probePort) + if err != nil { + return + } + + podNS, err := ns.GetNS(podNs.(string)) + if err != nil { + customVPCPodIPToNs.Delete(podIP) + klog.Infof("ns %s already deleted", podNs) + return + } + + _ = ns.WithNetNSPath(podNS.Path(), func(_ ns.NetNS) error { + // Packet's src and dst IP are both PodIP in netns + localpodTcpAddr := net.TCPAddr{IP: net.ParseIP(podIP)} + remotepodTcpAddr := net.TCPAddr{IP: net.ParseIP(podIP), Port: iprobePort} + + remoteConn, err := goTProxy.DialTCP(&localpodTcpAddr, &remotepodTcpAddr, !isTProxyProbe) + if err != nil { + if isTProxyProbe { + customVPCPodTCPProbeIPPort.Store(getIPPortString(podIP, probePort), false) + } + return nil + } + + if isTProxyProbe { + customVPCPodTCPProbeIPPort.Store(getIPPortString(podIP, probePort), true) + return nil + } + + defer func() { + if err := remoteConn.Close(); err != nil { + klog.Errorf("remoteConn %v Close err: %v ", remoteConn, err) + } + }() + + var streamWait sync.WaitGroup + streamWait.Add(2) + + streamConn := func(dst io.Writer, src io.Reader) { + if _, err := io.Copy(dst, src); err != nil { + klog.Errorf("copy stream from dst %v to src %v failed err: %v ", dst, src, err) + } + + streamWait.Done() + } + + go streamConn(remoteConn, conn) + go streamConn(conn, remoteConn) + + streamWait.Wait() + return nil + }) +} + +func getIPPortString(podIP, port string) string { + return fmt.Sprintf("%s|%s", podIP, port) +} + +func getProtocols(protocol string) []string { + var protocols []string + if protocol == kubeovnv1.ProtocolDual { + protocols = append(protocols, kubeovnv1.ProtocolIPv4) + protocols = append(protocols, kubeovnv1.ProtocolIPv6) + } else { + protocols = append(protocols, protocol) + } + return protocols +} + +func GetDefaultRouteDst(protocol string) net.IPNet { + var dst net.IPNet + if protocol == kubeovnv1.ProtocolIPv4 { + dst = net.IPNet{ + IP: net.IPv4zero, + Mask: net.CIDRMask(0, 0), + } + } else if protocol == kubeovnv1.ProtocolIPv6 { + dst = net.IPNet{ + IP: net.IPv6zero, + Mask: net.CIDRMask(0, 0), + } + } + return dst +} diff --git a/pkg/daemon/tproxy_windows.go b/pkg/daemon/tproxy_windows.go new file mode 100644 index 00000000000..639b80e8305 --- /dev/null +++ b/pkg/daemon/tproxy_windows.go @@ -0,0 +1,13 @@ +package daemon + +func (c *Controller) StartTProxyForwarding() { +} + +func (c *Controller) StartTProxyTCPPortProbe() { +} + +func (c *Controller) runTProxyConfigWorker() { +} + +func (c *Controller) cleanTProxyConfig() { +} diff --git a/pkg/ovs/ovs-vsctl.go b/pkg/ovs/ovs-vsctl.go index d8cfb813267..7fe9f9dd19d 100644 --- a/pkg/ovs/ovs-vsctl.go +++ b/pkg/ovs/ovs-vsctl.go @@ -3,6 +3,7 @@ package ovs import ( "fmt" "os/exec" + "regexp" "strconv" "strings" "time" @@ -15,6 +16,8 @@ import ( // Glory belongs to openvswitch/ovn-kubernetes // https://github.com/openvswitch/ovn-kubernetes/blob/master/go-controller/pkg/util/ovs.go +var podNetNsRegexp = regexp.MustCompile(`pod_netns="([^"]+)"`) + func Exec(args ...string) (string, error) { start := time.Now() args = append([]string{"--timeout=30"}, args...) @@ -249,6 +252,25 @@ func ValidatePortVendor(port string) (bool, error) { return util.ContainsString(output, port), err } +func GetInterfacePodNs(iface string) (string, error) { + ret, err := ovsFind("interface", "external-ids", fmt.Sprintf("external-ids:iface-id=%s", iface)) + if err != nil { + return "", err + } + + if len(ret) == 0 { + return "", nil + } + + podNetNs := "" + match := podNetNsRegexp.FindStringSubmatch(ret[0]) + if len(match) > 1 { + podNetNs = match[1] + } + + return podNetNs, nil +} + // config mirror for interface by pod annotations and install param func ConfigInterfaceMirror(globalMirror bool, open string, iface string) error { if globalMirror { diff --git a/pkg/tproxy/tproxy_tcp_linux.go b/pkg/tproxy/tproxy_tcp_linux.go new file mode 100644 index 00000000000..101b57ca31b --- /dev/null +++ b/pkg/tproxy/tproxy_tcp_linux.go @@ -0,0 +1,229 @@ +// This code below is referenced at https://github.com/Asphaltt/go-tproxy/blob/master/tproxy_tcp.go +// Because the code needs to be customized somewhere, the project is not directly imported +package tproxy + +import ( + "fmt" + "net" + "os" + "strings" + "syscall" + + "k8s.io/klog/v2" +) + +// Listener describes a TCP Listener +// with the Linux IP_TRANSPARENT option defined +// on the listening socket +type Listener struct { + base net.Listener +} + +// Accept waits for and returns +// the next connection to the listener. +// +// This command wraps the AcceptTProxy +// method of the Listener +func (listener *Listener) Accept() (net.Conn, error) { + return listener.AcceptTProxy() +} + +// AcceptTProxy will accept a TCP connection +// and wrap it to a TProxy connection to provide +// TProxy functionality +func (listener *Listener) AcceptTProxy() (*Conn, error) { + tcpConn, err := listener.base.(*net.TCPListener).AcceptTCP() + if err != nil { + return nil, err + } + + return &Conn{TCPConn: tcpConn}, nil +} + +// Addr returns the network address +// the listener is accepting connections +// from +func (listener *Listener) Addr() net.Addr { + return listener.base.Addr() +} + +// Close will close the listener from accepting +// any more connections. Any blocked connections +// will unblock and close +func (listener *Listener) Close() error { + return listener.base.Close() +} + +// ListenTCP will construct a new TCP listener +// socket with the Linux IP_TRANSPARENT option +// set on the underlying socket +func ListenTCP(network string, laddr *net.TCPAddr) (net.Listener, error) { + return listenTCP("", network, laddr) +} + +func listenTCP(device, network string, laddr *net.TCPAddr) (net.Listener, error) { + listener, err := net.ListenTCP(network, laddr) + if err != nil { + return nil, err + } + + fileDescriptorSource, err := listener.File() + if err != nil { + return nil, &net.OpError{Op: "listen", Net: network, Source: nil, Addr: laddr, Err: fmt.Errorf("get file descriptor: %s", err)} + } + + defer func() { + if err := fileDescriptorSource.Close(); err != nil { + klog.Errorf("fileDescriptorSource %v Close err: %v ", fileDescriptorSource, err) + } + }() + + if device != "" { + if err = syscall.BindToDevice(int(fileDescriptorSource.Fd()), device); err != nil { + return nil, &net.OpError{Op: "listen", Net: network, Source: nil, Addr: laddr, Err: fmt.Errorf("set socket option: SO_BINDTODEVICE(%s): %s", device, err)} + } + } + + if err = syscall.SetsockoptInt(int(fileDescriptorSource.Fd()), syscall.SOL_IP, syscall.IP_TRANSPARENT, 1); err != nil { + return nil, &net.OpError{Op: "listen", Net: network, Source: nil, Addr: laddr, Err: fmt.Errorf("set socket option: IP_TRANSPARENT: %s", err)} + } + + return &Listener{listener}, nil +} + +// Conn describes a connection +// accepted by the TProxy listener. +// +// It is simply a TCP connection with +// the ability to dial a connection to +// the original destination while assuming +// the IP address of the client +type Conn struct { + *net.TCPConn +} + +// tcpAddToSockerAddr will convert a TCPAddr +// into a Sockaddr that may be used when +// connecting and binding sockets +func tcpAddrToSocketAddr(addr *net.TCPAddr) (syscall.Sockaddr, error) { + switch { + case addr.IP.To4() != nil: + ip := [4]byte{} + copy(ip[:], addr.IP.To4()) + + return &syscall.SockaddrInet4{Addr: ip, Port: addr.Port}, nil + + default: + ip := [16]byte{} + copy(ip[:], addr.IP.To16()) + + return &syscall.SockaddrInet6{Addr: ip, Port: addr.Port}, nil + } +} + +// tcpAddrFamily will attempt to work +// out the address family based on the +// network and TCP addresses +func tcpAddrFamily(net string, laddr, raddr *net.TCPAddr) int { + switch net[len(net)-1] { + case '4': + return syscall.AF_INET + case '6': + return syscall.AF_INET6 + } + + if (laddr == nil || laddr.IP.To4() != nil) && + (raddr == nil || laddr.IP.To4() != nil) { + return syscall.AF_INET + } + return syscall.AF_INET6 +} + +// DialTCP will open a +// TCP connection to the specified destination +// with the specified local address. +func DialTCP(laddr, raddr *net.TCPAddr, isnonblocking bool) (*net.TCPConn, error) { + return dialTCP("", laddr, raddr, false, isnonblocking) +} + +func dialTCP(device string, laddr, raddr *net.TCPAddr, dontAssumeRemote bool, isnonblocking bool) (*net.TCPConn, error) { + if laddr == nil || raddr == nil { + return nil, &net.OpError{Op: "dial", Err: fmt.Errorf("empty local address or remote address")} + } + + remoteSocketAddress, err := tcpAddrToSocketAddr(raddr) + if err != nil { + return nil, &net.OpError{Op: "dial", Err: fmt.Errorf("build destination socket address: %s", err)} + } + + localSocketAddress, err := tcpAddrToSocketAddr(laddr) + if err != nil { + return nil, &net.OpError{Op: "dial", Err: fmt.Errorf("build local socket address: %s", err)} + } + + fileDescriptor, err := syscall.Socket(tcpAddrFamily("tcp", raddr, laddr), syscall.SOCK_STREAM, syscall.IPPROTO_TCP) + if err != nil { + return nil, &net.OpError{Op: "dial", Err: fmt.Errorf("socket open: %s", err)} + } + + if device != "" { + if err = syscall.BindToDevice(int(fileDescriptor), device); err != nil { + return nil, &net.OpError{Op: "dial", Err: fmt.Errorf("set socket option: SO_BINDTODEVICE(%s): %s", device, err)} + } + } + + if err = syscall.SetsockoptInt(fileDescriptor, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1); err != nil { + if err := syscall.Close(fileDescriptor); err != nil { + klog.Errorf("fileDescriptor %v Close err: %v ", fileDescriptor, err) + } + return nil, &net.OpError{Op: "dial", Err: fmt.Errorf("set socket option: SO_REUSEADDR: %s", err)} + } + + if err = syscall.SetsockoptInt(fileDescriptor, syscall.SOL_IP, syscall.IP_TRANSPARENT, 1); err != nil { + if err := syscall.Close(fileDescriptor); err != nil { + klog.Errorf("fileDescriptor %v Close err: %v ", fileDescriptor, err) + } + return nil, &net.OpError{Op: "dial", Err: fmt.Errorf("set socket option: IP_TRANSPARENT: %s", err)} + } + + if err = syscall.SetNonblock(fileDescriptor, isnonblocking); err != nil { + if err := syscall.Close(fileDescriptor); err != nil { + klog.Errorf("fileDescriptor %v Close err: %v ", fileDescriptor, err) + } + return nil, &net.OpError{Op: "dial", Err: fmt.Errorf("set socket option: SO_NONBLOCK: %s", err)} + } + + if !dontAssumeRemote { + if err = syscall.Bind(fileDescriptor, localSocketAddress); err != nil { + if err := syscall.Close(fileDescriptor); err != nil { + klog.Errorf("fileDescriptor %v Close err: %v ", fileDescriptor, err) + } + return nil, &net.OpError{Op: "dial", Err: fmt.Errorf("socket bind: %s", err)} + } + } + + if err = syscall.Connect(fileDescriptor, remoteSocketAddress); err != nil && !strings.Contains(err.Error(), "operation now in progress") { + if err := syscall.Close(fileDescriptor); err != nil { + klog.Errorf("fileDescriptor %v Close err: %v ", fileDescriptor, err) + } + return nil, &net.OpError{Op: "dial", Err: fmt.Errorf("socket connect: %s", err)} + } + + fdFile := os.NewFile(uintptr(fileDescriptor), fmt.Sprintf("net-tcp-dial-%s", raddr.String())) + defer func() { + if err := fdFile.Close(); err != nil { + klog.Errorf("fdFile %v Close err: %v ", fdFile, err) + } + }() + + remoteConn, err := net.FileConn(fdFile) + if err != nil { + if err := syscall.Close(fileDescriptor); err != nil { + klog.Errorf("fileDescriptor %v Close err: %v ", fileDescriptor, err) + } + + return nil, &net.OpError{Op: "dial", Err: fmt.Errorf("convert file descriptor to connection: %s", err)} + } + + return remoteConn.(*net.TCPConn), nil +} diff --git a/pkg/util/const.go b/pkg/util/const.go index a38198b3516..091b14e7442 100644 --- a/pkg/util/const.go +++ b/pkg/util/const.go @@ -254,4 +254,7 @@ const ( NatPolicyRuleActionNat = "nat" NatPolicyRuleActionForward = "forward" NatPolicyRuleIDLength = 12 + + TProxyListenPort = 8102 + TProxyRouteTable = 10001 ) diff --git a/pkg/util/net.go b/pkg/util/net.go index 27d0fa53e9c..fdca4239c8d 100644 --- a/pkg/util/net.go +++ b/pkg/util/net.go @@ -7,6 +7,7 @@ import ( "math" "math/big" "net" + "os" "strconv" "strings" "time" @@ -621,3 +622,20 @@ func UDPConnectivityListen(address string) error { } } } + +func GetDefaultListenAddr() string { + addr := "0.0.0.0" + if os.Getenv("ENABLE_BIND_LOCAL_IP") == "true" { + podIpsEnv := os.Getenv("POD_IPS") + podIps := strings.Split(podIpsEnv, ",") + // when pod in dual mode, golang can't support bind v4 and v6 address in the same time, + // so not support bind local ip when in dual mode + if len(podIps) == 1 { + addr = podIps[0] + if CheckProtocol(podIps[0]) == kubeovnv1.ProtocolIPv6 { + addr = fmt.Sprintf("[%s]", podIps[0]) + } + } + } + return addr +} diff --git a/test/e2e/framework/daemonset.go b/test/e2e/framework/daemonset.go index ed7a5b3368f..182926af7b5 100644 --- a/test/e2e/framework/daemonset.go +++ b/test/e2e/framework/daemonset.go @@ -2,12 +2,21 @@ package framework import ( "context" + "encoding/json" + "errors" "fmt" + "strings" + "time" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" v1apps "k8s.io/client-go/kubernetes/typed/apps/v1" + "k8s.io/kubectl/pkg/polymorphichelpers" ) type DaemonSetClient struct { @@ -67,3 +76,61 @@ func (c *DaemonSetClient) GetPodOnNode(ds *appsv1.DaemonSet, node string) (*core return nil, fmt.Errorf("pod for daemonset %s/%s on node %s not found", ds.Namespace, ds.Name, node) } + +func (c *DaemonSetClient) Patch(daemonset *appsv1.DaemonSet) *appsv1.DaemonSet { + modifiedBytes, err := json.Marshal(daemonset) + if err != nil { + Failf("failed to marshal modified DaemonSet: %v", err) + } + ExpectNoError(err) + var patchedDaemonSet *appsv1.DaemonSet + err = wait.PollUntilContextTimeout(context.Background(), 2*time.Second, timeout, true, func(ctx context.Context) (bool, error) { + daemonSet, err := c.DaemonSetInterface.Patch(ctx, daemonset.Name, types.MergePatchType, modifiedBytes, metav1.PatchOptions{}, "") + if err != nil { + return handleWaitingAPIError(err, false, "patch daemonset %s/%s", daemonset.Namespace, daemonset.Name) + } + patchedDaemonSet = daemonSet + return true, nil + }) + if err == nil { + return patchedDaemonSet.DeepCopy() + } + + if errors.Is(err, context.DeadlineExceeded) { + Failf("timed out while retrying to patch daemonset %s/%s", daemonset.Namespace, daemonset.Name) + } + Failf("error occurred while retrying to patch daemonset %s/%s: %v", daemonset.Namespace, daemonset.Name, err) + + return nil +} + +func (c *DaemonSetClient) PatchSync(modifiedDaemonset *appsv1.DaemonSet) *appsv1.DaemonSet { + daemonSet := c.Patch(modifiedDaemonset) + return c.RolloutStatus(daemonSet.Name) +} + +func (c *DaemonSetClient) RolloutStatus(name string) *appsv1.DaemonSet { + var daemonSet *appsv1.DaemonSet + WaitUntil(2*time.Second, timeout, func(_ context.Context) (bool, error) { + var err error + daemonSet = c.Get(name) + unstructured := &unstructured.Unstructured{} + if unstructured.Object, err = runtime.DefaultUnstructuredConverter.ToUnstructured(daemonSet); err != nil { + return false, err + } + + dsv := &polymorphichelpers.DaemonSetStatusViewer{} + msg, done, err := dsv.Status(unstructured, 0) + if err != nil { + return false, err + } + if done { + return true, nil + } + + Logf(strings.TrimSpace(msg)) + return false, nil + }, "") + + return daemonSet +} diff --git a/test/e2e/framework/image.go b/test/e2e/framework/image.go index 67c4e367ae4..b02381a2789 100644 --- a/test/e2e/framework/image.go +++ b/test/e2e/framework/image.go @@ -4,4 +4,5 @@ const ( PauseImage = "kubeovn/pause:3.2" BusyBoxImage = "busybox:stable" AgnhostImage = "kubeovn/agnhost:2.43" + NginxImage = "nginx:latest" ) diff --git a/test/e2e/framework/iptables/iptables.go b/test/e2e/framework/iptables/iptables.go new file mode 100644 index 00000000000..911115c03e9 --- /dev/null +++ b/test/e2e/framework/iptables/iptables.go @@ -0,0 +1,50 @@ +package iptables + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/onsi/gomega" + + apiv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" + "github.com/kubeovn/kube-ovn/test/e2e/framework" + corev1 "k8s.io/api/core/v1" + e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output" +) + +func CheckIptablesRulesOnNode(f *framework.Framework, node, table, chain, protocol string, expectedRules []string, shouldExist bool) { + + ovsPod := getOvsPodOnNode(f, node) + + iptBin := "iptables" + if protocol == apiv1.ProtocolIPv6 { + iptBin = "ip6tables" + } + + cmd := fmt.Sprintf(`%s -t %s -S `, iptBin, table) + if chain != "" { + cmd += chain + } + framework.WaitUntil(2*time.Second, time.Minute, func(_ context.Context) (bool, error) { + output := e2epodoutput.RunHostCmdOrDie(ovsPod.Namespace, ovsPod.Name, cmd) + rules := strings.Split(output, "\n") + for _, r := range expectedRules { + framework.Logf("checking rule %s ", r) + ok, err := gomega.ContainElement(gomega.HavePrefix(r)).Match(rules) + if err != nil || ok != shouldExist { + return false, err + } + } + return true, nil + }, "") +} + +func getOvsPodOnNode(f *framework.Framework, node string) *corev1.Pod { + daemonSetClient := f.DaemonSetClientNS(framework.KubeOvnNamespace) + ds := daemonSetClient.Get("ovs-ovn") + pod, err := daemonSetClient.GetPodOnNode(ds, node) + framework.ExpectNoError(err) + return pod +} diff --git a/test/e2e/framework/pod.go b/test/e2e/framework/pod.go index bd74ea98ae2..8f3921ba0bb 100644 --- a/test/e2e/framework/pod.go +++ b/test/e2e/framework/pod.go @@ -27,6 +27,12 @@ func (f *Framework) PodClientNS(namespace string) *PodClient { return &PodClient{f, e2epod.PodClientNS(f.Framework, namespace)} } +func (c *PodClient) GetPod(name string) *corev1.Pod { + pod, err := c.PodInterface.Get(context.TODO(), name, metav1.GetOptions{}) + ExpectNoError(err) + return pod +} + func (c *PodClient) Create(pod *corev1.Pod) *corev1.Pod { return c.PodClient.Create(context.Background(), pod) } diff --git a/test/e2e/kube-ovn/pod/vpc_pod_probe.go b/test/e2e/kube-ovn/pod/vpc_pod_probe.go new file mode 100644 index 00000000000..1f9fe629cf7 --- /dev/null +++ b/test/e2e/kube-ovn/pod/vpc_pod_probe.go @@ -0,0 +1,235 @@ +package pod + +import ( + "fmt" + "time" + + "github.com/onsi/ginkgo/v2" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" + clientset "k8s.io/client-go/kubernetes" + + apiv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" + "github.com/kubeovn/kube-ovn/pkg/daemon" + "github.com/kubeovn/kube-ovn/pkg/util" + "github.com/kubeovn/kube-ovn/test/e2e/framework" + "github.com/kubeovn/kube-ovn/test/e2e/framework/iptables" +) + +var _ = framework.SerialDescribe("[group:pod]", func() { + f := framework.NewDefaultFramework("vpc-pod-probe") + + var cs clientset.Interface + var podClient *framework.PodClient + var subnetClient *framework.SubnetClient + var vpcClient *framework.VpcClient + var namespaceName, subnetName, podName, vpcName string + var subnet *apiv1.Subnet + var cidr, image string + var extraSubnetNames []string + + ginkgo.BeforeEach(func() { + cs = f.ClientSet + podClient = f.PodClient() + subnetClient = f.SubnetClient() + namespaceName = f.Namespace.Name + subnetName = "subnet-" + framework.RandomSuffix() + podName = "pod-" + framework.RandomSuffix() + cidr = framework.RandomCIDR(f.ClusterIpFamily) + vpcClient = f.VpcClient() + if image == "" { + image = framework.GetKubeOvnImage(cs) + } + + ginkgo.By("Creating subnet " + subnetName) + subnet = framework.MakeSubnet(subnetName, "", cidr, "", "", "", nil, nil, []string{namespaceName}) + subnet = subnetClient.CreateSync(subnet) + }) + ginkgo.AfterEach(func() { + ginkgo.By("Deleting pod " + podName) + podClient.DeleteSync(podName) + + ginkgo.By("Deleting subnet " + subnetName) + subnetClient.DeleteSync(subnetName) + + if vpcName != "" { + ginkgo.By("Deleting custom vpc " + vpcName) + vpcClient.DeleteSync(vpcName) + } + + if len(extraSubnetNames) != 0 { + for _, subnetName := range extraSubnetNames { + subnetClient.DeleteSync(subnetName) + } + } + }) + + framework.ConformanceIt("should support http and tcp liveness probe and readiness probe in custom vpc pod ", func() { + f.SkipVersionPriorTo(1, 12, "This feature was introduced in v1.12") + daemonSetClient := f.DaemonSetClientNS(framework.KubeOvnNamespace) + originDs := daemonSetClient.Get("kube-ovn-cni") + modifyDs := originDs.DeepCopy() + + newArgs := originDs.Spec.Template.Spec.Containers[0].Args + for index, arg := range newArgs { + if arg == "--enable-tproxy=false" { + newArgs = append(newArgs[:index], newArgs[index+1:]...) + } + } + newArgs = append(newArgs, "--enable-tproxy=true") + modifyDs.Spec.Template.Spec.Containers[0].Args = newArgs + + daemonSetClient.PatchSync(modifyDs) + + custVPCSubnetName := "subnet-" + framework.RandomSuffix() + extraSubnetNames = append(extraSubnetNames, custVPCSubnetName) + + ginkgo.By("Create Custom Vpc subnet Pod") + vpcName = "vpc-" + framework.RandomSuffix() + customVPC := framework.MakeVpc(vpcName, "", false, false, nil) + vpcClient.CreateSync(customVPC) + + ginkgo.By("Creating subnet " + custVPCSubnetName) + cidr = framework.RandomCIDR(f.ClusterIpFamily) + subnet := framework.MakeSubnet(custVPCSubnetName, "", cidr, "", vpcName, "", nil, nil, nil) + _ = subnetClient.CreateSync(subnet) + + ginkgo.By("Creating pod with HTTP liveness and readiness probe that port is accessible " + podName) + pod := framework.MakePod(namespaceName, podName, nil, map[string]string{util.LogicalSwitchAnnotation: custVPCSubnetName}, framework.NginxImage, nil, nil) + + pod.Spec.Containers[0].ReadinessProbe = &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Port: intstr.FromInt(80), + }, + }, + } + pod.Spec.Containers[0].LivenessProbe = &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Port: intstr.FromInt(80), + }, + }, + } + + pod = podClient.CreateSync(pod) + framework.ExpectEqual(pod.Status.ContainerStatuses[0].Ready, true) + checkTProxyRules(f, pod, 80, true) + podClient.DeleteSync(podName) + + ginkgo.By("Creating pod with HTTP liveness and readiness probe that port is not accessible " + podName) + pod = framework.MakePod(namespaceName, podName, nil, map[string]string{util.LogicalSwitchAnnotation: custVPCSubnetName}, framework.NginxImage, nil, nil) + pod.Spec.Containers[0].ReadinessProbe = &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Port: intstr.FromInt(81), + }, + }, + } + pod.Spec.Containers[0].LivenessProbe = &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Port: intstr.FromInt(81), + }, + }, + } + + _ = podClient.Create(pod) + time.Sleep(5 * time.Second) + pod = podClient.GetPod(podName) + + framework.ExpectEqual(pod.Status.ContainerStatuses[0].Ready, false) + checkTProxyRules(f, pod, 81, true) + podClient.DeleteSync(podName) + + ginkgo.By("Creating pod with TCP probe liveness and readiness probe that port is accessible " + podName) + pod = framework.MakePod(namespaceName, podName, nil, map[string]string{util.LogicalSwitchAnnotation: custVPCSubnetName}, framework.NginxImage, nil, nil) + pod.Spec.Containers[0].ReadinessProbe = &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.FromInt(80), + }, + }, + } + pod.Spec.Containers[0].LivenessProbe = &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.FromInt(80), + }, + }, + } + + pod = podClient.CreateSync(pod) + framework.ExpectEqual(pod.Status.ContainerStatuses[0].Ready, true) + + checkTProxyRules(f, pod, 80, true) + podClient.DeleteSync(podName) + + ginkgo.By("Creating pod with TCP probe liveness and readiness probe that port is not accessible " + podName) + pod = framework.MakePod(namespaceName, podName, nil, map[string]string{util.LogicalSwitchAnnotation: custVPCSubnetName}, framework.NginxImage, nil, nil) + pod.Spec.Containers[0].ReadinessProbe = &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.FromInt(81), + }, + }, + } + pod.Spec.Containers[0].LivenessProbe = &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.FromInt(81), + }, + }, + } + + _ = podClient.Create(pod) + time.Sleep(5 * time.Second) + + pod = podClient.GetPod(podName) + framework.ExpectEqual(pod.Status.ContainerStatuses[0].Ready, false) + checkTProxyRules(f, pod, 81, false) + }) +}) + +func checkTProxyRules(f *framework.Framework, pod *corev1.Pod, probePort int, exist bool) { + + nodeName := pod.Spec.NodeName + tProxyOutputMarkMask := fmt.Sprintf("%#x/%#x", daemon.TProxyOutputMark, daemon.TProxyOutputMask) + tProxyPreRoutingMarkMask := fmt.Sprintf("%#x/%#x", daemon.TProxyPreroutingMark, daemon.TProxyPreroutingMask) + + isZeroIP := false + if len(pod.Status.PodIPs) == 2 { + isZeroIP = true + } + + for _, podIP := range pod.Status.PodIPs { + if util.CheckProtocol(podIP.IP) == apiv1.ProtocolIPv4 { + expectedRules := []string{ + fmt.Sprintf(`-A OVN-OUTPUT -d %s/32 -p tcp -m tcp --dport %d -j MARK --set-xmark %s`, podIP.IP, probePort, tProxyOutputMarkMask), + } + iptables.CheckIptablesRulesOnNode(f, nodeName, daemon.MANGLE, daemon.OvnOutput, apiv1.ProtocolIPv4, expectedRules, exist) + hostIP := pod.Status.HostIP + if isZeroIP { + hostIP = "0.0.0.0" + } + expectedRules = []string{ + fmt.Sprintf(`-A OVN-PREROUTING -d %s/32 -p tcp -m tcp --dport %d -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP.IP, probePort, util.TProxyListenPort, hostIP, tProxyPreRoutingMarkMask), + } + iptables.CheckIptablesRulesOnNode(f, nodeName, daemon.MANGLE, daemon.OvnPrerouting, apiv1.ProtocolIPv4, expectedRules, exist) + } else if util.CheckProtocol(podIP.IP) == apiv1.ProtocolIPv6 { + expectedRules := []string{ + fmt.Sprintf(`-A OVN-OUTPUT -d %s/128 -p tcp -m tcp --dport %d -j MARK --set-xmark %s`, podIP.IP, probePort, tProxyOutputMarkMask), + } + iptables.CheckIptablesRulesOnNode(f, nodeName, daemon.MANGLE, daemon.OvnOutput, apiv1.ProtocolIPv6, expectedRules, exist) + + hostIP := pod.Status.HostIP + if isZeroIP { + hostIP = "::" + } + expectedRules = []string{ + fmt.Sprintf(`-A OVN-PREROUTING -d %s/128 -p tcp -m tcp --dport %d -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP.IP, probePort, util.TProxyListenPort, hostIP, tProxyPreRoutingMarkMask), + } + iptables.CheckIptablesRulesOnNode(f, nodeName, daemon.MANGLE, daemon.OvnPrerouting, apiv1.ProtocolIPv6, expectedRules, exist) + } + } +} diff --git a/test/e2e/kube-ovn/subnet/subnet.go b/test/e2e/kube-ovn/subnet/subnet.go index 860adad33f8..4bae84280eb 100644 --- a/test/e2e/kube-ovn/subnet/subnet.go +++ b/test/e2e/kube-ovn/subnet/subnet.go @@ -24,6 +24,7 @@ import ( "github.com/kubeovn/kube-ovn/test/e2e/framework" "github.com/kubeovn/kube-ovn/test/e2e/framework/docker" "github.com/kubeovn/kube-ovn/test/e2e/framework/iproute" + "github.com/kubeovn/kube-ovn/test/e2e/framework/iptables" "github.com/kubeovn/kube-ovn/test/e2e/framework/kind" ) @@ -72,36 +73,6 @@ func checkIPSetOnNode(f *framework.Framework, node string, expectetIPsets []stri }, "") } -func checkIptablesRulesOnNode(f *framework.Framework, node, table, chain, cidr string, expectedRules []string, shouldExist bool) { - if cidr == "" { - return - } - - ovsPod := getOvsPodOnNode(f, node) - - iptBin := "iptables" - if util.CheckProtocol(cidr) == apiv1.ProtocolIPv6 { - iptBin = "ip6tables" - } - - cmd := fmt.Sprintf(`%s -t %s -S`, iptBin, table) - if chain != "" { - cmd += chain - } - framework.WaitUntil(3*time.Second, 10*time.Second, func(_ context.Context) (bool, error) { - output := e2epodoutput.RunHostCmdOrDie(ovsPod.Namespace, ovsPod.Name, cmd) - rules := strings.Split(output, "\n") - for _, r := range expectedRules { - framework.Logf("checking iptables rule %q in table %q chain %q: %v", r, table, chain, shouldExist) - ok, err := gomega.ContainElement(gomega.HavePrefix(r)).Match(rules) - if err != nil || ok != shouldExist { - return false, err - } - } - return true, nil - }, "") -} - var _ = framework.Describe("[group:subnet]", func() { f := framework.NewDefaultFramework("subnet") @@ -1036,14 +1007,14 @@ var _ = framework.Describe("[group:subnet]", func() { fmt.Sprintf(`-A %s -s %s -m comment --comment "%s,%s"`, "FORWARD", cidrV4, util.OvnSubnetGatewayIptables, subnetName), } - checkIptablesRulesOnNode(f, node.Name, "filter", "FORWARD", cidrV4, expectedRules, true) + iptables.CheckIptablesRulesOnNode(f, node.Name, "filter", "FORWARD", apiv1.ProtocolIPv4, expectedRules, true) } if cidrV6 != "" { expectedRules := []string{ fmt.Sprintf(`-A %s -d %s -m comment --comment "%s,%s"`, "FORWARD", cidrV6, util.OvnSubnetGatewayIptables, subnetName), fmt.Sprintf(`-A %s -s %s -m comment --comment "%s,%s"`, "FORWARD", cidrV6, util.OvnSubnetGatewayIptables, subnetName), } - checkIptablesRulesOnNode(f, node.Name, "filter", "FORWARD", cidrV6, expectedRules, true) + iptables.CheckIptablesRulesOnNode(f, node.Name, "filter", "FORWARD", apiv1.ProtocolIPv6, expectedRules, true) } } @@ -1091,14 +1062,14 @@ var _ = framework.Describe("[group:subnet]", func() { fmt.Sprintf(`-A %s -s %s -m comment --comment "%s,%s"`, "FORWARD", cidrV4, util.OvnSubnetGatewayIptables, subnetName), } - checkIptablesRulesOnNode(f, node.Name, "filter", "FORWARD", cidrV4, expectedRules, false) + iptables.CheckIptablesRulesOnNode(f, node.Name, "filter", "FORWARD", apiv1.ProtocolIPv4, expectedRules, false) } if cidrV6 != "" { expectedRules := []string{ fmt.Sprintf(`-A %s -d %s -m comment --comment "%s,%s"`, "FORWARD", cidrV6, util.OvnSubnetGatewayIptables, subnetName), fmt.Sprintf(`-A %s -s %s -m comment --comment "%s,%s"`, "FORWARD", cidrV6, util.OvnSubnetGatewayIptables, subnetName), } - checkIptablesRulesOnNode(f, node.Name, "filter", "FORWARD", cidrV6, expectedRules, false) + iptables.CheckIptablesRulesOnNode(f, node.Name, "filter", "FORWARD", apiv1.ProtocolIPv6, expectedRules, false) } } }) @@ -1378,12 +1349,12 @@ func checkNatPolicyRules(f *framework.Framework, cs clientset.Interface, subnet } if cidrV4 != "" { - checkIptablesRulesOnNode(f, node.Name, "nat", "", cidrV4, staticV4Rules, true) - checkIptablesRulesOnNode(f, node.Name, "nat", "", cidrV4, expectV4Rules, shouldExist) + iptables.CheckIptablesRulesOnNode(f, node.Name, "nat", "", apiv1.ProtocolIPv4, staticV4Rules, true) + iptables.CheckIptablesRulesOnNode(f, node.Name, "nat", "", apiv1.ProtocolIPv4, expectV4Rules, shouldExist) } if cidrV6 != "" { - checkIptablesRulesOnNode(f, node.Name, "nat", "", cidrV6, staticV6Rules, true) - checkIptablesRulesOnNode(f, node.Name, "nat", "", cidrV6, expectV6Rules, shouldExist) + iptables.CheckIptablesRulesOnNode(f, node.Name, "nat", "", apiv1.ProtocolIPv6, staticV6Rules, true) + iptables.CheckIptablesRulesOnNode(f, node.Name, "nat", "", apiv1.ProtocolIPv6, expectV6Rules, shouldExist) } } } diff --git a/yamls/kube-ovn-dual-stack.yaml b/yamls/kube-ovn-dual-stack.yaml index 0b765d77054..fe12c92c52e 100644 --- a/yamls/kube-ovn-dual-stack.yaml +++ b/yamls/kube-ovn-dual-stack.yaml @@ -200,6 +200,7 @@ spec: - --alsologtostderr=true - --log_file=/var/log/kube-ovn/kube-ovn-cni.log - --log_file_max_size=0 + - --enable-tproxy=false securityContext: runAsUser: 0 privileged: true diff --git a/yamls/kube-ovn-ipv6.yaml b/yamls/kube-ovn-ipv6.yaml index dec0dac2b9b..ac02e856d0f 100644 --- a/yamls/kube-ovn-ipv6.yaml +++ b/yamls/kube-ovn-ipv6.yaml @@ -200,6 +200,7 @@ spec: - --alsologtostderr=true - --log_file=/var/log/kube-ovn/kube-ovn-cni.log - --log_file_max_size=0 + - --enable-tproxy=false securityContext: runAsUser: 0 privileged: true diff --git a/yamls/kube-ovn.yaml b/yamls/kube-ovn.yaml index 9fa8a1e217b..dcaab848565 100644 --- a/yamls/kube-ovn.yaml +++ b/yamls/kube-ovn.yaml @@ -202,6 +202,7 @@ spec: - --alsologtostderr=true - --log_file=/var/log/kube-ovn/kube-ovn-cni.log - --log_file_max_size=0 + - --enable-tproxy=false securityContext: runAsUser: 0 privileged: true