Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(envoy): refactoring and optimising the components that build envoy config #6119

Merged
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
oos: linux
goarch: amd64
pkg: github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/processor
cpu: 12th Gen Intel(R) Core(TM) i7-12800H
BenchmarkModelUpdate_Models_10_Replicas_1_Batch_10ms-20 261 4293734 ns/op 1728869 B/op 20354 allocs/op
BenchmarkModelUpdate_Models_100_Replicas_1_Batch_10ms-20 57 19236652 ns/op 11745105 B/op 148552 allocs/op
BenchmarkModelUpdate_Models_1_000_Replicas_1_Batch_10ms-20 15 165481121 ns/op 306686874 B/op 4121256 allocs/op
BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_10ms-20 1 1449165416 ns/op 4010677584 B/op 54872332 allocs/op
BenchmarkModelUpdate_Models_10_Replicas_10_Batch_10ms-20 100 17107753 ns/op 21719815 B/op 273169 allocs/op
BenchmarkModelUpdate_Models_100_Replicas_10_Batch_10ms-20 27 66313339 ns/op 118376950 B/op 1388585 allocs/op
BenchmarkModelUpdate_Models_1_000_Replicas_10_Batch_10ms-20 3 519671198 ns/op 967958834 B/op 11993174 allocs/op
BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_10ms-20 1 4341797404 ns/op 13883915576 B/op 181619840 allocs/op
BenchmarkModelUpdate_Models_10_Replicas_1_Batch_100ms-20 6369 162639 ns/op 681659 B/op 6429 allocs/op
BenchmarkModelUpdate_Models_100_Replicas_1_Batch_100ms-20 10 102798616 ns/op 264589261 B/op 3582995 allocs/op
BenchmarkModelUpdate_Models_1_000_Replicas_1_Batch_100ms-20 9 117708448 ns/op 348929572 B/op 4632386 allocs/op
BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_100ms-20 2 683663227 ns/op 2238235876 B/op 29665474 allocs/op
BenchmarkModelUpdate_Models_10_Replicas_10_Batch_100ms-20 12 87394856 ns/op 297229676 B/op 4071575 allocs/op
BenchmarkModelUpdate_Models_100_Replicas_10_Batch_100ms-20 9 114736702 ns/op 374778539 B/op 4887162 allocs/op
BenchmarkModelUpdate_Models_1_000_Replicas_10_Batch_100ms-20 6 340307094 ns/op 1366063312 B/op 16472416 allocs/op
BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_100ms-20 1 2402863570 ns/op 7096934200 B/op 93288518 allocs/op
PASS
ok github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/processor 45.753s


After


