diff --git a/client/utils/servicepusher.go b/client/utils/servicepusher.go index e78de05ad..5bcbc67f8 100644 --- a/client/utils/servicepusher.go +++ b/client/utils/servicepusher.go @@ -5,19 +5,18 @@ import ( "errors" "os" "strings" - "sync" "time" - "github.com/projecteru2/core/log" - + "github.com/cornelk/hashmap" "github.com/go-ping/ping" + "github.com/projecteru2/core/log" ) // EndpointPusher pushes endpoints to registered channels if the ep is L3 reachable type EndpointPusher struct { chans []chan []string - pendingEndpoints sync.Map - availableEndpoints sync.Map + pendingEndpoints hashmap.HashMap + availableEndpoints hashmap.HashMap } // NewEndpointPusher . @@ -37,54 +36,52 @@ func (p *EndpointPusher) Push(endpoints []string) { } func (p *EndpointPusher) delOutdated(endpoints []string) { - newEps := make(map[string]struct{}) - for _, e := range endpoints { - newEps[e] = struct{}{} + newEndpoints := make(map[string]struct{}) // TODO after go 1.18, use slice package to search endpoints + for _, endpoint := range endpoints { + newEndpoints[endpoint] = struct{}{} } - p.pendingEndpoints.Range(func(key, value interface{}) bool { - ep, ok := key.(string) + for kv := range p.pendingEndpoints.Iter() { + endpoint, ok := kv.Key.(string) if !ok { log.Error("[EruResolver] failed to cast key while ranging pendingEndpoints") - return true + continue } - cancel, ok := value.(context.CancelFunc) + cancel, ok := kv.Value.(context.CancelFunc) if !ok { log.Error("[EruResolver] failed to cast value while ranging pendingEndpoints") } - if _, ok := newEps[ep]; !ok { + if _, ok := newEndpoints[endpoint]; !ok { cancel() - p.pendingEndpoints.Delete(ep) - log.Debugf(nil, "[EruResolver] pending endpoint deleted: %s", ep) //nolint + p.pendingEndpoints.Del(endpoint) + log.Debugf(nil, "[EruResolver] pending endpoint deleted: %s", endpoint) //nolint } - return true - }) + } - p.availableEndpoints.Range(func(key, _ interface{}) bool { - ep, ok := key.(string) + for kv := range p.availableEndpoints.Iter() { + endpoint, ok := kv.Key.(string) if !ok { log.Error("[EruResolver] failed to cast key while ranging availableEndpoints") - return true + continue } - if _, ok := newEps[ep]; !ok { - p.availableEndpoints.Delete(ep) - log.Debugf(nil, "[EruResolver] available endpoint deleted: %s", ep) //nolint + if _, ok := newEndpoints[endpoint]; !ok { + p.availableEndpoints.Del(endpoint) + log.Debugf(nil, "[EruResolver] available endpoint deleted: %s", endpoint) //nolint } - return true - }) + } } func (p *EndpointPusher) addCheck(endpoints []string) { for _, endpoint := range endpoints { - if _, ok := p.pendingEndpoints.Load(endpoint); ok { + if _, ok := p.pendingEndpoints.GetStringKey(endpoint); ok { continue } - if _, ok := p.availableEndpoints.Load(endpoint); ok { + if _, ok := p.availableEndpoints.GetStringKey(endpoint); ok { continue } ctx, cancel := context.WithCancel(context.TODO()) - p.pendingEndpoints.Store(endpoint, cancel) + p.pendingEndpoints.Set(endpoint, cancel) go p.pollReachability(ctx, endpoint) log.Debugf(ctx, "[EruResolver] pending endpoint added: %s", endpoint) } @@ -97,24 +94,23 @@ func (p *EndpointPusher) pollReachability(ctx context.Context, endpoint string) return } + ticker := time.NewTicker(time.Second) // TODO config from outside? + defer ticker.Stop() for { select { case <-ctx.Done(): log.Debugf(ctx, "[EruResolver] reachability goroutine ends: %s", endpoint) return - default: - } - - time.Sleep(time.Second) - if err := p.checkReachability(parts[0]); err != nil { - continue + case <-ticker.C: + if err := p.checkReachability(parts[0]); err != nil { + continue + } + p.pendingEndpoints.Del(endpoint) + p.availableEndpoints.Set(endpoint, struct{}{}) + p.pushEndpoints() + log.Debugf(ctx, "[EruResolver] available endpoint added: %s", endpoint) + return } - - p.pendingEndpoints.Delete(endpoint) - p.availableEndpoints.Store(endpoint, struct{}{}) - p.pushEndpoints() - log.Debugf(ctx, "[EruResolver] available endpoint added: %s", endpoint) - return } } @@ -140,15 +136,14 @@ func (p *EndpointPusher) checkReachability(host string) (err error) { func (p *EndpointPusher) pushEndpoints() { endpoints := []string{} - p.availableEndpoints.Range(func(key, value interface{}) bool { - endpoint, ok := key.(string) + for kv := range p.availableEndpoints.Iter() { + endpoint, ok := kv.Key.(string) if !ok { log.Error("[EruResolver] failed to cast key while ranging availableEndpoints") - return true + continue } endpoints = append(endpoints, endpoint) - return true - }) + } for _, ch := range p.chans { ch <- endpoints } diff --git a/cluster/calcium/create.go b/cluster/calcium/create.go index 66c1afa4a..71a78d703 100644 --- a/cluster/calcium/create.go +++ b/cluster/calcium/create.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/cornelk/hashmap" "github.com/projecteru2/core/cluster" enginetypes "github.com/projecteru2/core/engine/types" "github.com/projecteru2/core/log" @@ -158,7 +159,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateWorkloadMessage, opts *types.DeployOptions, plans []resourcetypes.ResourcePlans, deployMap map[string]int) (_ map[string][]int, err error) { wg := sync.WaitGroup{} wg.Add(len(deployMap)) - syncRollbackMap := sync.Map{} + syncRollbackMap := hashmap.HashMap{} seq := 0 rollbackMap := make(map[string][]int) @@ -172,7 +173,7 @@ func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateWo return func() { defer wg.Done() if indices, err := c.doDeployWorkloadsOnNode(ctx, ch, nodename, opts, deploy, plans, seq); err != nil { - syncRollbackMap.Store(nodename, indices) + syncRollbackMap.Set(nodename, indices) } } }(nodename, deploy, seq)) @@ -181,12 +182,11 @@ func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateWo } wg.Wait() - syncRollbackMap.Range(func(key, value interface{}) bool { - nodename := key.(string) - indices := value.([]int) + for kv := range syncRollbackMap.Iter() { + nodename := kv.Key.(string) + indices := kv.Value.([]int) rollbackMap[nodename] = indices - return true - }) + } log.Debugf(ctx, "[Calcium.doDeployWorkloads] rollbackMap: %+v", rollbackMap) if len(rollbackMap) != 0 { err = types.ErrRollbackMapIsNotEmpty diff --git a/go.mod b/go.mod index 5ab2affc5..89dd14002 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/cenkalti/backoff/v4 v4.0.2 github.com/containerd/containerd v1.4.13 // indirect github.com/containerd/continuity v0.0.0-20200710164510-efbc4488d8fe // indirect + github.com/cornelk/hashmap v1.0.2-0.20210201213917-c93d96ce6b8a github.com/docker/distribution v2.8.0+incompatible github.com/docker/docker v20.10.0+incompatible github.com/docker/go-connections v0.4.0 diff --git a/go.sum b/go.sum index 7a1a3dfbc..d39f3d331 100644 --- a/go.sum +++ b/go.sum @@ -107,6 +107,8 @@ github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+ github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= +github.com/cornelk/hashmap v1.0.2-0.20210201213917-c93d96ce6b8a h1:qv8c3h9mYAfJP3xPuMlhm12taC8VI0Vq4a9mivqCZBc= +github.com/cornelk/hashmap v1.0.2-0.20210201213917-c93d96ce6b8a/go.mod h1:SI48x/mQnWtjsJuM7GgmODo4o5O8FhGJgClCRmtOtIQ= github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= @@ -119,6 +121,8 @@ github.com/davecgh/go-spew v0.0.0-20161028175848-04cdfd42973b/go.mod h1:J7Y8YcW2 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dchest/siphash v1.1.0 h1:1Rs9eTUlZLPBEvV+2sTaM8O0NWn0ppbgqS7p11aWawI= +github.com/dchest/siphash v1.1.0/go.mod h1:q+IRvb2gOSrUnYoPqHiyHXS0FOBBOdl6tONBlVnOnt4= github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=