Skip to content

Commit

Permalink
refactor service discovery (#251)
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Sep 16, 2020
1 parent 30eb518 commit cbde8d3
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 83 deletions.
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ Eru
![](https://github.com/projecteru2/core/workflows/golangci-lint/badge.svg)
[![Codacy Badge](https://app.codacy.com/project/badge/Grade/69918e0a02ae45c5ae7dfc42bad5cfe5)](https://www.codacy.com/gh/projecteru2/core?utm_source=github.com&utm_medium=referral&utm_content=projecteru2/core&utm_campaign=Badge_Grade)

Eru is a stateless, flexible, production-ready cluster scheduler designed to easily integrate into existing workflows. Eru can run any containerized things in long or short time. This project is Eru Core. The Core use for resource allocation and manage containers lifetime.
Eru is a stateless, flexible, production-ready resource scheduler designed to easily integrate into existing systems.

Eru can use multiple engines to run anything for the long or short term.

This project is Eru Core. The Core use for resource allocation and manage resource's lifetime.

### Testing

Expand All @@ -23,11 +27,10 @@ You can use our [footstone](https://hub.docker.com/r/projecteru2/footstone/) ima

#### GRPC

Generate golang & python 3 code
Generate golang grpc definitions.

```shell
go get -u github.com/golang/protobuf/{proto,protoc-gen-go}
pip install -U grpcio-tools
make grpc
```

Expand Down Expand Up @@ -60,7 +63,7 @@ docker run -d \

### Build and Deploy by Eru itself

After we implemented bootstrap in eru2, now you can build and deploy eru with [cli](https://github.com/projecteru2/cli) tool.
After we implemented bootstrap in eru, now you can build and deploy eru with [cli](https://github.com/projecteru2/cli) tool.

1. Test source code and build image

Expand Down
9 changes: 7 additions & 2 deletions cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"strings"

"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/discovery"
"github.com/projecteru2/core/discovery/helium"
"github.com/projecteru2/core/scheduler"
complexscheduler "github.com/projecteru2/core/scheduler/complex"
"github.com/projecteru2/core/source"
Expand All @@ -21,7 +23,7 @@ type Calcium struct {
store store.Store
scheduler scheduler.Scheduler
source source.Source
watcher *serviceWatcher
watcher discovery.Service
}

// New returns a new cluster config
Expand Down Expand Up @@ -50,7 +52,10 @@ func New(config types.Config, embeddedStorage bool) (*Calcium, error) {
log.Warn("[Calcium] SCM not set, build API disabled")
}

return &Calcium{store: store, config: config, scheduler: scheduler, source: scm, watcher: &serviceWatcher{}}, err
// set watcher
watcher := helium.New(config.GRPCConfig, store)

return &Calcium{store: store, config: config, scheduler: scheduler, source: scm, watcher: watcher}, err
}

// Finalizer use for defer
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -66,7 +65,8 @@ func (c *Calcium) ExecuteContainer(ctx context.Context, opts *types.ExecuteConta

exitData := []byte(exitDataPrefix + strconv.Itoa(execCode))
ch <- &types.AttachContainerMessage{ContainerID: opts.ContainerID, Data: exitData}
log.Infof("[ExecuteContainer] Execuate in container %s complete", utils.ShortID(opts.ContainerID))
log.Infof("[ExecuteContainer] Execuate in container %s complete", opts.ContainerID)
log.Infof("[ExecuteContainer] %v", opts.Commands)
}()

return ch
Expand Down
72 changes: 0 additions & 72 deletions cluster/calcium/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,86 +5,14 @@ import (
"sync"
"time"

"github.com/google/uuid"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
log "github.com/sirupsen/logrus"
)

type serviceWatcher struct {
once sync.Once
subs sync.Map
}

func (w *serviceWatcher) Start(ctx context.Context, s store.Store, pushInterval time.Duration) {
w.once.Do(func() {
w.start(ctx, s, pushInterval)
})
}

func (w *serviceWatcher) start(ctx context.Context, s store.Store, pushInterval time.Duration) {
ch, err := s.ServiceStatusStream(ctx)
if err != nil {
log.Errorf("[WatchServiceStatus] failed to start watch: %v", err)
return
}

go func() {
defer log.Error("[WatchServiceStatus] goroutine exited")
var (
latestStatus types.ServiceStatus
timer *time.Timer = time.NewTimer(pushInterval)
)
for {
select {
case addresses, ok := <-ch:
if !ok {
log.Error("[WatchServiceStatus] watch channel closed")
return
}

latestStatus = types.ServiceStatus{
Addresses: addresses,
Interval: pushInterval * 2,
}
w.dispatch(latestStatus)

case <-timer.C:
w.dispatch(latestStatus)
}
timer.Stop()
timer.Reset(pushInterval)
}
}()
}

func (w *serviceWatcher) dispatch(status types.ServiceStatus) {
w.subs.Range(func(k, v interface{}) bool {
c, ok := v.(chan<- types.ServiceStatus)
if !ok {
log.Error("[WatchServiceStatus] failed to cast channel from map")
return true
}
c <- status
return true
})
}

func (w *serviceWatcher) Subscribe(ch chan<- types.ServiceStatus) uuid.UUID {
id := uuid.New()
_, _ = w.subs.LoadOrStore(id, ch)
return id
}

func (w *serviceWatcher) Unsubscribe(id uuid.UUID) {
w.subs.Delete(id)
}

// WatchServiceStatus returns chan of available service address
func (c *Calcium) WatchServiceStatus(ctx context.Context) (<-chan types.ServiceStatus, error) {
ch := make(chan types.ServiceStatus)
c.watcher.Start(ctx, c.store, c.config.GRPCConfig.ServiceDiscoveryPushInterval)
id := c.watcher.Subscribe(ch)
go func() {
<-ctx.Done()
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/projecteru2/core/discovery/helium"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -59,8 +60,6 @@ func TestWatchServiceStatus(t *testing.T) {
c.config.GRPCConfig.ServiceDiscoveryPushInterval = 500 * time.Millisecond
store := &storemocks.Store{}
c.store = store
c.watcher = &serviceWatcher{}

store.On("ServiceStatusStream", mock.AnythingOfType("*context.emptyCtx")).Return(
func(_ context.Context) chan []string {
ch := make(chan []string)
Expand All @@ -78,6 +77,7 @@ func TestWatchServiceStatus(t *testing.T) {
return ch
}, nil,
)
c.watcher = helium.New(c.config.GRPCConfig, c.store)

ch, err := c.WatchServiceStatus(context.Background())
assert.NoError(t, err)
Expand Down
12 changes: 12 additions & 0 deletions discovery/discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package discovery

import (
"github.com/google/uuid"
"github.com/projecteru2/core/types"
)

// Service .
type Service interface {
Subscribe(ch chan<- types.ServiceStatus) uuid.UUID
Unsubscribe(id uuid.UUID)
}
92 changes: 92 additions & 0 deletions discovery/helium/helium.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package helium

import (
"context"
"sync"
"time"

"github.com/google/uuid"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
log "github.com/sirupsen/logrus"
)

// Helium .
type Helium struct {
sync.Once
config types.GRPCConfig
stor store.Store
subs sync.Map
}

// New .
func New(config types.GRPCConfig, stor store.Store) *Helium {
h := &Helium{}
h.config = config
h.stor = stor
h.Do(func() {
h.start(context.Background()) // rewrite ctx here, because this will run only once!
})
return h
}

// Subscribe .
func (h *Helium) Subscribe(ch chan<- types.ServiceStatus) uuid.UUID {
id := uuid.New()
_, _ = h.subs.LoadOrStore(id, ch)
return id
}

// Unsubscribe .
func (h *Helium) Unsubscribe(id uuid.UUID) {
h.subs.Delete(id)
}

func (h *Helium) start(ctx context.Context) {
ch, err := h.stor.ServiceStatusStream(ctx)
if err != nil {
log.Errorf("[WatchServiceStatus] failed to start watch: %v", err)
return
}

go func() {
log.Info("[WatchServiceStatus] service discovery start")
defer log.Error("[WatchServiceStatus] service discovery exited")
var (
latestStatus types.ServiceStatus
timer *time.Timer = time.NewTimer(h.config.ServiceDiscoveryPushInterval)
)
for {
select {
case addresses, ok := <-ch:
if !ok {
log.Error("[WatchServiceStatus] watch channel closed")
return
}

latestStatus = types.ServiceStatus{
Addresses: addresses,
Interval: h.config.ServiceDiscoveryPushInterval * 2,
}
h.dispatch(latestStatus)

case <-timer.C:
h.dispatch(latestStatus)
}
timer.Stop()
timer.Reset(h.config.ServiceDiscoveryPushInterval)
}
}()
}

func (h *Helium) dispatch(status types.ServiceStatus) {
h.subs.Range(func(k, v interface{}) bool {
c, ok := v.(chan<- types.ServiceStatus)
if !ok {
log.Error("[WatchServiceStatus] failed to cast channel from map")
return true
}
c <- status
return true
})
}
1 change: 0 additions & 1 deletion store/etcdv3/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func (m *Mercury) ServiceStatusStream(ctx context.Context) (chan []string, error
ch := make(chan []string)
go func() {
defer close(ch)
log.Info("[ServiceStatusStream] start watching service status")
resp, err := m.Get(ctx, fmt.Sprintf(serviceStatusKey, ""), clientv3.WithPrefix())
if err != nil {
log.Errorf("[ServiceStatusStream] failed to get current services: %v", err)
Expand Down

0 comments on commit cbde8d3

Please sign in to comment.