goos: linux
goarch: amd64
pkg: github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/processor
cpu: 12th Gen Intel(R) Core(TM) i7-12800H
BenchmarkModelUpdate_Models_10_Replicas_1_Batch_10ms-20 99 10503076 ns/op 5459504 B/op 75936 allocs/op
BenchmarkModelUpdate_Models_100_Replicas_1_Batch_10ms-20 100 12329506 ns/op 11528623 B/op 147768 allocs/op
BenchmarkModelUpdate_Models_1_000_Replicas_1_Batch_10ms-20 32 57135215 ns/op 200141562 B/op 2638843 allocs/op
BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_10ms-20 3 425881058 ns/op 1155633304 B/op 15271185 allocs/op
BenchmarkModelUpdate_Models_10_Replicas_10_Batch_10ms-20 97 12796395 ns/op 49125454 B/op 644298 allocs/op
BenchmarkModelUpdate_Models_100_Replicas_10_Batch_10ms-20 34 33008128 ns/op 173017588 B/op 2127563 allocs/op
BenchmarkModelUpdate_Models_1_000_Replicas_10_Batch_10ms-20 4 324180730 ns/op 1990164076 B/op 25634294 allocs/op
BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_10ms-20 1 2786099232 ns/op 16107104968 B/op 215067138 allocs/op
BenchmarkModelUpdate_Models_10_Replicas_1_Batch_100ms-20 5761 230485 ns/op 1276192 B/op 14658 allocs/op
BenchmarkModelUpdate_Models_100_Replicas_1_Batch_100ms-20 12 95563752 ns/op 368770140 B/op 5040953 allocs/op
BenchmarkModelUpdate_Models_1_000_Replicas_1_Batch_100ms-20 9 118337597 ns/op 345704658 B/op 4589376 allocs/op
BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_100ms-20 3 531757210 ns/op 2030309925 B/op 26656810 allocs/op
BenchmarkModelUpdate_Models_10_Replicas_10_Batch_100ms-20 18 103613236 ns/op 149228799 B/op 1985478 allocs/op
BenchmarkModelUpdate_Models_100_Replicas_10_Batch_100ms-20 9 113923163 ns/op 225481680 B/op 2822586 allocs/op
BenchmarkModelUpdate_Models_1_000_Replicas_10_Batch_100ms-20 6 272610358 ns/op 945017966 B/op 10781455 allocs/op
BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_100ms-20 1 1734657705 ns/op 3920402984 B/op 49687374 allocs/op
PASS
ok github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/processor 37.457s
98 changes: 51 additions & 47 deletions scheduler/pkg/envoy/processor/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewIncrementalProcessor(
cache: cache,
nodeID: nodeID,
snapshotVersion: rand.Int63n(1000),
logger: log.WithField("source", "EnvoyServer"),
logger: log.WithField("source", "IncrementalProcessor"),
xdsCache: xdscache.NewSeldonXDSCache(log, pipelineGatewayDetails),
modelStore: modelStore,
experimentServer: experimentServer,
Expand All @@ -91,7 +91,7 @@ func NewIncrementalProcessor(
batchTriggerManual: nil,
}

err := ip.setListeners()
err := ip.init()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -154,22 +154,26 @@ func (p *IncrementalProcessor) handleExperimentEvents(event coordinator.Experime
} else {
if event.UpdatedExperiment {
err := p.experimentUpdate(exp)
var err2 error
if err != nil {
logger.WithError(err).Errorf("Failed to process sync for experiment %s", event.String())
err2 = p.experimentServer.SetStatus(event.ExperimentName, false, err.Error())
p.setExperimentStatus(event, false, err.Error())
} else {
err2 = p.experimentServer.SetStatus(event.ExperimentName, true, "experiment active")
}
if err2 != nil {
logger.WithError(err2).Errorf("Failed to set experiment activation")
p.setExperimentStatus(event, true, "experiment active")
}
}
}
}
}()
}

func (p *IncrementalProcessor) setExperimentStatus(event coordinator.ExperimentEventMsg, active bool, msg string) {
logger := p.logger.WithField("func", "setExperimentStatus")
err := p.experimentServer.SetStatus(event.ExperimentName, active, msg)
if err != nil {
logger.WithError(err).Errorf("Failed to set experiment activation")
}
}

func (p *IncrementalProcessor) handleModelEvents(event coordinator.ModelEventMsg) {
logger := p.logger.WithField("func", "handleModelEvents")
logger.Debugf("Received sync for model %s", event.String())
Expand All @@ -182,14 +186,15 @@ func (p *IncrementalProcessor) handleModelEvents(event coordinator.ModelEventMsg
}()
}

