Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

custom vpc pod support tcp http probe with tproxy method #3024

Merged
merged 22 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions charts/templates/ovncni-ds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ spec:
- --log_file_max_size=0
- --enable-metrics={{- .Values.networking.ENABLE_METRICS }}
- --kubelet-dir={{ .Values.kubelet_conf.KUBELET_DIR }}
- --enable-tproxy={{ .Values.func.ENABLE_TPROXY }}
securityContext:
runAsUser: 0
privileged: true
Expand Down
1 change: 1 addition & 0 deletions charts/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func:
LOGICAL_GATEWAY: false
ENABLE_BIND_LOCAL_IP: true
U2O_INTERCONNECTION: false
ENABLE_TPROXY: true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

默认需要关闭,避免未知问题影响其他功能


ipv4:
POD_CIDR: "10.16.0.0/16"
Expand Down
15 changes: 1 addition & 14 deletions cmd/daemon/cniserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"k8s.io/klog/v2"
"k8s.io/sample-controller/pkg/signals"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
"github.com/kubeovn/kube-ovn/pkg/daemon"
"github.com/kubeovn/kube-ovn/pkg/util"
Expand Down Expand Up @@ -96,19 +95,7 @@ func CmdMain() {
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
}

addr := "0.0.0.0"
if os.Getenv("ENABLE_BIND_LOCAL_IP") == "true" {
podIpsEnv := os.Getenv("POD_IPS")
podIps := strings.Split(podIpsEnv, ",")
// when pod in dual mode, golang can't support bind v4 and v6 address in the same time,
// so not support bind local ip when in dual mode
if len(podIps) == 1 {
addr = podIps[0]
if util.CheckProtocol(podIps[0]) == kubeovnv1.ProtocolIPv6 {
addr = fmt.Sprintf("[%s]", podIps[0])
}
}
}
addr := util.GetDefaultListenPort()

if config.EnableVerboseConnCheck {
go func() {
Expand Down
2 changes: 2 additions & 0 deletions dist/images/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ IFACE=${IFACE:-}
# Note that the dpdk tunnel iface and tunnel ip cidr should be diffierent with Kubernetes api cidr, otherwise the route will be a problem.
DPDK_TUNNEL_IFACE=${DPDK_TUNNEL_IFACE:-br-phy}
ENABLE_BIND_LOCAL_IP=${ENABLE_BIND_LOCAL_IP:-true}
ENABLE_TPROXY=${ENABLE_TPROXY:-true}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1


# debug
DEBUG_WRAPPER=${DEBUG_WRAPPER:-}
Expand Down Expand Up @@ -4037,6 +4038,7 @@ spec:
- --log_file=/var/log/kube-ovn/kube-ovn-cni.log
- --log_file_max_size=0
- --kubelet-dir=$KUBELET_DIR
- --enable-tproxy=$ENABLE_TPROXY
securityContext:
runAsUser: 0
privileged: true
Expand Down
6 changes: 6 additions & 0 deletions dist/images/uninstall.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ iptables -t filter -D FORWARD -m set --match-set ovn40subnets src -j ACCEPT
iptables -t filter -D FORWARD -m set --match-set ovn40services dst -j ACCEPT
iptables -t filter -D FORWARD -m set --match-set ovn40services src -j ACCEPT
iptables -t filter -D OUTPUT -p udp -m udp --dport 6081 -j MARK --set-xmark 0x0
iptables -t mangle -D PREROUTING -m comment --comment "kube-ovn prerouting rules" -j OVN-PREROUTING
iptables -t mangle -D OUTPUT -m comment --comment "kube-ovn output rules" -j OVN-OUTPUT
iptables -t mangle -F OVN-PREROUTING
iptables -t mangle -X OVN-PREROUTING
iptables -t mangle -F OVN-OUTPUT
iptables -t mangle -X OVN-OUTPUT
zhangzujian marked this conversation as resolved.
Show resolved Hide resolved

sleep 1

Expand Down
3 changes: 3 additions & 0 deletions pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Configuration struct {
EnableVerboseConnCheck bool
TCPConnCheckPort int
UDPConnCheckPort int
EnableTProxy bool
}

// ParseFlags will parse cmd args then init kubeClient and configuration
Expand Down Expand Up @@ -100,6 +101,7 @@ func ParseFlags() *Configuration {
argEnableVerboseConnCheck = pflag.Bool("enable-verbose-conn-check", false, "enable TCP/UDP connectivity check listen port")
argTCPConnectivityCheckPort = pflag.Int("tcp-conn-check-port", 8100, "TCP connectivity Check Port")
argUDPConnectivityCheckPort = pflag.Int("udp-conn-check-port", 8101, "UDP connectivity Check Port")
argEnableTProxy = pflag.Bool("enable-tproxy", true, "enable tproxy for vpc pod liveness or readiness probe")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上

zhangzujian marked this conversation as resolved.
Show resolved Hide resolved
)

// mute info log for ipset lib
Expand Down Expand Up @@ -154,6 +156,7 @@ func ParseFlags() *Configuration {
EnableVerboseConnCheck: *argEnableVerboseConnCheck,
TCPConnCheckPort: *argTCPConnectivityCheckPort,
UDPConnCheckPort: *argUDPConnectivityCheckPort,
EnableTProxy: *argEnableTProxy,
}
return config
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/daemon/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,17 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
}
}, 5*time.Minute, stopCh)

