Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
changluyi committed Jul 11, 2023
1 parent 0646523 commit 05e8aa5
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 164 deletions.
2 changes: 1 addition & 1 deletion pkg/daemon/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
}, 5*time.Minute, stopCh)

if c.config.EnableTProxy {
go c.StartTProxyForwarding(stopCh)
go c.StartTProxyForwarding()
go wait.Until(c.runTProxyConfigWorker, 3*time.Second, stopCh)
// Using the tproxy method, kubelet's TCP probe packets cannot reach the namespace of the pod of the custom VPC,
// so tproxy itself probes the pod of the custom VPC, if probe failed remove the iptable rules from
Expand Down
34 changes: 18 additions & 16 deletions pkg/daemon/gateway_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ const (
const (
OnOutGoingNatMark = "0x90001/0x90001"
OnOutGoingForwardMark = "0x90002/0x90002"
TProxyPostroutingMark = 0x90003
TProxyPostroutingMask = 0x90003
TProxyOutputMark = 0x90003
TProxyOutputMask = 0x90003
TProxyPreroutingMark = 0x90004
TProxyPreroutingMask = 0x90004
)
Expand Down Expand Up @@ -841,7 +841,11 @@ func (c *Controller) reconcileTProxyIPTableRules(protocol string) error {

if tcpSocket := container.ReadinessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
probePorts = append(probePorts, port)
if isTCPProbePortReachable, ok := customVPCPodTCPProbeIPPort.Load(getIPPortString(podIP, port)); ok {
if isTCPProbePortReachable.(bool) {
probePorts = append(probePorts, port)
}
}
}
}
}
Expand All @@ -855,7 +859,11 @@ func (c *Controller) reconcileTProxyIPTableRules(protocol string) error {

if tcpSocket := container.LivenessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
probePorts = append(probePorts, port)
if isTCPProbePortReachable, ok := customVPCPodTCPProbeIPPort.Load(getIPPortString(podIP, port)); ok {
if isTCPProbePortReachable.(bool) {
probePorts = append(probePorts, port)
}
}
}
}
}
Expand All @@ -867,21 +875,15 @@ func (c *Controller) reconcileTProxyIPTableRules(protocol string) error {

probePorts = formatProbePorts(probePorts)
for _, probePort := range probePorts {
if isTCPProbePortReachable, ok := customVPCPodTCPProbeIPPort.Load(getIPPortString(podIP, probePort)); ok {
if !isTCPProbePortReachable.(bool) {
continue
}
}

tProxyPostroutingMarkMask := fmt.Sprintf("%#x/%#x", TProxyPostroutingMark, TProxyPostroutingMask)
tProxyOutputMarkMask := fmt.Sprintf("%#x/%#x", TProxyOutputMark, TProxyOutputMask)
tProxyPreRoutingMarkMask := fmt.Sprintf("%#x/%#x", TProxyPreroutingMark, TProxyPreroutingMask)
if protocol == kubeovnv1.ProtocolIPv4 {
tproxyOutputRules = append(tproxyOutputRules, util.IPTableRule{Table: MANGLE, Chain: OvnOutput, Rule: strings.Fields(fmt.Sprintf(`-d %s/32 -p tcp -m tcp --dport %s -j MARK --set-xmark %s`, podIP, probePort, tProxyPostroutingMarkMask))})
tproxyPreRoutingRules = append(tproxyPreRoutingRules, util.IPTableRule{Table: MANGLE, Chain: OvnPrerouting, Rule: strings.Fields(fmt.Sprintf(`-d %s/32 -p tcp -m tcp --dport %s -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP, probePort, util.TProxyListenPort, pod.Status.HostIP, tProxyPreRoutingMarkMask))})
tproxyOutputRules = append(tproxyOutputRules, util.IPTableRule{Table: MANGLE, Chain: OvnOutput, Rule: strings.Fields(fmt.Sprintf(`-d %s/32 -p tcp -m tcp --dport %s -j MARK --set-xmark %s`, podIP, probePort, tProxyOutputMarkMask))})
tproxyPreRoutingRules = append(tproxyPreRoutingRules, util.IPTableRule{Table: MANGLE, Chain: OvnPrerouting, Rule: strings.Fields(fmt.Sprintf(`-d %s/32 -p tcp -m tcp --dport %s -j TPROXY --on-port %d --on-ip 0.0.0.0 --tproxy-mark %s`, podIP, probePort, util.TProxyListenPort, tProxyPreRoutingMarkMask))})
}
if protocol == kubeovnv1.ProtocolIPv6 {
tproxyOutputRules = append(tproxyOutputRules, util.IPTableRule{Table: MANGLE, Chain: OvnOutput, Rule: strings.Fields(fmt.Sprintf(`-d %s/128 -p tcp -m tcp --dport %s -j MARK --set-xmark %s`, podIP, probePort, tProxyPostroutingMarkMask))})
tproxyPreRoutingRules = append(tproxyPreRoutingRules, util.IPTableRule{Table: MANGLE, Chain: OvnPrerouting, Rule: strings.Fields(fmt.Sprintf(`-d %s/128 -p tcp -m tcp --dport %s -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP, probePort, util.TProxyListenPort, pod.Status.HostIP, tProxyPreRoutingMarkMask))})
tproxyOutputRules = append(tproxyOutputRules, util.IPTableRule{Table: MANGLE, Chain: OvnOutput, Rule: strings.Fields(fmt.Sprintf(`-d %s/128 -p tcp -m tcp --dport %s -j MARK --set-xmark %s`, podIP, probePort, tProxyOutputMarkMask))})
tproxyPreRoutingRules = append(tproxyPreRoutingRules, util.IPTableRule{Table: MANGLE, Chain: OvnPrerouting, Rule: strings.Fields(fmt.Sprintf(`-d %s/128 -p tcp -m tcp --dport %s -j TPROXY --on-port %d --on-ip :: --tproxy-mark %s`, podIP, probePort, util.TProxyListenPort, tProxyPreRoutingMarkMask))})
}
}
}
Expand All @@ -907,7 +909,7 @@ func (c *Controller) cleanTProxyIPTableRules(protocol string) {
return
}
for _, rule := range rules {
if !strings.Contains(rule, fmt.Sprintf("%#x", TProxyPostroutingMark)) &&
if !strings.Contains(rule, fmt.Sprintf("%#x", TProxyOutputMark)) &&
!strings.Contains(rule, fmt.Sprintf("%#x", TProxyPreroutingMark)) {
continue
}
Expand Down
175 changes: 84 additions & 91 deletions pkg/daemon/tproxy.go → pkg/daemon/tproxy_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var (
customVPCPodTCPProbeIPPort sync.Map
)

func (c *Controller) StartTProxyForwarding(stopCh <-chan struct{}) {
func (c *Controller) StartTProxyForwarding() {
var err error
addr := GetDefaultListenPort()

Expand All @@ -51,76 +51,82 @@ func (c *Controller) StartTProxyForwarding(stopCh <-chan struct{}) {
}
}()

go listenTCP()

<-stopCh
}

func (c *Controller) StartTProxyTCPPortProbe() {

for {
var probePorts []string
pods, err := c.podsLister.List(labels.Everything())
conn, err := tcpListener.Accept()
if err != nil {
klog.Errorf("failed to list pods: %v", err)
if netErr, ok := err.(net.Error); ok {
klog.Errorf("Temporary error while accepting connection: %s", netErr)
}
klog.Fatalf("Unrecoverable error while accepting connection: %s", err)
return
}
go handleRedirectFlow(conn)
}
}

if len(pods) == 0 {
return
}
func (c *Controller) StartTProxyTCPPortProbe() {

for _, pod := range pods {
podName := pod.Name
probePorts := map[string]interface{}{}
pods, err := c.podsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list pods: %v", err)
return
}

if pod.Spec.NodeName != c.config.NodeName {
continue
}
if len(pods) == 0 {
return
}

subnetName, ok := pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, util.OvnProvider)]
if !ok {
continue
}
for _, pod := range pods {
podName := pod.Name

subnet, err := c.subnetsLister.Get(subnetName)
if err != nil {
klog.Errorf("failed to get subnet '%s', err: %v", subnetName, err)
continue
}
if pod.Spec.NodeName != c.config.NodeName {
continue
}

if subnet.Spec.Vpc == c.config.ClusterRouter {
continue
}
subnetName, ok := pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, util.OvnProvider)]
if !ok {
continue
}