func (p *IncrementalProcessor) setListeners() error {
func (p *IncrementalProcessor) init() error {
p.mu.Lock()
defer p.mu.Unlock()
err := p.xdsCache.SetupTLS()
if err != nil {
return err
}
p.xdsCache.AddListeners()
p.xdsCache.AddPermanentListeners()
p.xdsCache.AddPermanentClusters()
return nil
}

Expand Down Expand Up @@ -246,45 +251,32 @@ func (p *IncrementalProcessor) removeRouteForServerInEnvoyCache(routeName string
return nil
}

func (p *IncrementalProcessor) updateEnvoyForModelVersion(modelRouteName string, modelVersion *store.ModelVersion, server *store.ServerSnapshot, trafficPercent uint32, isMirror bool) {
func (p *IncrementalProcessor) updateEnvoyForModelVersion(routeName string, modelVersion *store.ModelVersion, server *store.ServerSnapshot, trafficPercent uint32, isMirror bool) {
logger := p.logger.WithField("func", "updateEnvoyForModelVersion")

assignment := modelVersion.GetAssignment() // Get loaded replicas for model
assignment := modelVersion.GetAssignment()
if len(assignment) == 0 {
logger.Debugf("No assigned replicas so returning for %s", modelRouteName)
logger.Debugf("Not updating route: %s - no assigned replicas for %v", routeName, modelVersion)
return
}

clusterNameBase := modelVersion.GetMeta().GetName() + "_" + strconv.FormatInt(int64(modelVersion.GetVersion()), 10)
httpClusterName := clusterNameBase + "_http"
grpcClusterName := clusterNameBase + "_grpc"
p.xdsCache.AddCluster(httpClusterName, modelRouteName, modelVersion.GetModel().GetMeta().GetName(), modelVersion.GetVersion(), false)
for _, replicaIdx := range assignment {
replica, ok := server.Replicas[replicaIdx]
if !ok {
logger.Warnf("Invalid replica index %d for server %s", replicaIdx, server.Name)
} else {
p.xdsCache.AddEndpoint(httpClusterName, replica.GetInferenceSvc(), uint32(replica.GetInferenceHttpPort()))
}
}
p.xdsCache.AddCluster(grpcClusterName, modelRouteName, modelVersion.GetModel().GetMeta().GetName(), modelVersion.GetVersion(), true)
for _, replicaIdx := range assignment {
replica, ok := server.Replicas[replicaIdx]
if !ok {
logger.Warnf("Invalid replica index %d for server %s", replicaIdx, server.Name)
} else {
p.xdsCache.AddEndpoint(grpcClusterName, replica.GetInferenceSvc(), uint32(replica.GetInferenceGrpcPort()))
}
}
modelName := modelVersion.GetMeta().GetName()
modelVersionNumber := modelVersion.GetVersion()
httpClusterName, grpcClusterName := getClusterNames(modelName, modelVersionNumber)
p.xdsCache.AddClustersForRoute(routeName, modelName, httpClusterName, grpcClusterName, modelVersionNumber, assignment, server)

logPayloads := false
if modelVersion.GetDeploymentSpec() != nil {
logPayloads = modelVersion.GetDeploymentSpec().LogPayloads
} else {
logger.Warnf("model %s has not deployment spec", modelVersion.GetModel().GetMeta().GetName())
logger.Warnf("model %s has not deployment spec", modelName)
}
p.xdsCache.AddRouteClusterTraffic(routeName, modelName, httpClusterName, grpcClusterName, modelVersionNumber, trafficPercent, logPayloads, isMirror)
}

p.xdsCache.AddRouteClusterTraffic(modelRouteName, modelVersion.GetModel().GetMeta().GetName(), modelVersion.GetVersion(), trafficPercent, httpClusterName, grpcClusterName, logPayloads, isMirror)
func getClusterNames(modelVersion string, modelVersionNumber uint32) (string, string) {
clusterNameBase := modelVersion + "_" + strconv.FormatInt(int64(modelVersionNumber), 10)
httpClusterName := clusterNameBase + "_http"
grpcClusterName := clusterNameBase + "_grpc"
return httpClusterName, grpcClusterName
}

func getTrafficShare(latestModel *store.ModelVersion, lastAvailableModelVersion *store.ModelVersion, weight uint32) (uint32, uint32) {
Expand All @@ -305,7 +297,7 @@ func (p *IncrementalProcessor) addModelTraffic(routeName string, model *store.Mo
if latestModel == nil {
logger.Infof("latest model is nil for model %s route %s", model.Name, routeName)
}
return fmt.Errorf("No live replica for model %s for model route %s", model.Name, routeName)
return fmt.Errorf("no live replica for model %s for model route %s", model.Name, routeName)
}

server, err := p.modelStore.GetServer(latestModel.Server(), false, false)
Expand All @@ -321,13 +313,15 @@ func (p *IncrementalProcessor) addModelTraffic(routeName string, model *store.Mo
logger.WithError(err).Errorf("Failed to find server %s for last available model %s", lastAvailableModelVersion.Server(), modelName)
return err
}

logger.Debugf("Splitting traffic between latest %s:%d %d percent and %s:%d %d percent",
modelName,
latestModel.GetVersion(),
trafficLatestModel,
modelName,
lastAvailableModelVersion.GetVersion(),
trafficLastAvailableModel)

p.updateEnvoyForModelVersion(routeName, lastAvailableModelVersion, lastAvailableServer, trafficLastAvailableModel, isMirror)
p.updateEnvoyForModelVersion(routeName, latestModel, server, trafficLatestModel, isMirror)
} else {
Expand Down Expand Up @@ -395,12 +389,19 @@ func (p *IncrementalProcessor) addModel(model *store.ModelSnapshot) error {
func (p *IncrementalProcessor) addTrafficForExperiment(routeName string, exp *experiment.Experiment) error {
switch exp.ResourceType {
case experiment.PipelineResourceType:

var mirrorSplit *resources.PipelineTrafficSplit
trafficSplits := make([]resources.PipelineTrafficSplit, len(exp.Candidates))

for _, candidate := range exp.Candidates {
p.xdsCache.AddPipelineRoute(routeName, candidate.Name, candidate.Weight, false)
trafficSplits = append(trafficSplits, resources.PipelineTrafficSplit{PipelineName: candidate.Name, TrafficWeight: candidate.Weight})
}
if exp.Mirror != nil {
p.xdsCache.AddPipelineRoute(routeName, exp.Mirror.Name, exp.Mirror.Percent, true)
mirrorSplit = &resources.PipelineTrafficSplit{PipelineName: exp.Mirror.Name, TrafficWeight: exp.Mirror.Percent}
}

p.xdsCache.AddPipelineRoute(routeName, trafficSplits, mirrorSplit)

case experiment.ModelResourceType:
for _, candidate := range exp.Candidates {
candidateModel, err := p.modelStore.GetModel(candidate.Name)
Expand Down Expand Up @@ -494,17 +495,20 @@ func (p *IncrementalProcessor) addPipeline(pipelineName string) error {
if exp.Deleted {
return fmt.Errorf("Experiment on pipeline %s, but %s is deleted", pip.Name, *exp.Default)
}
var mirrorSplit *resources.PipelineTrafficSplit
trafficSplits := make([]resources.PipelineTrafficSplit, len(exp.Candidates))

for _, candidate := range exp.Candidates {
logger.Infof("Adding pipeline experiment candidate %s %s %d", routeName, candidate.Name, candidate.Weight)
p.xdsCache.AddPipelineRoute(routeName, candidate.Name, candidate.Weight, false)
trafficSplits = append(trafficSplits, resources.PipelineTrafficSplit{PipelineName: candidate.Name, TrafficWeight: candidate.Weight})
}
if exp.Mirror != nil {
logger.Infof("Adding pipeline experiment mirror %s %s %d", routeName, exp.Mirror.Name, exp.Mirror.Percent)
p.xdsCache.AddPipelineRoute(routeName, exp.Mirror.Name, exp.Mirror.Percent, true)
mirrorSplit = &resources.PipelineTrafficSplit{PipelineName: exp.Mirror.Name, TrafficWeight: exp.Mirror.Percent}
}

p.xdsCache.AddPipelineRoute(routeName, trafficSplits, mirrorSplit)
} else {
logger.Infof("Adding normal pipeline route %s", routeName)
p.xdsCache.AddPipelineRoute(routeName, pip.Name, 100, false)
p.xdsCache.AddPipelineRoute(routeName, []resources.PipelineTrafficSplit{{PipelineName: pip.Name, TrafficWeight: 100}}, nil)
}

return p.updateEnvoy()
Expand Down Expand Up @@ -548,7 +552,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {

model, err := p.modelStore.GetModel(modelName)
if err != nil {
logger.WithError(err).Warnf("Failed to sync model %s", modelName)
logger.WithError(err).Warnf("sync: Failed to sync model %s", modelName)
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName)
p.modelStore.UnlockModel(modelName)
Expand Down
22 changes: 15 additions & 7 deletions scheduler/pkg/envoy/processor/incremental_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func TestRollingUpdate(t *testing.T) {
experimentServer: experiment.NewExperimentServer(log.New(), nil, nil, nil),
pipelineHandler: pipeline.NewPipelineStore(log.New(), nil, modelStore),
}
inc.xdsCache.AddListeners()
inc.xdsCache.AddPermanentListeners()
for _, op := range test.ops {
op(inc, g)
}
Expand Down Expand Up @@ -421,7 +421,7 @@ func TestDraining(t *testing.T) {
experimentServer: experiment.NewExperimentServer(log.New(), nil, nil, nil),
pipelineHandler: pipeline.NewPipelineStore(log.New(), nil, modelStore),
}
inc.xdsCache.AddListeners()
inc.xdsCache.AddPermanentListeners()
for _, op := range test.ops {
op(inc, g)
}
Expand Down Expand Up @@ -566,7 +566,7 @@ func TestModelSync(t *testing.T) {
pipelineHandler: pipeline.NewPipelineStore(log.New(), nil, modelStore),
pendingModelVersions: test.pendingModelVersions,
}
inc.xdsCache.AddListeners()
inc.xdsCache.AddPermanentListeners()
for _, op := range test.ops {
op(inc, g)
}
Expand Down Expand Up @@ -827,7 +827,8 @@ func TestEnvoySettings(t *testing.T) {
inc.handlePipelinesEvents,
)

inc.xdsCache.AddListeners()
inc.xdsCache.AddPermanentListeners()
inc.xdsCache.AddPermanentClusters()
for _, op := range test.ops {
op(inc, g)
time.Sleep(50 * time.Millisecond) // to allow event handlers to process
Expand Down Expand Up @@ -974,7 +975,7 @@ func getTrafficSplits(virtualHost *routev3.VirtualHost) []resources.Route {
for _, route := range virtualHost.Routes {
trafficSplit := resources.Route{
RouteName: route.Name,
Clusters: make([]resources.TrafficSplits, 0),
Clusters: make([]resources.TrafficSplit, 0),
}

clusterSpecificer := route.GetRoute().GetClusterSpecifier()
Expand All @@ -986,20 +987,27 @@ func getTrafficSplits(virtualHost *routev3.VirtualHost) []resources.Route {
weightedClusters := route.GetRoute().GetClusterSpecifier().(*routev3.RouteAction_WeightedClusters)

for _, weightedCluster := range weightedClusters.WeightedClusters.Clusters {
trafficSplit.Clusters = append(trafficSplit.Clusters, resources.TrafficSplits{
trafficSplit.Clusters = append(trafficSplit.Clusters, resources.TrafficSplit{
ModelName: weightedCluster.Name,
TrafficWeight: weightedCluster.Weight.Value,
})
}
case *routev3.RouteAction_Cluster:
cluster := route.GetRoute().GetClusterSpecifier().(*routev3.RouteAction_Cluster)
trafficSplit.Clusters = append(trafficSplit.Clusters, resources.TrafficSplits{
trafficSplit.Clusters = append(trafficSplit.Clusters, resources.TrafficSplit{
ModelName: cluster.Cluster,
TrafficWeight: 100,
})

}

if len(route.GetRoute().RequestMirrorPolicies) > 0 {
mirror := route.GetRoute().RequestMirrorPolicies[0]
trafficSplit.Mirror = &resources.TrafficSplit{ModelName: mirror.Cluster, TrafficWeight: mirror.RuntimeFraction.DefaultValue.Numerator}
}

trafficSplits = append(trafficSplits, trafficSplit)

}

return trafficSplits
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/envoy/processor/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestFetch(t *testing.T) {
nodeID: "node_1",
}

err = inc.setListeners()
err = inc.init()
g.Expect(err).To(BeNil())

conn, err := grpc.NewClient(":"+strconv.Itoa(port), grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand Down
13 changes: 6 additions & 7 deletions scheduler/pkg/envoy/resources/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@ type Listener struct {
Address string
Port uint32
RouteConfigurationName string
//RouteNames []string
}

type Route struct {
RouteName string
LogPayloads bool
Clusters []TrafficSplits
Mirrors []TrafficSplits
Clusters []TrafficSplit
Mirror *TrafficSplit
}

type TrafficSplits struct {
type TrafficSplit struct {
ModelName string
ModelVersion uint32
TrafficWeight uint32
Expand Down Expand Up @@ -61,11 +60,11 @@ type Endpoint struct {

type PipelineRoute struct {
RouteName string
Clusters []PipelineTrafficSplits
Mirrors []PipelineTrafficSplits
Clusters []PipelineTrafficSplit
Mirror *PipelineTrafficSplit
}

type PipelineTrafficSplits struct {
type PipelineTrafficSplit struct {
PipelineName string
TrafficWeight uint32
}
Expand Down
Loading
Loading