Skip to content

Commit

Permalink
Merge pull request #403 from yanjianbo1983/improve-cloud-agent
Browse files Browse the repository at this point in the history
Let cloud-agent rejoin connectors
  • Loading branch information
yanjianbo1983 authored Aug 1, 2023
2 parents 8419908 + f9e75f6 commit a6d2bf8
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 9 deletions.
30 changes: 23 additions & 7 deletions pkg/cloud-agent/cloud_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func Execute() {
defer klog.Flush()

var initMembers []string

flag.StringSliceVar(&initMembers, "connector-node-addresses", []string{}, "internal ip address of all connector nodes")
logutil.AddFlags(flag.CommandLine)
about.AddFlags(flag.CommandLine)
Expand Down Expand Up @@ -87,16 +88,22 @@ func Execute() {
time.Sleep(5 * time.Second)
}

// sometimes cloud-agent may lose connection to connectors, especially when there
// is only one connector, cloud-agent have to rejoin to those connectors otherwise
// it will never receive updates from connector
for {
if len(mc.ListMembers()) < 2 {
logger.Error(errAtLeaseOneConnector, "lost connection to connectors")
} else {
for _, member := range mc.ListMembers() {
logger.V(5).Info("Got Member", "name", member.Name, "addr", member.Addr)
}
time.Sleep(time.Minute)

if !agent.isConnectorLost() {
continue
}

time.Sleep(time.Minute * 5)
logger.V(5).Info("Connectors are lost, try to rejoin them")
if err := mc.RejoinInitMembers(); err != nil {
logger.Error(err, "failed to rejoin to connector")
} else {
logger.V(3).Info("cloud-agent has rejoin with connectors")
}
}
}

Expand Down Expand Up @@ -234,6 +241,15 @@ func (a *cloudAgent) handleNodeLeave(name string) {
go a.deleteRoutesByHost(name)
}

// if there is data in routesByHost, this cloud-agent must have lost
// connection to connector
func (a *cloudAgent) isConnectorLost() bool {
a.routesLock.RLock()
defer a.routesLock.RUnlock()

return len(a.routesByHost) == 0
}

func getRouteTmpl(prefix string) (netlink.Route, error) {
ip, _, err := net.ParseCIDR(prefix)
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions pkg/util/memberlist/memberlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func (ed *eventDelegate) NotifyUpdate(node *memberlist.Node) {
type Client struct {
list *memberlist.Memberlist
delegate *delegate
// initMembers are used to rejoin them
initMembers []string
}

func getAdvertiseAddr() (string, error) {
Expand Down Expand Up @@ -125,8 +127,9 @@ func New(initMembers []string, msgHandler msgHandlerFun, leaveHandler notifyLeav
}

return &Client{
list: list,
delegate: dg,
list: list,
delegate: dg,
initMembers: initMembers,
}, nil
}

Expand All @@ -143,3 +146,8 @@ func (c *Client) Broadcast(b []byte) {
msg: b,
})
}

func (c *Client) RejoinInitMembers() error {
_, err := c.list.Join(c.initMembers)
return err
}

0 comments on commit a6d2bf8

Please sign in to comment.