iface := ovs.PodNameToPortName(podName, pod.Namespace, util.OvnProvider)
nsName, err := ovs.GetInterfacePodNs(iface)
if err != nil || nsName == "" {
continue
}
subnet, err := c.subnetsLister.Get(subnetName)
if err != nil {
klog.Errorf("failed to get subnet '%s', err: %v", subnetName, err)
continue
}

for _, podIP := range pod.Status.PodIPs {
customVPCPodIPToNs.Store(podIP.IP, nsName)
for _, container := range pod.Spec.Containers {
if container.ReadinessProbe != nil {
if tcpSocket := container.ReadinessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
probePorts = append(probePorts, port)
}
if subnet.Spec.Vpc == c.config.ClusterRouter {
continue
}

iface := ovs.PodNameToPortName(podName, pod.Namespace, util.OvnProvider)
nsName, err := ovs.GetInterfacePodNs(iface)
if err != nil || nsName == "" {
continue
}

for _, podIP := range pod.Status.PodIPs {
customVPCPodIPToNs.Store(podIP.IP, nsName)
for _, container := range pod.Spec.Containers {
if container.ReadinessProbe != nil {
if tcpSocket := container.ReadinessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
probePorts[port] = nil
}
}
}

if container.LivenessProbe != nil {
if tcpSocket := container.LivenessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
probePorts = append(probePorts, port)
}
if container.LivenessProbe != nil {
if tcpSocket := container.LivenessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
probePorts[port] = nil
}
}
}
}

