From bbfeabf3fdb5766cec2d1659008951cb86b58220 Mon Sep 17 00:00:00 2001 From: Vadym Popov Date: Tue, 3 Dec 2024 16:17:30 -0600 Subject: [PATCH] Fix inventory controller tests after adding additional ping (#49663) * Fix inventory controller tests after adding additional ping * Fix TestTimeReconciliation Remove unused code --- lib/inventory/controller.go | 4 +- lib/inventory/controller_test.go | 145 ++++++++++++++++++------------- lib/inventory/inventory.go | 24 ----- 3 files changed, 89 insertions(+), 84 deletions(-) diff --git a/lib/inventory/controller.go b/lib/inventory/controller.go index 86bd6450fb0c1..63873c03fc852 100644 --- a/lib/inventory/controller.go +++ b/lib/inventory/controller.go @@ -104,7 +104,7 @@ const ( instanceHeartbeatOk testEvent = "instance-heartbeat-ok" instanceHeartbeatErr testEvent = "instance-heartbeat-err" - timeReconciliationOk testEvent = "time-reconciliation-ok" + pongOk testEvent = "pong-ok" instanceCompareFailed testEvent = "instance-compare-failed" @@ -517,7 +517,6 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) { handle.CloseWithError(err) return } - c.testEvent(timeReconciliationOk) case now := <-dbKeepAliveDelay.Elapsed(): dbKeepAliveDelay.Advance(now) @@ -631,6 +630,7 @@ func (c *Controller) handlePong(handle *upstreamHandle, msg proto.UpstreamInvent pending.rspC <- pong delete(handle.pings, msg.ID) + c.testEvent(pongOk) } func (c *Controller) handlePingRequest(handle *upstreamHandle, req pingRequest) error { diff --git a/lib/inventory/controller_test.go b/lib/inventory/controller_test.go index b89a8bdbae7dd..9ec509f725293 100644 --- a/lib/inventory/controller_test.go +++ b/lib/inventory/controller_test.go @@ -190,6 +190,27 @@ func TestSSHServerBasics(t *testing.T) { // set up fake in-memory control stream upstream, downstream := client.InventoryControlStreamPipe(client.ICSPipePeerAddr(peerAddr)) + t.Cleanup(func() { + controller.Close() + downstream.Close() + upstream.Close() + }) + + // launch goroutine to respond to ping requests + go func() { + for { + select { + case msg := <-downstream.Recv(): + downstream.Send(ctx, proto.UpstreamInventoryPong{ + ID: msg.(proto.DownstreamInventoryPing).ID, + }) + case <-downstream.Done(): + return + case <-ctx.Done(): + return + } + } + }() controller.RegisterControlStream(upstream, proto.UpstreamInventoryHello{ ServerID: serverID, @@ -256,18 +277,6 @@ func TestSSHServerBasics(t *testing.T) { deny(sshKeepAliveErr, handlerClose), ) - // launch goroutine to respond to a single ping - go func() { - select { - case msg := <-downstream.Recv(): - downstream.Send(ctx, proto.UpstreamInventoryPong{ - ID: msg.(proto.DownstreamInventoryPing).ID, - }) - case <-downstream.Done(): - case <-ctx.Done(): - } - }() - // limit time of ping call pingCtx, cancel := context.WithTimeout(ctx, time.Second*10) defer cancel() @@ -357,6 +366,27 @@ func TestAppServerBasics(t *testing.T) { // set up fake in-memory control stream upstream, downstream := client.InventoryControlStreamPipe() + t.Cleanup(func() { + controller.Close() + upstream.Close() + downstream.Close() + }) + + // launch goroutine to respond to ping requests + go func() { + for { + select { + case msg := <-downstream.Recv(): + downstream.Send(ctx, proto.UpstreamInventoryPong{ + ID: msg.(proto.DownstreamInventoryPing).ID, + }) + case <-downstream.Done(): + return + case <-ctx.Done(): + return + } + } + }() controller.RegisterControlStream(upstream, proto.UpstreamInventoryHello{ ServerID: serverID, @@ -443,18 +473,6 @@ func TestAppServerBasics(t *testing.T) { deny(appKeepAliveErr, handlerClose), ) - // launch goroutine to respond to a single ping - go func() { - select { - case msg := <-downstream.Recv(): - downstream.Send(ctx, proto.UpstreamInventoryPong{ - ID: msg.(proto.DownstreamInventoryPing).ID, - }) - case <-downstream.Done(): - case <-ctx.Done(): - } - }() - // limit time of ping call pingCtx, cancel := context.WithTimeout(ctx, time.Second*10) defer cancel() @@ -575,6 +593,27 @@ func TestDatabaseServerBasics(t *testing.T) { // set up fake in-memory control stream upstream, downstream := client.InventoryControlStreamPipe() + t.Cleanup(func() { + controller.Close() + upstream.Close() + downstream.Close() + }) + + // launch goroutine to respond to ping requests + go func() { + for { + select { + case msg := <-downstream.Recv(): + downstream.Send(ctx, proto.UpstreamInventoryPong{ + ID: msg.(proto.DownstreamInventoryPing).ID, + }) + case <-downstream.Done(): + return + case <-ctx.Done(): + return + } + } + }() controller.RegisterControlStream(upstream, proto.UpstreamInventoryHello{ ServerID: serverID, @@ -662,18 +701,6 @@ func TestDatabaseServerBasics(t *testing.T) { deny(dbKeepAliveErr, handlerClose), ) - // launch goroutine to respond to a single ping - go func() { - select { - case msg := <-downstream.Recv(): - downstream.Send(ctx, proto.UpstreamInventoryPong{ - ID: msg.(proto.DownstreamInventoryPing).ID, - }) - case <-downstream.Done(): - case <-ctx.Done(): - } - }() - // limit time of ping call pingCtx, cancel := context.WithTimeout(ctx, time.Second*10) defer cancel() @@ -1189,6 +1216,21 @@ func TestKubernetesServerBasics(t *testing.T) { // set up fake in-memory control stream upstream, downstream := client.InventoryControlStreamPipe() + // launch goroutine to respond to ping requests + go func() { + for { + select { + case msg := <-downstream.Recv(): + downstream.Send(ctx, proto.UpstreamInventoryPong{ + ID: msg.(proto.DownstreamInventoryPing).ID, + }) + case <-downstream.Done(): + return + case <-ctx.Done(): + return + } + } + }() controller.RegisterControlStream(upstream, proto.UpstreamInventoryHello{ ServerID: serverID, @@ -1277,18 +1319,6 @@ func TestKubernetesServerBasics(t *testing.T) { deny(kubeKeepAliveErr, handlerClose), ) - // launch goroutine to respond to a single ping - go func() { - select { - case msg := <-downstream.Recv(): - downstream.Send(ctx, proto.UpstreamInventoryPong{ - ID: msg.(proto.DownstreamInventoryPing).ID, - }) - case <-downstream.Done(): - case <-ctx.Done(): - } - }() - // limit time of ping call pingCtx, cancel := context.WithTimeout(ctx, time.Second*10) defer cancel() @@ -1473,12 +1503,6 @@ func TestTimeReconciliation(t *testing.T) { cancel() }) - controller.RegisterControlStream(upstream, proto.UpstreamInventoryHello{ - ServerID: serverID, - Version: teleport.Version, - Services: []types.SystemRole{types.RoleNode}, - }) - // Launch goroutine to respond to clock request. go func() { for { @@ -1488,7 +1512,6 @@ func TestTimeReconciliation(t *testing.T) { ID: msg.(proto.DownstreamInventoryPing).ID, SystemClock: clock.Now().Add(-time.Minute).UTC(), }) - return case <-downstream.Done(): return case <-ctx.Done(): @@ -1497,12 +1520,16 @@ func TestTimeReconciliation(t *testing.T) { } }() + controller.RegisterControlStream(upstream, proto.UpstreamInventoryHello{ + ServerID: serverID, + Version: teleport.Version, + Services: []types.SystemRole{types.RoleNode}, + }) + _, ok := controller.GetControlStream(serverID) require.True(t, ok) - awaitEvents(t, events, - expect(timeReconciliationOk), - ) + awaitEvents(t, events, expect(pongOk)) awaitEvents(t, events, expect(instanceHeartbeatOk), deny(instanceHeartbeatErr, instanceCompareFailed, handlerClose), @@ -1510,6 +1537,8 @@ func TestTimeReconciliation(t *testing.T) { auth.mu.Lock() m := auth.lastInstance.GetLastMeasurement() auth.mu.Unlock() + + require.NotNil(t, m) require.InDelta(t, time.Minute, m.ControllerSystemClock.Sub(m.SystemClock)-m.RequestDuration/2, float64(time.Second)) } diff --git a/lib/inventory/inventory.go b/lib/inventory/inventory.go index 6f2f07df8533e..c96bcf4675ed9 100644 --- a/lib/inventory/inventory.go +++ b/lib/inventory/inventory.go @@ -404,9 +404,6 @@ type UpstreamHandle interface { Ping(ctx context.Context, id uint64) (d time.Duration, err error) - // SystemClock makes ping request to fetch the system clock of the node. - SystemClock(ctx context.Context, id uint64) (time.Time, time.Duration, error) - // HasService is a helper for checking if a given service is associated with this // stream. HasService(types.SystemRole) bool @@ -673,27 +670,6 @@ func (h *upstreamHandle) Ping(ctx context.Context, id uint64) (d time.Duration, } } -// SystemClock makes ping request to fetch the system clock of the downstream. -func (h *upstreamHandle) SystemClock(ctx context.Context, id uint64) (time.Time, time.Duration, error) { - rspC := make(chan pingResponse, 1) - select { - case h.pingC <- pingRequest{rspC: rspC, id: id}: - case <-h.Done(): - return time.Time{}, 0, trace.Errorf("failed to send downstream ping (stream closed)") - case <-ctx.Done(): - return time.Time{}, 0, trace.Errorf("failed to send downstream ping: %v", ctx.Err()) - } - - select { - case rsp := <-rspC: - return rsp.systemClock, rsp.reqDuration, rsp.err - case <-h.Done(): - return time.Time{}, 0, trace.Errorf("failed to recv upstream pong (stream closed)") - case <-ctx.Done(): - return time.Time{}, 0, trace.Errorf("failed to recv upstream ping: %v", ctx.Err()) - } -} - func (h *upstreamHandle) Hello() proto.UpstreamInventoryHello { return h.hello }