Skip to content

Commit

Permalink
custom vpc pod support tcp http probe with tproxy method (#3024)
Browse files Browse the repository at this point in the history
* custom vpc pod support tcp http probe with tproxy method
  • Loading branch information
changluyi authored Jul 18, 2023
1 parent 494209d commit f4f8041
Show file tree
Hide file tree
Showing 24 changed files with 1,242 additions and 52 deletions.
1 change: 1 addition & 0 deletions charts/templates/ovncni-ds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ spec:
- --log_file_max_size=0
- --enable-metrics={{- .Values.networking.ENABLE_METRICS }}
- --kubelet-dir={{ .Values.kubelet_conf.KUBELET_DIR }}
- --enable-tproxy={{ .Values.func.ENABLE_TPROXY }}
securityContext:
runAsUser: 0
privileged: true
Expand Down
1 change: 1 addition & 0 deletions charts/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func:
LOGICAL_GATEWAY: false
ENABLE_BIND_LOCAL_IP: true
U2O_INTERCONNECTION: false
ENABLE_TPROXY: false

ipv4:
POD_CIDR: "10.16.0.0/16"
Expand Down
15 changes: 1 addition & 14 deletions cmd/daemon/cniserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"k8s.io/klog/v2"
"k8s.io/sample-controller/pkg/signals"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
"github.com/kubeovn/kube-ovn/pkg/daemon"
"github.com/kubeovn/kube-ovn/pkg/util"
Expand Down Expand Up @@ -96,19 +95,7 @@ func CmdMain() {
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
}

addr := "0.0.0.0"
if os.Getenv("ENABLE_BIND_LOCAL_IP") == "true" {
podIpsEnv := os.Getenv("POD_IPS")
podIps := strings.Split(podIpsEnv, ",")
// when pod in dual mode, golang can't support bind v4 and v6 address in the same time,
// so not support bind local ip when in dual mode
if len(podIps) == 1 {
addr = podIps[0]
if util.CheckProtocol(podIps[0]) == kubeovnv1.ProtocolIPv6 {
addr = fmt.Sprintf("[%s]", podIps[0])
}
}
}
addr := util.GetDefaultListenAddr()

if config.EnableVerboseConnCheck {
go func() {
Expand Down
2 changes: 2 additions & 0 deletions dist/images/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ IFACE=${IFACE:-}
# Note that the dpdk tunnel iface and tunnel ip cidr should be diffierent with Kubernetes api cidr, otherwise the route will be a problem.
DPDK_TUNNEL_IFACE=${DPDK_TUNNEL_IFACE:-br-phy}
ENABLE_BIND_LOCAL_IP=${ENABLE_BIND_LOCAL_IP:-true}
ENABLE_TPROXY=${ENABLE_TPROXY:-false}

# debug
DEBUG_WRAPPER=${DEBUG_WRAPPER:-}
Expand Down Expand Up @@ -4037,6 +4038,7 @@ spec:
- --log_file=/var/log/kube-ovn/kube-ovn-cni.log
- --log_file_max_size=0
- --kubelet-dir=$KUBELET_DIR
- --enable-tproxy=$ENABLE_TPROXY
securityContext:
runAsUser: 0
privileged: true
Expand Down
12 changes: 12 additions & 0 deletions dist/images/uninstall.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ iptables -t filter -D FORWARD -m set --match-set ovn40subnets src -j ACCEPT
iptables -t filter -D FORWARD -m set --match-set ovn40services dst -j ACCEPT
iptables -t filter -D FORWARD -m set --match-set ovn40services src -j ACCEPT
iptables -t filter -D OUTPUT -p udp -m udp --dport 6081 -j MARK --set-xmark 0x0
iptables -t mangle -D PREROUTING -m comment --comment "kube-ovn prerouting rules" -j OVN-PREROUTING
iptables -t mangle -D OUTPUT -m comment --comment "kube-ovn output rules" -j OVN-OUTPUT
iptables -t mangle -F OVN-PREROUTING
iptables -t mangle -X OVN-PREROUTING
iptables -t mangle -F OVN-OUTPUT
iptables -t mangle -X OVN-OUTPUT

sleep 1

Expand Down Expand Up @@ -51,6 +57,12 @@ ip6tables -t filter -D FORWARD -m set --match-set ovn60subnets src -j ACCEPT
ip6tables -t filter -D FORWARD -m set --match-set ovn60services dst -j ACCEPT
ip6tables -t filter -D FORWARD -m set --match-set ovn60services src -j ACCEPT
ip6tables -t filter -D OUTPUT -p udp -m udp --dport 6081 -j MARK --set-xmark 0x0
ip6tables -t mangle -D PREROUTING -m comment --comment "kube-ovn prerouting rules" -j OVN-PREROUTING
ip6tables -t mangle -D OUTPUT -m comment --comment "kube-ovn output rules" -j OVN-OUTPUT
ip6tables -t mangle -F OVN-PREROUTING
ip6tables -t mangle -X OVN-PREROUTING
ip6tables -t mangle -F OVN-OUTPUT
ip6tables -t mangle -X OVN-OUTPUT

sleep 1

Expand Down
3 changes: 3 additions & 0 deletions pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Configuration struct {
EnableVerboseConnCheck bool
TCPConnCheckPort int
UDPConnCheckPort int
EnableTProxy bool
}

// ParseFlags will parse cmd args then init kubeClient and configuration
Expand Down Expand Up @@ -100,6 +101,7 @@ func ParseFlags() *Configuration {
argEnableVerboseConnCheck = pflag.Bool("enable-verbose-conn-check", false, "enable TCP/UDP connectivity check listen port")
argTCPConnectivityCheckPort = pflag.Int("tcp-conn-check-port", 8100, "TCP connectivity Check Port")
argUDPConnectivityCheckPort = pflag.Int("udp-conn-check-port", 8101, "UDP connectivity Check Port")
argEnableTProxy = pflag.Bool("enable-tproxy", false, "enable tproxy for vpc pod liveness or readiness probe")
)

// mute info log for ipset lib
Expand Down Expand Up @@ -154,6 +156,7 @@ func ParseFlags() *Configuration {
EnableVerboseConnCheck: *argEnableVerboseConnCheck,
TCPConnCheckPort: *argTCPConnectivityCheckPort,
UDPConnCheckPort: *argUDPConnectivityCheckPort,
EnableTProxy: *argEnableTProxy,
}
return config
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/daemon/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,17 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
}
}, 5*time.Minute, stopCh)

