Skip to content

Commit

Permalink
refactor and extend TestClientV2_Checkin_Initial (#96)
Browse files Browse the repository at this point in the history
 - remove test for wrong token
 - add check to ensure the client haven't returned any error
 - add check for the new BuildHash field of proto.CheckinObserved
 - check AgentInfo is correctly set
  • Loading branch information
AndersonQ authored Jan 22, 2024
1 parent 16361c4 commit 17d2b25
Showing 1 changed file with 111 additions and 119 deletions.
230 changes: 111 additions & 119 deletions pkg/client/client_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,69 +141,79 @@ func TestClientV2_Checkin_Initial(t *testing.T) {
}

for _, tc := range tests {
t.Run(tc.name, func(n string, sc, cc credentials.TransportCredentials) func(t *testing.T) {
return func(t *testing.T) {
testClientV2CheckinInitial(t, n, sc, cc)
}
}(tc.localRPC, tc.serverCreds, tc.clientCreds))
t.Run(tc.name, func(t *testing.T) {
testClientV2CheckinInitial(
t, tc.localRPC, tc.serverCreds, tc.clientCreds)
})
}
}

func testClientV2CheckinInitial(t *testing.T, localRPC string, serverCreds, clientCreds credentials.TransportCredentials) {
var m sync.Mutex
token := mock.NewID()
gotInvalid := false
gotValid := false
unitOneID := mock.NewID()
unitTwoID := mock.NewID()
wantFQDN := true
reportedVersion := VersionInfo{}

packageVersion := "8.13.0+build20060102"
wantBuildHash := "build_hash"
wantAgentInfo := AgentInfo{
ID: "elastic-agent-id",
Version: packageVersion,
Snapshot: true,
}
vInfo := VersionInfo{
Name: "program",
BuildHash: wantBuildHash,
Meta: map[string]string{
"key": "value",
},
}

checkInDone := make(chan *proto.CheckinObserved, 1)
srv := mock.StubServerV2{
CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
m.Lock()
defer m.Unlock()

if observed.Token == token {
gotValid = true
reportedVersion.Name = observed.VersionInfo.Name
reportedVersion.Meta = observed.VersionInfo.Meta
return &proto.CheckinExpected{
AgentInfo: &proto.AgentInfo{
Id: "elastic-agent-id",
Version: "8.5.0",
Snapshot: true,
defer func() {
checkInDone <- observed
}()

return &proto.CheckinExpected{
AgentInfo: &proto.AgentInfo{
Id: wantAgentInfo.ID,
Version: wantAgentInfo.Version,
Snapshot: wantAgentInfo.Snapshot,
},
Features: &proto.Features{
Fqdn: &proto.FQDNFeature{Enabled: wantFQDN},
},
Component: &proto.Component{
Limits: &proto.ComponentLimits{
GoMaxProcs: 0,
},
Features: &proto.Features{
Fqdn: &proto.FQDNFeature{Enabled: wantFQDN},
},
Units: []*proto.UnitExpected{
{
Id: unitOneID,
Type: proto.UnitType_OUTPUT,
ConfigStateIdx: 1,
Config: &proto.UnitExpectedConfig{},
State: proto.State_HEALTHY,
LogLevel: proto.UnitLogLevel_INFO,
},
Component: &proto.Component{
Limits: &proto.ComponentLimits{
GoMaxProcs: 0,
},
{
Id: unitTwoID,
Type: proto.UnitType_INPUT,
ConfigStateIdx: 1,
Config: &proto.UnitExpectedConfig{},
State: proto.State_HEALTHY,
LogLevel: proto.UnitLogLevel_INFO,
},
Units: []*proto.UnitExpected{
{
Id: unitOneID,
Type: proto.UnitType_OUTPUT,
ConfigStateIdx: 1,
Config: &proto.UnitExpectedConfig{},
State: proto.State_HEALTHY,
LogLevel: proto.UnitLogLevel_INFO,
},
{
Id: unitTwoID,
Type: proto.UnitType_INPUT,
ConfigStateIdx: 1,
Config: &proto.UnitExpectedConfig{},
State: proto.State_HEALTHY,
LogLevel: proto.UnitLogLevel_INFO,
},
},
}
},
}
// disconnect
gotInvalid = true
return nil

},
ActionImpl: func(response *proto.ActionResponse) error {
// actions not tested here
Expand All @@ -212,119 +222,101 @@ func testClientV2CheckinInitial(t *testing.T, localRPC string, serverCreds, clie
ActionsChan: make(chan *mock.PerformAction, 100),
LocalRPC: localRPC,
}
var gops []grpc.ServerOption

var serverOptions []grpc.ServerOption
if serverCreds != nil {
gops = append(gops, grpc.Creds(serverCreds))
serverOptions = append(serverOptions, grpc.Creds(serverCreds))
}
require.NoError(t, srv.Start(gops...))

require.NoError(t, srv.Start(serverOptions...), "failed to start GRPC server")
defer srv.Stop()

var dops []grpc.DialOption
if clientCreds != nil {
dops = append(dops, grpc.WithTransportCredentials(clientCreds))
} else {
dops = append(dops, grpc.WithTransportCredentials(insecure.NewCredentials()))
if clientCreds == nil {
clientCreds = insecure.NewCredentials()
}
dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(clientCreds)}

// connect with an invalid token
var errsMu sync.Mutex
var errs []error
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
invalidClient := NewV2(srv.GetTarget(), mock.NewID(), VersionInfo{}, WithGRPCDialOptions(dops...))
storeErrors(ctx, invalidClient, &errs, &errsMu)
require.NoError(t, invalidClient.Start(ctx))
defer invalidClient.Stop()
require.NoError(t, waitFor(func() error {
m.Lock()
defer m.Unlock()

if !gotInvalid {
return fmt.Errorf("server never received invalid token")
}
return nil
}))
invalidClient.Stop()
cancel()

// connect with an valid token
var errs2Mu sync.Mutex
var errs2 []error
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
validClient := NewV2(srv.GetTarget(), token, VersionInfo{
Name: "program",
Meta: map[string]string{
"key": "value",
},
}, WithGRPCDialOptions(dops...))

storeErrors(ctx, validClient, &errs2, &errs2Mu)
clientv2 := NewV2(srv.GetTarget(), token, vInfo,
WithAgentInfo(wantAgentInfo),
WithGRPCDialOptions(dialOptions...))
storeErrors(ctx, clientv2, &errs, &errsMu)

// receive the units
var unitsMu sync.Mutex
receivedUnits := make(chan struct{})
var units []*Unit
var gotFQDN bool
go func() {
for {
select {
case <-ctx.Done():
return
case change := <-validClient.UnitChanges():
case change := <-clientv2.UnitChanges():
if change.Triggers&TriggeredFeatureChange == TriggeredFeatureChange {
gotFQDN = change.Unit.Expected().Features.Fqdn.Enabled
}

switch change.Type {
case UnitChangedAdded:
unitsMu.Lock()
units = append(units, change.Unit)
unitsMu.Unlock()
default:
panic("not implemented")
panic(fmt.Errorf("received unnexpected change type: %s",
change.Type))
}
case <-time.After(5 * time.Minute):
panic(fmt.Errorf("timed out waiting for unit changes after 2s"))
}

if len(units) == 2 {
close(receivedUnits)
return
}
}
}()

require.NoError(t, validClient.Start(ctx))
defer validClient.Stop()
require.NoError(t, waitFor(func() error {
m.Lock()
defer m.Unlock()
unitsMu.Lock()
defer unitsMu.Unlock()

if !gotValid {
return fmt.Errorf("server never received valid token")
}
if len(units) < 2 {
return fmt.Errorf("never received 2 units")
}
return nil
}))
require.NoError(t, clientv2.Start(ctx), "failed starting client V2")
defer clientv2.Stop()

agentInfo := validClient.AgentInfo()
require.NotNil(t, agentInfo)
<-receivedUnits

validClient.Stop()
cancel()
var gotObserved *proto.CheckinObserved
timeout := 1 * time.Second
select {
case <-time.After(timeout):
t.Fatalf("timed out after %s waiting for 1st checkin to complete",
timeout)
case gotObserved = <-checkInDone:
}

assert.Equal(t, agentInfo.ID, "elastic-agent-id")
assert.Equal(t, agentInfo.Version, "8.5.0")
assert.True(t, agentInfo.Snapshot)
assert.Empty(t, errs, "client should not have sent eny error")

assert.Equal(t, wantFQDN, gotFQDN)
clientv2.Stop()
srv.Stop()
cancel()

assert.Equal(t, units[0].ID(), unitOneID)
assert.Equal(t, units[0].Type(), UnitTypeOutput)
assert.Equal(t, units[1].ID(), unitTwoID)
assert.Equal(t, units[1].Type(), UnitTypeInput)
agentInfo := clientv2.AgentInfo()
if assert.NotNilf(t, agentInfo, "AgentInfo should not be nil") {
assert.Equal(t, wantAgentInfo.ID, agentInfo.ID)
assert.Equal(t, wantAgentInfo.Version, agentInfo.Version)
assert.True(t, wantAgentInfo.Snapshot, agentInfo.Snapshot)
}

assert.Truef(t, gotFQDN, "FQND should be true")

if assert.Lenf(t, units, 2, "should have received 2 units") {
assert.Equal(t, units[0].ID(), unitOneID)
assert.Equal(t, units[0].Type(), UnitTypeOutput)
assert.Equal(t, units[1].ID(), unitTwoID)
assert.Equal(t, units[1].Type(), UnitTypeInput)
}

assert.Equal(t, reportedVersion.Name, "program")
assert.Equal(t, reportedVersion.Meta, map[string]string{
"key": "value",
})
assert.Equal(t, "program", gotObserved.VersionInfo.Name)
assert.Equal(t, wantBuildHash, gotObserved.VersionInfo.BuildHash)
assert.Equal(t, map[string]string{"key": "value"}, gotObserved.VersionInfo.Meta)
}

func TestClientV2_Checkin_UnitState(t *testing.T) {
Expand Down

0 comments on commit 17d2b25

Please sign in to comment.