From f9e75f62bc3d0aebf740e95f5c7775f3244b6546 Mon Sep 17 00:00:00 2001 From: yanjianbo Date: Tue, 1 Aug 2023 14:38:12 +0800 Subject: [PATCH] Let cloud-agent rejoin connectors Sometimes cloud-agent migth lose connection to connectors, in this suitation, cloud-agent need to rejoin connectors, otherwise they will no longer receive any data. Signed-off-by: yanjianbo --- pkg/cloud-agent/cloud_agent.go | 30 +++++++++++++++++++++++------- pkg/util/memberlist/memberlist.go | 12 ++++++++++-- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/pkg/cloud-agent/cloud_agent.go b/pkg/cloud-agent/cloud_agent.go index 9209e9a..1f75f63 100644 --- a/pkg/cloud-agent/cloud_agent.go +++ b/pkg/cloud-agent/cloud_agent.go @@ -57,6 +57,7 @@ func Execute() { defer klog.Flush() var initMembers []string + flag.StringSliceVar(&initMembers, "connector-node-addresses", []string{}, "internal ip address of all connector nodes") logutil.AddFlags(flag.CommandLine) about.AddFlags(flag.CommandLine) @@ -87,16 +88,22 @@ func Execute() { time.Sleep(5 * time.Second) } + // sometimes cloud-agent may lose connection to connectors, especially when there + // is only one connector, cloud-agent have to rejoin to those connectors otherwise + // it will never receive updates from connector for { - if len(mc.ListMembers()) < 2 { - logger.Error(errAtLeaseOneConnector, "lost connection to connectors") - } else { - for _, member := range mc.ListMembers() { - logger.V(5).Info("Got Member", "name", member.Name, "addr", member.Addr) - } + time.Sleep(time.Minute) + + if !agent.isConnectorLost() { + continue } - time.Sleep(time.Minute * 5) + logger.V(5).Info("Connectors are lost, try to rejoin them") + if err := mc.RejoinInitMembers(); err != nil { + logger.Error(err, "failed to rejoin to connector") + } else { + logger.V(3).Info("cloud-agent has rejoin with connectors") + } } } @@ -234,6 +241,15 @@ func (a *cloudAgent) handleNodeLeave(name string) { go a.deleteRoutesByHost(name) } +// if there is data in routesByHost, this cloud-agent must have lost +// connection to connector +func (a *cloudAgent) isConnectorLost() bool { + a.routesLock.RLock() + defer a.routesLock.RUnlock() + + return len(a.routesByHost) == 0 +} + func getRouteTmpl(prefix string) (netlink.Route, error) { ip, _, err := net.ParseCIDR(prefix) if err != nil { diff --git a/pkg/util/memberlist/memberlist.go b/pkg/util/memberlist/memberlist.go index 9c814e0..37b40cb 100644 --- a/pkg/util/memberlist/memberlist.go +++ b/pkg/util/memberlist/memberlist.go @@ -74,6 +74,8 @@ func (ed *eventDelegate) NotifyUpdate(node *memberlist.Node) { type Client struct { list *memberlist.Memberlist delegate *delegate + // initMembers are used to rejoin them + initMembers []string } func getAdvertiseAddr() (string, error) { @@ -125,8 +127,9 @@ func New(initMembers []string, msgHandler msgHandlerFun, leaveHandler notifyLeav } return &Client{ - list: list, - delegate: dg, + list: list, + delegate: dg, + initMembers: initMembers, }, nil } @@ -143,3 +146,8 @@ func (c *Client) Broadcast(b []byte) { msg: b, }) } + +func (c *Client) RejoinInitMembers() error { + _, err := c.list.Join(c.initMembers) + return err +}