Skip to content

Commit

Permalink
Merge pull request #604 from Shopify/broker-locking-tweaks
Browse files Browse the repository at this point in the history
Adjust broker locking order slightly
  • Loading branch information
eapache committed Feb 16, 2016
2 parents ded0888 + 62e4ae8 commit 1663b48
Showing 1 changed file with 7 additions and 13 deletions.
20 changes: 7 additions & 13 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func NewBroker(addr string) *Broker {
// follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
// AlreadyConnected. If conf is nil, the result of NewConfig() is used.
func (b *Broker) Open(conf *Config) error {
if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
return ErrAlreadyConnected
}

if conf == nil {
conf = NewConfig()
}
Expand All @@ -54,18 +58,8 @@ func (b *Broker) Open(conf *Config) error {
return err
}

if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
return ErrAlreadyConnected
}

b.lock.Lock()

if b.conn != nil {
b.lock.Unlock()
Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, ErrAlreadyConnected)
return ErrAlreadyConnected
}

go withRecover(func() {
defer b.lock.Unlock()

Expand All @@ -80,9 +74,9 @@ func (b *Broker) Open(conf *Config) error {
b.conn, b.connErr = dialer.Dial("tcp", b.addr)
}
if b.connErr != nil {
Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
b.conn = nil
atomic.StoreInt32(&b.opened, 0)
Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
return
}
b.conn = newBufConn(b.conn)
Expand Down Expand Up @@ -129,14 +123,14 @@ func (b *Broker) Close() error {
b.done = nil
b.responses = nil

atomic.StoreInt32(&b.opened, 0)

if err == nil {
Logger.Printf("Closed connection to broker %s\n", b.addr)
} else {
Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
}

atomic.StoreInt32(&b.opened, 0)

return err
}

Expand Down

0 comments on commit 1663b48

Please sign in to comment.