Skip to content

Commit

Permalink
Merge pull request #41 from Nitro/mihaitodor/check-errors-docker-disc…
Browse files Browse the repository at this point in the history
…overy

Check errors in docker_discovery.go
  • Loading branch information
mihaitodor authored Nov 19, 2018
2 parents 71626d2 + 804f4d8 commit 3ed7ddc
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 38 deletions.
6 changes: 3 additions & 3 deletions discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
)

const (
SLEEP_INTERVAL = 1 * time.Second
DefaultSleepInterval = 1 * time.Second
)

// A ChangeListener is a service that will receive service change events
// over the HTTP interface.
type ChangeListener struct {
Name string // Name to be represented in the Listeners list
Url string // Url of the service to send events to
Name string // Name to be represented in the Listeners list
Url string // Url of the service to send events to
}

// A Discoverer is responsible for finding services that we care
Expand Down
43 changes: 27 additions & 16 deletions discovery/docker_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type DockerDiscovery struct {
serviceNamer ServiceNamer // The service namer implementation
advertiseIp string // The address we'll advertise for services
containerCache *ContainerCache // Stores full container data for fast lookups
sleepInterval time.Duration // The sleep interval for event processing and reconnection
sync.RWMutex // Reader/Writer lock
}

Expand All @@ -43,6 +44,7 @@ func NewDockerDiscovery(endpoint string, svcNamer ServiceNamer, ip string) *Dock
containerCache: NewContainerCache(),
serviceNamer: svcNamer,
advertiseIp: ip,
sleepInterval: DefaultSleepInterval,
}

// Default to our own method for returning this
Expand Down Expand Up @@ -124,7 +126,7 @@ func (d *DockerDiscovery) Run(looper director.Looper) {
}
log.Debugf("Event: %#v\n", event)
d.handleEvent(*event)
case <-time.After(SLEEP_INTERVAL):
case <-time.After(d.sleepInterval):
d.getContainers()
case <-time.After(CacheDrainInterval):
d.containerCache.Drain(len(d.services))
Expand Down Expand Up @@ -278,29 +280,37 @@ func (d *DockerDiscovery) getContainers() {
d.containerCache.Prune(containerMap)
}

func (d *DockerDiscovery) manageConnection(quit chan bool) {
func (d *DockerDiscovery) configureDockerConnection() DockerClient {
client, err := d.ClientProvider()
if err != nil {
log.Errorf("Error when creating Docker client: %s\n", err.Error())
return
log.Errorf("Error creating Docker client: %s", err)
return nil
}

err = client.AddEventListener(d.events)
if err != nil {
log.Errorf("Error adding Docker client event listener: %s", err)
return nil
}
client.AddEventListener(d.events)

return client
}

func (d *DockerDiscovery) manageConnection(quit chan bool) {
client := d.configureDockerConnection()

// Health check the connection and set it back up when it goes away.
for {

err := client.Ping()
if err != nil {
// Is the client connected?
if client == nil || client.Ping() != nil {
log.Warn("Lost connection to Docker, re-connecting")
client.RemoveEventListener(d.events)
if client != nil {
// Swallow errors since we're overwriting the client anyway
_ = client.RemoveEventListener(d.events)
}
d.events = make(chan *docker.APIEvents) // RemoveEventListener closes it

client, err = docker.NewClient(d.endpoint)
if err == nil {
client.AddEventListener(d.events)
} else {
log.Error("Can't reconnect to Docker!")
}
client = d.configureDockerConnection()
}

select {
Expand All @@ -309,7 +319,8 @@ func (d *DockerDiscovery) manageConnection(quit chan bool) {
default:
}

time.Sleep(SLEEP_INTERVAL)
// Sleep a bit before attempting to reconnect
time.Sleep(d.sleepInterval)
}
}

Expand Down
72 changes: 68 additions & 4 deletions discovery/docker_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ var hostname = "shakespeare"
// Define a stubDockerClient that we can use to test the discovery
type stubDockerClient struct {
ErrorOnInspectContainer bool
ErrorOnPing bool
PingChan chan struct{}
}

func (s *stubDockerClient) InspectContainer(id string) (*docker.Container, error) {
Expand Down Expand Up @@ -57,7 +59,24 @@ func (s *stubDockerClient) RemoveEventListener(listener chan *docker.APIEvents)
return nil
}

func (s *stubDockerClient) Ping() error { return nil }
func (s *stubDockerClient) Ping() error {
if s.ErrorOnPing {
return errors.New("dummy errror")
}

s.PingChan <- struct{}{}

return nil
}

type dummyLooper struct{}

// Loop will block for enough time to prevent the event loop in DockerDiscovery.Run()
// from closing connQuitChan before the tests finish running
func (*dummyLooper) Loop(fn func() error) { time.Sleep(1 * time.Second) }
func (*dummyLooper) Wait() error { return nil }
func (*dummyLooper) Done(error) {}
func (*dummyLooper) Quit() {}

func Test_DockerDiscovery(t *testing.T) {

Expand All @@ -75,10 +94,12 @@ func Test_DockerDiscovery(t *testing.T) {
service2 := service.Service{ID: svcId2, Hostname: hostname, Updated: baseTime}
services := []*service.Service{&service1, &service2}

client := stubDockerClient{
ErrorOnInspectContainer: false,
}

stubClientProvider := func() (DockerClient, error) {
return &stubDockerClient{
ErrorOnInspectContainer: false,
}, nil
return &client, nil
}

svcNamer := &RegexpNamer{ServiceNameMatch: "^/(.+)(-[0-9a-z]{7,14})$"}
Expand Down Expand Up @@ -197,5 +218,48 @@ func Test_DockerDiscovery(t *testing.T) {
So(container, ShouldBeNil)
})
})

Convey("Run()", func() {
disco.sleepInterval = 1 * time.Millisecond

Convey("pings Docker", func() {
disco.Run(&dummyLooper{})

// Check a few times that it tries to ping Docker
for i := 0; i < 3; i++ {
pinged := false
select {
case <-client.PingChan:
pinged = true
case <-time.After(10 * time.Millisecond):
}

So(pinged, ShouldBeFalse)
}
})

Convey("reconnects if the connection is dropped", func() {
connectEvent := make(chan struct{})
disco.ClientProvider = func() (DockerClient, error) {
connectEvent <- struct{}{}
return stubClientProvider()
}

client.ErrorOnPing = true
disco.Run(&dummyLooper{})

// Check a few times that it tries to reconnect to Docker
for i := 0; i < 3; i++ {
triedToConnect := false
select {
case <-connectEvent:
triedToConnect = true
case <-time.After(10 * time.Millisecond):
}

So(triedToConnect, ShouldBeTrue)
}
})
})
})
}
22 changes: 13 additions & 9 deletions discovery/static_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package discovery