for _, port := range probePorts {
probePortInNs(podIP.IP, port, false, nil)
}
for port := range probePorts {
probePortInNs(podIP.IP, port, true, nil)
}
}
}
Expand All @@ -135,7 +141,7 @@ func (c *Controller) runTProxyConfigWorker() {

func (c *Controller) reconcileTProxyRoutes(protocol string) {
family := getFamily(protocol)
if err := addRuleIfNotExist(family, TProxyPostroutingMark, TProxyPostroutingMask, util.TProxyRouteTable); err != nil {
if err := addRuleIfNotExist(family, TProxyOutputMark, TProxyOutputMask, util.TProxyRouteTable); err != nil {
return
}

Expand All @@ -159,8 +165,8 @@ func (c *Controller) cleanTProxyConfig() {

func (c *Controller) cleanTProxyRoutes(protocol string) {
family := getFamily(protocol)
if err := deleteRuleIfExists(family, TProxyPostroutingMark); err != nil {
klog.Errorf("delete tproxy route rule mark %v failed err: %v ", TProxyPostroutingMark, err)
if err := deleteRuleIfExists(family, TProxyOutputMark); err != nil {
klog.Errorf("delete tproxy route rule mark %v failed err: %v ", TProxyOutputMark, err)
}

if err := deleteRuleIfExists(family, TProxyPreroutingMark); err != nil {
Expand Down Expand Up @@ -286,24 +292,9 @@ func delRouteIfExist(family, table int, dst *net.IPNet) error {
return nil
}

func listenTCP() {
for {
conn, err := tcpListener.Accept()
if err != nil {
if netErr, ok := err.(net.Error); ok {
klog.Errorf("Temporary error while accepting connection: %s", netErr)
}
klog.Fatalf("Unrecoverable error while accepting connection: %s", err)
return
}

go handleRedirectFlow(conn)
}
}

func handleRedirectFlow(conn net.Conn) {

klog.V(5).Info("Accepting TCP connection from %v with destination of %v", conn.RemoteAddr().String(), conn.LocalAddr().String())
klog.V(5).Infof("Accepting TCP connection from %v with destination of %v", conn.RemoteAddr().String(), conn.LocalAddr().String())

defer func() {
if err := conn.Close(); err != nil {
Expand All @@ -322,10 +313,10 @@ func handleRedirectFlow(conn net.Conn) {
probePort = ret[1]
}

probePortInNs(podIP, probePort, true, conn)
probePortInNs(podIP, probePort, false, conn)
}

func probePortInNs(podIP, probePort string, transferHTTPMessage bool, conn net.Conn) {
func probePortInNs(podIP, probePort string, isTProxyProbe bool, conn net.Conn) {
podNs, ok := customVPCPodIPToNs.Load(podIP)
if !ok {
return
Expand All @@ -339,7 +330,7 @@ func probePortInNs(podIP, probePort string, transferHTTPMessage bool, conn net.C
podNS, err := ns.GetNS(podNs.(string))
if err != nil {
customVPCPodIPToNs.Delete(podIP)
klog.Errorf("Can't get ns %s with err: %v", podNs, err)
klog.Infof("ns %s already deleted", podNs)
return
}

Expand All @@ -363,12 +354,17 @@ func probePortInNs(podIP, probePort string, transferHTTPMessage bool, conn net.C
remotepodTcpAddr = net.TCPAddr{IP: net.ParseIP(podIP), Port: iprobePort}
}

remoteConn, err := goTProxy.DialTCP(&localpodTcpAddr, &remotepodTcpAddr, transferHTTPMessage)
remoteConn, err := goTProxy.DialTCP(&localpodTcpAddr, &remotepodTcpAddr, !isTProxyProbe)
if err != nil {
customVPCPodTCPProbeIPPort.Store(getIPPortString(podIP, probePort), false)
if isTProxyProbe {
customVPCPodTCPProbeIPPort.Store(getIPPortString(podIP, probePort), false)
}
return nil
} else {
}

if isTProxyProbe {
customVPCPodTCPProbeIPPort.Store(getIPPortString(podIP, probePort), true)
return nil
}

defer func() {
Expand All @@ -377,24 +373,21 @@ func probePortInNs(podIP, probePort string, transferHTTPMessage bool, conn net.C
}
}()

if transferHTTPMessage {
var streamWait sync.WaitGroup
streamWait.Add(2)

streamConn := func(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
klog.Errorf("copy stream from dst %v to src %v failed err: %v ", dst, src, err)
}
var streamWait sync.WaitGroup
streamWait.Add(2)

streamWait.Done()
streamConn := func(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
klog.Errorf("copy stream from dst %v to src %v failed err: %v ", dst, src, err)
}

go streamConn(remoteConn, conn)
go streamConn(conn, remoteConn)

streamWait.Wait()
return nil
streamWait.Done()
}

go streamConn(remoteConn, conn)
go streamConn(conn, remoteConn)

streamWait.Wait()
return nil
})
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/daemon/tproxy_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package daemon

func (c *Controller) StartTProxyForwarding(stopCh <-chan struct{}) {
return nil
}

func (c *Controller) StartTProxyTCPPortProbe() {
return nil
}

func (c *Controller) runTProxyConfigWorker() {
return nil
}
3 changes: 0 additions & 3 deletions pkg/tproxy/tproxy_tcp.go → pkg/tproxy/tproxy_tcp_linux.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
//go:build linux
// +build linux

package tproxy

import (
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/framework/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ const (
PauseImage = "kubeovn/pause:3.2"
BusyBoxImage = "busybox:stable"
AgnhostImage = "kubeovn/agnhost:2.43"
NginxImage = "nginx:alpine"
NginxImage = "nginx:latest"
)
Loading

0 comments on commit 05e8aa5

Please sign in to comment.