diff --git a/microservices-connector/Makefile b/microservices-connector/Makefile index 69ab07d7d..0965b8464 100644 --- a/microservices-connector/Makefile +++ b/microservices-connector/Makefile @@ -119,12 +119,12 @@ docker.build: manager-image router-image # Build manager Docker image manager-image: @echo "Building manager Docker image..." - $(CONTAINER_TOOL) build -t $(DOCKER_REGISTRY)/${CTR_IMG}:$(VERSION) -f Dockerfile.manager . + $(CONTAINER_TOOL) build --build-arg https_proxy --build-arg http_proxy -t $(DOCKER_REGISTRY)/${CTR_IMG}:$(VERSION) -f Dockerfile.manager . # Build router Docker image router-image: @echo "Building router Docker image..." - $(CONTAINER_TOOL) build -t $(DOCKER_REGISTRY)/${ROUTER_IMG}:$(VERSION) -f Dockerfile.router . + $(CONTAINER_TOOL) build --build-arg https_proxy --build-arg http_proxy -t $(DOCKER_REGISTRY)/${ROUTER_IMG}:$(VERSION) -f Dockerfile.router . # Publish manager and router docker images .PHONY: docker.push @@ -192,7 +192,7 @@ uninstall: manifests kustomize ## Uninstall CRDs from the K8s cluster specified .PHONY: deploy deploy: manifests kustomize ## Deploy controller to the K8s cluster specified in ~/.kube/config. - cd config/manager && $(KUSTOMIZE) edit set image controller=${CTR_IMG} + cd config/manager && $(KUSTOMIZE) edit set image controller=$(DOCKER_REGISTRY)/${CTR_IMG}:$(VERSION) $(KUSTOMIZE) build config/default | $(KUBECTL) apply -f - .PHONY: undeploy @@ -215,7 +215,7 @@ GOLANGCI_LINT = $(LOCALBIN)/golangci-lint-$(GOLANGCI_LINT_VERSION) ## Tool Versions KUSTOMIZE_VERSION ?= v5.3.0 -CONTROLLER_TOOLS_VERSION ?= v0.14.0 +CONTROLLER_TOOLS_VERSION ?= v0.15.0 ENVTEST_VERSION ?= release-0.17 GOLANGCI_LINT_VERSION ?= v1.57.2 diff --git a/microservices-connector/api/v1alpha3/gmconnector_types.go b/microservices-connector/api/v1alpha3/gmconnector_types.go index 6a717f8b6..bbdc1f46f 100644 --- a/microservices-connector/api/v1alpha3/gmconnector_types.go +++ b/microservices-connector/api/v1alpha3/gmconnector_types.go @@ -69,6 +69,18 @@ const ( Tgi StepNameType = "Tgi" // Llm Llm StepNameType = "Llm" + // LLMGuardInput + LLMGuardInput StepNameType = "LLMGuardInput" + // LLMGuardOutput + LLMGuardOutput StepNameType = "LLMGuardOutput" + // VLLMGaudi + VLLMGaudi StepNameType = "VLLMGaudi" + // VLLM + VLLM StepNameType = "VLLM" + // VLLMOpenVino + VLLMOpenVino StepNameType = "VLLMOpenVino" + // Language-Detection + LanguageDetection StepNameType = "LanguageDetection" ) type Executor struct { diff --git a/microservices-connector/api/v1alpha3/validating_webhook.go b/microservices-connector/api/v1alpha3/validating_webhook.go index 173e203d9..aacde979d 100644 --- a/microservices-connector/api/v1alpha3/validating_webhook.go +++ b/microservices-connector/api/v1alpha3/validating_webhook.go @@ -46,7 +46,18 @@ var ( "Whisper", "WhisperGaudi", "DataPrep", - "UI", + + // EntRag specific + // please keep that in sync with internal/controller/gmconnector_controller.go:41 const section + "Ingestion", + "TorchserveEmbedding", + "TorchserveEmbeddingGaudi", + "LLMGuardInput", + "LLMGuardOutput", + "VLLMGaudi", + "VLLM", + "VLLMOpenVino", + "LanguageDetection", } ) diff --git a/microservices-connector/api/v1alpha3/zz_generated.deepcopy.go b/microservices-connector/api/v1alpha3/zz_generated.deepcopy.go index a5cf0e848..bd8d2f3de 100644 --- a/microservices-connector/api/v1alpha3/zz_generated.deepcopy.go +++ b/microservices-connector/api/v1alpha3/zz_generated.deepcopy.go @@ -10,7 +10,7 @@ package v1alpha3 import ( - runtime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. diff --git a/microservices-connector/cmd/router/main.go b/microservices-connector/cmd/router/main.go index 843c6fcb9..357cd72fd 100644 --- a/microservices-connector/cmd/router/main.go +++ b/microservices-connector/cmd/router/main.go @@ -20,8 +20,6 @@ import ( "io" "mime/multipart" "net/http" - "net/http/httputil" - "net/url" "os" // "regexp" @@ -29,7 +27,11 @@ import ( "strings" "time" + "github.com/MrAlias/otlpr" + "github.com/go-logr/logr" "github.com/tidwall/gjson" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -38,23 +40,51 @@ import ( mcv1alpha3 "github.com/opea-project/GenAIInfra/microservices-connector/api/v1alpha3" flag "github.com/spf13/pflag" + + // OpenTelemetry/Metrics: Prometheus and opentelemetry imports + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + "go.opentelemetry.io/otel/metric" + api "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/propagation" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" + "go.opentelemetry.io/otel/trace" + + // OpenTelemetry/Traces: + + sdktrace "go.opentelemetry.io/otel/sdk/trace" ) const ( - BufferSize = 1024 - MaxGoroutines = 1024 - ServiceURL = "serviceUrl" - ServiceNode = "node" - DataPrep = "DataPrep" - Parameters = "parameters" - UI = "UI" - LLMKeyword = "query" - EmbeddingKeyword = "text" + BufferSize = 1024 + MaxGoroutines = 1024 + ServiceURL = "serviceUrl" + ServiceNode = "node" + DataPrep = "DataPrep" + Parameters = "parameters" + OtelVersion = "v0.3.0" + + CallClientTimeoutSeconds = 3600 + GraphHandlerTimeoutSeconds = 3600 ) var ( + OtelServiceName = "router-service" // will be overwriteen by OTEL_SERVICE_NAME + OtelNamespace = "unknown-namespace" // will be overwriteen by OTEL_NAMESPACE + OtelExcludedUrls = []string{} + debugRequestLogs = false + debugRequestTraces = false + log logr.Logger + jsonGraph = flag.String("graph-json", "", "serialized json graph def") - log = logf.Log.WithName("GMCGraphRouter") mcGraph *mcv1alpha3.GMConnector defaultNodeName = "root" semaphore = make(chan struct{}, MaxGoroutines) @@ -65,19 +95,6 @@ var ( TLSHandshakeTimeout: time.Minute, ExpectContinueTimeout: 30 * time.Second, } - callClient = &http.Client{ - Transport: transport, - Timeout: 30 * time.Second, - } - UnknownErr = errors.New("Unknown format") - defaultLlmParams = map[string]interface{}{ - "max_tokens": 1024, - "top_k": 10, - "top_p": 0.95, - "temperature": 0.01, - "repetition_penalty": 1.03, - "streaming": true, - } ) type EnsembleStepOutput struct { @@ -94,6 +111,221 @@ type ReadCloser struct { *bytes.Reader } +var ( + firstTokenLatencyMeasure metric.Float64Histogram + nextTokenLatencyMeasure metric.Float64Histogram + allTokenLatencyMeasure metric.Float64Histogram + pipelineLatencyMeasure metric.Float64Histogram + stepLatencyMeasure metric.Float64Histogram +) + +func initMeter() { + // The exporter embeds a default OpenTelemetry Reader and + // implements prometheus.Collector, allowing it to be used as + // both a Reader and Collector. + exporter, err := prometheus.New() + if err != nil { + log.Error(err, "metrics: cannot init prometheus collector") + } + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(exporter)) + otel.SetMeterProvider(provider) + + // ppalucki: Own metrics defintion bellow + const meterName = "entrag-telemetry" + meter := provider.Meter(meterName) + + firstTokenLatencyMeasure, err = meter.Float64Histogram( + "llm.first.token.latency", + metric.WithUnit("ms"), + metric.WithDescription("Measures the duration of first token generation."), + api.WithExplicitBucketBoundaries(1, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16364), + ) + if err != nil { + log.Error(err, "metrics: cannot register first token histogram measure") + } + nextTokenLatencyMeasure, err = meter.Float64Histogram( + "llm.next.token.latency", + metric.WithUnit("ms"), + metric.WithDescription("Measures the duration of generating all but first tokens."), + api.WithExplicitBucketBoundaries(1, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16364), + ) + if err != nil { + log.Error(err, "metrics: cannot register next token histogram measure") + } + + allTokenLatencyMeasure, err = meter.Float64Histogram( + "llm.all.token.latency", + metric.WithUnit("ms"), + metric.WithDescription("Measures the duration to generate response with all tokens."), + api.WithExplicitBucketBoundaries(1, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16364), + ) + if err != nil { + log.Error(err, "metrics: cannot register all token histogram measure") + } + + pipelineLatencyMeasure, err = meter.Float64Histogram( + "llm.pipeline.latency", + metric.WithUnit("ms"), + metric.WithDescription("Measures the duration to going through pipeline steps until first token is being generated (including read data time from client)."), + api.WithExplicitBucketBoundaries(1, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16364), + ) + if err != nil { + log.Error(err, "metrics: cannot register pipeline histogram measure") + } + stepLatencyMeasure, err = meter.Float64Histogram( + "llm.pipeline.step", + metric.WithUnit("ms"), + metric.WithDescription("Measures the duration to going through step."), + api.WithExplicitBucketBoundaries(1, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16364), + ) + if err != nil { + log.Error(err, "metrics: cannot register step histogram measure") + } + println("otel/metrics: configured") +} + +func initLogs() { + // if OTEL_LOGS_GRPC_ENDPOINT is set to grpc otlp endpoint like this OTEL_LOGS_GRPC_ENDPOINT=127.0.0.1:4317 + // then global variable log (logr.Logger) will be replaced with logr with sink that sends data to otlp endpoint https://github.com/MrAlias/otlpr + // otherwise log uses zap from controller-runtime logf.WithName... + otlpTarget, configured := os.LookupEnv("OTEL_LOGS_GRPC_ENDPOINT") + if configured { + conn, err := grpc.NewClient(otlpTarget, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + fmt.Println("error", err) + //log.Error(err, "failed to configure logger grpc connection") + os.Exit(1) + } + res := resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String(OtelServiceName), + ) + log = otlpr.NewWithOptions(conn, otlpr.Options{ + LogCaller: otlpr.All, + LogCallerFunc: true, + Batcher: otlpr.Batcher{Messages: 1, Timeout: 5 * time.Second}, + }) + log = otlpr.WithResource(log, res) + + println("otel/logs: enabled - otlpr logger configured with:", otlpTarget) + log.Info("OTEL OTLPR sink configured") + } else { + log = logf.Log.WithName("GMCGraphRouter") + logf.SetLogger(zap.New()) + println("otel/logs: disabled - otlrp not configured (OTEL_LOGS_GRPC_ENDPOINT empty)") + } + +} + +func initTraces() { + // BY DEFAULT DO NOT INSTALL TRACES if URLS is NOT GIVEN + otlpEndpoint, endpointFound := os.LookupEnv("OTEL_EXPORTER_OTLP_ENDPOINT") + if !endpointFound { + println("otel/traces: disabled - OTEL_EXPORTER_OTLP_ENDPOINT not set") + return + } + if otlpEndpoint == "" { + println("otel/traces: disabled - OTEL_EXPORTER_OTLP_ENDPOINT is empty ") + return + } + + if os.Getenv("OTEL_TRACES_DISABLED") == "true" { + println("otel/traces: disabled - because of OTEL_TRACES_DISABLED=true") + return + } + + println("otel/traces: enabled OTEL_EXPORTER_OTLP_ENDPOINT (or default localhost will be used):", os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT")) + + excludedUrlsStr, urlsFound := os.LookupEnv("OTEL_GO_EXCLUDED_URLS") + if urlsFound { + OtelExcludedUrls = strings.Split(excludedUrlsStr, ",") + } + fmt.Println("otel/traces: OTEL_GO_EXCLUDED_URLS =", OtelExcludedUrls) + + ctx := context.Background() + exporterOtlp, err := otlptracehttp.New(ctx) + if err != nil { + log.Error(err, "failed to init trace exporters") + os.Exit(1) + } + + samplerRatio := 1.0 + ratioStr, ratioFound := os.LookupEnv("OTEL_TRACES_SAMPLER_FRACTION") + if ratioFound { + if samplerRatio, err = strconv.ParseFloat(ratioStr, 64); err == nil { + if err != nil { + log.Error(err, "failed to conver sampler ratio to float64") + os.Exit(1) + } + } + + } + fmt.Println("otel/traces: OTEL_TRACES_SAMPLER_FRACTION =", samplerRatio) + + // Use sdktrace.AlwaysSample sampler to sample all traces. + // In a production application, use sdktrace.ProbabilitySampler with a desired probability. + var tp trace.TracerProvider + if os.Getenv("OTEL_TRACES_CONSOLE_EXPORTER") == "true" { + println("otel/traces: console exporter enabled (OTEL_TRACES_CONSOLE_EXPORTER=true)") + exporterStdout, err := stdouttrace.New( + stdouttrace.WithPrettyPrint(), + //stdouttrace.WithWriter(os.Stderr), + ) + if err != nil { + log.Error(err, "failed to init trace console exporter") + os.Exit(1) + } + tp = sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.TraceIDRatioBased(samplerRatio)), + sdktrace.WithBatcher(exporterOtlp), + sdktrace.WithSyncer(exporterStdout), + sdktrace.WithResource(resource.NewWithAttributes(semconv.SchemaURL, semconv.ServiceName(OtelServiceName))), + ) + } else { + println("otel/traces: console exporter disabled (missing OTEL_TRACES_CONSOLE_EXPORTER=true)") + tp = sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.TraceIDRatioBased(samplerRatio)), + sdktrace.WithBatcher(exporterOtlp), + sdktrace.WithResource(resource.NewWithAttributes(semconv.SchemaURL, semconv.ServiceName(OtelServiceName))), + ) + } + + // Later us this like this: mainTracer := otel.GetTracerProvider().Tracer("graphtracer") + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) +} + +func init() { + println("otel: version:", OtelVersion) + serviceNameFromEnv, found := os.LookupEnv("OTEL_SERVICE_NAME") + if found { + OtelServiceName = serviceNameFromEnv + } + println("otel: servicename:", OtelServiceName) + namespaceFromEnv, found := os.LookupEnv("OTEL_NAMESPACE") + if found { + OtelNamespace = namespaceFromEnv + } + println("otel: namespace:", OtelNamespace) + initMeter() + initLogs() + initTraces() + + // ENABLE_DEBUG_REQUEST_LOGS will enable debug logs (if "true") + debugEnvStr, debugEnvFound := os.LookupEnv("ENABLE_DEBUG_REQUEST_LOGS") + if debugEnvFound && debugEnvStr == "true" { + debugRequestLogs = true + } + fmt.Println("debugRequestLogs:", debugRequestLogs) + + // ENABLE_DEBUG_REQUEST_TRACES will enable debug traces (if "true") + debugTracesEnvStr, debugTracesEnvFound := os.LookupEnv("ENABLE_DEBUG_REQUEST_TRACES") + if debugTracesEnvFound && debugTracesEnvStr == "true" { + debugRequestTraces = true + } + fmt.Println("debugRequestTraces:", debugRequestTraces) +} + func (ReadCloser) Close() error { // Typically, you would release resources here, but for bytes.Reader, there's nothing to do. return nil @@ -107,9 +339,9 @@ func (e *GMCGraphRoutingError) Error() string { return fmt.Sprintf("%s. %s", e.ErrorMessage, e.Cause) } -func timeTrack(start time.Time, nodeOrStep string, name string) { +func timeTrack(ctx context.Context, start time.Time, nodeOrStep string, name string) { elapsed := time.Since(start) - log.Info("elapsed time", nodeOrStep, name, "time", elapsed) + otlpr.WithContext(log, ctx).Info("elapsed time", nodeOrStep, name, "time", elapsed) } func isSuccessFul(statusCode int) bool { @@ -168,6 +400,7 @@ func prepareErrorResponse(err error, errorMessage string) []byte { } func callService( + ctx context.Context, step *mcv1alpha3.Step, serviceUrl string, input []byte, @@ -176,33 +409,68 @@ func callService( semaphore <- struct{}{} defer func() { <-semaphore }() - defer timeTrack(time.Now(), "step", serviceUrl) - log.Info("Entering callService", "url", serviceUrl) + defer timeTrack(ctx, time.Now(), "step", serviceUrl) + otlpr.WithContext(log, ctx).Info("Entering callService", "url", serviceUrl) // log the http header from the original request - log.Info("Print the http request headers", "HTTP_Header", headers) - + if debugRequestLogs { + otlpr.WithContext(log, ctx).Info("Print the http request headers", "HTTP_Header", headers) + } if step.InternalService.Config != nil { err := os.Setenv("no_proxy", step.InternalService.Config["no_proxy"]) if err != nil { - log.Error(err, "Error setting environment variable", "no_proxy", step.InternalService.Config["no_proxy"]) + otlpr.WithContext(log, ctx).Error(err, "Error setting environment variable", "no_proxy", step.InternalService.Config["no_proxy"]) return nil, 400, err } } - req, err := http.NewRequest("POST", serviceUrl, bytes.NewBuffer(input)) + //req, err := http.NewRequest("POST", serviceUrl, bytes.NewBuffer(input)) + req, err := http.NewRequestWithContext(ctx, "POST", serviceUrl, bytes.NewBuffer(input)) if err != nil { - log.Error(err, "An error occurred while preparing request object with serviceUrl.", "serviceUrl", serviceUrl) + otlpr.WithContext(log, ctx).Error(err, "An error occurred while preparing request object with serviceUrl.", "serviceUrl", serviceUrl) return nil, 500, err } if val := req.Header.Get("Content-Type"); val == "" { req.Header.Add("Content-Type", "application/json") } - + // normal client + // callClient := http.Client{ + // Transport: transport, + // Timeout: 600 * time.Second, + // } + + // otel client + // we want to use existing tracer instad creating a new one, but how !!! + callClient := http.Client{ + Transport: otelhttp.NewTransport( + transport, + otelhttp.WithServerName(serviceUrl), + otelhttp.WithSpanNameFormatter( + func(operation string, r *http.Request) string { + return "HTTP " + r.Method + " " + r.URL.String() + }), + otelhttp.WithFilter(func(r *http.Request) bool { + for _, excludedUrl := range OtelExcludedUrls { + if r.RequestURI == excludedUrl { + return false + } + } + return true + }), + otelhttp.WithMessageEvents(otelhttp.ReadEvents, otelhttp.WriteEvents), + // //// GEnerate EXTRA spans for dns/sent/reciver + // otelhttp.WithClientTrace( + // func(ctx context.Context) *httptrace.ClientTrace { + // return otelhttptrace.NewClientTrace(ctx) + // }, + // ), + ), + Timeout: CallClientTimeoutSeconds * time.Second, + } resp, err := callClient.Do(req) if err != nil { - log.Error(err, "An error has occurred while calling service", "service", serviceUrl) + otlpr.WithContext(log, ctx).Error(err, "An error has occurred while calling service", "service", serviceUrl) return nil, 500, err } @@ -220,6 +488,7 @@ func getServiceURLByStepTarget(step *mcv1alpha3.Step, svcNameSpace string) strin } func executeStep( + ctx context.Context, step *mcv1alpha3.Step, graph mcv1alpha3.GMConnector, initInput []byte, @@ -228,18 +497,18 @@ func executeStep( ) (io.ReadCloser, int, error) { if step.NodeName != "" { // when nodeName is specified make a recursive call for routing to next step - return routeStep(step.NodeName, graph, initInput, input, headers) + return routeStep(ctx, step.NodeName, graph, initInput, input, headers) } serviceURL := getServiceURLByStepTarget(step, graph.Namespace) - return callService(step, serviceURL, input, headers) + return callService(ctx, step, serviceURL, input, headers) } -func mergeRequests(respReq []byte, initReqData map[string]interface{}) []byte { +func mergeRequests(ctx context.Context, respReq []byte, initReqData map[string]interface{}) []byte { var respReqData map[string]interface{} if _, exists := initReqData[Parameters]; exists { if err := json.Unmarshal(respReq, &respReqData); err != nil { - log.Error(err, "Error unmarshaling respReqData:") + otlpr.WithContext(log, ctx).Error(err, "Error unmarshaling respReqData:") return nil } // Merge init request into respReq @@ -252,7 +521,7 @@ func mergeRequests(respReq []byte, initReqData map[string]interface{}) []byte { } mergedBytes, err := json.Marshal(respReqData) if err != nil { - log.Error(err, "Error marshaling merged data:") + otlpr.WithContext(log, ctx).Error(err, "Error marshaling merged data:") return nil } return mergedBytes @@ -261,6 +530,7 @@ func mergeRequests(respReq []byte, initReqData map[string]interface{}) []byte { } func handleSwitchNode( + ctx context.Context, route *mcv1alpha3.Step, graph mcv1alpha3.GMConnector, initInput []byte, @@ -274,24 +544,20 @@ func handleSwitchNode( if route.NodeName != "" { stepType = ServiceNode } - log.Info("Starting execution of step", "Node Name", route.NodeName, "type", stepType, "stepName", route.StepName) - if responseBody, statusCode, err = executeStep(route, graph, initInput, request, headers); err != nil { + otlpr.WithContext(log, ctx).Info("Starting execution of step", "Node Name", route.NodeName, "type", stepType, "stepName", route.StepName) + if responseBody, statusCode, err = executeStep(ctx, route, graph, initInput, request, headers); err != nil { return nil, 500, err } if route.Dependency == mcv1alpha3.Hard && !isSuccessFul(statusCode) { - log.Info( - "This step is a hard dependency and it is unsuccessful", - "stepName", - route.StepName, - "statusCode", - statusCode, - ) + otlpr.WithContext(log, ctx).Info("This step is a hard dependency and it is unsuccessful", "stepName", route.StepName, "statusCode", statusCode) } return responseBody, statusCode, nil } -func handleSwitchPipeline(nodeName string, +func handleSwitchPipeline( + ctx context.Context, + nodeName string, graph mcv1alpha3.GMConnector, initInput []byte, input []byte, @@ -305,19 +571,13 @@ func handleSwitchPipeline(nodeName string, initReqData := make(map[string]interface{}) if err = json.Unmarshal(initInput, &initReqData); err != nil { - log.Error(err, "Error unmarshaling initReqData:") + otlpr.WithContext(log, ctx).Error(err, "Error unmarshaling initReqData:") return nil, 500, err } for index, route := range currentNode.Steps { if route.InternalService.IsDownstreamService { - log.Info( - "InternalService DownstreamService is true, skip the execution of step", - "type", - currentNode.RouterType, - "stepName", - route.StepName, - ) + otlpr.WithContext(log, ctx).Info("InternalService DownstreamService is true, skip the execution of step", "type", currentNode.RouterType, "stepName", route.StepName) continue } @@ -328,28 +588,33 @@ func handleSwitchPipeline(nodeName string, } } - log.Info("Current Step Information", "Node Name", nodeName, "Step Index", index) + otlpr.WithContext(log, ctx).Info("Current Step Information", "Node Name", nodeName, "Step Index", index) request := input if responseBody != nil { responseBytes, err = io.ReadAll(responseBody) if err != nil { - log.Error(err, "Error while reading the response body") + otlpr.WithContext(log, ctx).Error(err, "Error while reading the response body") return nil, 500, err } - log.Info("Print Previous Response Bytes", "Previous Response Bytes", - responseBytes, "Previous Status Code", statusCode) + if debugRequestLogs { + otlpr.WithContext(log, ctx).Info("Print Previous Response Bytes", "Previous Response Bytes", string(responseBytes[:]), "Previous Status Code", statusCode) + } err = responseBody.Close() if err != nil { - log.Error(err, "Error while trying to close the responseBody in handleSwitchPipeline") + otlpr.WithContext(log, ctx).Error(err, "Error while trying to close the responseBody in handleSwitchPipeline") } } - log.Info("Print Original Request Bytes", "Request Bytes", request) + if debugRequestLogs { + otlpr.WithContext(log, ctx).Info("Print Original Request Bytes", "Request Bytes", string(request[:])) + } if route.Data == "$response" && index > 0 { - request = mergeRequests(responseBytes, initReqData) + request = mergeRequests(ctx, responseBytes, initReqData) + } + if debugRequestLogs { + otlpr.WithContext(log, ctx).Info("Print New Request Bytes", "Request Bytes", string(request[:])) } - log.Info("Print New Request Bytes", "Request Bytes", request) - responseBody, statusCode, err = handleSwitchNode(&route, graph, initInput, request, headers) + responseBody, statusCode, err = handleSwitchNode(ctx, &route, graph, initInput, request, headers) if err != nil { return nil, statusCode, err } @@ -357,7 +622,9 @@ func handleSwitchPipeline(nodeName string, return responseBody, statusCode, err } -func handleEnsemblePipeline(nodeName string, +func handleEnsemblePipeline( + ctx context.Context, + nodeName string, graph mcv1alpha3.GMConnector, initInput []byte, input []byte, @@ -372,15 +639,15 @@ func handleEnsemblePipeline(nodeName string, if step.NodeName != "" { stepType = ServiceNode } - log.Info("Starting execution of step", "type", stepType, "stepName", step.StepName) + otlpr.WithContext(log, ctx).Info("Starting execution of step", "type", stepType, "stepName", step.StepName) resultChan := make(chan EnsembleStepOutput) ensembleRes[i] = resultChan go func() { - responseBody, statusCode, err := executeStep(step, graph, initInput, input, headers) + responseBody, statusCode, err := executeStep(ctx, step, graph, initInput, input, headers) if err == nil { output, rerr := io.ReadAll(responseBody) if rerr != nil { - log.Error(rerr, "Error while reading the response body") + otlpr.WithContext(log, ctx).Error(rerr, "Error while reading the response body") } var res map[string]interface{} if err = json.Unmarshal(output, &res); err == nil { @@ -393,7 +660,7 @@ func handleEnsemblePipeline(nodeName string, } rerr := responseBody.Close() if rerr != nil { - log.Error(rerr, "Error while trying to close the responseBody in handleEnsemblePipeline") + otlpr.WithContext(log, ctx).Error(rerr, "Error while trying to close the responseBody in handleEnsemblePipeline") } errChan <- err }() @@ -409,13 +676,7 @@ func handleEnsemblePipeline(nodeName string, select { case ensembleStepOutput = <-resultChan: if !isSuccessFul(ensembleStepOutput.StepStatusCode) && currentNode.Steps[i].Dependency == mcv1alpha3.Hard { - log.Info( - "This step is a hard dependency and it is unsuccessful", - "stepName", - currentNode.Steps[i].StepName, - "statusCode", - ensembleStepOutput.StepStatusCode, - ) + otlpr.WithContext(log, ctx).Info("This step is a hard dependency and it is unsuccessful", "stepName", currentNode.Steps[i].StepName, "statusCode", ensembleStepOutput.StepStatusCode) stepResponse, _ := json.Marshal(ensembleStepOutput.StepResponse) stepIOReader := NewReadCloser(stepResponse) return stepIOReader, ensembleStepOutput.StepStatusCode, nil @@ -432,7 +693,9 @@ func handleEnsemblePipeline(nodeName string, return combinedIOReader, 200, nil } -func handleSequencePipeline(nodeName string, +func handleSequencePipeline( + ctx context.Context, + nodeName string, graph mcv1alpha3.GMConnector, initInput []byte, input []byte, @@ -446,140 +709,271 @@ func handleSequencePipeline(nodeName string, initReqData := make(map[string]interface{}) if err = json.Unmarshal(initInput, &initReqData); err != nil { - log.Error(err, "Error unmarshaling initReqData:") + otlpr.WithContext(log, ctx).Error(err, "Error unmarshaling initReqData:") return nil, 500, err } for i := range currentNode.Steps { + + stepStartTime := time.Now() + stepTracer := otel.GetTracerProvider().Tracer(OtelNamespace + "/steptracer") + step := ¤tNode.Steps[i] stepType := ServiceURL if step.NodeName != "" { stepType = ServiceNode } if step.InternalService.IsDownstreamService { - log.Info( - "InternalService DownstreamService is true, skip the execution of step", - "type", - stepType, - "stepName", - step.StepName, - ) + otlpr.WithContext(log, ctx).Info("InternalService DownstreamService is true, skip the execution of step", "type", stepType, "stepName", step.StepName) continue } - log.Info("Starting execution of step", "type", stepType, "stepName", step.StepName) + ctx, stepSpan := stepTracer.Start(ctx, "step "+step.StepName) + stepSpan.SetAttributes(attribute.String("stepType", stepType), attribute.String("stepName", step.StepName)) + otlpr.WithContext(log, ctx).Info("Starting execution of step", "type", stepType, "stepName", step.StepName) request := input - log.Info("Print Original Request Bytes", "Request Bytes", request) + if debugRequestLogs { + otlpr.WithContext(log, ctx).Info("Print Original Request Bytes", "Request Bytes", string(request[:])) + } + if responseBody != nil { responseBytes, err = io.ReadAll(responseBody) if err != nil { - log.Error(err, "Error while reading the response body") + otlpr.WithContext(log, ctx).Error(err, "Error while reading the response body") + stepSpan.RecordError(err) + stepSpan.SetStatus(codes.Error, err.Error()) + if debugRequestTraces { + stepSpan.SetAttributes(attribute.String("failed response", string(responseBytes[:]))) + } + stepSpan.End() return nil, 500, err } - log.Info("Print Previous Response Bytes", "Previous Response Bytes", - responseBytes, "Previous Status Code", statusCode) + if debugRequestLogs { + otlpr.WithContext(log, ctx).Info("Print Previous Response Bytes", "Previous Response Bytes", string(responseBytes[:]), "Previous Status Code", statusCode) + } + if debugRequestTraces { + stepSpan.SetAttributes(attribute.String("previous response", string(responseBytes[:]))) + } + stepSpan.SetAttributes(attribute.Int("previous response size bytes", len(responseBytes))) err := responseBody.Close() if err != nil { - log.Error(err, "Error while trying to close the responseBody in handleSequencePipeline") + otlpr.WithContext(log, ctx).Error(err, "Error while trying to close the responseBody in handleSequencePipeline") } } if step.Data == "$response" && i > 0 { - request = mergeRequests(responseBytes, initReqData) + request = mergeRequests(ctx, responseBytes, initReqData) + } + if debugRequestLogs { + otlpr.WithContext(log, ctx).Info("Print New Request Bytes", "Request Bytes", string(request[:])) } - log.Info("Print New Request Bytes", "Request Bytes", request) + if debugRequestTraces { + stepSpan.SetAttributes(attribute.String("new request", string(request[:]))) + } + stepSpan.SetAttributes(attribute.Int("request.size.bytes", len(request))) if step.Condition != "" { if !gjson.ValidBytes(responseBytes) { - return nil, 500, fmt.Errorf("invalid response") + invalidBytesError := fmt.Errorf("invalid response") + stepSpan.RecordError(err) + stepSpan.SetStatus(codes.Error, invalidBytesError.Error()) + stepSpan.End() + return nil, 500, invalidBytesError } // if the condition does not match for the step in the sequence we stop and return the response if !gjson.GetBytes(responseBytes, step.Condition).Exists() { return responseBody, 500, nil } } - if responseBody, statusCode, err = executeStep(step, graph, initInput, request, headers); err != nil { + if responseBody, statusCode, err = executeStep(ctx, step, graph, initInput, request, headers); err != nil { + stepSpan.RecordError(err) + stepSpan.SetStatus(codes.Error, err.Error()) + stepSpan.End() return nil, 500, err } + + stepLatencyMilliseconds := float64(time.Since(stepStartTime)) / float64(time.Millisecond) + stepLatencyMeasure.Record(ctx, stepLatencyMilliseconds, api.WithAttributes(attribute.Int("statusCode", statusCode), attribute.String("stepName", step.StepName))) + + stepSpan.SetAttributes(attribute.Int("statusCode", statusCode)) + stepSpan.SetAttributes(attribute.Float64("llm.step.latency.ms", stepLatencyMilliseconds)) + /* Only if a step is a hard dependency, we will check for its success. */ if step.Dependency == mcv1alpha3.Hard { if !isSuccessFul(statusCode) { - log.Info( - "This step is a hard dependency and it is unsuccessful", - "stepName", - step.StepName, - "statusCode", - statusCode, - ) // Stop the execution of sequence right away if step is a hard dependency and is unsuccessful + otlpr.WithContext(log, ctx).Info("This step is a hard dependency and it is unsuccessful. Stop pipeline execution.", "stepName", step.StepName, "statusCode", statusCode) + // err is nil here, so we cannot record any details about this unsuccesful response without parsing the responseBody. + err := fmt.Errorf("This step (stepName=%s) is a hard dependency and it is unsuccessful with statusCode=%d. Stop pipeline execution.", step.StepName, statusCode) + stepSpan.RecordError(err) + stepSpan.SetStatus(codes.Error, err.Error()) + stepSpan.End() return responseBody, statusCode, nil } } + stepSpan.End() } return responseBody, statusCode, nil } -func routeStep(nodeName string, +func routeStep( + ctx context.Context, + nodeName string, graph mcv1alpha3.GMConnector, initInput, input []byte, headers http.Header, ) (io.ReadCloser, int, error) { - defer timeTrack(time.Now(), "node", nodeName) + defer timeTrack(ctx, time.Now(), "node", nodeName) currentNode := graph.Spec.Nodes[nodeName] - log.Info("Current Node", "Node Name", nodeName) + otlpr.WithContext(log, ctx).Info("Current Node", "Node Name", nodeName) if currentNode.RouterType == mcv1alpha3.Switch { - return handleSwitchPipeline(nodeName, graph, initInput, input, headers) + return handleSwitchPipeline(ctx, nodeName, graph, initInput, input, headers) } if currentNode.RouterType == mcv1alpha3.Ensemble { - return handleEnsemblePipeline(nodeName, graph, initInput, input, headers) + return handleEnsemblePipeline(ctx, nodeName, graph, initInput, input, headers) } if currentNode.RouterType == mcv1alpha3.Sequence { - return handleSequencePipeline(nodeName, graph, initInput, input, headers) + return handleSequencePipeline(ctx, nodeName, graph, initInput, input, headers) } - log.Error(nil, "invalid route type", "type", currentNode.RouterType) + otlpr.WithContext(log, ctx).Error(nil, "invalid route type", "type", currentNode.RouterType) return nil, 500, fmt.Errorf("invalid route type: %v", currentNode.RouterType) } func mcGraphHandler(w http.ResponseWriter, req *http.Request) { - ctx, cancel := context.WithTimeout(req.Context(), time.Minute) + ctx, cancel := context.WithTimeout(req.Context(), GraphHandlerTimeoutSeconds*time.Second) defer cancel() done := make(chan struct{}) go func() { defer close(done) + mainTracer := otel.GetTracerProvider().Tracer(OtelNamespace + "graphtracer") + _, spanReadInitialRequest := mainTracer.Start(ctx, "read initial request") + + // Return x-trace-id to the user, for debbugging purposes + w.Header().Set("x-trace-id", spanReadInitialRequest.SpanContext().TraceID().String()) + + // ### Example event + // uk := attribute.Key("foo") + // bag := baggage.FromContext(ctx) + // spanReadInitialRequest.AddEvent("handling this...", trace.WithAttributes(uk.String(bag.Member("bar").Value()))) + + // ---------------------- ReadRequestBody + allTokensStartTime := time.Now() inputBytes, err := io.ReadAll(req.Body) if err != nil { - log.Error(err, "failed to read request body") + otlpr.WithContext(log, ctx).Error(err, "failed to read request body") + spanReadInitialRequest.RecordError(err) + spanReadInitialRequest.SetStatus(codes.Error, err.Error()) + spanReadInitialRequest.End() http.Error(w, "failed to read request body", http.StatusBadRequest) return } + if debugRequestLogs { + otlpr.WithContext(log, ctx).Info("Data from input request", "inputBytes", string(inputBytes[:])) + } + if debugRequestTraces { + spanReadInitialRequest.SetAttributes(attribute.String("initial request", string(inputBytes[:]))) + } + spanReadInitialRequest.SetAttributes(attribute.Int("initial request body size", len(inputBytes))) + spanReadInitialRequest.End() + + // ---------------------- RouterAllSteps + allStepsCtx, spanRouterAllSteps := mainTracer.Start(ctx, "router all steps") // this context will be used for callClient instrumenation (POSTs) + responseBody, statusCode, err := routeStep(allStepsCtx, defaultNodeName, *mcGraph, inputBytes, inputBytes, req.Header) + pipeLatencyMilliseconds := float64(time.Since(allTokensStartTime)) / float64(time.Millisecond) + pipelineLatencyMeasure.Record(ctx, pipeLatencyMilliseconds) + + spanRouterAllSteps.SetAttributes(attribute.Int("last_step.statusCode", statusCode)) + spanRouterAllSteps.SetAttributes(attribute.Float64("llm.pipeline.latency.ms", pipeLatencyMilliseconds)) + + if statusCode == 466 { // Guardrails code! + // Info: statusCode != 200 is unrealted to err being nil or not and for Guardrails err is nil + otlpr.WithContext(log, ctx).Info("Guardrails activated!") + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + respBytes, err := io.ReadAll(responseBody) + if debugRequestLogs { + otlpr.WithContext(log, ctx).Info("Print the http response body", "body", string(respBytes[:])) + } + if debugRequestTraces { + spanRouterAllSteps.SetAttributes(attribute.String("response body", string(respBytes[:]))) + } + if err != nil { + otlpr.WithContext(log, ctx).Error(err, "failed to read all request body from guardrails") + spanRouterAllSteps.RecordError(err) + spanRouterAllSteps.SetStatus(codes.Error, err.Error()) + spanRouterAllSteps.End() + http.Error(w, "failed to read request body", http.StatusBadRequest) + return + } + + w.Write(prepareErrorResponse(err, string(respBytes))) + spanRouterAllSteps.End() + return + } - responseBody, statusCode, err := routeStep(defaultNodeName, *mcGraph, inputBytes, inputBytes, req.Header) if err != nil { - log.Error(err, "failed to process request") + otlpr.WithContext(log, ctx).Error(err, "failed to process request") + spanRouterAllSteps.RecordError(err) + spanRouterAllSteps.SetStatus(codes.Error, err.Error()) + spanRouterAllSteps.End() w.Header().Set("Content-Type", "application/json") w.WriteHeader(statusCode) if _, err := w.Write(prepareErrorResponse(err, "Failed to process request")); err != nil { - log.Error(err, "failed to write mcGraphHandler response") + otlpr.WithContext(log, ctx).Error(err, "failed to write mcGraphHandler response") } return } + + // Close span if there was not err and not guardarils were activated + spanRouterAllSteps.End() // end "router all steps" span + defer func() { err := responseBody.Close() if err != nil { - log.Error(err, "Error while trying to close the responseBody in mcGraphHandler") + otlpr.WithContext(log, ctx).Error(err, "Error while trying to close the responseBody in mcGraphHandler") } }() w.Header().Set("Content-Type", "application/json") + firstTokenCollected := false + firstTokenLatencyMilliseconds := 0.0 + nextTokenLatencyTotal := 0.0 + nextTokenLatencyCount := 0.0 buffer := make([]byte, BufferSize) + // ---------------------- Tokens + ctx, spanTokens := mainTracer.Start(ctx, "tokens") for { + + // DETAILED spans (disabled because number of tokens generated!) + // _, span = mainTracer.Start(ctx, "read response partial") + + // measure time of reading another portion of response + tokenStartTime := time.Now() n, err := responseBody.Read(buffer) + + // span.End() // "read response partial" + + elapsedTimeMilisecond := float64(time.Since(tokenStartTime)) / float64(time.Millisecond) + + if !firstTokenCollected { + firstTokenCollected = true + firstTokenLatencyMeasure.Record(ctx, elapsedTimeMilisecond) + firstTokenLatencyMilliseconds = elapsedTimeMilisecond + } else { + nextTokenLatencyMeasure.Record(ctx, elapsedTimeMilisecond) + nextTokenLatencyTotal += elapsedTimeMilisecond + nextTokenLatencyCount += 1.0 + } + if err != nil && err != io.EOF { - log.Error(err, "failed to read from response body") + otlpr.WithContext(log, ctx).Error(err, "failed to read from response body") + spanTokens.RecordError(err) + spanTokens.SetStatus(codes.Error, err.Error()) + spanTokens.End() http.Error(w, "failed to read from response body", http.StatusInternalServerError) return } @@ -589,29 +983,53 @@ func mcGraphHandler(w http.ResponseWriter, req *http.Request) { // Write the chunk to the ResponseWriter if _, err := w.Write(buffer[:n]); err != nil { - log.Error(err, "failed to write to ResponseWriter") + otlpr.WithContext(log, ctx).Error(err, "failed to write to ResponseWriter") + spanTokens.RecordError(err) + spanTokens.SetStatus(codes.Error, err.Error()) + spanTokens.End() return } + // Flush the data to the client immediately if flusher, ok := w.(http.Flusher); ok { flusher.Flush() } else { - log.Error(errors.New("unable to flush data"), "ResponseWriter does not support flushing") + err := errors.New("unable to flush data") + otlpr.WithContext(log, ctx).Error(err, "ResponseWriter does not support flushing") + spanTokens.RecordError(err) + spanTokens.SetStatus(codes.Error, err.Error()) + spanTokens.End() return } } + + // Statisitcs for metrics and traces attributes + allTokensElapsedTimeMilisecond := float64(time.Since(allTokensStartTime)) / float64(time.Millisecond) + allTokenLatencyMeasure.Record(ctx, allTokensElapsedTimeMilisecond) + spanTokens.SetAttributes(attribute.Float64("llm.first.token.latency.ms", firstTokenLatencyMilliseconds)) + spanTokens.SetAttributes(attribute.Float64("llm.next.token.latency.total.ms", nextTokenLatencyTotal)) + spanTokens.SetAttributes(attribute.Float64("llm.next.token.latency.count", nextTokenLatencyCount)) + spanTokens.SetAttributes(attribute.Float64("llm.next.token.latency.avg.ms", nextTokenLatencyTotal/nextTokenLatencyCount)) + spanTokens.SetAttributes(attribute.Float64("llm.all.token.latency.ms", allTokensElapsedTimeMilisecond)) + if debugRequestTraces { + spanTokens.SetAttributes(attribute.String("response buffer", string(buffer[:]))) + } + spanTokens.SetStatus(codes.Ok, "response send") + spanTokens.End() + }() select { case <-ctx.Done(): - log.Error(errors.New("request timed out"), "failed to process request") + otlpr.WithContext(log, ctx).Error(errors.New("request timed out"), "failed to process request") http.Error(w, "request timed out", http.StatusGatewayTimeout) case <-done: - log.Info("mcGraphHandler is done") + otlpr.WithContext(log, ctx).Info("mcGraphHandler is done") } } func mcDataHandler(w http.ResponseWriter, r *http.Request) { + ctx := context.Background() var isDataHandled bool serviceName := r.Header.Get("SERVICE_NAME") defaultNode := mcGraph.Spec.Nodes[defaultNodeName] @@ -621,9 +1039,9 @@ func mcDataHandler(w http.ResponseWriter, r *http.Request) { if serviceName != "" && serviceName != step.InternalService.ServiceName { continue } - log.Info("Starting execution of step", "stepName", step.StepName) + otlpr.WithContext(log, ctx).Info("Starting execution of step", "stepName", step.StepName) serviceURL := getServiceURLByStepTarget(step, mcGraph.Namespace) - log.Info("ServiceURL is", "serviceURL", serviceURL) + otlpr.WithContext(log, ctx).Info("ServiceURL is", "serviceURL", serviceURL) // Parse the multipart form in the request // err := r.ParseMultipartForm(64 << 20) // 64 MB is the default used by ParseMultipartForm @@ -658,7 +1076,7 @@ func mcDataHandler(w http.ResponseWriter, r *http.Request) { } defer func() { if err := file.Close(); err != nil { - log.Error(err, "error closing file") + otlpr.WithContext(log, ctx).Error(err, "error closing file") } }() part, err := writer.CreateFormFile(key, fileHeader.Filename) @@ -702,7 +1120,7 @@ func mcDataHandler(w http.ResponseWriter, r *http.Request) { } defer func() { if err := resp.Body.Close(); err != nil { - log.Error(err, "error closing response body stream") + otlpr.WithContext(log, ctx).Error(err, "error closing response body stream") } }() // Copy the response headers from the backend service to the original client @@ -715,7 +1133,7 @@ func mcDataHandler(w http.ResponseWriter, r *http.Request) { // Copy the response body from the backend service to the original client _, err = io.Copy(w, resp.Body) if err != nil { - log.Error(err, "failed to copy response body") + otlpr.WithContext(log, ctx).Error(err, "failed to copy response body") } isDataHandled = true } @@ -725,7 +1143,7 @@ func mcDataHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(404) if _, err := w.Write([]byte("\n Message: None dataprep endpoint is available! \n")); err != nil { - log.Info("Message: ", "failed to write mcDataHandler response") + otlpr.WithContext(log, ctx).Info("Message: ", "failed to write mcDataHandler response") } } } @@ -741,185 +1159,45 @@ func handleMultipartError(writer *multipart.Writer, err error) { log.Error(err, "Error during multipart creation") } -// create a handler to handle traffic to /ui -// if the payload is empty, redirect to ui service -// if there's payload, format the payload and redirect to backend service -func mcUiHandler(w http.ResponseWriter, req *http.Request) { - // redirect traffic to ui pod if payload is empty - // redirect traffic to mcGraphHandler if payload is not empty - var finishProcessing bool - defaultNode := mcGraph.Spec.Nodes[defaultNodeName] - for i := range defaultNode.Steps { - step := &defaultNode.Steps[i] - if UI == step.StepName { - body, err := io.ReadAll(req.Body) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - // if no payload included in the request, redirect request to UI - if len(body) == 0 { - serviceURL := getServiceURLByStepTarget(step, mcGraph.Namespace) - targetURL, err := url.Parse(serviceURL) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - proxy := httputil.NewSingleHostReverseProxy(targetURL) - proxy.ServeHTTP(w, req) - finishProcessing = true - } else { - // if payload exists, format payload and redirect to backend services - var data map[string]interface{} - parsedData := map[string]interface{}{} - - // find the first hop for the pipeline - var nextHop *mcv1alpha3.Step - for i := range defaultNode.Steps { - nextHop = &defaultNode.Steps[i] - if nextHop.InternalService.IsDownstreamService { - // skip downstream service - continue - } - break - } - - // set the corresponding keyword for input data - var key string - switch nextHop.StepName { - case "Embedding": - key = EmbeddingKeyword - case "Llm": - key = LLMKeyword - default: - log.Info("Unsupported step. Failed to find the corresponding keyword. Using default one...") - key = LLMKeyword - } - - // resolve the data input - err = json.Unmarshal(body, &data) - if err != nil { - log.Error(err, "Failed to parse data.") - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - // check if the pattern "messages" exists - if val, ok := data["messages"]; ok { - switch ms := val.(type) { - // value is in the format of string - case string: - parsedData[key] = ms - // value is in the format of array - case []interface{}: - for _, item := range ms { - // check if item is in the format of map - if m, ok := item.(map[string]interface{}); ok { - // find the key "role" - if v, ok := m["role"]; ok { - strRole := fmt.Sprintf("%v", v) - content := "" - // find the key "content" - // currently consume everything as string - if c, ok := m["content"]; ok { - content = fmt.Sprintf("%v", c) - } - switch strRole { - // concatenate system prompt - case "system": - parsedData[key] = content + "/n" - // concatenate others - default: - parsedData[key] = fmt.Sprintf("%v", parsedData[key]) + strRole + ":" + content + "/n" - } - } - } - } - // others - default: - log.Error(UnknownErr, "Unknown format in payload messages.") - http.Error(w, err.Error(), http.StatusInternalServerError) - } - } - - // attach the default llm parameters if the llm is the first hop - if key == LLMKeyword { - for k, v := range defaultLlmParams { - parsedData[k] = v - } - } +func initializeRoutes() *http.ServeMux { + mux := http.NewServeMux() - // append rest of the data - // treat everything as string - for k, v := range data { - if k == "messages" { - continue + // Wrap connector handlers with otelhttp wrappers + // "http.server.request.size" - Int64Counter - "Measures the size of HTTP request messages" (Incoming request bytes total) + // "http.server.response.size" - Int64Counter - "Measures the size of HTTP response messages" (Incoming response bytes total) + // "http.server.duration" - Float64histogram "Measures the duration of inbound HTTP requests." (Incoming end to end duration, milliseconds) + handleFunc := func(pattern string, handlerFunc func(http.ResponseWriter, *http.Request), operation string) { + // Wrap with otelhttp handler. + handler := otelhttp.NewHandler( + otelhttp.WithRouteTag(pattern, http.HandlerFunc(handlerFunc)), + operation, + otelhttp.WithFilter(func(r *http.Request) bool { + for _, excludedUrl := range OtelExcludedUrls { + if r.RequestURI == excludedUrl { + return false } - parsedData[k] = v } + return true + }), + ) + mux.Handle(pattern, handler) - marshalData, err := json.Marshal(parsedData) - if err != nil { - log.Error(err, "Failed to marshal prompt to json") - http.Error(w, err.Error(), http.StatusInternalServerError) - } - - // create a new http request with the formatted data - proxyReq, err := http.NewRequest(req.Method, req.URL.String(), bytes.NewReader(marshalData)) - if err != nil { - log.Error(err, "Failed to generate new http request with formatted payload.") - http.Error(w, err.Error(), http.StatusInternalServerError) - } - mcGraphHandler(w, proxyReq) - finishProcessing = true - } - - } - } - // check if the request has been processed - if !finishProcessing { - log.Info("UI step not found in the graph or other errors happened. Please check the logs.") + // Original code with wrapping with OTLP. + // mux.Handle(pattern, http.HandlerFunc(handlerFunc)) } -} -// accessing UI result in access to assets under /assets uri -// create a handler to redirect that request to ui endpoint -func mcAssetHandler(w http.ResponseWriter, req *http.Request) { - // Determine the asset type based on the URL path - defaultNode := mcGraph.Spec.Nodes[defaultNodeName] - found := false - for i := range defaultNode.Steps { - step := &defaultNode.Steps[i] - if UI == step.StepName { - serviceURL := getServiceURLByStepTarget(step, mcGraph.Namespace) - targetURL, err := url.Parse(serviceURL) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - proxy := httputil.NewSingleHostReverseProxy(targetURL) - proxy.ServeHTTP(w, req) - return - } - } - if !found { - log.Info("UI step not found in the graph.") - } -} + handleFunc("/", mcGraphHandler, OtelNamespace+"/mcGraphHandler") + handleFunc("/dataprep", mcDataHandler, OtelNamespace+"/mcDataHandler") + + promHandler := promhttp.Handler() + handleFunc("/metrics", promHandler.ServeHTTP, OtelNamespace+"metrics") + log.Info("Metrics exposed on /metrics.", "version", OtelVersion) -func initializeRoutes() *http.ServeMux { - mux := http.NewServeMux() - mux.HandleFunc("/", mcGraphHandler) - mux.HandleFunc("/dataprep", mcDataHandler) - mux.HandleFunc("/assets/", mcAssetHandler) - mux.HandleFunc("/ui", mcUiHandler) return mux } func main() { flag.Parse() - logf.SetLogger(zap.New()) mcGraph = &mcv1alpha3.GMConnector{} err := json.Unmarshal([]byte(*jsonGraph), mcGraph) @@ -928,6 +1206,7 @@ func main() { os.Exit(1) } + log.Info("Listen on :8080", "GraphTimeout(s):", CallClientTimeoutSeconds, "CallClientTimeout(s):", GraphHandlerTimeoutSeconds) mcRouter := initializeRoutes() server := &http.Server{ @@ -936,9 +1215,9 @@ func main() { // specify the HTTP routers Handler: mcRouter, // set the maximum duration for reading the entire request, including the body - ReadTimeout: time.Minute, + ReadTimeout: GraphHandlerTimeoutSeconds * time.Second, // set the maximum duration before timing out writes of the response - WriteTimeout: time.Minute, + WriteTimeout: GraphHandlerTimeoutSeconds * time.Second, // set the maximum amount of time to wait for the next request when keep-alive are enabled IdleTimeout: 3 * time.Minute, } diff --git a/microservices-connector/cmd/router/main_test.go b/microservices-connector/cmd/router/main_test.go index 6b8d3f88c..9f513b840 100644 --- a/microservices-connector/cmd/router/main_test.go +++ b/microservices-connector/cmd/router/main_test.go @@ -112,7 +112,8 @@ func TestSimpleModelChainer(t *testing.T) { "Authorization": {"Bearer Token"}, } - res, _, err := routeStep("root", gmcGraph, jsonBytes, jsonBytes, headers) + ctx := context.Background() + res, _, err := routeStep(ctx, "root", gmcGraph, jsonBytes, jsonBytes, headers) if err != nil { return } @@ -218,7 +219,8 @@ func TestSimpleServiceEnsemble(t *testing.T) { headers := http.Header{ "Authorization": {"Bearer Token"}, } - res, _, err := routeStep("root", gmcGraph, jsonBytes, jsonBytes, headers) + ctx := context.Background() + res, _, err := routeStep(ctx, "root", gmcGraph, jsonBytes, jsonBytes, headers) if err != nil { return } @@ -458,7 +460,8 @@ func TestMCWithCondition(t *testing.T) { headers := http.Header{ "Authorization": {"Bearer Token"}, } - res, _, err := routeStep("root", gmcGraph, jsonBytes, jsonBytes, headers) + ctx := context.Background() + res, _, err := routeStep(ctx, "root", gmcGraph, jsonBytes, jsonBytes, headers) if err != nil { return } @@ -547,7 +550,8 @@ func TestCallServiceWhenNoneHeadersToPropagateIsEmpty(t *testing.T) { Condition: "instances.#(modelId==\"1\")", } - res, _, err := callService(step, service1Url.String(), jsonBytes, headers) + ctx := context.Background() + res, _, err := callService(ctx, step, service1Url.String(), jsonBytes, headers) if err != nil { return } @@ -581,7 +585,9 @@ func TestMalformedURL(t *testing.T) { }, Condition: "instances.#(modelId==\"1\")", } - _, response, err := callService(step, malformedURL, []byte{}, http.Header{}) + + ctx := context.Background() + _, response, err := callService(ctx, step, malformedURL, []byte{}, http.Header{}) if err != nil { assert.Equal(t, 500, response) } diff --git a/microservices-connector/config/crd/bases/gmc.opea.io_gmconnectors.yaml b/microservices-connector/config/crd/bases/gmc.opea.io_gmconnectors.yaml index 058a2b4bf..19f9dc22c 100644 --- a/microservices-connector/config/crd/bases/gmc.opea.io_gmconnectors.yaml +++ b/microservices-connector/config/crd/bases/gmc.opea.io_gmconnectors.yaml @@ -6,7 +6,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.14.0 + controller-gen.kubebuilder.io/version: v0.15.0 name: gmconnectors.gmc.opea.io spec: group: gmc.opea.io diff --git a/microservices-connector/config/gmcrouter/gmc-router.yaml b/microservices-connector/config/gmcrouter/gmc-router.yaml index a0b9901b0..b6177cbcf 100644 --- a/microservices-connector/config/gmcrouter/gmc-router.yaml +++ b/microservices-connector/config/gmcrouter/gmc-router.yaml @@ -20,8 +20,8 @@ spec: serviceAccountName: default containers: - name: router-server - image: opea/gmcrouter:latest - imagePullPolicy: IfNotPresent + image: localhost:5000/opea/gmcrouter:latest + imagePullPolicy: Always ports: - containerPort: 8080 env: @@ -31,6 +31,28 @@ spec: value: {{.HttpProxy}} - name: https_proxy value: {{.HttpsProxy}} + ### OTEL/Tracing + # target + - name: OTEL_EXPORTER_OTLP_ENDPOINT + value: "http://otelcol-traces-collector.monitoring-traces:4318" + # exclusion + - name: OTEL_GO_EXCLUDED_URLS + value: "/metrics" + # identification + - name: OTEL_SERVICE_NAME + value: "{{.Namespace}}/{{.DplymntName}}" + # identification (namespace is used to distinguish different span names for different router-instances) used by router not OTEL + - name: OTEL_NAMESPACE + value: "{{.Namespace}}" + ### TODO: Enabling this breaks traces->logs correlation when query in Grafana + # - name: OTEL_RESOURCE_ATTRIBUTES + # value: "namespace={{.Namespace}}" + # ratio: 0 never and 1 always with 0.5 half of queries will be traced + - name: OTEL_TRACES_SAMPLER_FRACTION + value: "1.0" + ### OTEL/Logs: Warning: Enabling logs through collector disables logs to stdout + # - name: OTEL_LOGS_GRPC_ENDPOINT + # value: "otelcol-traces-collector.monitoring-traces:4317" args: - "--graph-json" - {{.GRAPH_JSON}} diff --git a/microservices-connector/config/manager/kustomization.yaml b/microservices-connector/config/manager/kustomization.yaml index e77eb05ea..9c0f7be5b 100644 --- a/microservices-connector/config/manager/kustomization.yaml +++ b/microservices-connector/config/manager/kustomization.yaml @@ -7,5 +7,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: - name: controller - newName: controller + newName: localhost:5000/gmconnectortest newTag: latest diff --git a/microservices-connector/helm/templates/configmap.yaml b/microservices-connector/helm/templates/configmap.yaml index 806a16db0..8ed732c87 100644 --- a/microservices-connector/helm/templates/configmap.yaml +++ b/microservices-connector/helm/templates/configmap.yaml @@ -1,5 +1,6 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +# TODO: these makefiles should became templates apiVersion: v1 kind: ConfigMap @@ -8,6 +9,48 @@ metadata: labels: {{- include "gmc.labels" . | nindent 4 }} data: -{{ (.Files.Glob "manifests_common/*.yaml").AsConfig | indent 2 }} + {{- range $path, $_ := .Files.Glob "manifests_common/*.yaml" }} + {{ $filename := index (splitList "." ($path | base)) 0 }} + {{ $imageKey := index $.Values.images $filename }} + {{ $hugToken := required "The hugToken value is required and must be set in values.yaml" $.Values.tokens.hugToken }} + {{ $path | base }}: | + {{- $content := $.Files.Get $path }} + {{- if and $imageKey.repository $imageKey.image $imageKey.tag }} + {{- $content = regexReplaceAll "(?s)(containers:.*?image:\\s*).*?(\n)" $content (printf "${1}%s/%s:%s${2}" $imageKey.repository $imageKey.image $imageKey.tag) }} + {{- end }} + {{- if $imageKey.envs }} + {{- range $key, $value := $imageKey.envs }} + {{- $key := $key }} + {{- $value := $value | quote }} + {{- $envVar := printf "%s: %s" $key $value | nindent 2 }} + {{- $content = regexReplaceAll "(\\s+data:.*)" $content (printf "${1}%s" $envVar) }} + {{- end }} + {{- end }} + {{- if $imageKey.envfile }} + {{- $envContent := $.Files.Get (printf "envs/%s" $imageKey.envfile) }} + {{- $envContent = regexReplaceAll "#.*" $envContent "" | trim }} + {{- $envContent = regexReplaceAll "(?m)\\s+$" $envContent "" }} + {{- $envContent = $envContent | replace "=" ": " }} + {{- $envContent = regexReplaceAll "(?m)^([^:]+):\\s*[\"']?(.*?)[\"']?$" $envContent "$1: \"$2\"" }} + {{- $envContent = $envContent | nindent 2 }} + {{- $content = regexReplaceAll "(\\s+data:.*)" $content (printf "${1}%s" $envContent) }} + {{- end }} + {{- $content = regexReplaceAll "(.*http_proxy:\\s*).*" $content (printf "$1\"%s\"" $.Values.proxy.httpProxy) }} + {{- $content = regexReplaceAll "(.*https_proxy:\\s*).*" $content (printf "$1\"%s\"" $.Values.proxy.httpsProxy) }} + {{- $content = regexReplaceAll "(.*no_proxy:\\s*).*" $content (printf "$1\"%s\"" $.Values.proxy.noProxy) }} + {{- $content = regexReplaceAll "(.*(HF_TOKEN|HUGGINGFACEHUB_API_TOKEN):\\s*.*\\n?)" $content "" }} + {{- if $.Values.imagePullSecrets }} + {{- $imagePullSecrets := toYaml $.Values.imagePullSecrets | nindent 10 }} + {{- $content = regexReplaceAllLiteral "containers:" $content (printf "imagePullSecrets:%s\n containers:" $imagePullSecrets) }} + {{- end }} + {{- $content | nindent 4 }} + {{- end }} + # router cannot be an actual template as some parameters are being set by gmc gmc-router.yaml: | - {{- .Files.Get "gmc-router.yaml" | nindent 4 }} + {{- $routerContent := .Files.Get "gmc-router.yaml" }} + {{- $routerContent = regexReplaceAll "(image:\\s*).*" $routerContent (printf "${1}%s/%s:%s" .Values.images.gmcRouter.repository .Values.images.gmcRouter.image .Values.images.gmcRouter.tag) }} + {{- if .Values.imagePullSecrets }} + {{- $imagePullSecrets := toYaml .Values.imagePullSecrets | nindent 10 }} + {{- $routerContent = regexReplaceAllLiteral "containers:" $routerContent (printf "imagePullSecrets:%s\n containers:" $imagePullSecrets) }} + {{- end }} + {{- $routerContent | nindent 4 }} diff --git a/microservices-connector/helm/values.yaml b/microservices-connector/helm/values.yaml index 8fbdd918a..67545540d 100644 --- a/microservices-connector/helm/values.yaml +++ b/microservices-connector/helm/values.yaml @@ -7,13 +7,114 @@ replicaCount: 1 -image: - repository: opea/gmcmanager - pullPolicy: IfNotPresent - # Overrides the image tag whose default is the chart appVersion. - tag: "latest" - -imagePullSecrets: [] +proxy: + httpProxy: "" + httpsProxy: "" + noProxy: "" + +tokens: + hugToken: "" + + +# Common tag & repository for all images. Override if needed for specific images. +common_tag: &tag "ts1734346962" +common_repository: &repo "localhost:5000" + +llm_model: &cpu_model "Intel/neural-chat-7b-v3-3" +llm_model_gaudi: &hpu_model "mistralai/Mixtral-8x7B-Instruct-v0.1" + +images: + gmcManager: + image: "opea/gmcmanager" + repository: *repo + tag: *tag + pullPolicy: Always + gmcRouter: + image: "opea/gmcrouter" + repository: *repo + tag: *tag + dataprep-usvc: + image: "opea/dataprep" + repository: *repo + tag: *tag + embedding-usvc: + image: "opea/embedding" + repository: *repo + tag: *tag + reranking-usvc: + image: "opea/reranking" + repository: *repo + tag: *tag + torchserve: + image: "opea/torchserve" + repository: *repo + tag: *tag + envfile: "src/comps/embeddings/impl/model-server/torchserve/docker/.env" + retriever-usvc: + image: "opea/retriever" + repository: *repo + tag: *tag + ingestion-usvc: + image: "opea/ingestion" + repository: *repo + tag: *tag + llm-usvc: + image: "opea/llm" + repository: *repo + tag: *tag + envfile: "src/comps/llms/impl/microservice/.env" + envs: + LLM_MODEL_NAME: *hpu_model + in-guard-usvc: + image: "opea/in-guard" + repository: *repo + tag: *tag + out-guard-usvc: + image: "opea/out-guard" + repository: *repo + tag: *tag + ui-usvc: + image: "opea/chatqna-conversation-ui" + repository: *repo + tag: *tag + tgi: + envfile: "src/comps/llms/impl/model_server/tgi/docker/.env.cpu" + envs: + LLM_TGI_MODEL_NAME: *cpu_model + tgi_gaudi: + envfile: "src/comps/llms/impl/model_server/tgi/docker/.env.hpu" + envs: + LLM_TGI_MODEL_NAME: *hpu_model + vllm_gaudi: + image: "opea/vllm-gaudi" + repository: *repo + tag: *tag + envfile: "src/comps/llms/impl/model_server/vllm/docker/.env.hpu" + envs: + LLM_VLLM_MODEL_NAME: *hpu_model + vllm: + image: "opea/vllm-cpu" + repository: *repo + tag: *tag + envfile: "src/comps/llms/impl/model_server/vllm/docker/.env.cpu" + envs: + LLM_VLLM_MODEL_NAME: *cpu_model + vllm_openvino: + image: "opea/vllm-openvino" + repository: *repo + tag: *tag + envfile: "src/comps/llms/impl/model_server/vllm/docker/.env.cpu" + envs: + LLM_VLLM_MODEL_NAME: *cpu_model + langdtct-usvc: + image: "opea/language_detection" + repository: *repo + tag: *tag + envfile: "src/comps/language_detection/impl/microservice/.env" + + +imagePullSecrets: + - name: regcred nameOverride: "" fullnameOverride: "" @@ -38,23 +139,22 @@ podSecurityContext: {} securityContext: capabilities: drop: - - ALL + - ALL runAsNonRoot: true allowPrivilegeEscalation: false resources: limits: - cpu: 500m memory: 128Mi requests: - cpu: 10m + cpu: 500m memory: 64Mi livenessProbe: httpGet: path: /healthz port: gmc - initialDelaySeconds: 15 + initialDelaySeconds: 30 periodSeconds: 20 readinessProbe: httpGet: @@ -84,3 +184,17 @@ affinity: {} service: type: ClusterIP + +pvc: + - name: model-volume-embedding + accessMode: ReadWriteOnce + namespace: chatqa + storage: 20Gi + - name: model-volume-embedding + accessMode: ReadWriteOnce + namespace: dataprep + storage: 20Gi + - name: model-volume-llm + accessMode: ReadWriteOnce + namespace: chatqa + storage: 100Gi diff --git a/microservices-connector/internal/controller/gmconnector_controller.go b/microservices-connector/internal/controller/gmconnector_controller.go index c5f0801cc..ea041f0db 100644 --- a/microservices-connector/internal/controller/gmconnector_controller.go +++ b/microservices-connector/internal/controller/gmconnector_controller.go @@ -44,6 +44,8 @@ const ( Embedding = "Embedding" TeiEmbedding = "TeiEmbedding" TeiEmbeddingGaudi = "TeiEmbeddingGaudi" + TorchserveEmbedding = "TorchserveEmbedding" + TorchserveEmbeddingGaudi = "TorchserveEmbeddingGaudi" VectorDB = "VectorDB" Retriever = "Retriever" Reranking = "Reranking" @@ -71,32 +73,44 @@ const ( SpeechT5Gaudi = "SpeechT5Gaudi" Whisper = "Whisper" WhisperGaudi = "WhisperGaudi" - UI = "UI" + LLMGuardInput = "LLMGuardInput" + LLMGuardOutput = "LLMGuardOutput" + Ingestion = "Ingestion" + VLLMGaudi = "VLLMGaudi" + VLLM = "VLLM" + VLLMOpenVino = "VLLMOpenVino" + LanguageDetection = "LanguageDetection" ) var yamlDict = map[string]string{ - TeiEmbedding: yaml_dir + "tei.yaml", - TeiEmbeddingGaudi: yaml_dir + "tei_gaudi.yaml", - Embedding: yaml_dir + "embedding-usvc.yaml", - VectorDB: yaml_dir + "redis-vector-db.yaml", - Retriever: yaml_dir + "retriever-usvc.yaml", - Reranking: yaml_dir + "reranking-usvc.yaml", - TeiReranking: yaml_dir + "teirerank.yaml", - Tgi: yaml_dir + "tgi.yaml", - TgiGaudi: yaml_dir + "tgi_gaudi.yaml", - TgiNvidia: yaml_dir + "tgi_nv.yaml", - Llm: yaml_dir + "llm-uservice.yaml", - DocSum: yaml_dir + "docsum-llm-uservice.yaml", - Router: yaml_dir + "gmc-router.yaml", - WebRetriever: yaml_dir + "web-retriever.yaml", - ASR: yaml_dir + "asr.yaml", - TTS: yaml_dir + "tts.yaml", - SpeechT5: yaml_dir + "speecht5.yaml", - SpeechT5Gaudi: yaml_dir + "speecht5_gaudi.yaml", - Whisper: yaml_dir + "whisper.yaml", - WhisperGaudi: yaml_dir + "whisper_gaudi.yaml", - DataPrep: yaml_dir + "data-prep.yaml", - UI: yaml_dir + "ui.yaml", + TeiEmbedding: yaml_dir + "tei.yaml", + TeiEmbeddingGaudi: yaml_dir + "tei_gaudi.yaml", + TorchserveEmbedding: yaml_dir + "torchserve.yaml", + Embedding: yaml_dir + "embedding-usvc.yaml", + VectorDB: yaml_dir + "redis-vector-db.yaml", + Retriever: yaml_dir + "retriever-usvc.yaml", + Reranking: yaml_dir + "reranking-usvc.yaml", + TeiReranking: yaml_dir + "teirerank.yaml", + Tgi: yaml_dir + "tgi.yaml", + TgiGaudi: yaml_dir + "tgi_gaudi.yaml", + Llm: yaml_dir + "llm-usvc.yaml", + DocSum: yaml_dir + "docsum-llm-uservice.yaml", + Router: yaml_dir + "gmc-router.yaml", + WebRetriever: yaml_dir + "web-retriever.yaml", + ASR: yaml_dir + "asr.yaml", + TTS: yaml_dir + "tts.yaml", + SpeechT5: yaml_dir + "speecht5.yaml", + SpeechT5Gaudi: yaml_dir + "speecht5_gaudi.yaml", + Whisper: yaml_dir + "whisper.yaml", + WhisperGaudi: yaml_dir + "whisper_gaudi.yaml", + DataPrep: yaml_dir + "dataprep-usvc.yaml", + LLMGuardInput: yaml_dir + "in-guard-usvc.yaml", + LLMGuardOutput: yaml_dir + "out-guard-usvc.yaml", + Ingestion: yaml_dir + "ingestion-usvc.yaml", + VLLMGaudi: yaml_dir + "vllm_gaudi.yaml", + VLLM: yaml_dir + "vllm.yaml", + VLLMOpenVino: yaml_dir + "vllm_openvino.yaml", + LanguageDetection: yaml_dir + "langdtct-usvc.yaml", } var ( @@ -128,6 +142,13 @@ func lookupManifestDir(step string) string { } } +func setEnvVars(containers []corev1.Container, envVars []corev1.EnvVar) []corev1.Container { + for i := range containers { + containers[i].Env = append(containers[i].Env, envVars...) + } + return containers +} + func (r *GMConnectorReconciler) reconcileResource(ctx context.Context, graphNs string, stepCfg *mcv1alpha3.Step, nodeCfg *mcv1alpha3.Router, graph *mcv1alpha3.GMConnector) ([]*unstructured.Unstructured, error) { if stepCfg == nil || nodeCfg == nil { return nil, errors.New("invalid svc config") @@ -201,31 +222,29 @@ func (r *GMConnectorReconciler) reconcileResource(ctx context.Context, graphNs s // append the user defined ENVs var newEnvVars []corev1.EnvVar - for name, value := range *svcCfg { - if name == "endpoint" || name == "nodes" { - continue - } - if isDownStreamEndpointKey(name) { - ds := findDownStreamService(value, stepCfg, nodeCfg) - value, err = getDownstreamSvcEndpoint(graphNs, value, ds) - if err != nil { - _log.Error(err, "Failed to find downstream service endpoint", "name", name, "value", value) - return nil, err + if svcCfg != nil { + for name, value := range *svcCfg { + if name == "endpoint" || name == "nodes" { + continue } + if isDownStreamEndpointKey(name) { + ds := findDownStreamService(value, stepCfg, nodeCfg) + value, err = getDownstreamSvcEndpoint(graphNs, value, ds) + // value = getDsEndpoint(platform, name, graphNs, ds) + if err != nil { + return nil, fmt.Errorf("failed to find downstream service endpoint %s-%s: %v", name, value, err) + } + } + itemEnvVar := corev1.EnvVar{ + Name: name, + Value: value, + } + newEnvVars = append(newEnvVars, itemEnvVar) } - itemEnvVar := corev1.EnvVar{ - Name: name, - Value: value, - } - newEnvVars = append(newEnvVars, itemEnvVar) } - if len(newEnvVars) > 0 { - for i := range deployment_obj.Spec.Template.Spec.Containers { - deployment_obj.Spec.Template.Spec.Containers[i].Env = append( - deployment_obj.Spec.Template.Spec.Containers[i].Env, - newEnvVars...) - } + deployment_obj.Spec.Template.Spec.Containers = setEnvVars(deployment_obj.Spec.Template.Spec.Containers, newEnvVars) + deployment_obj.Spec.Template.Spec.InitContainers = setEnvVars(deployment_obj.Spec.Template.Spec.InitContainers, newEnvVars) } err = scheme.Scheme.Convert(deployment_obj, obj, nil) @@ -249,9 +268,11 @@ func (r *GMConnectorReconciler) reconcileResource(ctx context.Context, graphNs s func isDownStreamEndpointKey(keyname string) bool { return keyname == "TEI_EMBEDDING_ENDPOINT" || - keyname == "TEI_RERANKING_ENDPOINT" || + keyname == "RERANKING_SERVICE_ENDPOINT" || keyname == "TGI_LLM_ENDPOINT" || - keyname == "REDIS_URL" || + keyname == "VLLM_ENDPOINT" || + keyname == "LLM_MODEL_SERVER_ENDPOINT" || + keyname == "EMBEDDING_MODEL_SERVER_ENDPOINT" || keyname == "ASR_ENDPOINT" || keyname == "TTS_ENDPOINT" || keyname == "TEI_ENDPOINT" @@ -500,7 +521,13 @@ func (r *GMConnectorReconciler) collectResourceStatus(graph *mcv1alpha3.GMConnec continue } var deploymentStatus strings.Builder - deploymentStatus.WriteString(fmt.Sprintf("Replicas: %d desired | %d updated | %d total | %d available | %d unavailable\n", + statusVerbose := "Not ready" + if deployment.Status.AvailableReplicas == *deployment.Spec.Replicas { + readyCnt += 1 + statusVerbose = "Ready" + } + deploymentStatus.WriteString(fmt.Sprintf("%s; Replicas: %d desired | %d updated | %d total | %d available | %d unavailable\n", + statusVerbose, *deployment.Spec.Replicas, deployment.Status.UpdatedReplicas, deployment.Status.Replicas, @@ -514,9 +541,6 @@ func (r *GMConnectorReconciler) collectResourceStatus(graph *mcv1alpha3.GMConnec deploymentStatus.WriteString(fmt.Sprintf(" Message: %s\n", condition.Message)) } graph.Status.Annotations[resName] = deploymentStatus.String() - if deployment.Status.AvailableReplicas == *deployment.Spec.Replicas { - readyCnt += 1 - } } } externalResourceCntStr := strings.Split(graph.Status.Status, "/")[1] diff --git a/microservices-connector/internal/controller/gmconnector_controller_test.go b/microservices-connector/internal/controller/gmconnector_controller_test.go index 60cf7b9e1..a99fdc9d1 100644 --- a/microservices-connector/internal/controller/gmconnector_controller_test.go +++ b/microservices-connector/internal/controller/gmconnector_controller_test.go @@ -311,7 +311,7 @@ var _ = Describe("GMConnector Controller", func() { }, &appsv1.Deployment{})).To(Succeed()) Expect(k8sClient.Get(ctx, types.NamespacedName{ - Name: "llm-uservice-config", + Name: "llm-usvc-config", Namespace: "default", }, &corev1.ConfigMap{})).To(Succeed()) diff --git a/microservices-connector/internal/controller/suite_test.go b/microservices-connector/internal/controller/suite_test.go index 9047358f0..35c0209c1 100644 --- a/microservices-connector/internal/controller/suite_test.go +++ b/microservices-connector/internal/controller/suite_test.go @@ -58,7 +58,7 @@ var _ = BeforeSuite(func() { templateDir + "/teirerank.yaml", templateDir + "/tgi.yaml", templateDir + "/tgi_gaudi.yaml", - templateDir + "/llm-uservice.yaml", + templateDir + "/llm-usvc.yaml", templateDir + "/docsum-llm-uservice.yaml", "../../config/gmcrouter/gmc-router.yaml", } diff --git a/microservices-connector/test/e2e/e2e_test.go b/microservices-connector/test/e2e/e2e_test.go index 28acf9ccd..47044cbd7 100644 --- a/microservices-connector/test/e2e/e2e_test.go +++ b/microservices-connector/test/e2e/e2e_test.go @@ -48,15 +48,21 @@ var _ = Describe("controller", Ordered, func() { var err error // projectimage stores the name of the image used in the example - var projectimage = "example.com/gmconnector:v0.0.1" + var registryName = "localhost:5000" + var projectimage = "gmconnectortest" By("building the manager(Operator) image") - cmd := exec.Command("make", "docker-build", fmt.Sprintf("IMG=%s", projectimage)) + cmd := exec.Command("make", "docker.build", fmt.Sprintf("DOCKER_REGISTRY=%s", registryName), fmt.Sprintf("CTR_IMG=%s", projectimage)) + _, err = utils.Run(cmd) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + + By("pushing the manager(Operator) image") + cmd = exec.Command("make", "docker.push", fmt.Sprintf("DOCKER_REGISTRY=%s", registryName), fmt.Sprintf("CTR_IMG=%s", projectimage)) _, err = utils.Run(cmd) ExpectWithOffset(1, err).NotTo(HaveOccurred()) By("loading the the manager(Operator) image on Kind") - err = utils.LoadImageToKindClusterWithName(projectimage) + err = utils.LoadImageToKindClusterWithName(registryName + "/" + projectimage) ExpectWithOffset(1, err).NotTo(HaveOccurred()) By("installing CRDs") @@ -65,7 +71,7 @@ var _ = Describe("controller", Ordered, func() { ExpectWithOffset(1, err).NotTo(HaveOccurred()) By("deploying the controller-manager") - cmd = exec.Command("make", "deploy", fmt.Sprintf("IMG=%s", projectimage)) + cmd = exec.Command("make", "deploy", fmt.Sprintf("DOCKER_REGISTRY=%s", registryName), fmt.Sprintf("CTR_IMG=%s", projectimage)) _, err = utils.Run(cmd) ExpectWithOffset(1, err).NotTo(HaveOccurred())