Skip to content
This repository has been archived by the owner on Feb 1, 2021. It is now read-only.

Commit

Permalink
improve eventHandlers locking
Browse files Browse the repository at this point in the history
Signed-off-by: Victor Vieux <[email protected]>
  • Loading branch information
vieux committed Jan 12, 2016
1 parent 78008f4 commit a2018c1
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 44 deletions.
54 changes: 53 additions & 1 deletion cluster/event.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package cluster

import "github.com/samalba/dockerclient"
import (
"errors"
"sync"

log "github.com/Sirupsen/logrus"
"github.com/samalba/dockerclient"
)

// Event is exported
type Event struct {
Expand All @@ -12,3 +18,49 @@ type Event struct {
type EventHandler interface {
Handle(*Event) error
}

// EventHandlers is a map of EventHandler
type EventHandlers struct {
sync.RWMutex

eventHandlers map[EventHandler]struct{}
}

// NewEventHandlers returns a EventHandlers
func NewEventHandlers() *EventHandlers {
return &EventHandlers{
eventHandlers: make(map[EventHandler]struct{}),
}
}

// Handle callbacks for the events
func (eh *EventHandlers) Handle(e *Event) {
eh.RLock()
defer eh.RUnlock()

for h := range eh.eventHandlers {
if err := h.Handle(e); err != nil {
log.Error(err)
}
}
}

// RegisterEventHandler registers an event handler.
func (eh *EventHandlers) RegisterEventHandler(h EventHandler) error {
eh.Lock()
defer eh.Unlock()

if _, ok := eh.eventHandlers[h]; ok {
return errors.New("event handler already set")
}
eh.eventHandlers[h] = struct{}{}
return nil
}

// UnregisterEventHandler unregisters a previously registered event handler.
func (eh *EventHandlers) UnregisterEventHandler(h EventHandler) {
eh.Lock()
defer eh.Unlock()

delete(eh.eventHandlers, h)
}
26 changes: 5 additions & 21 deletions cluster/mesos/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Cluster struct {

driver *mesosscheduler.MesosSchedulerDriver
dockerEnginePort string
eventHandlers map[cluster.EventHandler]struct{}
eventHandlers *cluster.EventHandlers
master string
agents map[string]*agent
scheduler *scheduler.Scheduler
Expand Down Expand Up @@ -67,6 +67,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st
}
cluster := &Cluster{
dockerEnginePort: defaultDockerEnginePort,
eventHandlers: cluster.NewEventHandlers(),
master: master,
agents: make(map[string]*agent),
scheduler: scheduler,
Expand Down Expand Up @@ -156,35 +157,18 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st

// Handle callbacks for the events
func (c *Cluster) Handle(e *cluster.Event) error {
c.RLock()
defer c.RUnlock()

for h := range c.eventHandlers {
if err := h.Handle(e); err != nil {
log.Error(err)
}
}
c.eventHandlers.Handle(e)
return nil
}

// RegisterEventHandler registers an event handler.
func (c *Cluster) RegisterEventHandler(h cluster.EventHandler) error {
c.Lock()
defer c.Unlock()

if _, ok := c.eventHandlers[h]; ok {
return errors.New("event handler already set")
}
c.eventHandlers[h] = struct{}{}
return nil
return c.eventHandlers.RegisterEventHandler(h)
}

// UnregisterEventHandler unregisters a previously registered event handler.
func (c *Cluster) UnregisterEventHandler(h cluster.EventHandler) {
c.Lock()
defer c.Unlock()

delete(c.eventHandlers, h)
c.eventHandlers.UnregisterEventHandler(h)
}

// CreateContainer for container creation in Mesos task
Expand Down
27 changes: 5 additions & 22 deletions cluster/swarm/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (p *pendingContainer) ToContainer() *cluster.Container {
type Cluster struct {
sync.RWMutex

eventHandlers map[cluster.EventHandler]struct{}
eventHandlers *cluster.EventHandlers
engines map[string]*cluster.Engine
pendingEngines map[string]*cluster.Engine
scheduler *scheduler.Scheduler
Expand All @@ -67,7 +67,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery
log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster")

cluster := &Cluster{
eventHandlers: make(map[cluster.EventHandler]struct{}),
eventHandlers: cluster.NewEventHandlers(),
engines: make(map[string]*cluster.Engine),
pendingEngines: make(map[string]*cluster.Engine),
scheduler: scheduler,
Expand All @@ -91,35 +91,18 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery

// Handle callbacks for the events
func (c *Cluster) Handle(e *cluster.Event) error {
c.RLock()
defer c.RUnlock()

for h := range c.eventHandlers {
if err := h.Handle(e); err != nil {
log.Error(err)
}
}
c.eventHandlers.Handle(e)
return nil
}

// RegisterEventHandler registers an event handler.
func (c *Cluster) RegisterEventHandler(h cluster.EventHandler) error {
c.Lock()
defer c.Unlock()

if _, ok := c.eventHandlers[h]; ok {
return errors.New("event handler already set")
}
c.eventHandlers[h] = struct{}{}
return nil
return c.eventHandlers.RegisterEventHandler(h)
}

// UnregisterEventHandler unregisters a previously registered event handler.
func (c *Cluster) UnregisterEventHandler(h cluster.EventHandler) {
c.Lock()
defer c.Unlock()

delete(c.eventHandlers, h)
c.eventHandlers.UnregisterEventHandler(h)
}

// Generate a globally (across the cluster) unique ID.
Expand Down

0 comments on commit a2018c1

Please sign in to comment.