From 1d32bd9df629bb370b8541b558fda694888d27b4 Mon Sep 17 00:00:00 2001 From: Alex Leong Date: Thu, 9 Nov 2023 04:32:30 -0800 Subject: [PATCH] Add queuing to profile translator (#11546) https://github.com/linkerd/linkerd2/pull/11491 changed the EndpointTranslator to use a queue to avoid calling `Send` on a gRPC stream directly from an informer callback goroutine. This change updates the ProfileTranslator in the same way, adding a queue to ensure we do not block the informer thread. Signed-off-by: Alex Leong --- .../api/destination/destination_fuzzer.go | 9 +- .../api/destination/profile_translator.go | 80 ++++++++++- .../destination/profile_translator_test.go | 135 +++++++++--------- controller/api/destination/server.go | 23 ++- controller/api/destination/test_util.go | 4 +- 5 files changed, 161 insertions(+), 90 deletions(-) diff --git a/controller/api/destination/destination_fuzzer.go b/controller/api/destination/destination_fuzzer.go index beba1adc6b766..1369581ef22d7 100644 --- a/controller/api/destination/destination_fuzzer.go +++ b/controller/api/destination/destination_fuzzer.go @@ -91,12 +91,11 @@ func FuzzProfileTranslatorUpdate(data []byte) int { return 0 } t := &testing.T{} - mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}} + mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} - translator := &profileTranslator{ - stream: mockGetProfileServer, - log: logging.WithField("test", t.Name()), - } + translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "foo.bar.svc.cluster.local", 80, nil) + translator.Start() + defer translator.Stop() translator.Update(profile) return 1 } diff --git a/controller/api/destination/profile_translator.go b/controller/api/destination/profile_translator.go index 1342dc4334025..05471b51d5afb 100644 --- a/controller/api/destination/profile_translator.go +++ b/controller/api/destination/profile_translator.go @@ -11,6 +11,8 @@ import ( sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2" "github.com/linkerd/linkerd2/pkg/profiles" "github.com/linkerd/linkerd2/pkg/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" logging "github.com/sirupsen/logrus" ) @@ -18,22 +20,90 @@ const millisPerDecimilli = 10 // implements the ProfileUpdateListener interface type profileTranslator struct { - stream pb.Destination_GetProfileServer - log *logging.Entry fullyQualifiedName string port uint32 + + stream pb.Destination_GetProfileServer + endStream chan struct{} + log *logging.Entry + overflowCounter prometheus.Counter + + updates chan *sp.ServiceProfile + stop chan struct{} } -func newProfileTranslator(stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32) *profileTranslator { +var profileUpdatesQueueOverflowCounter = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "profile_updates_queue_overflow", + Help: "A counter incremented whenever the profile updates queue overflows", + }, + []string{ + "fqn", + "port", + }, +) + +func newProfileTranslator(stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32, endStream chan struct{}) *profileTranslator { return &profileTranslator{ - stream: stream, - log: log.WithField("component", "profile-translator"), fullyQualifiedName: fqn, port: port, + + stream: stream, + endStream: endStream, + log: log.WithField("component", "profile-translator"), + overflowCounter: profileUpdatesQueueOverflowCounter.With(prometheus.Labels{"fqn": fqn, "port": fmt.Sprintf("%d", port)}), + updates: make(chan *sp.ServiceProfile, updateQueueCapacity), + stop: make(chan struct{}), } } +// Update is called from a client-go informer callback and therefore must not +// We enqueue an update in a channel so that it can be processed asyncronously. +// To ensure that enqueuing does not block, we first check to see if there is +// capacity in the buffered channel. If there is not, we drop the update and +// signal to the stream that it has fallen too far behind and should be closed. func (pt *profileTranslator) Update(profile *sp.ServiceProfile) { + select { + case pt.updates <- profile: + // Update has been successfully enqueued. + default: + // We are unable to enqueue because the channel does not have capacity. + // The stream has fallen too far behind and should be closed. + pt.overflowCounter.Inc() + select { + case <-pt.endStream: + // The endStream channel has already been closed so no action is + // necessary. + default: + pt.log.Error("profile update queue full; aborting stream") + close(pt.endStream) + } + } +} + +// Start initiates a goroutine which processes update events off of the +// profileTranslator's internal queue and sends to the grpc stream as +// appropriate. The goroutine calls non-thread-safe Send, therefore Start must +// not be called more than once. +func (pt *profileTranslator) Start() { + go func() { + for { + select { + case update := <-pt.updates: + pt.update(update) + case <-pt.stop: + return + } + } + }() +} + +// Stop terminates the goroutine started by Start. +func (pt *profileTranslator) Stop() { + close(pt.stop) +} + +func (pt *profileTranslator) update(profile *sp.ServiceProfile) { if profile == nil { pt.log.Debugf("Sending default profile") if err := pt.stream.Send(pt.defaultServiceProfile()); err != nil { diff --git a/controller/api/destination/profile_translator_test.go b/controller/api/destination/profile_translator_test.go index 2f926b82a35db..17612dd5a6b91 100644 --- a/controller/api/destination/profile_translator_test.go +++ b/controller/api/destination/profile_translator_test.go @@ -421,52 +421,49 @@ var ( func TestProfileTranslator(t *testing.T) { t.Run("Sends update", func(t *testing.T) { - mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}} + mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} - translator := &profileTranslator{ - stream: mockGetProfileServer, - log: logging.WithField("test", t.Name()), - } + translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil) + translator.Start() + defer translator.Stop() translator.Update(profile) - numProfiles := len(mockGetProfileServer.profilesReceived) - if numProfiles != 1 { - t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) - } - actualPbProfile := mockGetProfileServer.profilesReceived[0] + actualPbProfile := <-mockGetProfileServer.profilesReceived if !proto.Equal(actualPbProfile, pbProfile) { t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbProfile, actualPbProfile) } + numProfiles := len(mockGetProfileServer.profilesReceived) + 1 + if numProfiles != 1 { + t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) + } }) t.Run("Request match with more than one field becomes ALL", func(t *testing.T) { - mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}} + mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} - translator := &profileTranslator{ - stream: mockGetProfileServer, - log: logging.WithField("test", t.Name()), - } + translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil) + translator.Start() + defer translator.Stop() translator.Update(multipleRequestMatches) - numProfiles := len(mockGetProfileServer.profilesReceived) - if numProfiles != 1 { - t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) - } - actualPbProfile := mockGetProfileServer.profilesReceived[0] + actualPbProfile := <-mockGetProfileServer.profilesReceived if !proto.Equal(actualPbProfile, pbRequestMatchAll) { t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbRequestMatchAll, actualPbProfile) } + numProfiles := len(mockGetProfileServer.profilesReceived) + 1 + if numProfiles != 1 { + t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) + } }) t.Run("Ignores request match without any fields", func(t *testing.T) { - mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}} + mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} - translator := &profileTranslator{ - stream: mockGetProfileServer, - log: logging.WithField("test", t.Name()), - } + translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil) + translator.Start() + defer translator.Stop() translator.Update(notEnoughRequestMatches) @@ -477,32 +474,30 @@ func TestProfileTranslator(t *testing.T) { }) t.Run("Response match with more than one field becomes ALL", func(t *testing.T) { - mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}} + mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} - translator := &profileTranslator{ - stream: mockGetProfileServer, - log: logging.WithField("test", t.Name()), - } + translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil) + translator.Start() + defer translator.Stop() translator.Update(multipleResponseMatches) - numProfiles := len(mockGetProfileServer.profilesReceived) - if numProfiles != 1 { - t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) - } - actualPbProfile := mockGetProfileServer.profilesReceived[0] + actualPbProfile := <-mockGetProfileServer.profilesReceived if !proto.Equal(actualPbProfile, pbResponseMatchAll) { t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbResponseMatchAll, actualPbProfile) } + numProfiles := len(mockGetProfileServer.profilesReceived) + 1 + if numProfiles != 1 { + t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) + } }) t.Run("Ignores response match without any fields", func(t *testing.T) { - mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}} + mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} - translator := &profileTranslator{ - stream: mockGetProfileServer, - log: logging.WithField("test", t.Name()), - } + translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil) + translator.Start() + defer translator.Stop() translator.Update(notEnoughResponseMatches) @@ -513,12 +508,11 @@ func TestProfileTranslator(t *testing.T) { }) t.Run("Ignores response match with invalid status range", func(t *testing.T) { - mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}} + mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} - translator := &profileTranslator{ - stream: mockGetProfileServer, - log: logging.WithField("test", t.Name()), - } + translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil) + translator.Start() + defer translator.Stop() translator.Update(invalidStatusRange) @@ -529,58 +523,57 @@ func TestProfileTranslator(t *testing.T) { }) t.Run("Sends update for one sided status range", func(t *testing.T) { - mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}} + mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} - translator := &profileTranslator{ - stream: mockGetProfileServer, - log: logging.WithField("test", t.Name()), - } + translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil) + translator.Start() + defer translator.Stop() translator.Update(oneSidedStatusRange) - numProfiles := len(mockGetProfileServer.profilesReceived) + <-mockGetProfileServer.profilesReceived + + numProfiles := len(mockGetProfileServer.profilesReceived) + 1 if numProfiles != 1 { t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) } }) t.Run("Sends empty update", func(t *testing.T) { - mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}} + mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} - translator := &profileTranslator{ - stream: mockGetProfileServer, - log: logging.WithField("test", t.Name()), - } + translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil) + translator.Start() + defer translator.Stop() translator.Update(nil) - numProfiles := len(mockGetProfileServer.profilesReceived) - if numProfiles != 1 { - t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) - } - actualPbProfile := mockGetProfileServer.profilesReceived[0] + actualPbProfile := <-mockGetProfileServer.profilesReceived if !proto.Equal(actualPbProfile, defaultPbProfile) { t.Fatalf("Expected profile sent to be [%v] but was [%v]", defaultPbProfile, actualPbProfile) } + numProfiles := len(mockGetProfileServer.profilesReceived) + 1 + if numProfiles != 1 { + t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) + } }) t.Run("Sends update with custom timeout", func(t *testing.T) { - mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}} + mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)} - translator := &profileTranslator{ - stream: mockGetProfileServer, - log: logging.WithField("test", t.Name()), - } + translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil) + translator.Start() + defer translator.Stop() translator.Update(profileWithTimeout) - numProfiles := len(mockGetProfileServer.profilesReceived) - if numProfiles != 1 { - t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) - } - actualPbProfile := mockGetProfileServer.profilesReceived[0] + actualPbProfile := <-mockGetProfileServer.profilesReceived if !proto.Equal(actualPbProfile, pbProfileWithTimeout) { t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbProfileWithTimeout, actualPbProfile) } + numProfiles := len(mockGetProfileServer.profilesReceived) + 1 + if numProfiles != 1 { + t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) + } }) } diff --git a/controller/api/destination/server.go b/controller/api/destination/server.go index 61abb0e5cb1f8..4fadda88a6bce 100644 --- a/controller/api/destination/server.go +++ b/controller/api/destination/server.go @@ -345,11 +345,14 @@ func (s *server) subscribeToServiceProfile( WithField("port", port) canceled := stream.Context().Done() + streamEnd := make(chan struct{}) // We build up the pipeline of profile updaters backwards, starting from // the translator which takes profile updates, translates them to protobuf // and pushes them onto the gRPC stream. - translator := newProfileTranslator(stream, log, fqn, port) + translator := newProfileTranslator(stream, log, fqn, port, streamEnd) + translator.Start() + defer translator.Stop() // The opaque ports adaptor merges profile updates with service opaque // port annotation updates; it then publishes the result to the traffic @@ -376,9 +379,9 @@ func (s *server) subscribeToServiceProfile( // namespace. If there's no namespace in the token, start a single // subscription. if token.Ns == "" { - return s.subscribeToServiceWithoutContext(fqn, listener, canceled, log) + return s.subscribeToServiceWithoutContext(fqn, listener, canceled, log, streamEnd) } - return s.subscribeToServicesWithContext(fqn, token, listener, canceled, log) + return s.subscribeToServicesWithContext(fqn, token, listener, canceled, log, streamEnd) } // subscribeToServiceWithContext establishes two profile watches: a "backup" @@ -393,6 +396,7 @@ func (s *server) subscribeToServicesWithContext( listener watcher.ProfileUpdateListener, canceled <-chan struct{}, log *logging.Entry, + streamEnd <-chan struct{}, ) error { // We ned to support two subscriptions: // - First, a backup subscription that assumes the context of the server @@ -430,7 +434,9 @@ func (s *server) subscribeToServicesWithContext( select { case <-s.shutdown: case <-canceled: - log.Debug("Cancelled") + log.Debugf("GetProfile %s cancelled", fqn) + case <-streamEnd: + log.Errorf("GetProfile %s stream aborted", fqn) } return nil } @@ -440,8 +446,9 @@ func (s *server) subscribeToServicesWithContext( func (s *server) subscribeToServiceWithoutContext( fqn string, listener watcher.ProfileUpdateListener, - cancel <-chan struct{}, + canceled <-chan struct{}, log *logging.Entry, + streamEnd <-chan struct{}, ) error { id, err := profileID(fqn, contextToken{}, s.clusterDomain) if err != nil { @@ -457,8 +464,10 @@ func (s *server) subscribeToServiceWithoutContext( select { case <-s.shutdown: - case <-cancel: - log.Debug("Cancelled") + case <-canceled: + log.Debugf("GetProfile %s cancelled", fqn) + case <-streamEnd: + log.Errorf("GetProfile %s stream aborted", fqn) } return nil } diff --git a/controller/api/destination/test_util.go b/controller/api/destination/test_util.go index 1ab71eb1ad5ff..df79bab9a8249 100644 --- a/controller/api/destination/test_util.go +++ b/controller/api/destination/test_util.go @@ -563,11 +563,11 @@ func (m *mockDestinationGetServer) Send(update *pb.Update) error { type mockDestinationGetProfileServer struct { util.MockServerStream - profilesReceived []*pb.DestinationProfile + profilesReceived chan *pb.DestinationProfile } func (m *mockDestinationGetProfileServer) Send(profile *pb.DestinationProfile) error { - m.profilesReceived = append(m.profilesReceived, profile) + m.profilesReceived <- profile return nil }