Skip to content

Commit

Permalink
feature(main): add udp support (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
cuisongliu authored Jan 21, 2022
1 parent 1d8f7cd commit dfac82d
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 106 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,7 @@ spec:
port: 80
protocol: UDP
targetPort: 80
###udp暂时不支持探活
udpSocket:
enable: true
data: "testdata"
```
15 changes: 14 additions & 1 deletion api/network/v1beta1/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions api/network/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,18 @@ spec:
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes'
format: int32
type: integer
udpSocket:
description: UDPSocketAction specifies an action involving a
UDP port. UDP hooks not yet supported
properties:
data:
description: UDP test data
type: string
enable:
type: boolean
required:
- enable
type: object
required:
- port
- targetPort
Expand Down
12 changes: 12 additions & 0 deletions config/crd/sealyun.com_clusterendpoints.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,18 @@ spec:
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes'
format: int32
type: integer
udpSocket:
description: UDPSocketAction specifies an action involving a
UDP port. UDP hooks not yet supported
properties:
data:
description: UDP test data
type: string
enable:
type: boolean
required:
- enable
type: object
required:
- port
- targetPort
Expand Down
2 changes: 1 addition & 1 deletion config/demo/dmz-kube.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ spec:
targetPort: 1234
udpSocket:
enable: true
testData: "testdata"
data: "testdata"
25 changes: 17 additions & 8 deletions controllers/run_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
grpcprobe "github.com/sealyun/endpoints-operator/library/probe/grpc"
httpprobe "github.com/sealyun/endpoints-operator/library/probe/http"
tcpprobe "github.com/sealyun/endpoints-operator/library/probe/tcp"
udpprobe "github.com/sealyun/endpoints-operator/library/probe/udp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
urutime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -41,19 +42,18 @@ import (

type work struct {
p *libv1.Probe
network string
resultRun int
lastResult probe.Result
retry int
err error
}

func (pb *prober) runProbeWithRetries(p *libv1.Probe, network string, retries int) (probe.Result, string, error) {
func (pb *prober) runProbeWithRetries(p *libv1.Probe, retries int) (probe.Result, string, error) {
var err error
var result probe.Result
var output string
for i := 0; i < retries; i++ {
result, output, err = pb.runProbe(p, network)
result, output, err = pb.runProbe(p)
if err == nil {
return result, output, nil
}
Expand All @@ -67,7 +67,7 @@ func (w *work) doProbe() (keepGoing bool) {

// the full container environment here, OR we must make a call to the CRI in order to get those environment
// values from the running container.
result, output, err := proberCheck.runProbeWithRetries(w.p, w.network, w.retry)
result, output, err := proberCheck.runProbeWithRetries(w.p, w.retry)
if err != nil {
w.err = err
return false
Expand Down Expand Up @@ -99,6 +99,7 @@ type prober struct {
exec execprobe.Prober
http httpprobe.Prober
tcp tcpprobe.Prober
udp udpprobe.Prober
grpc grpcprobe.Prober
}

Expand All @@ -113,11 +114,12 @@ func newProber() *prober {
exec: execprobe.New(),
http: httpprobe.New(followNonLocalRedirects),
tcp: tcpprobe.New(),
udp: udpprobe.New(),
grpc: grpcprobe.New(),
}
}

func (pb *prober) runProbe(p *libv1.Probe, network string) (probe.Result, string, error) {
func (pb *prober) runProbe(p *libv1.Probe) (probe.Result, string, error) {
timeout := time.Duration(p.TimeoutSeconds) * time.Second
if p.Exec != nil {
klog.V(4).Infof("Exec-Probe Command: %v", p.Exec.Command)
Expand All @@ -144,10 +146,17 @@ func (pb *prober) runProbe(p *libv1.Probe, network string) (probe.Result, string
return probe.Unknown, "", err
}
host := p.TCPSocket.Host
if network == "tcp" {
klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
return pb.tcp.Probe(host, port, timeout)
klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
return pb.tcp.Probe(host, port, timeout)
}
if p.UDPSocket != nil {
port, err := extractPort(p.UDPSocket.Port)
if err != nil {
return probe.Unknown, "", err
}
host := p.UDPSocket.Host
klog.V(4).Infof("UDP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
return pb.udp.Probe(host, port, p.UDPSocket.Data, timeout)
}
if p.GRPC != nil {
host := &(p.GRPC.Host)
Expand Down
10 changes: 8 additions & 2 deletions controllers/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,21 +175,27 @@ func healthyCheck(host string, cep *v1beta1.ClusterEndpoint, retry int) ([]v1bet
HTTPHeaders: port.HTTPGet.HTTPHeaders,
}
}
network := "tcp"
if port.TCPSocket != nil && port.TCPSocket.Enable {
pro.TCPSocket = &libv1.TCPSocketAction{
Port: intstr.FromInt(int(port.TargetPort)),
Host: host,
}
}
if port.UDPSocket != nil && port.UDPSocket.Enable {
pro.UDPSocket = &libv1.UDPSocketAction{
Port: intstr.FromInt(int(port.TargetPort)),
Host: host,
Data: port.UDPSocket.Data,
}
}
if port.GRPC != nil {
pro.GRPC = &libv1.GRPCAction{
Port: port.TargetPort,
Host: host,
Service: port.GRPC.Service,
}
}
w := &work{p: pro, network: network, retry: retry}
w := &work{p: pro, retry: retry}
for w.doProbe() {
}
mx.Lock()
Expand Down
18 changes: 17 additions & 1 deletion library/api/core/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ type TCPSocketAction struct {
Host string `json:"host,omitempty" protobuf:"bytes,2,opt,name=host"`
}

// UDPSocketAction describes an action based on opening a socket
type UDPSocketAction struct {
// Number or name of the port to access on the container.
// Number must be in the range 1 to 65535.
// Name must be an IANA_SVC_NAME.
Port intstr.IntOrString `json:"port" protobuf:"bytes,1,opt,name=port"`
// Optional: Host name to connect to, defaults to the pod IP.
// +optional
Host string `json:"host,omitempty" protobuf:"bytes,2,opt,name=host"`
// +optional
Data string `json:"data,omitempty" protobuf:"bytes,3,opt,name=host"`
}

type GRPCAction struct {
// Port number of the gRPC service. Number must be in the range 1 to 65535.
Port int32 `json:"port" protobuf:"bytes,1,opt,name=port"`
Expand Down Expand Up @@ -101,12 +114,15 @@ type ProbeHandler struct {
// TCPSocket specifies an action involving a TCP port.
// +optional
TCPSocket *TCPSocketAction `json:"tcpSocket,omitempty" protobuf:"bytes,3,opt,name=tcpSocket"`
// UDPSocket specifies an action involving a UDP port.
// +optional
UDPSocket *UDPSocketAction `json:"udpSocket,omitempty" protobuf:"bytes,4,opt,name=udpSocket"`

// GRPC specifies an action involving a GRPC port.
// This is an alpha field and requires enabling GRPCContainerProbe feature gate.
// +featureGate=GRPCContainerProbe
// +optional
GRPC *GRPCAction `json:"grpc,omitempty" protobuf:"bytes,4,opt,name=grpc"`
GRPC *GRPCAction `json:"grpc,omitempty" protobuf:"bytes,5,opt,name=grpc"`
}

// Probe describes a health check to be performed against a container to determine whether it is
Expand Down
17 changes: 8 additions & 9 deletions library/probe/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package udp

import (
"errors"
"fmt"
"github.com/sealyun/endpoints-operator/library/probe"
"k8s.io/klog"
"net"
Expand Down Expand Up @@ -63,22 +62,22 @@ func DoUDPProbe(addr string, testData string, timeout time.Duration) (probe.Resu
deadline := time.Now().Add(timeout * time.Second)
_ = conn.SetWriteDeadline(deadline)
buf := []byte(testData)
_, err1 := conn.Write(buf)
if err1 != nil {
return probe.Failure, err1.Error(), nil
_, err = conn.Write(buf)
if err != nil {
return probe.Failure, err.Error(), nil
}
_ = conn.SetReadDeadline(deadline)
bufr := make([]byte, 1024)
read, err2 := conn.Read(bufr)
read, err := conn.Read(bufr)

if err2 != nil {
klog.Errorf("Unexpected error closing UDP probe socket: %v (%#v)", err2, err2)
if err != nil {
klog.Errorf("Unexpected error closing UDP probe socket: %v", err)
return probe.Failure, errors.New("io read timout").Error(), nil
}
if read > 0 {
fmt.Println("recv:", string(bufr[:read]))
klog.V(4).Info("recv:", string(bufr[:read]))
return probe.Success, "", nil
} else {
return probe.Failure, err2.Error(), nil
return probe.Failure, "not recv any data", nil
}
}
27 changes: 20 additions & 7 deletions library/probe/udp/udp_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
// Copyright © 2022 The sealyun Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package udp

import (
"fmt"
"github.com/sealyun/endpoints-operator/library/probe"
"net"
"strings"
Expand All @@ -11,7 +24,7 @@ import (

const maxBufferSize = 1024

func udpServer(addr string) {
func udpServer(t *testing.T, addr string) {
listener, err := net.ListenPacket("udp", addr)
if err != nil {
return
Expand All @@ -31,12 +44,12 @@ func udpServer(addr string) {
}
str1 := string(buffer[:n])
if strings.ToLower(str1) == "toerror" {
fmt.Println("nothing no send ...")
t.Log("nothing no send ...")
buffer = []byte{}
n = 0
break
} else {
fmt.Printf("packet-received: bytes=%d from=%s\n", n, caddr.String())
t.Logf("packet-received: bytes=%d from=%s", n, caddr.String())
}

// write
Expand All @@ -53,7 +66,7 @@ func udpServer(addr string) {
return
}

fmt.Printf("packet-written: bytes=%d to=%s\n", n, caddr.String())
t.Logf("packet-written: bytes=%d to=%s\n", n, caddr.String())

}
}()
Expand Down Expand Up @@ -91,8 +104,8 @@ func TestUDPProbe(t *testing.T) {
}{addr: "127.0.0.1:38889", testData: "toerror", timeout: 1}, want: probe.Failure, want1: "io read timout", wantErr: false},
}
// start two udp server
go udpServer("127.0.0.1:38888")
go udpServer("127.0.0.1:38889")
go udpServer(t, "127.0.0.1:38888")
go udpServer(t, "127.0.0.1:38889")

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
Loading

0 comments on commit dfac82d

Please sign in to comment.