if c.config.EnableTProxy {
go c.StartTProxyForwarding()
go wait.Until(c.runTProxyConfigWorker, 3*time.Second, stopCh)
// Using the tproxy method, kubelet's TCP probe packets cannot reach the namespace of the pod of the custom VPC,
// so tproxy itself probes the pod of the custom VPC, if probe failed remove the iptable rules from
// kubelet to tproxy, if probe success recover the iptable rules
go wait.Until(c.StartTProxyTCPPortProbe, 1*time.Second, stopCh)
} else {
c.cleanTProxyConfig()
}

<-stopCh
klog.Info("Shutting down workers")
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/daemon/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package daemon
import (
"fmt"
"os/exec"
"sort"
"strings"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -246,3 +247,44 @@ func (c *Controller) getEgressNatIpByNode(nodeName string) (map[string]string, e
}
return subnetsNatIp, nil
}

func (c *Controller) getTProxyConditionPod(needSort bool) ([]*v1.Pod, error) {

var filteredPods []*v1.Pod
pods, err := c.podsLister.List(labels.Everything())
if err != nil {
klog.Errorf("list pods failed, %v", err)
return nil, err
}

for _, pod := range pods {
if pod.Spec.NodeName != c.config.NodeName {
continue
}

subnetName, ok := pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, util.OvnProvider)]
if !ok {
continue
}

subnet, err := c.subnetsLister.Get(subnetName)
if err != nil {
err = fmt.Errorf("failed to get subnet '%s', err: %v", subnetName, err)
return nil, err
}

if subnet.Spec.Vpc == c.config.ClusterRouter {
continue
}

filteredPods = append(filteredPods, pod)
}

if needSort {
sort.Slice(filteredPods, func(i, j int) bool {
return filteredPods[i].Namespace+"/"+filteredPods[i].Name < filteredPods[j].Namespace+"/"+filteredPods[j].Name
})
}

