Skip to content

Commit

Permalink
use hashmap to refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Mar 8, 2022
1 parent 36d1c03 commit 04638c2
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 52 deletions.
85 changes: 40 additions & 45 deletions client/utils/servicepusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@ import (
"errors"
"os"
"strings"
"sync"
"time"

"github.com/projecteru2/core/log"

"github.com/cornelk/hashmap"
"github.com/go-ping/ping"
"github.com/projecteru2/core/log"
)

// EndpointPusher pushes endpoints to registered channels if the ep is L3 reachable
type EndpointPusher struct {
chans []chan []string
pendingEndpoints sync.Map
availableEndpoints sync.Map
pendingEndpoints hashmap.HashMap
availableEndpoints hashmap.HashMap
}

// NewEndpointPusher .
Expand All @@ -37,54 +36,52 @@ func (p *EndpointPusher) Push(endpoints []string) {
}

func (p *EndpointPusher) delOutdated(endpoints []string) {
newEps := make(map[string]struct{})
for _, e := range endpoints {
newEps[e] = struct{}{}
newEndpoints := make(map[string]struct{}) // TODO after go 1.18, use slice package to search endpoints
for _, endpoint := range endpoints {
newEndpoints[endpoint] = struct{}{}
}

p.pendingEndpoints.Range(func(key, value interface{}) bool {
ep, ok := key.(string)
for kv := range p.pendingEndpoints.Iter() {
endpoint, ok := kv.Key.(string)
if !ok {
log.Error("[EruResolver] failed to cast key while ranging pendingEndpoints")
return true
continue
}
cancel, ok := value.(context.CancelFunc)
cancel, ok := kv.Value.(context.CancelFunc)
if !ok {
log.Error("[EruResolver] failed to cast value while ranging pendingEndpoints")
}
if _, ok := newEps[ep]; !ok {
if _, ok := newEndpoints[endpoint]; !ok {
cancel()
p.pendingEndpoints.Delete(ep)
log.Debugf(nil, "[EruResolver] pending endpoint deleted: %s", ep) //nolint
p.pendingEndpoints.Del(endpoint)
log.Debugf(nil, "[EruResolver] pending endpoint deleted: %s", endpoint) //nolint
}
return true
})
}

p.availableEndpoints.Range(func(key, _ interface{}) bool {
ep, ok := key.(string)
for kv := range p.availableEndpoints.Iter() {
endpoint, ok := kv.Key.(string)
if !ok {
log.Error("[EruResolver] failed to cast key while ranging availableEndpoints")
return true
continue
}
if _, ok := newEps[ep]; !ok {
p.availableEndpoints.Delete(ep)
log.Debugf(nil, "[EruResolver] available endpoint deleted: %s", ep) //nolint
if _, ok := newEndpoints[endpoint]; !ok {
p.availableEndpoints.Del(endpoint)
log.Debugf(nil, "[EruResolver] available endpoint deleted: %s", endpoint) //nolint
}
return true
})
}
}

func (p *EndpointPusher) addCheck(endpoints []string) {
for _, endpoint := range endpoints {
if _, ok := p.pendingEndpoints.Load(endpoint); ok {
if _, ok := p.pendingEndpoints.GetStringKey(endpoint); ok {
continue
}
if _, ok := p.availableEndpoints.Load(endpoint); ok {
if _, ok := p.availableEndpoints.GetStringKey(endpoint); ok {
continue
}

ctx, cancel := context.WithCancel(context.TODO())
p.pendingEndpoints.Store(endpoint, cancel)
p.pendingEndpoints.Set(endpoint, cancel)
go p.pollReachability(ctx, endpoint)
log.Debugf(ctx, "[EruResolver] pending endpoint added: %s", endpoint)
}
Expand All @@ -97,24 +94,23 @@ func (p *EndpointPusher) pollReachability(ctx context.Context, endpoint string)
return
}

ticker := time.NewTicker(time.Second) // TODO config from outside?
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Debugf(ctx, "[EruResolver] reachability goroutine ends: %s", endpoint)
return
default:
}

time.Sleep(time.Second)
if err := p.checkReachability(parts[0]); err != nil {
continue
case <-ticker.C:
if err := p.checkReachability(parts[0]); err != nil {
continue
}
p.pendingEndpoints.Del(endpoint)
p.availableEndpoints.Set(endpoint, struct{}{})
p.pushEndpoints()
log.Debugf(ctx, "[EruResolver] available endpoint added: %s", endpoint)
return
}

p.pendingEndpoints.Delete(endpoint)
p.availableEndpoints.Store(endpoint, struct{}{})
p.pushEndpoints()
log.Debugf(ctx, "[EruResolver] available endpoint added: %s", endpoint)
return
}
}

Expand All @@ -140,15 +136,14 @@ func (p *EndpointPusher) checkReachability(host string) (err error) {

func (p *EndpointPusher) pushEndpoints() {
endpoints := []string{}
p.availableEndpoints.Range(func(key, value interface{}) bool {
endpoint, ok := key.(string)
for kv := range p.availableEndpoints.Iter() {
endpoint, ok := kv.Key.(string)
if !ok {
log.Error("[EruResolver] failed to cast key while ranging availableEndpoints")
return true
continue
}
endpoints = append(endpoints, endpoint)
return true
})
}
for _, ch := range p.chans {
ch <- endpoints
}
Expand Down
14 changes: 7 additions & 7 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/cornelk/hashmap"
"github.com/projecteru2/core/cluster"
enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/log"
Expand Down Expand Up @@ -158,7 +159,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateWorkloadMessage, opts *types.DeployOptions, plans []resourcetypes.ResourcePlans, deployMap map[string]int) (_ map[string][]int, err error) {
wg := sync.WaitGroup{}
wg.Add(len(deployMap))
syncRollbackMap := sync.Map{}
syncRollbackMap := hashmap.HashMap{}

seq := 0
rollbackMap := make(map[string][]int)
Expand All @@ -172,7 +173,7 @@ func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateWo
return func() {
defer wg.Done()
if indices, err := c.doDeployWorkloadsOnNode(ctx, ch, nodename, opts, deploy, plans, seq); err != nil {
syncRollbackMap.Store(nodename, indices)
syncRollbackMap.Set(nodename, indices)
}
}
}(nodename, deploy, seq))
Expand All @@ -181,12 +182,11 @@ func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateWo
}

wg.Wait()
syncRollbackMap.Range(func(key, value interface{}) bool {
nodename := key.(string)
indices := value.([]int)
for kv := range syncRollbackMap.Iter() {
nodename := kv.Key.(string)
indices := kv.Value.([]int)
rollbackMap[nodename] = indices
return true
})
}
log.Debugf(ctx, "[Calcium.doDeployWorkloads] rollbackMap: %+v", rollbackMap)
if len(rollbackMap) != 0 {
err = types.ErrRollbackMapIsNotEmpty
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/cenkalti/backoff/v4 v4.0.2
github.com/containerd/containerd v1.4.13 // indirect
github.com/containerd/continuity v0.0.0-20200710164510-efbc4488d8fe // indirect
github.com/cornelk/hashmap v1.0.2-0.20210201213917-c93d96ce6b8a
github.com/docker/distribution v2.8.0+incompatible
github.com/docker/docker v20.10.0+incompatible
github.com/docker/go-connections v0.4.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+
github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cornelk/hashmap v1.0.2-0.20210201213917-c93d96ce6b8a h1:qv8c3h9mYAfJP3xPuMlhm12taC8VI0Vq4a9mivqCZBc=
github.com/cornelk/hashmap v1.0.2-0.20210201213917-c93d96ce6b8a/go.mod h1:SI48x/mQnWtjsJuM7GgmODo4o5O8FhGJgClCRmtOtIQ=
github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
Expand All @@ -119,6 +121,8 @@ github.com/davecgh/go-spew v0.0.0-20161028175848-04cdfd42973b/go.mod h1:J7Y8YcW2
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dchest/siphash v1.1.0 h1:1Rs9eTUlZLPBEvV+2sTaM8O0NWn0ppbgqS7p11aWawI=
github.com/dchest/siphash v1.1.0/go.mod h1:q+IRvb2gOSrUnYoPqHiyHXS0FOBBOdl6tONBlVnOnt4=
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
Expand Down

0 comments on commit 04638c2

Please sign in to comment.