Skip to content

Commit

Permalink
diagnose subnet and kubectl ko perf refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
changluyi committed Jun 19, 2023
1 parent f5d7cba commit dcf885a
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 1 deletion.
17 changes: 17 additions & 0 deletions cmd/daemon/cniserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,23 @@ func CmdMain() {
}
}
}

if config.EnableVerboseConnCheck {
go func() {
connListenaddr := fmt.Sprintf("%s:%d", addr, config.TCPConnCheckPort)
if err := util.TCPConnectivityListen(connListenaddr); err != nil {
util.LogFatalAndExit(err, "failed to start TCP listen on addr %s ", addr)
}
}()

go func() {
connListenaddr := fmt.Sprintf("%s:%d", addr, config.UDPConnCheckPort)
if err := util.UDPConnectivityListen(connListenaddr); err != nil {
util.LogFatalAndExit(err, "failed to start UDP listen on addr %s ", addr)
}
}()
}

// conform to Gosec G114
// https://github.com/securego/gosec#available-rules
server := &http.Server{
Expand Down
16 changes: 16 additions & 0 deletions cmd/pinger/pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ func CmdMain() {
}
util.LogFatalAndExit(server.ListenAndServe(), "failed to listen and serve on %s", server.Addr)
}()

if config.EnableVerboseConnCheck {
go func() {
addr := fmt.Sprintf("0.0.0.0:%d", config.TCPConnCheckPort)
if err := util.TCPConnectivityListen(addr); err != nil {
util.LogFatalAndExit(err, "failed to start TCP listen on addr %s ", addr)
}
}()

go func() {
addr := fmt.Sprintf("0.0.0.0:%d", config.UDPConnCheckPort)
if err := util.UDPConnectivityListen(addr); err != nil {
util.LogFatalAndExit(err, "failed to start UDP listen on addr %s ", addr)
}
}()
}
}
e := pinger.NewExporter(config)
pinger.StartPinger(config, e)
Expand Down
63 changes: 63 additions & 0 deletions dist/images/kubectl-ko
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ REGISTRY="kubeovn"
OVN_NORTHD_POD=
PERF_TIMES=5
PERF_LABEL="PerfTest"
CONN_CHECK_LABEL="conn-check"
CONN_CHECK_SERVER="conn-check-server"