return filteredPods, nil
}
156 changes: 156 additions & 0 deletions pkg/daemon/gateway_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ const (

const (
NAT = "nat"
MANGLE = "mangle"
Prerouting = "PREROUTING"
Postrouting = "POSTROUTING"
Output = "OUTPUT"
OvnPrerouting = "OVN-PREROUTING"
OvnPostrouting = "OVN-POSTROUTING"
OvnOutput = "OVN-OUTPUT"
OvnMasquerade = "OVN-MASQUERADE"
OvnNatOutGoingPolicy = "OVN-NAT-POLICY"
OvnNatOutGoingPolicySubnet = "OVN-NAT-PSUBNET-"
Expand All @@ -52,6 +55,10 @@ const (
const (
OnOutGoingNatMark = "0x90001/0x90001"
OnOutGoingForwardMark = "0x90002/0x90002"
TProxyOutputMark = 0x90003
TProxyOutputMask = 0x90003
TProxyPreroutingMark = 0x90004
TProxyPreroutingMask = 0x90004
)

type policyRouteMeta struct {
Expand Down Expand Up @@ -584,9 +591,11 @@ func (c *Controller) setIptables() error {
}
)
protocols := make([]string, 2)
isDual := false
if c.protocol == kubeovnv1.ProtocolDual {
protocols[0] = kubeovnv1.ProtocolIPv4
protocols[1] = kubeovnv1.ProtocolIPv6
isDual = true
} else {
protocols[0] = c.protocol
}
Expand Down Expand Up @@ -733,6 +742,10 @@ func (c *Controller) setIptables() error {
return err
}

if err = c.reconcileTProxyIPTableRules(protocol, isDual); err != nil {
return err
}

if err = c.updateIptablesChain(ipt, NAT, OvnPrerouting, Prerouting, natPreroutingRules); err != nil {
klog.Errorf("failed to update chain %s/%s: %v", NAT, OvnPrerouting)
return err
Expand All @@ -754,6 +767,134 @@ func (c *Controller) setIptables() error {
return nil
}

func (c *Controller) reconcileTProxyIPTableRules(protocol string, isDual bool) error {
if !c.config.EnableTProxy {
return nil
}

ipt := c.iptables[protocol]
tproxyPreRoutingRules := make([]util.IPTableRule, 0)
tproxyOutputRules := make([]util.IPTableRule, 0)
var probePorts []string
zhangzujian marked this conversation as resolved.
Show resolved Hide resolved

pods, err := c.getTProxyConditionPod(true)
if err != nil {
return err
}

for _, pod := range pods {
var podIP string
for _, ip := range pod.Status.PodIPs {
if util.CheckProtocol(ip.IP) == protocol {
podIP = ip.IP
break
}
}

if podIP == "" {
continue
}

for _, container := range pod.Spec.Containers {
if container.ReadinessProbe != nil {
if httpGet := container.ReadinessProbe.HTTPGet; httpGet != nil {
if port := httpGet.Port.String(); port != "" {
probePorts = append(probePorts, port)
}
}

if tcpSocket := container.ReadinessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
if isTCPProbePortReachable, ok := customVPCPodTCPProbeIPPort.Load(getIPPortString(podIP, port)); ok {
if isTCPProbePortReachable.(bool) {
probePorts = append(probePorts, port)
}
}
}
}
}

if container.LivenessProbe != nil {
if httpGet := container.LivenessProbe.HTTPGet; httpGet != nil {
if port := httpGet.Port.String(); port != "" {
probePorts = append(probePorts, port)
}
}

if tcpSocket := container.LivenessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
if isTCPProbePortReachable, ok := customVPCPodTCPProbeIPPort.Load(getIPPortString(podIP, port)); ok {
if isTCPProbePortReachable.(bool) {
probePorts = append(probePorts, port)
}
}
}
}
}
}

if len(probePorts) == 0 {
continue
}

probePorts = formatProbePorts(probePorts)
for _, probePort := range probePorts {
tProxyOutputMarkMask := fmt.Sprintf("%#x/%#x", TProxyOutputMark, TProxyOutputMask)
tProxyPreRoutingMarkMask := fmt.Sprintf("%#x/%#x", TProxyPreroutingMark, TProxyPreroutingMask)

hostIP := pod.Status.HostIP
prefixLen := 32
if protocol == kubeovnv1.ProtocolIPv6 {
prefixLen = 128
}

if isDual || os.Getenv("ENABLE_BIND_LOCAL_IP") == "false" {
if protocol == kubeovnv1.ProtocolIPv4 {
hostIP = "0.0.0.0"
} else if protocol == kubeovnv1.ProtocolIPv6 {
hostIP = "::"
}
}
tproxyOutputRules = append(tproxyOutputRules, util.IPTableRule{Table: MANGLE, Chain: OvnOutput, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %s -j MARK --set-xmark %s`, podIP, prefixLen, probePort, tProxyOutputMarkMask))})
tproxyPreRoutingRules = append(tproxyPreRoutingRules, util.IPTableRule{Table: MANGLE, Chain: OvnPrerouting, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %s -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP, prefixLen, probePort, util.TProxyListenPort, hostIP, tProxyPreRoutingMarkMask))})
}
}

if err := c.updateIptablesChain(ipt, MANGLE, OvnPrerouting, Prerouting, tproxyPreRoutingRules); err != nil {
klog.Errorf("failed to update chain %s with rules %v: %v", OvnPrerouting, tproxyPreRoutingRules, err)
return err
}

if err := c.updateIptablesChain(ipt, MANGLE, OvnOutput, Output, tproxyOutputRules); err != nil {
klog.Errorf("failed to update chain %s with rules %v: %v", OvnOutput, tproxyOutputRules, err)
return err
}
return nil
}

func (c *Controller) cleanTProxyIPTableRules(protocol string) {
ipt := c.iptables[protocol]
for _, chain := range []string{OvnPrerouting, OvnOutput} {
rules, err := ipt.List(MANGLE, chain)
if err != nil {
klog.Errorf("failed to list iptables rules in table %v chain %v, %+v", MANGLE, chain, err)
return
}
for _, rule := range rules {
if !strings.Contains(rule, fmt.Sprintf("%#x", TProxyOutputMark)) &&
!strings.Contains(rule, fmt.Sprintf("%#x", TProxyPreroutingMark)) {
continue
}
rule := rule[4+len(chain):]
spec := util.DoubleQuotedFields(rule)
if err = ipt.Delete(MANGLE, chain, spec...); err != nil {
klog.Errorf(`failed to delete iptables rule "%s": %v`, rule, err)
return
}
}
}
}
zhangzujian marked this conversation as resolved.
Show resolved Hide resolved

func (c *Controller) reconcileNatOutgoingPolicyIptablesChain(protocol string) error {
ipt := c.iptables[protocol]

Expand Down Expand Up @@ -1527,3 +1668,18 @@ func getNatPolicySubnetChainUID(chainName string) string {
func formatIPsetUnPrefix(ipsetName string) string {
return ipsetName[len("ovn40"):]
}

func formatProbePorts(probePorts []string) []string {
// Deduplicate and sort
retProbePorts := make([]string, 0, len(probePorts))
portMap := make(map[string]interface{})
for _, port := range probePorts {
if _, exist := portMap[port]; !exist {
retProbePorts = append(retProbePorts, port)
portMap[port] = nil
}
}

sort.Strings(retProbePorts)
return retProbePorts
}
Loading