Skip to content

Commit

Permalink
refactor(envoy): moving envoy/resources headers to util (SeldonIO#6129)
Browse files Browse the repository at this point in the history
* moving headers to util

* removing a newline

* lint
  • Loading branch information
driev authored Dec 6, 2024
1 parent f284b4a commit 4125273
Show file tree
Hide file tree
Showing 26 changed files with 274 additions and 283 deletions.
25 changes: 12 additions & 13 deletions scheduler/pkg/agent/rproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

"github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/interfaces"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/modelscaling"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/metrics"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/util"
)
Expand Down Expand Up @@ -76,14 +75,14 @@ func (t *lazyModelLoadTransport) RoundTrip(req *http.Request) (*http.Response, e
var originalBody []byte
var err error

internalModelName := req.Header.Get(resources.SeldonInternalModelHeader)
internalModelName := req.Header.Get(util.SeldonInternalModelHeader)
// externalModelName is the name of the model as it is known to the client, we should not use
// resources.SeldonModelHeader though as it can contain the experiment tag (used for routing by envoy)
// however for the metrics we need the actual model name and this is done by using resources.SeldonInternalModelHeader
// util.SeldonModelHeader though as it can contain the experiment tag (used for routing by envoy)
// however for the metrics we need the actual model name and this is done by using util.SeldonInternalModelHeader
externalModelName, _, err := util.GetOrignalModelNameAndVersion(internalModelName)
if err != nil {
t.logger.WithError(err).Warnf("cannot extract model name from %s, revert to actual header", internalModelName)
externalModelName = req.Header.Get(resources.SeldonModelHeader)
externalModelName = req.Header.Get(util.SeldonModelHeader)
}

// to sync between scalingMetricsSetup and scalingMetricsTearDown calls running in go routines
Expand Down Expand Up @@ -121,7 +120,7 @@ func (t *lazyModelLoadTransport) RoundTrip(req *http.Request) (*http.Response, e
// in the case of triton, a request to a model that is not found is considered a bad request
// this is likely to increase latency for genuine bad requests as we will retry twice
if res.StatusCode == http.StatusNotFound || res.StatusCode == http.StatusBadRequest {
internalModelName := req.Header.Get(resources.SeldonInternalModelHeader)
internalModelName := req.Header.Get(util.SeldonInternalModelHeader)
if v2Err := t.loader(internalModelName); v2Err != nil {
t.logger.WithError(v2Err).Warnf("cannot load model %s", internalModelName)
}
Expand All @@ -143,26 +142,26 @@ func (t *lazyModelLoadTransport) RoundTrip(req *http.Request) (*http.Response, e
func (rp *reverseHTTPProxy) addHandlers(proxy http.Handler) http.Handler {
return otelhttp.NewHandler(rp.metrics.AddModelHistogramMetricsHandler(func(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
rp.logger.Debugf("Received request with host %s and internal header %v", r.Host, r.Header.Values(resources.SeldonInternalModelHeader))
rp.logger.Debugf("Received request with host %s and internal header %v", r.Host, r.Header.Values(util.SeldonInternalModelHeader))
rewriteHostHandler(r)

internalModelName := r.Header.Get(resources.SeldonInternalModelHeader)
internalModelName := r.Header.Get(util.SeldonInternalModelHeader)
// externalModelName is the name of the model as it is known to the client, we should not use
// resources.SeldonModelHeader though as it can contain the experiment tag (used for routing by envoy)
// however for the metrics we need the actual model name and this is done by using resources.SeldonInternalModelHeader
// util.SeldonModelHeader though as it can contain the experiment tag (used for routing by envoy)
// however for the metrics we need the actual model name and this is done by using util.SeldonInternalModelHeader
externalModelName, _, err := util.GetOrignalModelNameAndVersion(internalModelName)
if err != nil {
rp.logger.WithError(err).Warnf("cannot extract model name from %s, revert to actual header", internalModelName)
externalModelName = r.Header.Get(resources.SeldonModelHeader)
externalModelName = r.Header.Get(util.SeldonModelHeader)
}

//TODO should we return a 404 if headers not found?
if externalModelName == "" || internalModelName == "" {
rp.logger.Warnf("Failed to extract model name %s:[%s] %s:[%s]", resources.SeldonInternalModelHeader, internalModelName, resources.SeldonModelHeader, externalModelName)
rp.logger.Warnf("Failed to extract model name %s:[%s] %s:[%s]", util.SeldonInternalModelHeader, internalModelName, util.SeldonModelHeader, externalModelName)
proxy.ServeHTTP(w, r)
return
} else {
rp.logger.Debugf("Extracted model name %s:%s %s:%s", resources.SeldonInternalModelHeader, internalModelName, resources.SeldonModelHeader, externalModelName)
rp.logger.Debugf("Extracted model name %s:%s %s:%s", util.SeldonInternalModelHeader, internalModelName, util.SeldonModelHeader, externalModelName)
}

if err := rp.stateManager.EnsureLoadModel(internalModelName); err != nil {
Expand Down
9 changes: 4 additions & 5 deletions scheduler/pkg/agent/rproxy_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

"github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/modelscaling"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/modelserver_controlplane/oip"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/metrics"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/util"
)
Expand Down Expand Up @@ -169,10 +168,10 @@ func (rp *reverseGRPCProxy) extractModelNamesFromContext(ctx context.Context) (s
var internalModelName, externalModelName string
var inHeader bool
if internalModelName, externalModelName, inHeader = extractModelNamesFromHeaders(ctx); inHeader {
rp.logger.Debugf("Extracted model name %s:%s %s:%s", resources.SeldonInternalModelHeader, internalModelName, resources.SeldonModelHeader, externalModelName)
rp.logger.Debugf("Extracted model name %s:%s %s:%s", util.SeldonInternalModelHeader, internalModelName, util.SeldonModelHeader, externalModelName)
return internalModelName, externalModelName, nil
} else {
msg := fmt.Sprintf("Failed to extract model name %s:[%s] %s:[%s]", resources.SeldonInternalModelHeader, internalModelName, resources.SeldonModelHeader, externalModelName)
msg := fmt.Sprintf("Failed to extract model name %s:[%s] %s:[%s]", util.SeldonInternalModelHeader, internalModelName, util.SeldonModelHeader, externalModelName)
rp.logger.Error(msg)
return "", "", status.Error(codes.FailedPrecondition, msg)
}
Expand Down Expand Up @@ -354,10 +353,10 @@ func extractHeader(key string, md metadata.MD) string {
func extractModelNamesFromHeaders(ctx context.Context) (string, string, bool) {
md, ok := metadata.FromIncomingContext(ctx)
if ok {
internalModelName := extractHeader(resources.SeldonInternalModelHeader, md)
internalModelName := extractHeader(util.SeldonInternalModelHeader, md)
externalModelName, _, err := util.GetOrignalModelNameAndVersion(internalModelName)
if err != nil {
externalModelName = extractHeader(resources.SeldonModelHeader, md)
externalModelName = extractHeader(util.SeldonModelHeader, md)
}
return internalModelName, externalModelName, internalModelName != "" && externalModelName != ""
}
Expand Down
7 changes: 3 additions & 4 deletions scheduler/pkg/agent/rproxy_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/internal/testing_utils"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/modelscaling"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources"
testing_utils2 "github.com/seldonio/seldon-core/scheduler/v2/pkg/internal/testing_utils"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/metrics"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/util"
Expand Down Expand Up @@ -120,21 +119,21 @@ func TestReverseGRPCServiceSmoke(t *testing.T) {
doInfer := func(modelSuffixInternal, modelSuffix string) (*v2.ModelInferResponse, error) {
client := v2.NewGRPCInferenceServiceClient(conn)
ctx := context.Background()
ctx = metadata.AppendToOutgoingContext(ctx, resources.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffixInternal, resources.SeldonModelHeader, dummyModelNamePrefix+modelSuffix)
ctx = metadata.AppendToOutgoingContext(ctx, util.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffixInternal, util.SeldonModelHeader, dummyModelNamePrefix+modelSuffix)
return client.ModelInfer(ctx, &v2.ModelInferRequest{ModelName: dummyModelNamePrefix}) // note without suffix
}

doMeta := func(modelSuffix string) (*v2.ModelMetadataResponse, error) {
client := v2.NewGRPCInferenceServiceClient(conn)
ctx := context.Background()
ctx = metadata.AppendToOutgoingContext(ctx, resources.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffix, resources.SeldonModelHeader, dummyModelNamePrefix)
ctx = metadata.AppendToOutgoingContext(ctx, util.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffix, util.SeldonModelHeader, dummyModelNamePrefix)
return client.ModelMetadata(ctx, &v2.ModelMetadataRequest{Name: dummyModelNamePrefix}) // note without suffix
}

doModelReady := func(modelSuffix string) (*v2.ModelReadyResponse, error) {
client := v2.NewGRPCInferenceServiceClient(conn)
ctx := context.Background()
ctx = metadata.AppendToOutgoingContext(ctx, resources.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffix, resources.SeldonModelHeader, dummyModelNamePrefix)
ctx = metadata.AppendToOutgoingContext(ctx, util.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffix, util.SeldonModelHeader, dummyModelNamePrefix)
return client.ModelReady(ctx, &v2.ModelReadyRequest{Name: dummyModelNamePrefix}) // note without suffix
}

Expand Down
7 changes: 3 additions & 4 deletions scheduler/pkg/agent/rproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/interfaces"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/internal/testing_utils"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/modelscaling"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources"
testing_utils2 "github.com/seldonio/seldon-core/scheduler/v2/pkg/internal/testing_utils"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/metrics"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/util"
Expand Down Expand Up @@ -272,8 +271,8 @@ func TestReverseProxySmoke(t *testing.T) {
req, err := http.NewRequest(http.MethodPost, url, nil)
g.Expect(err).To(BeNil())
req.Header.Set("contentType", "application/json")
req.Header.Set(resources.SeldonModelHeader, test.modelExternalHeader)
req.Header.Set(resources.SeldonInternalModelHeader, test.modelToRequest)
req.Header.Set(util.SeldonModelHeader, test.modelExternalHeader)
req.Header.Set(util.SeldonInternalModelHeader, test.modelToRequest)
resp, err := http.DefaultClient.Do(req)
g.Expect(err).To(BeNil())

Expand Down Expand Up @@ -459,7 +458,7 @@ func TestLazyLoadRoundTripper(t *testing.T) {
httpClient.Transport = &lazyModelLoadTransport{
loader, http.DefaultTransport, metricsHandler, modelScalingStatsCollector, log.New()}
mockMLServerState.setModelServerUnloaded(dummyModel)
req.Header.Set(resources.SeldonInternalModelHeader, dummyModel)
req.Header.Set(util.SeldonInternalModelHeader, dummyModel)
resp, err := httpClient.Do(req)
g.Expect(err).To(BeNil())
g.Expect(resp.StatusCode).To(Equal(http.StatusOK))
Expand Down
21 changes: 10 additions & 11 deletions scheduler/pkg/envoy/processor/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/sirupsen/logrus"

"github.com/seldonio/seldon-core/scheduler/v2/pkg/coordinator"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/xdscache"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/scheduler/cleaner"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/store"
Expand Down Expand Up @@ -390,14 +389,14 @@ func (p *IncrementalProcessor) addTrafficForExperiment(routeName string, exp *ex
switch exp.ResourceType {
case experiment.PipelineResourceType:

var mirrorSplit *resources.PipelineTrafficSplit
trafficSplits := make([]resources.PipelineTrafficSplit, len(exp.Candidates))
var mirrorSplit *xdscache.PipelineTrafficSplit
trafficSplits := make([]xdscache.PipelineTrafficSplit, len(exp.Candidates))

for _, candidate := range exp.Candidates {
trafficSplits = append(trafficSplits, resources.PipelineTrafficSplit{PipelineName: candidate.Name, TrafficWeight: candidate.Weight})
trafficSplits = append(trafficSplits, xdscache.PipelineTrafficSplit{PipelineName: candidate.Name, TrafficWeight: candidate.Weight})
}
if exp.Mirror != nil {
mirrorSplit = &resources.PipelineTrafficSplit{PipelineName: exp.Mirror.Name, TrafficWeight: exp.Mirror.Percent}
mirrorSplit = &xdscache.PipelineTrafficSplit{PipelineName: exp.Mirror.Name, TrafficWeight: exp.Mirror.Percent}
}

p.xdsCache.AddPipelineRoute(routeName, trafficSplits, mirrorSplit)
Expand Down Expand Up @@ -463,7 +462,7 @@ func (p *IncrementalProcessor) removeExperiment(exp *experiment.Experiment) erro
}

func getPipelineRouteName(pipelineName string) string {
return fmt.Sprintf("%s.%s", pipelineName, resources.SeldonPipelineHeaderSuffix)
return fmt.Sprintf("%s.%s", pipelineName, util.SeldonPipelineHeaderSuffix)
}

// TODO make envoy updates for pipelines batched
Expand Down Expand Up @@ -495,20 +494,20 @@ func (p *IncrementalProcessor) addPipeline(pipelineName string) error {
if exp.Deleted {
return fmt.Errorf("Experiment on pipeline %s, but %s is deleted", pip.Name, *exp.Default)
}
var mirrorSplit *resources.PipelineTrafficSplit
trafficSplits := make([]resources.PipelineTrafficSplit, len(exp.Candidates))
var mirrorSplit *xdscache.PipelineTrafficSplit
trafficSplits := make([]xdscache.PipelineTrafficSplit, len(exp.Candidates))

for _, candidate := range exp.Candidates {
trafficSplits = append(trafficSplits, resources.PipelineTrafficSplit{PipelineName: candidate.Name, TrafficWeight: candidate.Weight})
trafficSplits = append(trafficSplits, xdscache.PipelineTrafficSplit{PipelineName: candidate.Name, TrafficWeight: candidate.Weight})
}
if exp.Mirror != nil {
mirrorSplit = &resources.PipelineTrafficSplit{PipelineName: exp.Mirror.Name, TrafficWeight: exp.Mirror.Percent}
mirrorSplit = &xdscache.PipelineTrafficSplit{PipelineName: exp.Mirror.Name, TrafficWeight: exp.Mirror.Percent}
}

p.xdsCache.AddPipelineRoute(routeName, trafficSplits, mirrorSplit)
} else {
logger.Infof("Adding normal pipeline route %s", routeName)
p.xdsCache.AddPipelineRoute(routeName, []resources.PipelineTrafficSplit{{PipelineName: pip.Name, TrafficWeight: 100}}, nil)
p.xdsCache.AddPipelineRoute(routeName, []xdscache.PipelineTrafficSplit{{PipelineName: pip.Name, TrafficWeight: 100}}, nil)
}

return p.updateEnvoy()
Expand Down
21 changes: 10 additions & 11 deletions scheduler/pkg/envoy/processor/incremental_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler"

"github.com/seldonio/seldon-core/scheduler/v2/pkg/coordinator"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/xdscache"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/store"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/store/experiment"
Expand Down Expand Up @@ -969,13 +968,13 @@ func createSnapshot(g Gomega, resources []types.Resource, filename string) {
g.Expect(err).To(BeNil())
}

func getTrafficSplits(virtualHost *routev3.VirtualHost) []resources.Route {
trafficSplits := make([]resources.Route, 0)
func getTrafficSplits(virtualHost *routev3.VirtualHost) []xdscache.Route {
trafficSplits := make([]xdscache.Route, 0)

for _, route := range virtualHost.Routes {
trafficSplit := resources.Route{
trafficSplit := xdscache.Route{
RouteName: route.Name,
Clusters: make([]resources.TrafficSplit, 0),
Clusters: make([]xdscache.TrafficSplit, 0),
}

clusterSpecificer := route.GetRoute().GetClusterSpecifier()
Expand All @@ -987,14 +986,14 @@ func getTrafficSplits(virtualHost *routev3.VirtualHost) []resources.Route {
weightedClusters := route.GetRoute().GetClusterSpecifier().(*routev3.RouteAction_WeightedClusters)

for _, weightedCluster := range weightedClusters.WeightedClusters.Clusters {
trafficSplit.Clusters = append(trafficSplit.Clusters, resources.TrafficSplit{
trafficSplit.Clusters = append(trafficSplit.Clusters, xdscache.TrafficSplit{
ModelName: weightedCluster.Name,
TrafficWeight: weightedCluster.Weight.Value,
})
}
case *routev3.RouteAction_Cluster:
cluster := route.GetRoute().GetClusterSpecifier().(*routev3.RouteAction_Cluster)
trafficSplit.Clusters = append(trafficSplit.Clusters, resources.TrafficSplit{
trafficSplit.Clusters = append(trafficSplit.Clusters, xdscache.TrafficSplit{
ModelName: cluster.Cluster,
TrafficWeight: 100,
})
Expand All @@ -1003,7 +1002,7 @@ func getTrafficSplits(virtualHost *routev3.VirtualHost) []resources.Route {

if len(route.GetRoute().RequestMirrorPolicies) > 0 {
mirror := route.GetRoute().RequestMirrorPolicies[0]
trafficSplit.Mirror = &resources.TrafficSplit{ModelName: mirror.Cluster, TrafficWeight: mirror.RuntimeFraction.DefaultValue.Numerator}
trafficSplit.Mirror = &xdscache.TrafficSplit{ModelName: mirror.Cluster, TrafficWeight: mirror.RuntimeFraction.DefaultValue.Numerator}
}

trafficSplits = append(trafficSplits, trafficSplit)
Expand All @@ -1013,12 +1012,12 @@ func getTrafficSplits(virtualHost *routev3.VirtualHost) []resources.Route {
return trafficSplits
}

func getEndpoints(loadAssignment *endpointv3.ClusterLoadAssignment) []resources.Endpoint {
endpoints := make([]resources.Endpoint, 0)
func getEndpoints(loadAssignment *endpointv3.ClusterLoadAssignment) []xdscache.Endpoint {
endpoints := make([]xdscache.Endpoint, 0)
for _, localityLbEndpoint := range loadAssignment.Endpoints {
for _, lbEndpoint := range localityLbEndpoint.LbEndpoints {
endpointEndpoint := lbEndpoint.HostIdentifier.(*endpointv3.LbEndpoint_Endpoint)
endpoints = append(endpoints, resources.Endpoint{
endpoints = append(endpoints, xdscache.Endpoint{
UpstreamHost: endpointEndpoint.Endpoint.Address.GetSocketAddress().Address,
UpstreamPort: endpointEndpoint.Endpoint.Address.GetSocketAddress().GetPortValue(),
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Use of this software is governed by
the Change License after the Change Date as each is defined in accordance with the LICENSE file.
*/

package resources
package xdscache

import "github.com/seldonio/seldon-core/components/tls/v2/pkg/tls"

Expand Down
Loading

0 comments on commit 4125273

Please sign in to comment.