Skip to content

Commit

Permalink
Add queuing to profile translator (#11546)
Browse files Browse the repository at this point in the history
#11491 changed the EndpointTranslator to use a queue to avoid calling `Send` on a gRPC stream directly from an informer callback goroutine.  This change updates the ProfileTranslator in the same way, adding a queue to ensure we do not block the informer thread.

Signed-off-by: Alex Leong <[email protected]>
  • Loading branch information
adleong committed Nov 15, 2023
1 parent afd9b17 commit 1d32bd9
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 90 deletions.
9 changes: 4 additions & 5 deletions controller/api/destination/destination_fuzzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,11 @@ func FuzzProfileTranslatorUpdate(data []byte) int {
return 0
}
t := &testing.T{}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}

translator := &profileTranslator{
stream: mockGetProfileServer,
log: logging.WithField("test", t.Name()),
}
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "foo.bar.svc.cluster.local", 80, nil)
translator.Start()
defer translator.Stop()
translator.Update(profile)
return 1
}
80 changes: 75 additions & 5 deletions controller/api/destination/profile_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,99 @@ import (
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
"github.com/linkerd/linkerd2/pkg/profiles"
"github.com/linkerd/linkerd2/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
logging "github.com/sirupsen/logrus"
)

const millisPerDecimilli = 10

// implements the ProfileUpdateListener interface
type profileTranslator struct {
stream pb.Destination_GetProfileServer
log *logging.Entry
fullyQualifiedName string
port uint32

stream pb.Destination_GetProfileServer
endStream chan struct{}
log *logging.Entry
overflowCounter prometheus.Counter

updates chan *sp.ServiceProfile
stop chan struct{}
}

func newProfileTranslator(stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32) *profileTranslator {
var profileUpdatesQueueOverflowCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "profile_updates_queue_overflow",
Help: "A counter incremented whenever the profile updates queue overflows",
},
[]string{
"fqn",
"port",
},
)

func newProfileTranslator(stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32, endStream chan struct{}) *profileTranslator {
return &profileTranslator{
stream: stream,
log: log.WithField("component", "profile-translator"),
fullyQualifiedName: fqn,
port: port,

stream: stream,
endStream: endStream,
log: log.WithField("component", "profile-translator"),
overflowCounter: profileUpdatesQueueOverflowCounter.With(prometheus.Labels{"fqn": fqn, "port": fmt.Sprintf("%d", port)}),
updates: make(chan *sp.ServiceProfile, updateQueueCapacity),
stop: make(chan struct{}),
}
}

// Update is called from a client-go informer callback and therefore must not
// We enqueue an update in a channel so that it can be processed asyncronously.
// To ensure that enqueuing does not block, we first check to see if there is
// capacity in the buffered channel. If there is not, we drop the update and
// signal to the stream that it has fallen too far behind and should be closed.
func (pt *profileTranslator) Update(profile *sp.ServiceProfile) {
select {
case pt.updates <- profile:
// Update has been successfully enqueued.
default:
// We are unable to enqueue because the channel does not have capacity.
// The stream has fallen too far behind and should be closed.
pt.overflowCounter.Inc()
select {
case <-pt.endStream:
// The endStream channel has already been closed so no action is
// necessary.
default:
pt.log.Error("profile update queue full; aborting stream")
close(pt.endStream)
}
}
}

// Start initiates a goroutine which processes update events off of the
// profileTranslator's internal queue and sends to the grpc stream as
// appropriate. The goroutine calls non-thread-safe Send, therefore Start must
// not be called more than once.
func (pt *profileTranslator) Start() {
go func() {
for {
select {
case update := <-pt.updates:
pt.update(update)
case <-pt.stop:
return
}
}
}()
}

// Stop terminates the goroutine started by Start.
func (pt *profileTranslator) Stop() {
close(pt.stop)
}

