Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH]: add distributed tracing #2088

Merged
merged 2 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions chromadb/telemetry/opentelemetry/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import grpc
from opentelemetry.trace import StatusCode, SpanKind

from chromadb.telemetry.opentelemetry import tracer


class _ClientCallDetails(
collections.namedtuple(
Expand Down Expand Up @@ -36,6 +34,8 @@ class OtelInterceptor(
grpc.StreamStreamClientInterceptor,
):
def _intercept_call(self, continuation, client_call_details, request_or_iterator):
from chromadb.telemetry.opentelemetry import tracer

if tracer is None:
return continuation(client_call_details, request_or_iterator)
with tracer.start_as_current_span(
Expand Down
11 changes: 10 additions & 1 deletion go/cmd/logservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/chroma-core/chroma/go/pkg/proto/logservicepb"
"github.com/chroma-core/chroma/go/pkg/utils"
libs "github.com/chroma-core/chroma/go/shared/libs"
"github.com/chroma-core/chroma/go/shared/otel"
"github.com/pingcap/log"
"github.com/rs/zerolog"
"go.uber.org/automaxprocs/maxprocs"
Expand All @@ -18,6 +19,7 @@ import (

func main() {
ctx := context.Background()

// Configure logger
utils.LogLevel = zerolog.DebugLevel
utils.ConfigureLogger()
Expand All @@ -26,6 +28,13 @@ func main() {
}
log.Info("Starting log service")
config := configuration.NewLogServiceConfiguration()
err := otel.InitTracing(ctx, &otel.TracingConfig{
Service: "log-service",
Endpoint: config.OPTL_TRACING_ENDPOINT,
})
if err != nil {
log.Fatal("failed to initialize tracing", zap.Error(err))
}
conn, err := libs.NewPgConnection(ctx, config)
if err != nil {
log.Fatal("failed to connect to postgres", zap.Error(err))
Expand All @@ -37,7 +46,7 @@ func main() {
if err != nil {
log.Fatal("failed to listen", zap.Error(err))
}
s := grpc.NewServer()
s := grpc.NewServer(grpc.UnaryInterceptor(otel.ServerGrpcInterceptor))
logservicepb.RegisterLogServiceServer(s, server)
log.Info("log service started", zap.String("address", listener.Addr().String()))
if err := s.Serve(listener); err != nil {
Expand Down
9 changes: 9 additions & 0 deletions go/pkg/grpcutils/service.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package grpcutils

import (
"context"
"crypto/tls"
"crypto/x509"
"github.com/chroma-core/chroma/go/shared/otel"
Expand Down Expand Up @@ -72,6 +73,14 @@ func newDefaultGrpcProvider(name string, grpcConfig *GrpcConfig, registerFunc fu
opts = append(opts, grpc.Creds(credentials.NewTLS(tlsConfig)))
}
opts = append(opts, grpc.UnaryInterceptor(otel.ServerGrpcInterceptor))
OPTL_TRACING_ENDPOINT := os.Getenv("OPTL_TRACING_ENDPOINT")
if OPTL_TRACING_ENDPOINT == "" {
OPTL_TRACING_ENDPOINT = "jaeger:4317"
}
otel.InitTracing(context.Background(), &otel.TracingConfig{
Service: "sysdb-service",
Endpoint: OPTL_TRACING_ENDPOINT,
})

c := &defaultGrpcServer{
server: grpc.NewServer(opts...),
Expand Down
10 changes: 6 additions & 4 deletions go/pkg/log/configuration/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package configuration
import "os"

type LogServiceConfiguration struct {
PORT string
DATABASE_URL string
PORT string
DATABASE_URL string
OPTL_TRACING_ENDPOINT string
}

func getEnvWithDefault(key, defaultValue string) string {
Expand All @@ -17,7 +18,8 @@ func getEnvWithDefault(key, defaultValue string) string {

func NewLogServiceConfiguration() *LogServiceConfiguration {
return &LogServiceConfiguration{
PORT: getEnvWithDefault("PORT", "50051"),
DATABASE_URL: getEnvWithDefault("CHROMA_DATABASE_URL", "postgresql://chroma:[email protected]:5432/log"),
PORT: getEnvWithDefault("PORT", "50051"),
DATABASE_URL: getEnvWithDefault("CHROMA_DATABASE_URL", "postgresql://chroma:[email protected]:5432/log"),
OPTL_TRACING_ENDPOINT: getEnvWithDefault("OPTL_TRACING_ENDPOINT", "jaeger:4317"),
}
}
11 changes: 8 additions & 3 deletions go/shared/otel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"fmt"
"github.com/pingcap/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
otelCode "go.opentelemetry.io/otel/codes"
Expand All @@ -13,6 +14,7 @@ import (
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -71,32 +73,35 @@ func ServerGrpcInterceptor(ctx context.Context, req interface{}, info *grpc.Unar
var span trace.Span
ctx, span = tracer.Start(ctx, "Request "+info.FullMethod)
defer span.End()
span.SetAttributes(attribute.String("rpc.method", info.FullMethod))

// Calls the handler
h, err := handler(ctx, req)
if err != nil {
// Handle and log the error.
handleError(span, err)
handleError(span, info, err)
return nil, err
}

// Set the status to OK upon success.
span.SetStatus(otelCode.Ok, "ok")
span.SetAttributes(attribute.String("rpc.status_code", "ok"))
span.SetAttributes(attribute.String("rpc.method", info.FullMethod))
log.Info("RPC call", zap.String("method", info.FullMethod), zap.String("status", "ok"))

return h, nil
}

// handleError logs and annotates the span with details of the encountered error.
func handleError(span trace.Span, err error) {
func handleError(span trace.Span, info *grpc.UnaryServerInfo, err error) {
st, _ := status.FromError(err)
span.SetStatus(otelCode.Error, "error")
span.SetAttributes(
attribute.String("rpc.status_code", st.Code().String()),
attribute.String("rpc.message", st.Message()),
attribute.String("rpc.error", st.Err().Error()),
)
log.Error("RPC call", zap.String("method", info.FullMethod), zap.String("status", st.Code().String()), zap.String("error", st.Err().Error()), zap.String("message", st.Message()))

}

// decodeMetadataValue safely extracts a value from metadata, allowing for missing keys.
Expand Down
7 changes: 5 additions & 2 deletions k8s/distributed-chroma/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ frontendService:
memberlistProviderImpl: 'value: "chromadb.segment.impl.distributed.segment_directory.MockMemberlistProvider"'
logServiceHost: 'value: "logservice.chroma"'
logServicePort: 'value: "50051"'
otherEnvConfig: ''

otherEnvConfig: |
- name: CHROMA_OTEL_COLLECTION_ENDPOINT
value: "http://jaeger:4317"
- name: CHROMA_OTEL_GRANULARITY
value: all
sysdb:
image:
repository: 'local'
Expand Down
Loading