Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(main): add udp support #62

Merged
merged 1 commit into from
Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,7 @@ spec:
port: 80
protocol: UDP
targetPort: 80
###udp暂时不支持探活
udpSocket:
enable: true
data: "testdata"
```
15 changes: 14 additions & 1 deletion api/network/v1beta1/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions api/network/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,18 @@ spec:
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes'
format: int32
type: integer
udpSocket:
description: UDPSocketAction specifies an action involving a
UDP port. UDP hooks not yet supported
properties:
data:
description: UDP test data
type: string
enable:
type: boolean
required:
- enable
type: object
required:
- port
- targetPort
Expand Down
12 changes: 12 additions & 0 deletions config/crd/sealyun.com_clusterendpoints.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,18 @@ spec:
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes'
format: int32
type: integer
udpSocket:
description: UDPSocketAction specifies an action involving a
UDP port. UDP hooks not yet supported
properties:
data:
description: UDP test data
type: string
enable:
type: boolean
required:
- enable
type: object
required:
- port
- targetPort
Expand Down
2 changes: 1 addition & 1 deletion config/demo/dmz-kube.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ spec:
targetPort: 1234
udpSocket:
enable: true
testData: "testdata"
data: "testdata"
25 changes: 17 additions & 8 deletions controllers/run_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
grpcprobe "github.com/sealyun/endpoints-operator/library/probe/grpc"
httpprobe "github.com/sealyun/endpoints-operator/library/probe/http"
tcpprobe "github.com/sealyun/endpoints-operator/library/probe/tcp"
udpprobe "github.com/sealyun/endpoints-operator/library/probe/udp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
urutime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -41,19 +42,18 @@ import (

type work struct {
p *libv1.Probe
network string
resultRun int
lastResult probe.Result
retry int
err error
}

func (pb *prober) runProbeWithRetries(p *libv1.Probe, network string, retries int) (probe.Result, string, error) {
func (pb *prober) runProbeWithRetries(p *libv1.Probe, retries int) (probe.Result, string, error) {
var err error
var result probe.Result
var output string
for i := 0; i < retries; i++ {
result, output, err = pb.runProbe(p, network)
result, output, err = pb.runProbe(p)
if err == nil {
return result, output, nil
}
Expand All @@ -67,7 +67,7 @@ func (w *work) doProbe() (keepGoing bool) {

// the full container environment here, OR we must make a call to the CRI in order to get those environment
// values from the running container.
result, output, err := proberCheck.runProbeWithRetries(w.p, w.network, w.retry)
result, output, err := proberCheck.runProbeWithRetries(w.p, w.retry)
if err != nil {
w.err = err
return false
Expand Down Expand Up @@ -99,6 +99,7 @@ type prober struct {
exec execprobe.Prober
http httpprobe.Prober
tcp tcpprobe.Prober
udp udpprobe.Prober
grpc grpcprobe.Prober
}

Expand All @@ -113,11 +114,12 @@ func newProber() *prober {
exec: execprobe.New(),
http: httpprobe.New(followNonLocalRedirects),
tcp: tcpprobe.New(),
udp: udpprobe.New(),
grpc: grpcprobe.New(),
}
}

func (pb *prober) runProbe(p *libv1.Probe, network string) (probe.Result, string, error) {
func (pb *prober) runProbe(p *libv1.Probe) (probe.Result, string, error) {
timeout := time.Duration(p.TimeoutSeconds) * time.Second
if p.Exec != nil {
klog.V(4).Infof("Exec-Probe Command: %v", p.Exec.Command)
Expand All @@ -144,10 +146,17 @@ func (pb *prober) runProbe(p *libv1.Probe, network string) (probe.Result, string
return probe.Unknown, "", err
}
host := p.TCPSocket.Host
if network == "tcp" {
klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
return pb.tcp.Probe(host, port, timeout)
klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
return pb.tcp.Probe(host, port, timeout)
}
if p.UDPSocket != nil {
port, err := extractPort(p.UDPSocket.Port)
if err != nil {
return probe.Unknown, "", err
}
host := p.UDPSocket.Host
klog.V(4).Infof("UDP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
return pb.udp.Probe(host, port, p.UDPSocket.Data, timeout)
}
if p.GRPC != nil {
host := &(p.GRPC.Host)
Expand Down
10 changes: 8 additions & 2 deletions controllers/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,21 +175,27 @@ func healthyCheck(host string, cep *v1beta1.ClusterEndpoint, retry int) ([]v1bet
HTTPHeaders: port.HTTPGet.HTTPHeaders,
}
}
network := "tcp"
if port.TCPSocket != nil && port.TCPSocket.Enable {
pro.TCPSocket = &libv1.TCPSocketAction{
Port: intstr.FromInt(int(port.TargetPort)),
Host: host,
}
}
if port.UDPSocket != nil && port.UDPSocket.Enable {
pro.UDPSocket = &libv1.UDPSocketAction{
Port: intstr.FromInt(int(port.TargetPort)),
Host: host,
Data: port.UDPSocket.Data,
}
}
if port.GRPC != nil {
pro.GRPC = &libv1.GRPCAction{
Port: port.TargetPort,
Host: host,
Service: port.GRPC.Service,
}
}
w := &work{p: pro, network: network, retry: retry}
w := &work{p: pro, retry: retry}
for w.doProbe() {
}
mx.Lock()
Expand Down
18 changes: 17 additions & 1 deletion library/api/core/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ type TCPSocketAction struct {
Host string `json:"host,omitempty" protobuf:"bytes,2,opt,name=host"`
}

// UDPSocketAction describes an action based on opening a socket
type UDPSocketAction struct {
// Number or name of the port to access on the container.
// Number must be in the range 1 to 65535.
// Name must be an IANA_SVC_NAME.
Port intstr.IntOrString `json:"port" protobuf:"bytes,1,opt,name=port"`
// Optional: Host name to connect to, defaults to the pod IP.
// +optional
Host string `json:"host,omitempty" protobuf:"bytes,2,opt,name=host"`
// +optional
Data string `json:"data,omitempty" protobuf:"bytes,3,opt,name=host"`
}

type GRPCAction struct {
// Port number of the gRPC service. Number must be in the range 1 to 65535.
Port int32 `json:"port" protobuf:"bytes,1,opt,name=port"`
Expand Down Expand Up @@ -101,12 +114,15 @@ type ProbeHandler struct {
// TCPSocket specifies an action involving a TCP port.
// +optional
TCPSocket *TCPSocketAction `json:"tcpSocket,omitempty" protobuf:"bytes,3,opt,name=tcpSocket"`
// UDPSocket specifies an action involving a UDP port.
// +optional
UDPSocket *UDPSocketAction `json:"udpSocket,omitempty" protobuf:"bytes,4,opt,name=udpSocket"`

// GRPC specifies an action involving a GRPC port.
// This is an alpha field and requires enabling GRPCContainerProbe feature gate.
// +featureGate=GRPCContainerProbe
// +optional
GRPC *GRPCAction `json:"grpc,omitempty" protobuf:"bytes,4,opt,name=grpc"`
GRPC *GRPCAction `json:"grpc,omitempty" protobuf:"bytes,5,opt,name=grpc"`
}

// Probe describes a health check to be performed against a container to determine whether it is
Expand Down
17 changes: 8 additions & 9 deletions library/probe/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package udp

import (
"errors"
"fmt"
"github.com/sealyun/endpoints-operator/library/probe"
"k8s.io/klog"
"net"
Expand Down Expand Up @@ -63,22 +62,22 @@ func DoUDPProbe(addr string, testData string, timeout time.Duration) (probe.Resu
deadline := time.Now().Add(timeout * time.Second)
_ = conn.SetWriteDeadline(deadline)
buf := []byte(testData)
_, err1 := conn.Write(buf)
if err1 != nil {
return probe.Failure, err1.Error(), nil
_, err = conn.Write(buf)
if err != nil {
return probe.Failure, err.Error(), nil
}
_ = conn.SetReadDeadline(deadline)
bufr := make([]byte, 1024)
read, err2 := conn.Read(bufr)
read, err := conn.Read(bufr)

if err2 != nil {
klog.Errorf("Unexpected error closing UDP probe socket: %v (%#v)", err2, err2)
if err != nil {
klog.Errorf("Unexpected error closing UDP probe socket: %v", err)
return probe.Failure, errors.New("io read timout").Error(), nil
}
if read > 0 {
fmt.Println("recv:", string(bufr[:read]))
klog.V(4).Info("recv:", string(bufr[:read]))
return probe.Success, "", nil
} else {
return probe.Failure, err2.Error(), nil
return probe.Failure, "not recv any data", nil
}
}
27 changes: 20 additions & 7 deletions library/probe/udp/udp_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
// Copyright © 2022 The sealyun Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package udp

import (
"fmt"
"github.com/sealyun/endpoints-operator/library/probe"
"net"
"strings"
Expand All @@ -11,7 +24,7 @@ import (

const maxBufferSize = 1024

func udpServer(addr string) {
func udpServer(t *testing.T, addr string) {
listener, err := net.ListenPacket("udp", addr)
if err != nil {
return
Expand All @@ -31,12 +44,12 @@ func udpServer(addr string) {
}
str1 := string(buffer[:n])
if strings.ToLower(str1) == "toerror" {
fmt.Println("nothing no send ...")
t.Log("nothing no send ...")
buffer = []byte{}
n = 0
break
} else {
fmt.Printf("packet-received: bytes=%d from=%s\n", n, caddr.String())
t.Logf("packet-received: bytes=%d from=%s", n, caddr.String())
}

// write
Expand All @@ -53,7 +66,7 @@ func udpServer(addr string) {
return
}

fmt.Printf("packet-written: bytes=%d to=%s\n", n, caddr.String())
t.Logf("packet-written: bytes=%d to=%s\n", n, caddr.String())

}
}()
Expand Down Expand Up @@ -91,8 +104,8 @@ func TestUDPProbe(t *testing.T) {
}{addr: "127.0.0.1:38889", testData: "toerror", timeout: 1}, want: probe.Failure, want1: "io read timout", wantErr: false},
}
// start two udp server
go udpServer("127.0.0.1:38888")
go udpServer("127.0.0.1:38889")
go udpServer(t, "127.0.0.1:38888")
go udpServer(t, "127.0.0.1:38889")

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
Loading