Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flakes with rpc integration test #1860

Merged
merged 5 commits into from
Apr 1, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 90 additions & 52 deletions integration/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ import (
)

var (
retries = 20
numLogEntries = 5
waitTime = 1 * time.Second
connectionRetries = 2
readRetries = 5
numLogEntries = 5
waitTime = 1 * time.Second
)

func TestEventLogRPC(t *testing.T) {
Expand All @@ -52,19 +53,17 @@ func TestEventLogRPC(t *testing.T) {
defer teardown()

// start a grpc client and make sure we can connect properly
var conn *grpc.ClientConn
var err error
var client proto.SkaffoldServiceClient
attempts := 0
for {
var (
conn *grpc.ClientConn
err error
client proto.SkaffoldServiceClient
)

// connect to the skaffold grpc server
for i := 0; i < connectionRetries; i++ {
conn, err = grpc.Dial(fmt.Sprintf(":%s", rpcAddr), grpc.WithInsecure())
if err != nil {
t.Logf("unable to establish skaffold grpc connection: retrying...")
attempts++
if attempts == retries {
t.Fatalf("error establishing skaffold grpc connection")
}

time.Sleep(waitTime)
continue
}
Expand All @@ -74,22 +73,24 @@ func TestEventLogRPC(t *testing.T) {
break
}

if client == nil {
t.Fatalf("error establishing skaffold grpc connection")
}

ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()

// read the event log stream from the skaffold grpc server
var stream proto.SkaffoldService_EventLogClient
attempts = 0
for {
for i := 0; i < readRetries; i++ {
stream, err = client.EventLog(ctx)
if err == nil {
break
}
if attempts < retries {
attempts++
if err != nil {
t.Logf("waiting for connection...")
time.Sleep(3 * time.Second)
time.Sleep(waitTime)
continue
}
}
if stream == nil {
t.Fatalf("error retrieving event log: %v\n", err)
}

Expand Down Expand Up @@ -204,19 +205,16 @@ func TestGetStateRPC(t *testing.T) {
defer teardown()

// start a grpc client and make sure we can connect properly
var conn *grpc.ClientConn
var err error
var client proto.SkaffoldServiceClient
attempts := 0
for {
var (
conn *grpc.ClientConn
err error
client proto.SkaffoldServiceClient
)

for i := 0; i < connectionRetries; i++ {
conn, err = grpc.Dial(fmt.Sprintf(":%s", rpcAddr), grpc.WithInsecure())
if err != nil {
t.Logf("unable to establish skaffold grpc connection: retrying...")
attempts++
if attempts == retries {
t.Fatalf("error establishing skaffold grpc connection")
}

time.Sleep(waitTime)
continue
}
Expand All @@ -226,30 +224,27 @@ func TestGetStateRPC(t *testing.T) {
break
}

if client == nil {
t.Fatalf("error establishing skaffold grpc connection")
}

ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()

// retrieve the state and make sure everything looks correct
// try a few times and wait around until we see the build is complete, or fail.
success := false
var grpcState *proto.State
attempts = 0
for {
grpcState, err = client.GetState(ctx, &empty.Empty{})
if err == nil {
for i := 0; i < readRetries; i++ {
grpcState = retrieveRPCState(ctx, t, client)
if checkBuildAndDeployComplete(*grpcState) {
success = true
break
}
if attempts < retries {
attempts++
t.Logf("waiting for connection...")
time.Sleep(3 * time.Second)
continue
}
t.Fatalf("error retrieving state: %v\n", err)
time.Sleep(waitTime)
}

for _, v := range grpcState.BuildState.Artifacts {
testutil.CheckDeepEqual(t, event.Complete, v)
if !success {
t.Errorf("skaffold build or deploy not complete. state: %+v\n", grpcState)
}
testutil.CheckDeepEqual(t, event.Complete, grpcState.DeployState.Status)
}

func TestGetStateHTTP(t *testing.T) {
Expand All @@ -262,6 +257,44 @@ func TestGetStateHTTP(t *testing.T) {
defer teardown()
time.Sleep(3 * time.Second) // give skaffold time to process all events

success := false
var httpState proto.State
for i := 0; i < readRetries; i++ {
httpState = retrieveHTTPState(t, httpAddr)
if checkBuildAndDeployComplete(httpState) {
success = true
break
}
time.Sleep(waitTime)
}
if !success {
t.Errorf("skaffold build or deploy not complete. state: %+v\n", httpState)
}
}

func retrieveRPCState(ctx context.Context, t *testing.T, client proto.SkaffoldServiceClient) *proto.State {
var grpcState *proto.State
var err error
attempts := 0
for {
nkubala marked this conversation as resolved.
Show resolved Hide resolved
grpcState, err = client.GetState(ctx, &empty.Empty{})
if err == nil {
break
}
if attempts < connectionRetries {
attempts++
t.Logf("waiting for connection...")
time.Sleep(waitTime)
continue
}
t.Fatalf("error retrieving state: %v\n", err)
}
return grpcState
}

func retrieveHTTPState(t *testing.T, httpAddr string) proto.State {
var httpState proto.State

// retrieve the state via HTTP as well, and verify the result is the same
httpResponse, err := http.Get(fmt.Sprintf("http://localhost:%s/v1/state", httpAddr))
if err != nil {
Expand All @@ -271,14 +304,10 @@ func TestGetStateHTTP(t *testing.T) {
if err != nil {
t.Errorf("error reading body from http response: %s", err.Error())
}
var httpState proto.State
if err := json.Unmarshal(b, &httpState); err != nil {
t.Errorf("error converting http response to proto: %s", err.Error())
}
for _, v := range httpState.BuildState.Artifacts {
testutil.CheckDeepEqual(t, event.Complete, v)
}
testutil.CheckDeepEqual(t, event.Complete, httpState.DeployState.Status)
return httpState
}

func setupSkaffoldWithArgs(t *testing.T, args ...string) func() {
Expand All @@ -302,3 +331,12 @@ func setupSkaffoldWithArgs(t *testing.T, args ...string) func() {
func randomPort() string {
return fmt.Sprintf("%d", rand.Intn(65535))
}

func checkBuildAndDeployComplete(state proto.State) bool {
for _, a := range state.BuildState.Artifacts {
if a != event.Complete {
return false
}
}
return state.DeployState.Status == event.Complete
}