Skip to content

Commit

Permalink
Merge pull request #10146 from hashicorp/dnephin/connect-proxy-test-d…
Browse files Browse the repository at this point in the history
…eadlock

connect/proxy: fix a few problems with tests
  • Loading branch information
dnephin authored Apr 28, 2021
2 parents 9b344b3 + d18a03b commit e19aef4
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 21 deletions.
2 changes: 0 additions & 2 deletions connect/proxy/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
)

func TestUpstreamResolverFuncFromClient(t *testing.T) {
t.Parallel()

tests := []struct {
name string
cfg UpstreamConfig
Expand Down
9 changes: 2 additions & 7 deletions connect/proxy/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
"testing"
"time"

"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/sdk/testutil/retry"
)

// Assert io.Closer implementation
Expand Down Expand Up @@ -65,8 +66,6 @@ func testConnPipelineSetup(t *testing.T) (net.Conn, net.Conn, *Conn, func()) {
}

func TestConn(t *testing.T) {
t.Parallel()

src, dst, c, stop := testConnPipelineSetup(t)
defer stop()

Expand Down Expand Up @@ -124,8 +123,6 @@ func TestConn(t *testing.T) {
}

func TestConnSrcClosing(t *testing.T) {
t.Parallel()

src, dst, c, stop := testConnPipelineSetup(t)
defer stop()

Expand Down Expand Up @@ -164,8 +161,6 @@ func TestConnSrcClosing(t *testing.T) {
}

func TestConnDstClosing(t *testing.T) {
t.Parallel()

src, dst, c, stop := testConnPipelineSetup(t)
defer stop()

Expand Down
21 changes: 13 additions & 8 deletions connect/proxy/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
"time"

metrics "github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"

"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/connect"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/go-hclog"
)

const (
Expand Down Expand Up @@ -44,6 +45,7 @@ type Listener struct {
// `connection refused`. Retry loops and sleeps are unpleasant workarounds and
// this is cheap and correct.
listeningChan chan struct{}
listener net.Listener

logger hclog.Logger

Expand Down Expand Up @@ -136,28 +138,32 @@ func (l *Listener) Serve() error {
return errors.New("serve called on a closed listener")
}

listen, err := l.listenFunc()
var err error
l.listener, err = l.listenFunc()
if err != nil {
return err
}
close(l.listeningChan)

for {
conn, err := listen.Accept()
conn, err := l.listener.Accept()
if err != nil {
if atomic.LoadInt32(&l.stopFlag) == 1 {
return nil
}
return err
}

l.connWG.Add(1)
go l.handleConn(conn)
}
}

// handleConn is the internal connection handler goroutine.
func (l *Listener) handleConn(src net.Conn) {
defer src.Close()
// Make sure Listener.Close waits for this conn to be cleaned up.
defer l.connWG.Done()

dst, err := l.dialFunc()
if err != nil {
Expand All @@ -169,11 +175,6 @@ func (l *Listener) handleConn(src net.Conn) {
// it closes.
defer l.trackConn()()

// Make sure Close() waits for this conn to be cleaned up. Note defer is
// before conn.Close() so runs after defer conn.Close().
l.connWG.Add(1)
defer l.connWG.Done()

// Note no need to defer dst.Close() since conn handles that for us.
conn := NewConn(src, dst)
defer conn.Close()
Expand Down Expand Up @@ -246,6 +247,10 @@ func (l *Listener) Close() error {
close(l.stopChan)
// Wait for all conns to close
l.connWG.Wait()

if l.listener != nil {
l.listener.Close()
}
}
return nil
}
Expand Down
12 changes: 8 additions & 4 deletions connect/proxy/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func testSetupMetrics(t *testing.T) *metrics.InmemSink {
s := metrics.NewInmemSink(10*time.Second, 300*time.Second)
cfg := metrics.DefaultConfig("consul.proxy.test")
cfg.EnableHostname = false
cfg.EnableRuntimeMetrics = false
metrics.NewGlobal(cfg, s)
return s
}
Expand All @@ -45,6 +46,7 @@ func assertCurrentGaugeValue(t *testing.T, sink *metrics.InmemSink,
currentInterval.RLock()
if len(currentInterval.Gauges) > 0 {
got = currentInterval.Gauges[name].Value
currentInterval.RUnlock()
break
}
currentInterval.RUnlock()
Expand Down Expand Up @@ -132,8 +134,9 @@ func TestPublicListener(t *testing.T) {

// Run proxy
go func() {
err := l.Serve()
require.NoError(t, err)
if err := l.Serve(); err != nil {
t.Errorf("failed to listen: %v", err.Error())
}
}()
defer l.Close()
l.Wait()
Expand Down Expand Up @@ -200,8 +203,9 @@ func TestUpstreamListener(t *testing.T) {

// Run proxy
go func() {
err := l.Serve()
require.NoError(t, err)
if err := l.Serve(); err != nil {
t.Errorf("failed to listen: %v", err.Error())
}
}()
defer l.Close()
l.Wait()
Expand Down

0 comments on commit e19aef4

Please sign in to comment.