Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use hashmap to refactor #558

Merged
merged 6 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 45 additions & 44 deletions client/utils/servicepusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ import (
"sync"
"time"

"github.com/projecteru2/core/log"

"github.com/cornelk/hashmap"
"github.com/go-ping/ping"
"github.com/projecteru2/core/log"
)

// EndpointPusher pushes endpoints to registered channels if the ep is L3 reachable
type EndpointPusher struct {
sync.Mutex
chans []chan []string
pendingEndpoints sync.Map
availableEndpoints sync.Map
pendingEndpoints hashmap.HashMap
availableEndpoints hashmap.HashMap
}

// NewEndpointPusher .
Expand All @@ -37,54 +38,54 @@ func (p *EndpointPusher) Push(endpoints []string) {
}

func (p *EndpointPusher) delOutdated(endpoints []string) {
newEps := make(map[string]struct{})
for _, e := range endpoints {
newEps[e] = struct{}{}
p.Lock()
defer p.Unlock()
newEndpoints := make(map[string]struct{}) // TODO after go 1.18, use slice package to search endpoints
for _, endpoint := range endpoints {
newEndpoints[endpoint] = struct{}{}
}

p.pendingEndpoints.Range(func(key, value interface{}) bool {
ep, ok := key.(string)
for kv := range p.pendingEndpoints.Iter() {
endpoint, ok := kv.Key.(string)
if !ok {
log.Error("[EruResolver] failed to cast key while ranging pendingEndpoints")
return true
continue
}
cancel, ok := value.(context.CancelFunc)
cancel, ok := kv.Value.(context.CancelFunc)
if !ok {
log.Error("[EruResolver] failed to cast value while ranging pendingEndpoints")
}
if _, ok := newEps[ep]; !ok {
if _, ok := newEndpoints[endpoint]; !ok {
cancel()
p.pendingEndpoints.Delete(ep)
log.Debugf(nil, "[EruResolver] pending endpoint deleted: %s", ep) //nolint
p.pendingEndpoints.Del(endpoint)
log.Debugf(nil, "[EruResolver] pending endpoint deleted: %s", endpoint) //nolint
}
return true
})
}

p.availableEndpoints.Range(func(key, _ interface{}) bool {
ep, ok := key.(string)
for kv := range p.availableEndpoints.Iter() {
endpoint, ok := kv.Key.(string)
if !ok {
log.Error("[EruResolver] failed to cast key while ranging availableEndpoints")
return true
continue
}
if _, ok := newEps[ep]; !ok {
p.availableEndpoints.Delete(ep)
log.Debugf(nil, "[EruResolver] available endpoint deleted: %s", ep) //nolint
if _, ok := newEndpoints[endpoint]; !ok {
p.availableEndpoints.Del(endpoint)
log.Debugf(nil, "[EruResolver] available endpoint deleted: %s", endpoint) //nolint
}
return true
})
}
}

func (p *EndpointPusher) addCheck(endpoints []string) {
for _, endpoint := range endpoints {
if _, ok := p.pendingEndpoints.Load(endpoint); ok {
if _, ok := p.pendingEndpoints.GetStringKey(endpoint); ok {
continue
}
if _, ok := p.availableEndpoints.Load(endpoint); ok {
if _, ok := p.availableEndpoints.GetStringKey(endpoint); ok {
continue
}

ctx, cancel := context.WithCancel(context.TODO())
p.pendingEndpoints.Store(endpoint, cancel)
p.pendingEndpoints.Set(endpoint, cancel)
go p.pollReachability(ctx, endpoint)
log.Debugf(ctx, "[EruResolver] pending endpoint added: %s", endpoint)
}
Expand All @@ -97,24 +98,25 @@ func (p *EndpointPusher) pollReachability(ctx context.Context, endpoint string)
return
}

ticker := time.NewTicker(time.Second) // TODO config from outside?
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Debugf(ctx, "[EruResolver] reachability goroutine ends: %s", endpoint)
return
default:
}

time.Sleep(time.Second)
if err := p.checkReachability(parts[0]); err != nil {
continue
case <-ticker.C:
p.Lock()
defer p.Unlock()
if err := p.checkReachability(parts[0]); err != nil {
continue
}
p.pendingEndpoints.Del(endpoint)
p.availableEndpoints.Set(endpoint, struct{}{})
CMGS marked this conversation as resolved.
Show resolved Hide resolved
p.pushEndpoints()
log.Debugf(ctx, "[EruResolver] available endpoint added: %s", endpoint)
return
}

p.pendingEndpoints.Delete(endpoint)
p.availableEndpoints.Store(endpoint, struct{}{})
p.pushEndpoints()
log.Debugf(ctx, "[EruResolver] available endpoint added: %s", endpoint)
return
}
}

Expand All @@ -140,15 +142,14 @@ func (p *EndpointPusher) checkReachability(host string) (err error) {

func (p *EndpointPusher) pushEndpoints() {
endpoints := []string{}
p.availableEndpoints.Range(func(key, value interface{}) bool {
endpoint, ok := key.(string)
for kv := range p.availableEndpoints.Iter() {
endpoint, ok := kv.Key.(string)
if !ok {
log.Error("[EruResolver] failed to cast key while ranging availableEndpoints")
return true
continue
}
endpoints = append(endpoints, endpoint)
return true
})
}
for _, ch := range p.chans {
ch <- endpoints
}
Expand Down
3 changes: 3 additions & 0 deletions cluster/calcium/calcium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func NewTestCluster() *Calcium {
MaxShare: -1,
ShareBase: 100,
},
GRPCConfig: types.GRPCConfig{
ServiceDiscoveryPushInterval: 15 * time.Second,
},
WALFile: filepath.Join(walDir, "core.wal.log"),
MaxConcurrency: 10,
HAKeepaliveInterval: 16 * time.Second,
Expand Down
14 changes: 7 additions & 7 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/cornelk/hashmap"
"github.com/projecteru2/core/cluster"
enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/log"
Expand Down Expand Up @@ -158,7 +159,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateWorkloadMessage, opts *types.DeployOptions, plans []resourcetypes.ResourcePlans, deployMap map[string]int) (_ map[string][]int, err error) {
wg := sync.WaitGroup{}
wg.Add(len(deployMap))
syncRollbackMap := sync.Map{}
syncRollbackMap := hashmap.HashMap{}

seq := 0
rollbackMap := make(map[string][]int)
Expand All @@ -172,7 +173,7 @@ func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateWo
return func() {
defer wg.Done()
if indices, err := c.doDeployWorkloadsOnNode(ctx, ch, nodename, opts, deploy, plans, seq); err != nil {
syncRollbackMap.Store(nodename, indices)
syncRollbackMap.Set(nodename, indices)
}
}
}(nodename, deploy, seq))
Expand All @@ -181,12 +182,11 @@ func (c *Calcium) doDeployWorkloads(ctx context.Context, ch chan *types.CreateWo
}

wg.Wait()
syncRollbackMap.Range(func(key, value interface{}) bool {
nodename := key.(string)
indices := value.([]int)
for kv := range syncRollbackMap.Iter() {
nodename := kv.Key.(string)
indices := kv.Value.([]int)
rollbackMap[nodename] = indices
return true
})
}
log.Debugf(ctx, "[Calcium.doDeployWorkloads] rollbackMap: %+v", rollbackMap)
if len(rollbackMap) != 0 {
err = types.ErrRollbackMapIsNotEmpty
Expand Down
1 change: 0 additions & 1 deletion cluster/calcium/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ func (c *Calcium) WatchServiceStatus(ctx context.Context) (<-chan types.ServiceS
utils.SentryGo(func() {
<-ctx.Done()
c.watcher.Unsubscribe(id)
close(ch)
})
return ch, nil
}
Expand Down
62 changes: 34 additions & 28 deletions discovery/helium/helium.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,30 @@ import (
"sync"
"time"

"github.com/cornelk/hashmap"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"

"github.com/google/uuid"
)

const interval = 15 * time.Second

// Helium .
type Helium struct {
sync.Once
lock *sync.RWMutex
config types.GRPCConfig
stor store.Store
subs sync.Map
stor store.Store
subs hashmap.HashMap
interval time.Duration
}

// New .
func New(config types.GRPCConfig, stor store.Store) *Helium {
h := &Helium{}
h.config = config
h.stor = stor
h.lock = &sync.RWMutex{}
h := &Helium{interval: config.ServiceDiscoveryPushInterval, stor: stor, subs: hashmap.HashMap{}}
if h.interval < time.Second {
h.interval = interval
}
h.Do(func() {
h.start(context.TODO()) // TODO rewrite ctx here, because this will run only once!
})
Expand All @@ -35,18 +37,24 @@ func New(config types.GRPCConfig, stor store.Store) *Helium {

// Subscribe .
func (h *Helium) Subscribe(ch chan<- types.ServiceStatus) uuid.UUID {
h.lock.Lock()
defer h.lock.Unlock()
id := uuid.New()
_, _ = h.subs.LoadOrStore(id, ch)
key := id.ID()
h.subs.Set(key, ch)
return id
}

// Unsubscribe .
func (h *Helium) Unsubscribe(id uuid.UUID) {
h.lock.Lock()
defer h.lock.Unlock()
h.subs.Delete(id)
v, ok := h.subs.GetUintKey(uintptr(id.ID()))
if !ok {
return
}
ch, ok := v.(chan<- types.ServiceStatus)
if !ok {
return
}
close(ch)
h.subs.Del(id.ID())
}

func (h *Helium) start(ctx context.Context) {
CMGS marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -60,7 +68,8 @@ func (h *Helium) start(ctx context.Context) {
log.Info("[WatchServiceStatus] service discovery start")
defer log.Error("[WatchServiceStatus] service discovery exited")
var latestStatus types.ServiceStatus
timer := time.NewTimer(h.config.ServiceDiscoveryPushInterval)
ticker := time.NewTicker(h.interval)
defer ticker.Stop()
for {
select {
case addresses, ok := <-ch:
Expand All @@ -71,32 +80,29 @@ func (h *Helium) start(ctx context.Context) {

latestStatus = types.ServiceStatus{
Addresses: addresses,
Interval: h.config.ServiceDiscoveryPushInterval * 2,
Interval: h.interval * 2,
}
case <-timer.C:
case <-ticker.C:
}
h.dispatch(latestStatus)
timer.Stop()
timer.Reset(h.config.ServiceDiscoveryPushInterval)
}
}()
}

func (h *Helium) dispatch(status types.ServiceStatus) {
h.lock.RLock()
defer h.lock.RUnlock()
h.subs.Range(func(k, v interface{}) bool {
f := func(kv hashmap.KeyValue) {
defer func() {
if err := recover(); err != nil {
log.Errorf(context.TODO(), "[dispatch] dispatch %s failed, err: %v", k, err)
log.Errorf(context.TODO(), "[dispatch] dispatch %v failed, err: %v", kv.Key, err)
}
}()
c, ok := v.(chan<- types.ServiceStatus)
ch, ok := kv.Value.(chan<- types.ServiceStatus)
if !ok {
log.Error("[WatchServiceStatus] failed to cast channel from map")
return true
}
c <- status
return true
})
ch <- status
}
for kv := range h.subs.Iter() {
f(kv)
}
}
2 changes: 1 addition & 1 deletion discovery/helium/helium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestPanic(t *testing.T) {
uuid := service.Subscribe(chStatus)
time.Sleep(time.Second)
service.Unsubscribe(uuid)
close(chStatus)
//close(chStatus)
}()
}

Expand Down
Loading