Skip to content

Commit

Permalink
feat(main): add udp support (#40)
Browse files Browse the repository at this point in the history
* feat(main): add udp support
  • Loading branch information
cuisongliu authored Jan 18, 2022
1 parent 8e15e1a commit 7ba1b60
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 10 deletions.
9 changes: 9 additions & 0 deletions api/network/v1beta1/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions api/network/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 11 additions & 3 deletions config/crd/sealyun.com_clusterendpoints.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,8 @@ spec:
format: int32
type: integer
tcpSocket:
description: 'TCPSocket specifies an action involving a TCP
port. TCP hooks not yet supported TODO: implement a realistic
TCP lifecycle hook'
description: TCPSocket specifies an action involving a TCP port.
TCP hooks not yet supported
properties:
enable:
type: boolean
Expand All @@ -157,6 +156,15 @@ spec:
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes'
format: int32
type: integer
udpSocket:
description: UDPSocket specifies an action involving a UDP port.
UDP hooks not yet supported
properties:
enable:
type: boolean
required:
- enable
type: object
required:
- port
- targetPort
Expand Down
21 changes: 15 additions & 6 deletions controllers/run_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
execprobe "github.com/sealyun/endpoints-operator/library/probe/exec"
httpprobe "github.com/sealyun/endpoints-operator/library/probe/http"
tcpprobe "github.com/sealyun/endpoints-operator/library/probe/tcp"
udpprobe "github.com/sealyun/endpoints-operator/library/probe/udp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
urutime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -38,17 +39,18 @@ import (

type work struct {
p *v1.Probe
network string
resultRun int
lastResult probe.Result
err error
}

func (pb *prober) runProbeWithRetries(p *v1.Probe, retries int) (probe.Result, string, error) {
func (pb *prober) runProbeWithRetries(p *v1.Probe, network string, retries int) (probe.Result, string, error) {
var err error
var result probe.Result
var output string
for i := 0; i < retries; i++ {
result, output, err = pb.runProbe(p)
result, output, err = pb.runProbe(p, network)
if err == nil {
return result, output, nil
}
Expand All @@ -62,7 +64,7 @@ func (w *work) doProbe() (keepGoing bool) {

// the full container environment here, OR we must make a call to the CRI in order to get those environment
// values from the running container.
result, output, err := proberCheck.runProbeWithRetries(w.p, 3)
result, output, err := proberCheck.runProbeWithRetries(w.p, w.network, 3)
if err != nil {
w.err = err
return false
Expand Down Expand Up @@ -94,6 +96,7 @@ type prober struct {
exec execprobe.Prober
http httpprobe.Prober
tcp tcpprobe.Prober
udp udpprobe.Prober
}

var proberCheck = newProber()
Expand All @@ -107,10 +110,11 @@ func newProber() *prober {
exec: execprobe.New(),
http: httpprobe.New(followNonLocalRedirects),
tcp: tcpprobe.New(),
udp: udpprobe.New(),
}
}

func (pb *prober) runProbe(p *v1.Probe) (probe.Result, string, error) {
func (pb *prober) runProbe(p *v1.Probe, network string) (probe.Result, string, error) {
timeout := time.Duration(p.TimeoutSeconds) * time.Second
if p.Exec != nil {
klog.V(4).Infof("Exec-Probe Command: %v", p.Exec.Command)
Expand All @@ -137,8 +141,13 @@ func (pb *prober) runProbe(p *v1.Probe) (probe.Result, string, error) {
return probe.Unknown, "", err
}
host := p.TCPSocket.Host
klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
return pb.tcp.Probe(host, port, timeout)
if network == "tcp" {
klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
return pb.tcp.Probe(host, port, timeout)
} else {
klog.V(4).Infof("UDP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
return pb.udp.Probe(host, port, timeout)
}
}
klog.Warning("failed to find probe builder")
return probe.Warning, "", nil
Expand Down
17 changes: 16 additions & 1 deletion controllers/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,28 @@ func healthyCheck(host string, cep *v1beta1.ClusterEndpoint) ([]v1beta1.ServiceP
HTTPHeaders: port.HTTPGet.HTTPHeaders,
}
}
if port.UDPSocket != nil && port.UDPSocket.Enable {
pro.TCPSocket = &corev1.TCPSocketAction{
Port: intstr.FromInt(int(port.TargetPort)),
Host: host,
}
}
network := ""
if port.TCPSocket != nil && port.TCPSocket.Enable {
network = "tcp"
pro.TCPSocket = &corev1.TCPSocketAction{
Port: intstr.FromInt(int(port.TargetPort)),
Host: host,
}
}
if port.UDPSocket != nil && port.UDPSocket.Enable {
network = "udp"
pro.TCPSocket = &corev1.TCPSocketAction{
Port: intstr.FromInt(int(port.TargetPort)),
Host: host,
}
}
w := &work{p: pro}
w := &work{p: pro, network: network}
for w.doProbe() {
}
mx.Lock()
Expand Down
59 changes: 59 additions & 0 deletions library/probe/udp/udp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package udp

import (
"github.com/sealyun/endpoints-operator/library/probe"
"k8s.io/klog"
"net"
"strconv"
"time"
)

// New creates Prober.
func New() Prober {
return udpProber{}
}

// Prober is an interface that defines the Probe function for doing UDP readiness/liveness checks.
type Prober interface {
Probe(host string, port int, timeout time.Duration) (probe.Result, string, error)
}

type udpProber struct{}

// Probe returns a ProbeRunner capable of running an UDP check.
func (pr udpProber) Probe(host string, port int, timeout time.Duration) (probe.Result, string, error) {
return DoUDPProbe(net.JoinHostPort(host, strconv.Itoa(port)), timeout)
}

// DoUDPProbe checks that a UDP socket to the address can be opened.
// If the socket can be opened, it returns Success
// If the socket fails to open, it returns Failure.
// This is exported because some other packages may want to do direct TCP probes.
func DoUDPProbe(addr string, timeout time.Duration) (probe.Result, string, error) {
conn, err := net.DialTimeout("udp", addr, timeout)
if err != nil {
// Convert errors to failures to handle timeouts.
return probe.Failure, err.Error(), nil
}
err = conn.Close()
if err != nil {
klog.Errorf("Unexpected error closing UDP probe socket: %v (%#v)", err, err)
}
return probe.Success, "", nil
}

0 comments on commit 7ba1b60

Please sign in to comment.