if c.config.EnableTProxy {
go c.StartTProxyForwarding()
go wait.Until(c.runTProxyConfigWorker, 3*time.Second, stopCh)
// Using the tproxy method, kubelet's TCP probe packets cannot reach the namespace of the pod of the custom VPC,
// so tproxy itself probes the pod of the custom VPC, if probe failed remove the iptable rules from
// kubelet to tproxy, if probe success recover the iptable rules
go wait.Until(c.StartTProxyTCPPortProbe, 1*time.Second, stopCh)
} else {
c.cleanTProxyConfig()
}

<-stopCh
klog.Info("Shutting down workers")
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/daemon/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package daemon
import (
"fmt"
"os/exec"
"sort"
"strings"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -246,3 +247,44 @@ func (c *Controller) getEgressNatIpByNode(nodeName string) (map[string]string, e
}
return subnetsNatIp, nil
}

func (c *Controller) getTProxyConditionPod(needSort bool) ([]*v1.Pod, error) {

var filteredPods []*v1.Pod
pods, err := c.podsLister.List(labels.Everything())
if err != nil {
klog.Errorf("list pods failed, %v", err)
return nil, err
}

for _, pod := range pods {
if pod.Spec.NodeName != c.config.NodeName {
continue
}

subnetName, ok := pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, util.OvnProvider)]
if !ok {
continue
}

subnet, err := c.subnetsLister.Get(subnetName)
if err != nil {
err = fmt.Errorf("failed to get subnet '%s', err: %v", subnetName, err)
return nil, err
}

if subnet.Spec.Vpc == c.config.ClusterRouter {
continue
}

filteredPods = append(filteredPods, pod)
}

if needSort {
sort.Slice(filteredPods, func(i, j int) bool {
return filteredPods[i].Namespace+"/"+filteredPods[i].Name < filteredPods[j].Namespace+"/"+filteredPods[j].Name
})
}