import (
"testing"
"time"

"github.com/Nitro/sidecar/service"
"github.com/relistan/go-director"
Expand Down Expand Up @@ -73,10 +74,13 @@ func Test_Services(t *testing.T) {
})

Convey("Updates the current timestamp each time", func() {
services := disco.Services()
services2 := disco.Services()
s := disco.Services()
firstUpdate := s[0].Updated
time.Sleep(1 * time.Millisecond)
s = disco.Services()
secondUpdate := s[0].Updated

So(services[0].Updated.Before(services2[0].Updated), ShouldBeTrue)
So(firstUpdate.Before(secondUpdate), ShouldBeTrue)
})
})
}
Expand All @@ -93,24 +97,24 @@ func Test_Listeners(t *testing.T) {

Convey("Returns all listeners extracted from Targets", func() {
tgt1 := &Target{
Service: service.Service{Name: "beowulf", ID: "asdf"},
Service: service.Service{Name: "beowulf", ID: "asdf"},
ListenPort: 10000,
}
tgt2 := &Target{
Service: service.Service{Name: "hrothgar", ID: "abba"},
Service: service.Service{Name: "hrothgar", ID: "abba"},
ListenPort: 11000,
}
disco.Targets = []*Target{tgt1, tgt2}

listeners := disco.Listeners()

expected0 := ChangeListener{
Name:"Service(beowulf-asdf)",
Url:"http://" + disco.Hostname + ":10000/sidecar/update",
Name: "Service(beowulf-asdf)",
Url: "http://" + disco.Hostname + ":10000/sidecar/update",
}
expected1 := ChangeListener{
Name:"Service(hrothgar-abba)",
Url:"http://" + disco.Hostname + ":11000/sidecar/update",
Name: "Service(hrothgar-abba)",
Url: "http://" + disco.Hostname + ":11000/sidecar/update",
}

So(len(listeners), ShouldEqual, 2)
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,10 @@ func main() {
director.FOREVER, catalog.ALIVE_SLEEP_INTERVAL, nil,
)
discoLooper := director.NewTimedLooper(
director.FOREVER, discovery.SLEEP_INTERVAL, make(chan error),
director.FOREVER, discovery.DefaultSleepInterval, make(chan error),
)
listenLooper := director.NewTimedLooper(
director.FOREVER, discovery.SLEEP_INTERVAL, make(chan error),
director.FOREVER, discovery.DefaultSleepInterval, make(chan error),
)
healthWatchLooper := director.NewTimedLooper(
director.FOREVER, healthy.WATCH_INTERVAL, make(chan error),
Expand Down
8 changes: 4 additions & 4 deletions services_delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"github.com/Nitro/memberlist"
"github.com/Nitro/sidecar/catalog"
"github.com/Nitro/sidecar/service"
log "github.com/sirupsen/logrus"
"github.com/armon/go-metrics"
"github.com/pquerna/ffjson/ffjson"
log "github.com/sirupsen/logrus"
)

const (
Expand Down Expand Up @@ -143,7 +143,7 @@ func (d *servicesDelegate) GetBroadcasts(overhead, limit int) [][]byte {
}

func (d *servicesDelegate) LocalState(join bool) []byte {
log.Debugf("LocalState(): %b", join)
log.Debugf("LocalState(): %t", join)
d.state.RLock()
defer d.state.RUnlock()
return d.state.Encode()
Expand Down Expand Up @@ -200,9 +200,9 @@ func (d *servicesDelegate) packPacket(broadcasts [][]byte, limit int, overhead i
total += len(message) + overhead
}

if lastItem < 0 && len(broadcasts) > 0{
if lastItem < 0 && len(broadcasts) > 0 {
// Don't warn on startup... it's fairly normal
gracePeriod := time.Now().UTC().Add(0-(5*time.Second))
gracePeriod := time.Now().UTC().Add(0 - (5 * time.Second))
if d.StartedAt.Before(gracePeriod) {
log.Warnf("All messages were too long to fit! No broadcasts!")
}
Expand Down

0 comments on commit 3ed7ddc

Please sign in to comment.