Skip to content

Commit

Permalink
Add positive confirmation that Agent made a connection to Collector (#…
Browse files Browse the repository at this point in the history
…2423)

Signed-off-by: Bernardo Tolosa <[email protected]>
  • Loading branch information
BernardTolosajr authored Sep 11, 2020
1 parent 393e04d commit e37deb9
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 34 deletions.
18 changes: 17 additions & 1 deletion cmd/agent/app/reporter/grpc/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package grpc

import (
"context"
"errors"
"fmt"
"strings"
Expand Down Expand Up @@ -90,5 +91,20 @@ func (b *ConnBuilder) CreateConnection(logger *zap.Logger) (*grpc.ClientConn, er
}
dialOptions = append(dialOptions, grpc.WithDefaultServiceConfig(grpcresolver.GRPCServiceConfig))
dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(b.MaxRetry))))
return grpc.Dial(dialTarget, dialOptions...)
conn, err := grpc.Dial(dialTarget, dialOptions...)

if err != nil {
return nil, err
}

go func(cc *grpc.ClientConn) {
logger.Info("Checking connection to collector")
for {
s := cc.GetState()
logger.Info("Agent collector connection state change", zap.String("dialTarget", dialTarget), zap.Stringer("status", s))
cc.WaitForStateChange(context.Background(), s)
}
}(conn)

return conn, nil
}
115 changes: 82 additions & 33 deletions cmd/agent/app/reporter/grpc/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/uber/jaeger-lib/metrics/metricstest"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
yaml "gopkg.in/yaml.v2"

Expand Down Expand Up @@ -64,47 +65,79 @@ func TestBuilderFromConfig(t *testing.T) {
}

func TestBuilderWithCollectors(t *testing.T) {
spanHandler1 := &mockSpanHandler{}
s1, addr1 := initializeGRPCTestServer(t, func(s *grpc.Server) {
api_v2.RegisterCollectorServiceServer(s, spanHandler1)
})
defer s1.Stop()

tests := []struct {
target string
name string
hostPorts []string
checkSuffixOnly bool
notifier discovery.Notifier
discoverer discovery.Discoverer
expectedError string
target string
name string
hostPorts []string
checkSuffixOnly bool
notifier discovery.Notifier
discoverer discovery.Discoverer
expectedError string
checkConnectionState bool
expectedState string
}{
{
target: "///round_robin",
name: "with roundrobin schema",
hostPorts: []string{"127.0.0.1:9876", "127.0.0.1:9877", "127.0.0.1:9878"},
checkSuffixOnly: true,
notifier: nil,
discoverer: nil,
target: "///round_robin",
name: "with roundrobin schema",
hostPorts: []string{"127.0.0.1:9876", "127.0.0.1:9877", "127.0.0.1:9878"},
checkSuffixOnly: true,
notifier: nil,
discoverer: nil,
checkConnectionState: false,
},
{
target: "127.0.0.1:9876",
name: "with single host",
hostPorts: []string{"127.0.0.1:9876"},
checkSuffixOnly: false,
notifier: nil,
discoverer: nil,
checkConnectionState: false,
},
{
target: "///round_robin",
name: "with custom resolver and fixed discoverer",
hostPorts: []string{"dns://random_stuff"},
checkSuffixOnly: true,
notifier: noopNotifier{},
discoverer: discovery.FixedDiscoverer{},
checkConnectionState: false,
},
{
target: "127.0.0.1:9876",
name: "with single host",
hostPorts: []string{"127.0.0.1:9876"},
checkSuffixOnly: false,
notifier: nil,
discoverer: nil,
target: "",
name: "without collectorPorts and resolver",
hostPorts: nil,
checkSuffixOnly: false,
notifier: nil,
discoverer: nil,
expectedError: "at least one collector hostPort address is required when resolver is not available",
checkConnectionState: false,
},
{
target: "///round_robin",
name: "with custom resolver and fixed discoverer",
hostPorts: []string{"dns://random_stuff"},
checkSuffixOnly: true,
notifier: noopNotifier{},
discoverer: discovery.FixedDiscoverer{},
target: addr1.String(),
name: "with collector connection status ready",
hostPorts: []string{addr1.String()},
checkSuffixOnly: false,
notifier: nil,
discoverer: nil,
checkConnectionState: true,
expectedState: "READY",
},
{
target: "",
name: "without collectorPorts and resolver",
hostPorts: nil,
checkSuffixOnly: false,
notifier: nil,
discoverer: nil,
expectedError: "at least one collector hostPort address is required when resolver is not available",
target: "random_stuff",
name: "with collector connection status failure",
hostPorts: []string{"random_stuff"},
checkSuffixOnly: false,
notifier: nil,
discoverer: nil,
checkConnectionState: true,
expectedState: "TRANSIENT_FAILURE",
},
}

Expand All @@ -120,7 +153,9 @@ func TestBuilderWithCollectors(t *testing.T) {
if test.expectedError == "" {
require.NoError(t, err)
require.NotNil(t, conn)

if test.checkConnectionState {
assertConnectionState(t, conn, test.expectedState)
}
if test.checkSuffixOnly {
assert.True(t, strings.HasSuffix(conn.Target(), test.target))
} else {
Expand Down Expand Up @@ -346,3 +381,17 @@ func TestProxyClientTLS(t *testing.T) {
})
}
}

func assertConnectionState(t *testing.T, conn *grpc.ClientConn, expectedState string) {
for {
s := conn.GetState()
if s == connectivity.Ready {
assert.True(t, s.String() == expectedState)
break
}
if s == connectivity.TransientFailure {
assert.True(t, s.String() == expectedState)
break
}
}
}

0 comments on commit e37deb9

Please sign in to comment.