diff --git a/README.md b/README.md index 15ae0d5..97c7165 100644 --- a/README.md +++ b/README.md @@ -79,5 +79,7 @@ spec: port: 80 protocol: UDP targetPort: 80 - ###udp暂时不支持探活 + udpSocket: + enable: true + data: "testdata" ``` diff --git a/api/network/v1beta1/types.go b/api/network/v1beta1/types.go index e4baff2..b7cdcdd 100644 --- a/api/network/v1beta1/types.go +++ b/api/network/v1beta1/types.go @@ -69,6 +69,14 @@ type TCPSocketAction struct { Enable bool `json:"enable" protobuf:"bytes,1,opt,name=enable"` } +// UDPSocketAction describes an action based on opening a socket +type UDPSocketAction struct { + Enable bool `json:"enable" protobuf:"bytes,1,opt,name=enable"` + // UDP test data + // +optional + Data string `json:"data" protobuf:"bytes,2,opt,name=data"` +} + // HTTPGetAction describes an action based on HTTP Get requests. type HTTPGetAction struct { // Path to access on the HTTP server. @@ -102,10 +110,15 @@ type Handler struct { // TCP hooks not yet supported // +optional TCPSocket *TCPSocketAction `json:"tcpSocket,omitempty" protobuf:"bytes,3,opt,name=tcpSocket"` + // UDPSocketAction specifies an action involving a UDP port. + // UDP hooks not yet supported + // +optional + UDPSocket *UDPSocketAction `json:"udpSocket,omitempty" protobuf:"bytes,4,opt,name=udpSocket"` + // GRPC specifies an action involving a GRPC port. // This is an alpha field and requires enabling GRPCContainerProbe feature gate. // +optional - GRPC *GRPCAction `json:"grpc,omitempty" protobuf:"bytes,4,opt,name=grpc"` + GRPC *GRPCAction `json:"grpc,omitempty" protobuf:"bytes,5,opt,name=grpc"` } // ClusterEndpointSpec defines the desired state of ClusterEndpoint diff --git a/api/network/v1beta1/zz_generated.deepcopy.go b/api/network/v1beta1/zz_generated.deepcopy.go index 6a86d58..e7cbef7 100644 --- a/api/network/v1beta1/zz_generated.deepcopy.go +++ b/api/network/v1beta1/zz_generated.deepcopy.go @@ -210,6 +210,11 @@ func (in *Handler) DeepCopyInto(out *Handler) { *out = new(TCPSocketAction) **out = **in } + if in.UDPSocket != nil { + in, out := &in.UDPSocket, &out.UDPSocket + *out = new(UDPSocketAction) + **out = **in + } if in.GRPC != nil { in, out := &in.GRPC, &out.GRPC *out = new(GRPCAction) @@ -260,3 +265,19 @@ func (in *TCPSocketAction) DeepCopy() *TCPSocketAction { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UDPSocketAction) DeepCopyInto(out *UDPSocketAction) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UDPSocketAction. +func (in *UDPSocketAction) DeepCopy() *UDPSocketAction { + if in == nil { + return nil + } + out := new(UDPSocketAction) + in.DeepCopyInto(out) + return out +} diff --git a/config/charts/endpoints-operator/crds/sealyun.com_clusterendpoints.yaml b/config/charts/endpoints-operator/crds/sealyun.com_clusterendpoints.yaml index 2085c16..a7c8456 100644 --- a/config/charts/endpoints-operator/crds/sealyun.com_clusterendpoints.yaml +++ b/config/charts/endpoints-operator/crds/sealyun.com_clusterendpoints.yaml @@ -168,6 +168,18 @@ spec: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes' format: int32 type: integer + udpSocket: + description: UDPSocketAction specifies an action involving a + UDP port. UDP hooks not yet supported + properties: + data: + description: UDP test data + type: string + enable: + type: boolean + required: + - enable + type: object required: - port - targetPort diff --git a/config/crd/sealyun.com_clusterendpoints.yaml b/config/crd/sealyun.com_clusterendpoints.yaml index 2085c16..a7c8456 100644 --- a/config/crd/sealyun.com_clusterendpoints.yaml +++ b/config/crd/sealyun.com_clusterendpoints.yaml @@ -168,6 +168,18 @@ spec: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes' format: int32 type: integer + udpSocket: + description: UDPSocketAction specifies an action involving a + UDP port. UDP hooks not yet supported + properties: + data: + description: UDP test data + type: string + enable: + type: boolean + required: + - enable + type: object required: - port - targetPort diff --git a/config/demo/dmz-kube.yaml b/config/demo/dmz-kube.yaml index 84c9106..ef36207 100644 --- a/config/demo/dmz-kube.yaml +++ b/config/demo/dmz-kube.yaml @@ -36,4 +36,4 @@ spec: targetPort: 1234 udpSocket: enable: true - testData: "testdata" + data: "testdata" diff --git a/controllers/run_probe.go b/controllers/run_probe.go index c3d478a..17c79fc 100644 --- a/controllers/run_probe.go +++ b/controllers/run_probe.go @@ -33,6 +33,7 @@ import ( grpcprobe "github.com/sealyun/endpoints-operator/library/probe/grpc" httpprobe "github.com/sealyun/endpoints-operator/library/probe/http" tcpprobe "github.com/sealyun/endpoints-operator/library/probe/tcp" + udpprobe "github.com/sealyun/endpoints-operator/library/probe/udp" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/intstr" urutime "k8s.io/apimachinery/pkg/util/runtime" @@ -41,19 +42,18 @@ import ( type work struct { p *libv1.Probe - network string resultRun int lastResult probe.Result retry int err error } -func (pb *prober) runProbeWithRetries(p *libv1.Probe, network string, retries int) (probe.Result, string, error) { +func (pb *prober) runProbeWithRetries(p *libv1.Probe, retries int) (probe.Result, string, error) { var err error var result probe.Result var output string for i := 0; i < retries; i++ { - result, output, err = pb.runProbe(p, network) + result, output, err = pb.runProbe(p) if err == nil { return result, output, nil } @@ -67,7 +67,7 @@ func (w *work) doProbe() (keepGoing bool) { // the full container environment here, OR we must make a call to the CRI in order to get those environment // values from the running container. - result, output, err := proberCheck.runProbeWithRetries(w.p, w.network, w.retry) + result, output, err := proberCheck.runProbeWithRetries(w.p, w.retry) if err != nil { w.err = err return false @@ -99,6 +99,7 @@ type prober struct { exec execprobe.Prober http httpprobe.Prober tcp tcpprobe.Prober + udp udpprobe.Prober grpc grpcprobe.Prober } @@ -113,11 +114,12 @@ func newProber() *prober { exec: execprobe.New(), http: httpprobe.New(followNonLocalRedirects), tcp: tcpprobe.New(), + udp: udpprobe.New(), grpc: grpcprobe.New(), } } -func (pb *prober) runProbe(p *libv1.Probe, network string) (probe.Result, string, error) { +func (pb *prober) runProbe(p *libv1.Probe) (probe.Result, string, error) { timeout := time.Duration(p.TimeoutSeconds) * time.Second if p.Exec != nil { klog.V(4).Infof("Exec-Probe Command: %v", p.Exec.Command) @@ -144,10 +146,17 @@ func (pb *prober) runProbe(p *libv1.Probe, network string) (probe.Result, string return probe.Unknown, "", err } host := p.TCPSocket.Host - if network == "tcp" { - klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout) - return pb.tcp.Probe(host, port, timeout) + klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout) + return pb.tcp.Probe(host, port, timeout) + } + if p.UDPSocket != nil { + port, err := extractPort(p.UDPSocket.Port) + if err != nil { + return probe.Unknown, "", err } + host := p.UDPSocket.Host + klog.V(4).Infof("UDP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout) + return pb.udp.Probe(host, port, p.UDPSocket.Data, timeout) } if p.GRPC != nil { host := &(p.GRPC.Host) diff --git a/controllers/sync.go b/controllers/sync.go index d087c72..45d04e2 100644 --- a/controllers/sync.go +++ b/controllers/sync.go @@ -175,13 +175,19 @@ func healthyCheck(host string, cep *v1beta1.ClusterEndpoint, retry int) ([]v1bet HTTPHeaders: port.HTTPGet.HTTPHeaders, } } - network := "tcp" if port.TCPSocket != nil && port.TCPSocket.Enable { pro.TCPSocket = &libv1.TCPSocketAction{ Port: intstr.FromInt(int(port.TargetPort)), Host: host, } } + if port.UDPSocket != nil && port.UDPSocket.Enable { + pro.UDPSocket = &libv1.UDPSocketAction{ + Port: intstr.FromInt(int(port.TargetPort)), + Host: host, + Data: port.UDPSocket.Data, + } + } if port.GRPC != nil { pro.GRPC = &libv1.GRPCAction{ Port: port.TargetPort, @@ -189,7 +195,7 @@ func healthyCheck(host string, cep *v1beta1.ClusterEndpoint, retry int) ([]v1bet Service: port.GRPC.Service, } } - w := &work{p: pro, network: network, retry: retry} + w := &work{p: pro, retry: retry} for w.doProbe() { } mx.Lock() diff --git a/library/api/core/v1/types.go b/library/api/core/v1/types.go index 5fb5347..b093107 100644 --- a/library/api/core/v1/types.go +++ b/library/api/core/v1/types.go @@ -62,6 +62,19 @@ type TCPSocketAction struct { Host string `json:"host,omitempty" protobuf:"bytes,2,opt,name=host"` } +// UDPSocketAction describes an action based on opening a socket +type UDPSocketAction struct { + // Number or name of the port to access on the container. + // Number must be in the range 1 to 65535. + // Name must be an IANA_SVC_NAME. + Port intstr.IntOrString `json:"port" protobuf:"bytes,1,opt,name=port"` + // Optional: Host name to connect to, defaults to the pod IP. + // +optional + Host string `json:"host,omitempty" protobuf:"bytes,2,opt,name=host"` + // +optional + Data string `json:"data,omitempty" protobuf:"bytes,3,opt,name=host"` +} + type GRPCAction struct { // Port number of the gRPC service. Number must be in the range 1 to 65535. Port int32 `json:"port" protobuf:"bytes,1,opt,name=port"` @@ -101,12 +114,15 @@ type ProbeHandler struct { // TCPSocket specifies an action involving a TCP port. // +optional TCPSocket *TCPSocketAction `json:"tcpSocket,omitempty" protobuf:"bytes,3,opt,name=tcpSocket"` + // UDPSocket specifies an action involving a UDP port. + // +optional + UDPSocket *UDPSocketAction `json:"udpSocket,omitempty" protobuf:"bytes,4,opt,name=udpSocket"` // GRPC specifies an action involving a GRPC port. // This is an alpha field and requires enabling GRPCContainerProbe feature gate. // +featureGate=GRPCContainerProbe // +optional - GRPC *GRPCAction `json:"grpc,omitempty" protobuf:"bytes,4,opt,name=grpc"` + GRPC *GRPCAction `json:"grpc,omitempty" protobuf:"bytes,5,opt,name=grpc"` } // Probe describes a health check to be performed against a container to determine whether it is diff --git a/library/probe/udp/udp.go b/library/probe/udp/udp.go index 12eae06..dd4ff60 100644 --- a/library/probe/udp/udp.go +++ b/library/probe/udp/udp.go @@ -18,7 +18,6 @@ package udp import ( "errors" - "fmt" "github.com/sealyun/endpoints-operator/library/probe" "k8s.io/klog" "net" @@ -63,22 +62,22 @@ func DoUDPProbe(addr string, testData string, timeout time.Duration) (probe.Resu deadline := time.Now().Add(timeout * time.Second) _ = conn.SetWriteDeadline(deadline) buf := []byte(testData) - _, err1 := conn.Write(buf) - if err1 != nil { - return probe.Failure, err1.Error(), nil + _, err = conn.Write(buf) + if err != nil { + return probe.Failure, err.Error(), nil } _ = conn.SetReadDeadline(deadline) bufr := make([]byte, 1024) - read, err2 := conn.Read(bufr) + read, err := conn.Read(bufr) - if err2 != nil { - klog.Errorf("Unexpected error closing UDP probe socket: %v (%#v)", err2, err2) + if err != nil { + klog.Errorf("Unexpected error closing UDP probe socket: %v", err) return probe.Failure, errors.New("io read timout").Error(), nil } if read > 0 { - fmt.Println("recv:", string(bufr[:read])) + klog.V(4).Info("recv:", string(bufr[:read])) return probe.Success, "", nil } else { - return probe.Failure, err2.Error(), nil + return probe.Failure, "not recv any data", nil } } diff --git a/library/probe/udp/udp_test.go b/library/probe/udp/udp_test.go index 134d5af..a07552e 100644 --- a/library/probe/udp/udp_test.go +++ b/library/probe/udp/udp_test.go @@ -1,7 +1,20 @@ +// Copyright © 2022 The sealyun Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package udp import ( - "fmt" "github.com/sealyun/endpoints-operator/library/probe" "net" "strings" @@ -11,7 +24,7 @@ import ( const maxBufferSize = 1024 -func udpServer(addr string) { +func udpServer(t *testing.T, addr string) { listener, err := net.ListenPacket("udp", addr) if err != nil { return @@ -31,12 +44,12 @@ func udpServer(addr string) { } str1 := string(buffer[:n]) if strings.ToLower(str1) == "toerror" { - fmt.Println("nothing no send ...") + t.Log("nothing no send ...") buffer = []byte{} n = 0 break } else { - fmt.Printf("packet-received: bytes=%d from=%s\n", n, caddr.String()) + t.Logf("packet-received: bytes=%d from=%s", n, caddr.String()) } // write @@ -53,7 +66,7 @@ func udpServer(addr string) { return } - fmt.Printf("packet-written: bytes=%d to=%s\n", n, caddr.String()) + t.Logf("packet-written: bytes=%d to=%s\n", n, caddr.String()) } }() @@ -91,8 +104,8 @@ func TestUDPProbe(t *testing.T) { }{addr: "127.0.0.1:38889", testData: "toerror", timeout: 1}, want: probe.Failure, want1: "io read timout", wantErr: false}, } // start two udp server - go udpServer("127.0.0.1:38888") - go udpServer("127.0.0.1:38889") + go udpServer(t, "127.0.0.1:38888") + go udpServer(t, "127.0.0.1:38889") for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/library/probe/udp/udpclient.go b/library/probe/udp/udpclient.go deleted file mode 100644 index d770d1e..0000000 --- a/library/probe/udp/udpclient.go +++ /dev/null @@ -1,76 +0,0 @@ -package udp - -import ( - "fmt" - "k8s.io/klog" - "net" - "strconv" - "time" -) - -func scanUDP(host string, port int, testData string) (err error) { - address := host + ":" + strconv.Itoa(port) - serverAddr, err := net.ResolveUDPAddr("udp", address) - klog.Infoln("Scan UDP Endpoint: ", address) - if err != nil { - return err - } - - conn, err := net.DialUDP("udp", nil, serverAddr) - if err != nil { - return err - } - defer conn.Close() - - // Write 3 times to the udp socket and check - // if there's any kind of error - _ = conn.SetWriteDeadline(time.Now().Add(time.Duration(1) * time.Second)) - - if testData == "" { - //testData = "1234567890" - testData = "6d6b0100000100000000000002717103636f6d0000010001" - } - // query qq hex "6d6b0100000100000000000002717103636f6d0000010001" - errorCount := 0 - for i := 0; i < 5; i++ { - buf := []byte(testData + "\n") - _, err := conn.Write(buf) - if err != nil { - errorCount++ - } - } - if errorCount > 0 { - return err - } - _ = conn.SetReadDeadline(time.Now().Add(time.Duration(1) * time.Second)) - buf := make([]byte, 1024) - read, err := conn.Read(buf) - if err != nil { - return err - } - if read > 0 { - fmt.Println(string(buf[:read])) - return nil - } else { - return err - } -} - -func main() { - - err := scanUDP("114.114.114.114", 53, "") - if err != nil { - fmt.Println("192.168.3.78:9999 端口未开") - } else { - - fmt.Println("192.168.3.78:9999端口开放") - } - - err1 := scanUDP("192.168.3.78", 8888, "ABCD") - if err1 != nil { - fmt.Println("192.168.3.78:8888端口未开") - } else { - fmt.Println("192.168.3.78:8888端口开放") - } - -}