showHelp(){
echo "kubectl ko {subcommand} [option...]"
Expand Down Expand Up @@ -475,6 +477,67 @@ checkLeader(){
echo "ovn-$component leader check ok"
}

applyConnServerDaemonset(){
subnetName=$1

if [ $(kubectl get subnet $subnetName | wc -l) -eq 0 ]; then
echo "no subnet $subnetName exists !!"
exit 1
fi

imageID=$(kubectl get ds -n $KUBE_OVN_NS kube-ovn-pinger -o jsonpath={.spec.template.spec.containers[0].image})
tmpFileName="conn-server.yaml"
cat <<EOF > $tmpFileName
kind: DaemonSet
apiVersion: apps/v1
metadata:
name: $subnetName-$CONN_CHECK_SERVER
namespace: $KUBE_OVN_NS
spec:
selector:
matchLabels:
app: $CONN_CHECK_LABEL
template:
metadata:
annotations:
ovn.kubernetes.io/logical_switch: $subnetName
labels:
app: $CONN_CHECK_LABEL
spec:
serviceAccountName: ovn
containers:
- name: $subnetName-$CONN_CHECK_SERVER
imagePullPolicy: IfNotPresent
image: $imageID
command:
- /kube-ovn/kube-ovn-pinger
args:
- --enable-verbose-conn-check=true
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
EOF
kubectl apply -f $tmpFileName
rm $tmpFileName

isfailed=true
for i in {0..59}
do
if kubectl wait pod --for=condition=Ready -l app=$CONN_CHECK_LABEL -n $KUBE_OVN_NS ; then
isfailed=false
break
fi
sleep 1; \
done

if $isfailed; then
echo "Error ds $subnetName-$CONN_CHECK_SERVER pod not ready"
return
fi
}

diagnose(){
kubectl get crd vpcs.kubeovn.io
kubectl get crd vpc-nat-gateways.kubeovn.io
Expand Down
10 changes: 10 additions & 0 deletions pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type Configuration struct {
ExternalGatewaySwitch string // provider network underlay vlan subnet
EnableMetrics bool
EnableArpDetectIPConflict bool
EnableVerboseConnCheck bool
TCPConnCheckPort int
UDPConnCheckPort int
}

// ParseFlags will parse cmd args then init kubeClient and configuration
Expand Down Expand Up @@ -90,6 +93,10 @@ func ParseFlags() *Configuration {
argExternalGatewaySwitch = pflag.String("external-gateway-switch", "external", "The name of the external gateway switch which is a ovs bridge to provide external network, default: external")
argEnableMetrics = pflag.Bool("enable-metrics", true, "Whether to support metrics query")
argEnableArpDetectIPConflict = pflag.Bool("enable-arp-detect-ip-conflict", true, "Whether to support arp detect ip conflict in vlan network")

argEnableVerboseConnCheck = pflag.Bool("enable-verbose-conn-check", false, "enable TCP/UDP connectivity check listen port")
argTCPConnectivityCheckPort = pflag.Int("tcp-conn-check-port", 8100, "TCP connectivity Check Port")
argUDPConnectivityCheckPort = pflag.Int("udp-conn-check-port", 8101, "UDP connectivity Check Port")
)

// mute info log for ipset lib
Expand Down Expand Up @@ -139,6 +146,9 @@ func ParseFlags() *Configuration {
ExternalGatewaySwitch: *argExternalGatewaySwitch,
EnableMetrics: *argEnableMetrics,
EnableArpDetectIPConflict: *argEnableArpDetectIPConflict,
EnableVerboseConnCheck: *argEnableVerboseConnCheck,
TCPConnCheckPort: *argTCPConnectivityCheckPort,
UDPConnCheckPort: *argUDPConnectivityCheckPort,
}
return config
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/pinger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,19 @@ type Configuration struct {
ServiceVswitchdFilePidPath string
ServiceOvnControllerFileLogPath string
ServiceOvnControllerFilePidPath string
EnableVerboseConnCheck bool
TCPConnCheckPort int
UDPConnCheckPort int
}

func ParseFlags() (*Configuration, error) {
var (
argPort = pflag.Int("port", 8080, "metrics port")
argPort = pflag.Int("port", 8080, "metrics port")

argEnableVerboseConnCheck = pflag.Bool("enable-verbose-conn-check", false, "enable TCP/UDP connectivity check")
argTCPConnectivityCheckPort = pflag.Int("tcp-conn-check-port", 8100, "TCP connectivity Check Port")
argUDPConnectivityCheckPort = pflag.Int("udp-conn-check-port", 8101, "UDP connectivity Check Port")

argKubeConfigFile = pflag.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information. If not set use the inCluster token.")
argDaemonSetNameSpace = pflag.String("ds-namespace", "kube-system", "kube-ovn-pinger daemonset namespace")
argDaemonSetName = pflag.String("ds-name", "kube-ovn-pinger", "kube-ovn-pinger daemonset name")
Expand Down Expand Up @@ -119,6 +127,10 @@ func ParseFlags() (*Configuration, error) {
NetworkMode: *argNetworkMode,
EnableMetrics: *argEnableMetrics,

EnableVerboseConnCheck: *argEnableVerboseConnCheck,
TCPConnCheckPort: *argTCPConnectivityCheckPort,
UDPConnCheckPort: *argUDPConnectivityCheckPort,

// OVS Monitor
PollTimeout: *argPollTimeout,
PollInterval: *argPollInterval,
Expand Down
31 changes: 31 additions & 0 deletions pkg/pinger/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,21 @@ func pingNodes(config *Configuration) error {
for _, addr := range no.Status.Addresses {
if addr.Type == v1.NodeInternalIP && util.ContainsString(config.PodProtocols, util.CheckProtocol(addr.Address)) {
func(nodeIP, nodeName string) {
if config.EnableVerboseConnCheck {
if err := util.TCPConnectivityCheck(fmt.Sprintf("%s:%d", nodeIP, config.TCPConnCheckPort)); err != nil {
klog.Infof("TCP connnectivity to node %s %s failed", nodeName, nodeIP)
pingErr = err
} else {
klog.Infof("TCP connnectivity to node %s %s success", nodeName, nodeIP)
}
if err := util.UDPConnectivityCheck(fmt.Sprintf("%s:%d", nodeIP, config.UDPConnCheckPort)); err != nil {
klog.Infof("UDP connnectivity to node %s %s failed", nodeName, nodeIP)
pingErr = err
} else {
klog.Infof("UDP connnectivity to node %s %s success", nodeName, nodeIP)
}
}

pinger, err := goping.NewPinger(nodeIP)
if err != nil {
klog.Errorf("failed to init pinger, %v", err)
Expand Down Expand Up @@ -143,6 +158,22 @@ func pingPods(config *Configuration) error {
for _, podIP := range pod.Status.PodIPs {
if util.ContainsString(config.PodProtocols, util.CheckProtocol(podIP.IP)) {
func(podIp, podName, nodeIP, nodeName string) {
if config.EnableVerboseConnCheck {
if err := util.TCPConnectivityCheck(fmt.Sprintf("%s:%d", podIp, config.TCPConnCheckPort)); err != nil {
klog.Infof("TCP connnectivity to pod %s %s failed", podName, podIp)
pingErr = err
} else {
klog.Infof("TCP connnectivity to pod %s %s success", podName, podIp)
}

if err := util.UDPConnectivityCheck(fmt.Sprintf("%s:%d", podIp, config.UDPConnCheckPort)); err != nil {
klog.Infof("UDP connnectivity to pod %s %s failed", podName, podIp)
pingErr = err
} else {
klog.Infof("UDP connnectivity to pod %s %s success", podName, podIp)
}
}

pinger, err := goping.NewPinger(podIp)
if err != nil {
klog.Errorf("failed to init pinger, %v", err)
Expand Down
85 changes: 85 additions & 0 deletions pkg/util/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net"
"strconv"
"strings"
"time"

"k8s.io/klog/v2"

Expand Down Expand Up @@ -499,3 +500,87 @@ func CheckSystemCIDR(cidrs []string) error {
}
return nil
}

func TCPConnectivityCheck(address string) error {
conn, err := net.DialTimeout("tcp", address, 3*time.Second)
if err != nil {
return err
}

_ = conn.Close()

return nil
}

func TCPConnectivityListen(address string) error {
listener, err := net.Listen("tcp", address)
if err != nil {
return fmt.Errorf("listen failed with err %v", err)
}

for {
conn, err := listener.Accept()
if err != nil {
continue
}
_ = conn.Close()
}
}

func UDPConnectivityCheck(address string) error {

udpAddr, err := net.ResolveUDPAddr("udp", address)
if err != nil {
return fmt.Errorf("resolve udp addr failed with err %v", err)
}

conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return err
}

defer conn.Close()

if err := conn.SetReadDeadline(time.Now().Add(3 * time.Second)); err != nil {
return err
}

_, err = conn.Write([]byte("health check"))
if err != nil {
return fmt.Errorf("send udp packet failed with err %v", err)
}

buffer := make([]byte, 1024)
_, err = conn.Read(buffer)
if err != nil {
return fmt.Errorf("read udp packet from remote failed %v", err)
}

return nil
}

func UDPConnectivityListen(address string) error {
listenAddr, err := net.ResolveUDPAddr("udp", address)
if err != nil {
return fmt.Errorf("resolve udp addr failed with err %v", err)
}

conn, err := net.ListenUDP("udp", listenAddr)
if err != nil {
return fmt.Errorf("listen udp address failed with %v", err)
}

buffer := make([]byte, 1024)

for {
_, clientAddr, err := conn.ReadFromUDP(buffer)
if err != nil {
continue
}

_, err = conn.WriteToUDP([]byte("health check"), clientAddr)
if err != nil {
continue
}
}
}

0 comments on commit dcf885a

Please sign in to comment.