From 4e9481a6e37c667d05ab7307224374c464b4bee4 Mon Sep 17 00:00:00 2001 From: cuisongliu Date: Tue, 18 Jan 2022 10:41:16 +0800 Subject: [PATCH] feat(main): add udp support (#40) * feat(main): add udp support --- api/network/v1beta1/types.go | 9 +++ api/network/v1beta1/zz_generated.deepcopy.go | 22 ++++++++ config/crd/sealyun.com_clusterendpoints.yaml | 14 ++++- controllers/run_probe.go | 21 +++++-- controllers/sync.go | 17 +++++- library/probe/udp/udp.go | 59 ++++++++++++++++++++ 6 files changed, 132 insertions(+), 10 deletions(-) create mode 100644 library/probe/udp/udp.go diff --git a/api/network/v1beta1/types.go b/api/network/v1beta1/types.go index 80d6da6..720cb4f 100644 --- a/api/network/v1beta1/types.go +++ b/api/network/v1beta1/types.go @@ -69,6 +69,11 @@ type TCPSocketAction struct { Enable bool `json:"enable" protobuf:"bytes,1,opt,name=enable"` } +// UDPSocketAction describes an action based on opening a socket +type UDPSocketAction struct { + Enable bool `json:"enable" protobuf:"bytes,1,opt,name=enable"` +} + // HTTPGetAction describes an action based on HTTP Get requests. type HTTPGetAction struct { // Path to access on the HTTP server. @@ -92,6 +97,10 @@ type Handler struct { // TCP hooks not yet supported // +optional TCPSocket *TCPSocketAction `json:"tcpSocket,omitempty" protobuf:"bytes,3,opt,name=tcpSocket"` + // UDPSocket specifies an action involving a UDP port. + // UDP hooks not yet supported + // +optional + UDPSocket *UDPSocketAction `json:"udpSocket,omitempty" protobuf:"bytes,4,opt,name=tcpSocket"` } // ClusterEndpointSpec defines the desired state of ClusterEndpoint diff --git a/api/network/v1beta1/zz_generated.deepcopy.go b/api/network/v1beta1/zz_generated.deepcopy.go index cd9eafb..23bd69d 100644 --- a/api/network/v1beta1/zz_generated.deepcopy.go +++ b/api/network/v1beta1/zz_generated.deepcopy.go @@ -15,6 +15,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + // Code generated by deepcopy-gen. DO NOT EDIT. package v1beta1 @@ -188,6 +189,11 @@ func (in *Handler) DeepCopyInto(out *Handler) { *out = new(TCPSocketAction) **out = **in } + if in.UDPSocket != nil { + in, out := &in.UDPSocket, &out.UDPSocket + *out = new(UDPSocketAction) + **out = **in + } return } @@ -233,3 +239,19 @@ func (in *TCPSocketAction) DeepCopy() *TCPSocketAction { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UDPSocketAction) DeepCopyInto(out *UDPSocketAction) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UDPSocketAction. +func (in *UDPSocketAction) DeepCopy() *UDPSocketAction { + if in == nil { + return nil + } + out := new(UDPSocketAction) + in.DeepCopyInto(out) + return out +} diff --git a/config/crd/sealyun.com_clusterendpoints.yaml b/config/crd/sealyun.com_clusterendpoints.yaml index 94418f7..e610575 100644 --- a/config/crd/sealyun.com_clusterendpoints.yaml +++ b/config/crd/sealyun.com_clusterendpoints.yaml @@ -142,9 +142,8 @@ spec: format: int32 type: integer tcpSocket: - description: 'TCPSocket specifies an action involving a TCP - port. TCP hooks not yet supported TODO: implement a realistic - TCP lifecycle hook' + description: TCPSocket specifies an action involving a TCP port. + TCP hooks not yet supported properties: enable: type: boolean @@ -157,6 +156,15 @@ spec: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes' format: int32 type: integer + udpSocket: + description: UDPSocket specifies an action involving a UDP port. + UDP hooks not yet supported + properties: + enable: + type: boolean + required: + - enable + type: object required: - port - targetPort diff --git a/controllers/run_probe.go b/controllers/run_probe.go index c2901de..816b009 100644 --- a/controllers/run_probe.go +++ b/controllers/run_probe.go @@ -24,6 +24,7 @@ import ( execprobe "github.com/sealyun/endpoints-operator/library/probe/exec" httpprobe "github.com/sealyun/endpoints-operator/library/probe/http" tcpprobe "github.com/sealyun/endpoints-operator/library/probe/tcp" + udpprobe "github.com/sealyun/endpoints-operator/library/probe/udp" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/intstr" urutime "k8s.io/apimachinery/pkg/util/runtime" @@ -38,17 +39,18 @@ import ( type work struct { p *v1.Probe + network string resultRun int lastResult probe.Result err error } -func (pb *prober) runProbeWithRetries(p *v1.Probe, retries int) (probe.Result, string, error) { +func (pb *prober) runProbeWithRetries(p *v1.Probe, network string, retries int) (probe.Result, string, error) { var err error var result probe.Result var output string for i := 0; i < retries; i++ { - result, output, err = pb.runProbe(p) + result, output, err = pb.runProbe(p, network) if err == nil { return result, output, nil } @@ -62,7 +64,7 @@ func (w *work) doProbe() (keepGoing bool) { // the full container environment here, OR we must make a call to the CRI in order to get those environment // values from the running container. - result, output, err := proberCheck.runProbeWithRetries(w.p, 3) + result, output, err := proberCheck.runProbeWithRetries(w.p, w.network, 3) if err != nil { w.err = err return false @@ -94,6 +96,7 @@ type prober struct { exec execprobe.Prober http httpprobe.Prober tcp tcpprobe.Prober + udp udpprobe.Prober } var proberCheck = newProber() @@ -107,10 +110,11 @@ func newProber() *prober { exec: execprobe.New(), http: httpprobe.New(followNonLocalRedirects), tcp: tcpprobe.New(), + udp: udpprobe.New(), } } -func (pb *prober) runProbe(p *v1.Probe) (probe.Result, string, error) { +func (pb *prober) runProbe(p *v1.Probe, network string) (probe.Result, string, error) { timeout := time.Duration(p.TimeoutSeconds) * time.Second if p.Exec != nil { klog.V(4).Infof("Exec-Probe Command: %v", p.Exec.Command) @@ -137,8 +141,13 @@ func (pb *prober) runProbe(p *v1.Probe) (probe.Result, string, error) { return probe.Unknown, "", err } host := p.TCPSocket.Host - klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout) - return pb.tcp.Probe(host, port, timeout) + if network == "tcp" { + klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout) + return pb.tcp.Probe(host, port, timeout) + } else { + klog.V(4).Infof("UDP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout) + return pb.udp.Probe(host, port, timeout) + } } klog.Warning("failed to find probe builder") return probe.Warning, "", nil diff --git a/controllers/sync.go b/controllers/sync.go index ee00a83..7838f17 100644 --- a/controllers/sync.go +++ b/controllers/sync.go @@ -173,13 +173,28 @@ func healthyCheck(host string, cep *v1beta1.ClusterEndpoint) ([]v1beta1.ServiceP HTTPHeaders: port.HTTPGet.HTTPHeaders, } } + if port.UDPSocket != nil && port.UDPSocket.Enable { + pro.TCPSocket = &corev1.TCPSocketAction{ + Port: intstr.FromInt(int(port.TargetPort)), + Host: host, + } + } + network := "" if port.TCPSocket != nil && port.TCPSocket.Enable { + network = "tcp" + pro.TCPSocket = &corev1.TCPSocketAction{ + Port: intstr.FromInt(int(port.TargetPort)), + Host: host, + } + } + if port.UDPSocket != nil && port.UDPSocket.Enable { + network = "udp" pro.TCPSocket = &corev1.TCPSocketAction{ Port: intstr.FromInt(int(port.TargetPort)), Host: host, } } - w := &work{p: pro} + w := &work{p: pro, network: network} for w.doProbe() { } mx.Lock() diff --git a/library/probe/udp/udp.go b/library/probe/udp/udp.go new file mode 100644 index 0000000..8748fee --- /dev/null +++ b/library/probe/udp/udp.go @@ -0,0 +1,59 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package udp + +import ( + "github.com/sealyun/endpoints-operator/library/probe" + "k8s.io/klog" + "net" + "strconv" + "time" +) + +// New creates Prober. +func New() Prober { + return udpProber{} +} + +// Prober is an interface that defines the Probe function for doing UDP readiness/liveness checks. +type Prober interface { + Probe(host string, port int, timeout time.Duration) (probe.Result, string, error) +} + +type udpProber struct{} + +// Probe returns a ProbeRunner capable of running an UDP check. +func (pr udpProber) Probe(host string, port int, timeout time.Duration) (probe.Result, string, error) { + return DoUDPProbe(net.JoinHostPort(host, strconv.Itoa(port)), timeout) +} + +// DoUDPProbe checks that a UDP socket to the address can be opened. +// If the socket can be opened, it returns Success +// If the socket fails to open, it returns Failure. +// This is exported because some other packages may want to do direct TCP probes. +func DoUDPProbe(addr string, timeout time.Duration) (probe.Result, string, error) { + conn, err := net.DialTimeout("udp", addr, timeout) + if err != nil { + // Convert errors to failures to handle timeouts. + return probe.Failure, err.Error(), nil + } + err = conn.Close() + if err != nil { + klog.Errorf("Unexpected error closing UDP probe socket: %v (%#v)", err, err) + } + return probe.Success, "", nil +}