func (pt *profileTranslator) update(profile *sp.ServiceProfile) {
if profile == nil {
pt.log.Debugf("Sending default profile")
if err := pt.stream.Send(pt.defaultServiceProfile()); err != nil {
Expand Down
135 changes: 64 additions & 71 deletions controller/api/destination/profile_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,52 +421,49 @@ var (

func TestProfileTranslator(t *testing.T) {
t.Run("Sends update", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}

translator := &profileTranslator{
stream: mockGetProfileServer,
log: logging.WithField("test", t.Name()),
}
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
translator.Start()
defer translator.Stop()

translator.Update(profile)

numProfiles := len(mockGetProfileServer.profilesReceived)
if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
actualPbProfile := mockGetProfileServer.profilesReceived[0]
actualPbProfile := <-mockGetProfileServer.profilesReceived
if !proto.Equal(actualPbProfile, pbProfile) {
t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbProfile, actualPbProfile)
}
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
})

t.Run("Request match with more than one field becomes ALL", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}

translator := &profileTranslator{
stream: mockGetProfileServer,
log: logging.WithField("test", t.Name()),
}
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
translator.Start()
defer translator.Stop()

translator.Update(multipleRequestMatches)

numProfiles := len(mockGetProfileServer.profilesReceived)
if numProfiles != 1 {
t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
actualPbProfile := mockGetProfileServer.profilesReceived[0]
actualPbProfile := <-mockGetProfileServer.profilesReceived
if !proto.Equal(actualPbProfile, pbRequestMatchAll) {
t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbRequestMatchAll, actualPbProfile)
}
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
if numProfiles != 1 {
t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
})

t.Run("Ignores request match without any fields", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}

translator := &profileTranslator{
stream: mockGetProfileServer,
log: logging.WithField("test", t.Name()),
}
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
translator.Start()
defer translator.Stop()

translator.Update(notEnoughRequestMatches)

Expand All @@ -477,32 +474,30 @@ func TestProfileTranslator(t *testing.T) {
})

t.Run("Response match with more than one field becomes ALL", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}

translator := &profileTranslator{
stream: mockGetProfileServer,
log: logging.WithField("test", t.Name()),
}
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
translator.Start()
defer translator.Stop()

translator.Update(multipleResponseMatches)

numProfiles := len(mockGetProfileServer.profilesReceived)
if numProfiles != 1 {
t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
actualPbProfile := mockGetProfileServer.profilesReceived[0]
actualPbProfile := <-mockGetProfileServer.profilesReceived
if !proto.Equal(actualPbProfile, pbResponseMatchAll) {
t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbResponseMatchAll, actualPbProfile)
}
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
if numProfiles != 1 {
t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
})

t.Run("Ignores response match without any fields", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}

translator := &profileTranslator{
stream: mockGetProfileServer,
log: logging.WithField("test", t.Name()),
}
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
translator.Start()
defer translator.Stop()

translator.Update(notEnoughResponseMatches)

Expand All @@ -513,12 +508,11 @@ func TestProfileTranslator(t *testing.T) {
})

t.Run("Ignores response match with invalid status range", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}

translator := &profileTranslator{
stream: mockGetProfileServer,
log: logging.WithField("test", t.Name()),
}
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
translator.Start()
defer translator.Stop()

translator.Update(invalidStatusRange)

Expand All @@ -529,58 +523,57 @@ func TestProfileTranslator(t *testing.T) {
})

t.Run("Sends update for one sided status range", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}

translator := &profileTranslator{
stream: mockGetProfileServer,
log: logging.WithField("test", t.Name()),
}
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
translator.Start()
defer translator.Stop()

translator.Update(oneSidedStatusRange)

numProfiles := len(mockGetProfileServer.profilesReceived)
<-mockGetProfileServer.profilesReceived

numProfiles := len(mockGetProfileServer.profilesReceived) + 1
if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
})

t.Run("Sends empty update", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}

translator := &profileTranslator{
stream: mockGetProfileServer,
log: logging.WithField("test", t.Name()),
}
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
translator.Start()
defer translator.Stop()

translator.Update(nil)

numProfiles := len(mockGetProfileServer.profilesReceived)
if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
actualPbProfile := mockGetProfileServer.profilesReceived[0]
actualPbProfile := <-mockGetProfileServer.profilesReceived
if !proto.Equal(actualPbProfile, defaultPbProfile) {
t.Fatalf("Expected profile sent to be [%v] but was [%v]", defaultPbProfile, actualPbProfile)
}
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
})

t.Run("Sends update with custom timeout", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}

translator := &profileTranslator{
stream: mockGetProfileServer,
log: logging.WithField("test", t.Name()),
}
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
translator.Start()
defer translator.Stop()

translator.Update(profileWithTimeout)

numProfiles := len(mockGetProfileServer.profilesReceived)
if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
actualPbProfile := mockGetProfileServer.profilesReceived[0]
actualPbProfile := <-mockGetProfileServer.profilesReceived
if !proto.Equal(actualPbProfile, pbProfileWithTimeout) {
t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbProfileWithTimeout, actualPbProfile)
}
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
})
}
Loading

0 comments on commit 1d32bd9

Please sign in to comment.