From 62e4ae86457b7c3e668e9cb5970c95e3c1286ffd Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sun, 14 Feb 2016 09:56:32 -0500 Subject: [PATCH] Adjust broker locking order slightly I was doing an unrelated experiment with a config that generated a warning on validation, and I ended up flooded with warnings because we now call `Open` on every broker returned from the Client, and we were revalidating the config every single time. One thing led to another and I made several other adjustments in the same vein. - Move the atomic `AlreadyConnected` check to before the config validation in `Broker.Open`. Config validation is expensive and can log, no point in doing it if we don't have to. - Move all the `atomic.StoreInt32` calls to *after* any of their related log messages. None of these were real problems, but it removes any possibility of out-of-order log messages due to unusual race conditions. - Remove the double-check for `AlreadyConnected` after we've taken the full lock. Initially I just removed the log message since it's not something we need to log, but upon closer inspection I'm pretty sure this is dead code; anything that sets `opened` to 0 also sets `conn` to nil *first*, and does so while holding the lock. --- broker.go | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/broker.go b/broker.go index 5523fd4e9..c4596f909 100644 --- a/broker.go +++ b/broker.go @@ -45,6 +45,10 @@ func NewBroker(addr string) *Broker { // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or // AlreadyConnected. If conf is nil, the result of NewConfig() is used. func (b *Broker) Open(conf *Config) error { + if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) { + return ErrAlreadyConnected + } + if conf == nil { conf = NewConfig() } @@ -54,18 +58,8 @@ func (b *Broker) Open(conf *Config) error { return err } - if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) { - return ErrAlreadyConnected - } - b.lock.Lock() - if b.conn != nil { - b.lock.Unlock() - Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, ErrAlreadyConnected) - return ErrAlreadyConnected - } - go withRecover(func() { defer b.lock.Unlock() @@ -80,9 +74,9 @@ func (b *Broker) Open(conf *Config) error { b.conn, b.connErr = dialer.Dial("tcp", b.addr) } if b.connErr != nil { + Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr) b.conn = nil atomic.StoreInt32(&b.opened, 0) - Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr) return } b.conn = newBufConn(b.conn) @@ -129,14 +123,14 @@ func (b *Broker) Close() error { b.done = nil b.responses = nil - atomic.StoreInt32(&b.opened, 0) - if err == nil { Logger.Printf("Closed connection to broker %s\n", b.addr) } else { Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err) } + atomic.StoreInt32(&b.opened, 0) + return err }