diff --git a/discovery/discovery.go b/discovery/discovery.go index 7b48db31..af094117 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -8,14 +8,14 @@ import ( ) const ( - SLEEP_INTERVAL = 1 * time.Second + DefaultSleepInterval = 1 * time.Second ) // A ChangeListener is a service that will receive service change events // over the HTTP interface. type ChangeListener struct { - Name string // Name to be represented in the Listeners list - Url string // Url of the service to send events to + Name string // Name to be represented in the Listeners list + Url string // Url of the service to send events to } // A Discoverer is responsible for finding services that we care diff --git a/discovery/docker_discovery.go b/discovery/docker_discovery.go index 612d32c2..81cd5264 100644 --- a/discovery/docker_discovery.go +++ b/discovery/docker_discovery.go @@ -33,6 +33,7 @@ type DockerDiscovery struct { serviceNamer ServiceNamer // The service namer implementation advertiseIp string // The address we'll advertise for services containerCache *ContainerCache // Stores full container data for fast lookups + sleepInterval time.Duration // The sleep interval for event processing and reconnection sync.RWMutex // Reader/Writer lock } @@ -43,6 +44,7 @@ func NewDockerDiscovery(endpoint string, svcNamer ServiceNamer, ip string) *Dock containerCache: NewContainerCache(), serviceNamer: svcNamer, advertiseIp: ip, + sleepInterval: DefaultSleepInterval, } // Default to our own method for returning this @@ -124,7 +126,7 @@ func (d *DockerDiscovery) Run(looper director.Looper) { } log.Debugf("Event: %#v\n", event) d.handleEvent(*event) - case <-time.After(SLEEP_INTERVAL): + case <-time.After(d.sleepInterval): d.getContainers() case <-time.After(CacheDrainInterval): d.containerCache.Drain(len(d.services)) @@ -278,29 +280,37 @@ func (d *DockerDiscovery) getContainers() { d.containerCache.Prune(containerMap) } -func (d *DockerDiscovery) manageConnection(quit chan bool) { +func (d *DockerDiscovery) configureDockerConnection() DockerClient { client, err := d.ClientProvider() if err != nil { - log.Errorf("Error when creating Docker client: %s\n", err.Error()) - return + log.Errorf("Error creating Docker client: %s", err) + return nil + } + + err = client.AddEventListener(d.events) + if err != nil { + log.Errorf("Error adding Docker client event listener: %s", err) + return nil } - client.AddEventListener(d.events) + + return client +} + +func (d *DockerDiscovery) manageConnection(quit chan bool) { + client := d.configureDockerConnection() // Health check the connection and set it back up when it goes away. for { - - err := client.Ping() - if err != nil { + // Is the client connected? + if client == nil || client.Ping() != nil { log.Warn("Lost connection to Docker, re-connecting") - client.RemoveEventListener(d.events) + if client != nil { + // Swallow errors since we're overwriting the client anyway + _ = client.RemoveEventListener(d.events) + } d.events = make(chan *docker.APIEvents) // RemoveEventListener closes it - client, err = docker.NewClient(d.endpoint) - if err == nil { - client.AddEventListener(d.events) - } else { - log.Error("Can't reconnect to Docker!") - } + client = d.configureDockerConnection() } select { @@ -309,7 +319,8 @@ func (d *DockerDiscovery) manageConnection(quit chan bool) { default: } - time.Sleep(SLEEP_INTERVAL) + // Sleep a bit before attempting to reconnect + time.Sleep(d.sleepInterval) } } diff --git a/discovery/docker_discovery_test.go b/discovery/docker_discovery_test.go index ed602d2d..0a67da97 100644 --- a/discovery/docker_discovery_test.go +++ b/discovery/docker_discovery_test.go @@ -15,6 +15,8 @@ var hostname = "shakespeare" // Define a stubDockerClient that we can use to test the discovery type stubDockerClient struct { ErrorOnInspectContainer bool + ErrorOnPing bool + PingChan chan struct{} } func (s *stubDockerClient) InspectContainer(id string) (*docker.Container, error) { @@ -57,7 +59,24 @@ func (s *stubDockerClient) RemoveEventListener(listener chan *docker.APIEvents) return nil } -func (s *stubDockerClient) Ping() error { return nil } +func (s *stubDockerClient) Ping() error { + if s.ErrorOnPing { + return errors.New("dummy errror") + } + + s.PingChan <- struct{}{} + + return nil +} + +type dummyLooper struct{} + +// Loop will block for enough time to prevent the event loop in DockerDiscovery.Run() +// from closing connQuitChan before the tests finish running +func (*dummyLooper) Loop(fn func() error) { time.Sleep(1 * time.Second) } +func (*dummyLooper) Wait() error { return nil } +func (*dummyLooper) Done(error) {} +func (*dummyLooper) Quit() {} func Test_DockerDiscovery(t *testing.T) { @@ -75,10 +94,12 @@ func Test_DockerDiscovery(t *testing.T) { service2 := service.Service{ID: svcId2, Hostname: hostname, Updated: baseTime} services := []*service.Service{&service1, &service2} + client := stubDockerClient{ + ErrorOnInspectContainer: false, + } + stubClientProvider := func() (DockerClient, error) { - return &stubDockerClient{ - ErrorOnInspectContainer: false, - }, nil + return &client, nil } svcNamer := &RegexpNamer{ServiceNameMatch: "^/(.+)(-[0-9a-z]{7,14})$"} @@ -197,5 +218,48 @@ func Test_DockerDiscovery(t *testing.T) { So(container, ShouldBeNil) }) }) + + Convey("Run()", func() { + disco.sleepInterval = 1 * time.Millisecond + + Convey("pings Docker", func() { + disco.Run(&dummyLooper{}) + + // Check a few times that it tries to ping Docker + for i := 0; i < 3; i++ { + pinged := false + select { + case <-client.PingChan: + pinged = true + case <-time.After(10 * time.Millisecond): + } + + So(pinged, ShouldBeFalse) + } + }) + + Convey("reconnects if the connection is dropped", func() { + connectEvent := make(chan struct{}) + disco.ClientProvider = func() (DockerClient, error) { + connectEvent <- struct{}{} + return stubClientProvider() + } + + client.ErrorOnPing = true + disco.Run(&dummyLooper{}) + + // Check a few times that it tries to reconnect to Docker + for i := 0; i < 3; i++ { + triedToConnect := false + select { + case <-connectEvent: + triedToConnect = true + case <-time.After(10 * time.Millisecond): + } + + So(triedToConnect, ShouldBeTrue) + } + }) + }) }) } diff --git a/discovery/static_discovery_test.go b/discovery/static_discovery_test.go index 7d99631e..a99d46bc 100644 --- a/discovery/static_discovery_test.go +++ b/discovery/static_discovery_test.go @@ -2,6 +2,7 @@ package discovery import ( "testing" + "time" "github.com/Nitro/sidecar/service" "github.com/relistan/go-director" @@ -73,10 +74,13 @@ func Test_Services(t *testing.T) { }) Convey("Updates the current timestamp each time", func() { - services := disco.Services() - services2 := disco.Services() + s := disco.Services() + firstUpdate := s[0].Updated + time.Sleep(1 * time.Millisecond) + s = disco.Services() + secondUpdate := s[0].Updated - So(services[0].Updated.Before(services2[0].Updated), ShouldBeTrue) + So(firstUpdate.Before(secondUpdate), ShouldBeTrue) }) }) } @@ -93,11 +97,11 @@ func Test_Listeners(t *testing.T) { Convey("Returns all listeners extracted from Targets", func() { tgt1 := &Target{ - Service: service.Service{Name: "beowulf", ID: "asdf"}, + Service: service.Service{Name: "beowulf", ID: "asdf"}, ListenPort: 10000, } tgt2 := &Target{ - Service: service.Service{Name: "hrothgar", ID: "abba"}, + Service: service.Service{Name: "hrothgar", ID: "abba"}, ListenPort: 11000, } disco.Targets = []*Target{tgt1, tgt2} @@ -105,12 +109,12 @@ func Test_Listeners(t *testing.T) { listeners := disco.Listeners() expected0 := ChangeListener{ - Name:"Service(beowulf-asdf)", - Url:"http://" + disco.Hostname + ":10000/sidecar/update", + Name: "Service(beowulf-asdf)", + Url: "http://" + disco.Hostname + ":10000/sidecar/update", } expected1 := ChangeListener{ - Name:"Service(hrothgar-abba)", - Url:"http://" + disco.Hostname + ":11000/sidecar/update", + Name: "Service(hrothgar-abba)", + Url: "http://" + disco.Hostname + ":11000/sidecar/update", } So(len(listeners), ShouldEqual, 2) diff --git a/main.go b/main.go index 1a2712b0..61ca61c7 100644 --- a/main.go +++ b/main.go @@ -307,10 +307,10 @@ func main() { director.FOREVER, catalog.ALIVE_SLEEP_INTERVAL, nil, ) discoLooper := director.NewTimedLooper( - director.FOREVER, discovery.SLEEP_INTERVAL, make(chan error), + director.FOREVER, discovery.DefaultSleepInterval, make(chan error), ) listenLooper := director.NewTimedLooper( - director.FOREVER, discovery.SLEEP_INTERVAL, make(chan error), + director.FOREVER, discovery.DefaultSleepInterval, make(chan error), ) healthWatchLooper := director.NewTimedLooper( director.FOREVER, healthy.WATCH_INTERVAL, make(chan error), diff --git a/services_delegate.go b/services_delegate.go index b0326f87..9ec46069 100644 --- a/services_delegate.go +++ b/services_delegate.go @@ -7,9 +7,9 @@ import ( "github.com/Nitro/memberlist" "github.com/Nitro/sidecar/catalog" "github.com/Nitro/sidecar/service" - log "github.com/sirupsen/logrus" "github.com/armon/go-metrics" "github.com/pquerna/ffjson/ffjson" + log "github.com/sirupsen/logrus" ) const ( @@ -143,7 +143,7 @@ func (d *servicesDelegate) GetBroadcasts(overhead, limit int) [][]byte { } func (d *servicesDelegate) LocalState(join bool) []byte { - log.Debugf("LocalState(): %b", join) + log.Debugf("LocalState(): %t", join) d.state.RLock() defer d.state.RUnlock() return d.state.Encode() @@ -200,9 +200,9 @@ func (d *servicesDelegate) packPacket(broadcasts [][]byte, limit int, overhead i total += len(message) + overhead } - if lastItem < 0 && len(broadcasts) > 0{ + if lastItem < 0 && len(broadcasts) > 0 { // Don't warn on startup... it's fairly normal - gracePeriod := time.Now().UTC().Add(0-(5*time.Second)) + gracePeriod := time.Now().UTC().Add(0 - (5 * time.Second)) if d.StartedAt.Before(gracePeriod) { log.Warnf("All messages were too long to fit! No broadcasts!") }