return filteredPods, nil
}
132 changes: 132 additions & 0 deletions pkg/daemon/gateway_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ const (

const (
NAT = "nat"
MANGLE = "mangle"
Prerouting = "PREROUTING"
Postrouting = "POSTROUTING"
Output = "OUTPUT"
OvnPrerouting = "OVN-PREROUTING"
OvnPostrouting = "OVN-POSTROUTING"
OvnOutput = "OVN-OUTPUT"
OvnMasquerade = "OVN-MASQUERADE"
OvnNatOutGoingPolicy = "OVN-NAT-POLICY"
OvnNatOutGoingPolicySubnet = "OVN-NAT-PSUBNET-"
Expand All @@ -52,6 +55,10 @@ const (
const (
OnOutGoingNatMark = "0x90001/0x90001"
OnOutGoingForwardMark = "0x90002/0x90002"
TProxyOutputMark = 0x90003
TProxyOutputMask = 0x90003
TProxyPreroutingMark = 0x90004
TProxyPreroutingMask = 0x90004
)

type policyRouteMeta struct {
Expand Down Expand Up @@ -584,9 +591,11 @@ func (c *Controller) setIptables() error {
}
)
protocols := make([]string, 2)
isDual := false
if c.protocol == kubeovnv1.ProtocolDual {
protocols[0] = kubeovnv1.ProtocolIPv4
protocols[1] = kubeovnv1.ProtocolIPv6
isDual = true
} else {
protocols[0] = c.protocol
}
Expand Down Expand Up @@ -733,6 +742,10 @@ func (c *Controller) setIptables() error {
return err
}

if err = c.reconcileTProxyIPTableRules(protocol, isDual); err != nil {
return err
}

if err = c.updateIptablesChain(ipt, NAT, OvnPrerouting, Prerouting, natPreroutingRules); err != nil {
klog.Errorf("failed to update chain %s/%s: %v", NAT, OvnPrerouting)
return err
Expand All @@ -754,6 +767,125 @@ func (c *Controller) setIptables() error {
return nil
}

func (c *Controller) reconcileTProxyIPTableRules(protocol string, isDual bool) error {
if !c.config.EnableTProxy {
return nil
}

ipt := c.iptables[protocol]
tproxyPreRoutingRules := make([]util.IPTableRule, 0)
tproxyOutputRules := make([]util.IPTableRule, 0)
probePorts := strset.New()

pods, err := c.getTProxyConditionPod(true)
if err != nil {
return err
}

for _, pod := range pods {
var podIP string
for _, ip := range pod.Status.PodIPs {
if util.CheckProtocol(ip.IP) == protocol {
podIP = ip.IP
break
}
}

if podIP == "" {
continue
}

for _, container := range pod.Spec.Containers {
if container.ReadinessProbe != nil {
if httpGet := container.ReadinessProbe.HTTPGet; httpGet != nil {
if port := httpGet.Port.String(); port != "" {
probePorts.Add(port)
}
}

if tcpSocket := container.ReadinessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
if isTCPProbePortReachable, ok := customVPCPodTCPProbeIPPort.Load(getIPPortString(podIP, port)); ok {
if isTCPProbePortReachable.(bool) {
probePorts.Add(port)
}
}
}
}
}

if container.LivenessProbe != nil {
if httpGet := container.LivenessProbe.HTTPGet; httpGet != nil {
if port := httpGet.Port.String(); port != "" {
probePorts.Add(port)
}
}

if tcpSocket := container.LivenessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
if isTCPProbePortReachable, ok := customVPCPodTCPProbeIPPort.Load(getIPPortString(podIP, port)); ok {
if isTCPProbePortReachable.(bool) {
probePorts.Add(port)
}
}
}
}
}
}

if probePorts.IsEmpty() {
continue
}

probePortList := probePorts.List()
sort.Strings(probePortList)
for _, probePort := range probePortList {
tProxyOutputMarkMask := fmt.Sprintf("%#x/%#x", TProxyOutputMark, TProxyOutputMask)
tProxyPreRoutingMarkMask := fmt.Sprintf("%#x/%#x", TProxyPreroutingMark, TProxyPreroutingMask)

hostIP := pod.Status.HostIP
prefixLen := 32
if protocol == kubeovnv1.ProtocolIPv6 {
prefixLen = 128
}

if isDual || os.Getenv("ENABLE_BIND_LOCAL_IP") == "false" {
if protocol == kubeovnv1.ProtocolIPv4 {
hostIP = "0.0.0.0"
} else if protocol == kubeovnv1.ProtocolIPv6 {
hostIP = "::"
}
}
tproxyOutputRules = append(tproxyOutputRules, util.IPTableRule{Table: MANGLE, Chain: OvnOutput, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %s -j MARK --set-xmark %s`, podIP, prefixLen, probePort, tProxyOutputMarkMask))})
tproxyPreRoutingRules = append(tproxyPreRoutingRules, util.IPTableRule{Table: MANGLE, Chain: OvnPrerouting, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %s -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP, prefixLen, probePort, util.TProxyListenPort, hostIP, tProxyPreRoutingMarkMask))})
}
}

if err := c.updateIptablesChain(ipt, MANGLE, OvnPrerouting, Prerouting, tproxyPreRoutingRules); err != nil {
klog.Errorf("failed to update chain %s with rules %v: %v", OvnPrerouting, tproxyPreRoutingRules, err)
return err
}

if err := c.updateIptablesChain(ipt, MANGLE, OvnOutput, Output, tproxyOutputRules); err != nil {
klog.Errorf("failed to update chain %s with rules %v: %v", OvnOutput, tproxyOutputRules, err)
return err
}
return nil
}

func (c *Controller) cleanTProxyIPTableRules(protocol string) {
ipt := c.iptables[protocol]
if ipt == nil {
return
}
for _, chain := range [2]string{OvnPrerouting, OvnOutput} {
if err := ipt.ClearChain(MANGLE, chain); err != nil {
klog.Errorf("failed to clear iptables chain %v in table %v, %+v", chain, MANGLE, err)
return
}
}
}

func (c *Controller) reconcileNatOutgoingPolicyIptablesChain(protocol string) error {
ipt := c.iptables[protocol]

Expand Down
Loading

0 comments on commit f4f8041

Please sign in to comment.