Skip to content

Commit

Permalink
[ENH]: add distributed tracing (#2088)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
	 - Fix distributed tracing
  • Loading branch information
nicolasgere authored Apr 30, 2024
1 parent ff93ecd commit e7a9618
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 12 deletions.
4 changes: 2 additions & 2 deletions chromadb/telemetry/opentelemetry/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import grpc
from opentelemetry.trace import StatusCode, SpanKind

from chromadb.telemetry.opentelemetry import tracer


class _ClientCallDetails(
collections.namedtuple(
Expand Down Expand Up @@ -36,6 +34,8 @@ class OtelInterceptor(
grpc.StreamStreamClientInterceptor,
):
def _intercept_call(self, continuation, client_call_details, request_or_iterator):
from chromadb.telemetry.opentelemetry import tracer

if tracer is None:
return continuation(client_call_details, request_or_iterator)
with tracer.start_as_current_span(
Expand Down
11 changes: 10 additions & 1 deletion go/cmd/logservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/chroma-core/chroma/go/pkg/proto/logservicepb"
"github.com/chroma-core/chroma/go/pkg/utils"
libs "github.com/chroma-core/chroma/go/shared/libs"
"github.com/chroma-core/chroma/go/shared/otel"
"github.com/pingcap/log"
"github.com/rs/zerolog"
"go.uber.org/automaxprocs/maxprocs"
Expand All @@ -18,6 +19,7 @@ import (

func main() {
ctx := context.Background()

// Configure logger
utils.LogLevel = zerolog.DebugLevel
utils.ConfigureLogger()
Expand All @@ -26,6 +28,13 @@ func main() {
}
log.Info("Starting log service")
config := configuration.NewLogServiceConfiguration()
err := otel.InitTracing(ctx, &otel.TracingConfig{
Service: "log-service",
Endpoint: config.OPTL_TRACING_ENDPOINT,
})
if err != nil {
log.Fatal("failed to initialize tracing", zap.Error(err))
}
conn, err := libs.NewPgConnection(ctx, config)
if err != nil {
log.Fatal("failed to connect to postgres", zap.Error(err))
Expand All @@ -37,7 +46,7 @@ func main() {
if err != nil {
log.Fatal("failed to listen", zap.Error(err))
}
s := grpc.NewServer()
s := grpc.NewServer(grpc.UnaryInterceptor(otel.ServerGrpcInterceptor))
logservicepb.RegisterLogServiceServer(s, server)
log.Info("log service started", zap.String("address", listener.Addr().String()))
if err := s.Serve(listener); err != nil {
Expand Down
9 changes: 9 additions & 0 deletions go/pkg/grpcutils/service.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package grpcutils

import (
"context"
"crypto/tls"
"crypto/x509"
"github.com/chroma-core/chroma/go/shared/otel"
Expand Down Expand Up @@ -72,6 +73,14 @@ func newDefaultGrpcProvider(name string, grpcConfig *GrpcConfig, registerFunc fu
opts = append(opts, grpc.Creds(credentials.NewTLS(tlsConfig)))
}
opts = append(opts, grpc.UnaryInterceptor(otel.ServerGrpcInterceptor))
OPTL_TRACING_ENDPOINT := os.Getenv("OPTL_TRACING_ENDPOINT")
if OPTL_TRACING_ENDPOINT == "" {
OPTL_TRACING_ENDPOINT = "jaeger:4317"
}
otel.InitTracing(context.Background(), &otel.TracingConfig{
Service: "sysdb-service",
Endpoint: OPTL_TRACING_ENDPOINT,
})

c := &defaultGrpcServer{
server: grpc.NewServer(opts...),
Expand Down
10 changes: 6 additions & 4 deletions go/pkg/log/configuration/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package configuration
import "os"

type LogServiceConfiguration struct {
PORT string
DATABASE_URL string
PORT string
DATABASE_URL string
OPTL_TRACING_ENDPOINT string
}

func getEnvWithDefault(key, defaultValue string) string {
Expand All @@ -17,7 +18,8 @@ func getEnvWithDefault(key, defaultValue string) string {

func NewLogServiceConfiguration() *LogServiceConfiguration {
return &LogServiceConfiguration{
PORT: getEnvWithDefault("PORT", "50051"),
DATABASE_URL: getEnvWithDefault("CHROMA_DATABASE_URL", "postgresql://chroma:[email protected]:5432/log"),
PORT: getEnvWithDefault("PORT", "50051"),
DATABASE_URL: getEnvWithDefault("CHROMA_DATABASE_URL", "postgresql://chroma:[email protected]:5432/log"),
OPTL_TRACING_ENDPOINT: getEnvWithDefault("OPTL_TRACING_ENDPOINT", "jaeger:4317"),
}
}
11 changes: 8 additions & 3 deletions go/shared/otel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"fmt"
"github.com/pingcap/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
otelCode "go.opentelemetry.io/otel/codes"
Expand All @@ -13,6 +14,7 @@ import (
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -71,32 +73,35 @@ func ServerGrpcInterceptor(ctx context.Context, req interface{}, info *grpc.Unar
var span trace.Span
ctx, span = tracer.Start(ctx, "Request "+info.FullMethod)
defer span.End()
span.SetAttributes(attribute.String("rpc.method", info.FullMethod))

// Calls the handler
h, err := handler(ctx, req)
if err != nil {
// Handle and log the error.
handleError(span, err)
handleError(span, info, err)
return nil, err
}

// Set the status to OK upon success.
span.SetStatus(otelCode.Ok, "ok")
span.SetAttributes(attribute.String("rpc.status_code", "ok"))
span.SetAttributes(attribute.String("rpc.method", info.FullMethod))
log.Info("RPC call", zap.String("method", info.FullMethod), zap.String("status", "ok"))

return h, nil
}

// handleError logs and annotates the span with details of the encountered error.
func handleError(span trace.Span, err error) {
func handleError(span trace.Span, info *grpc.UnaryServerInfo, err error) {
st, _ := status.FromError(err)
span.SetStatus(otelCode.Error, "error")
span.SetAttributes(
attribute.String("rpc.status_code", st.Code().String()),
attribute.String("rpc.message", st.Message()),
attribute.String("rpc.error", st.Err().Error()),
)
log.Error("RPC call", zap.String("method", info.FullMethod), zap.String("status", st.Code().String()), zap.String("error", st.Err().Error()), zap.String("message", st.Message()))

}

// decodeMetadataValue safely extracts a value from metadata, allowing for missing keys.
Expand Down
7 changes: 5 additions & 2 deletions k8s/distributed-chroma/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ frontendService:
memberlistProviderImpl: 'value: "chromadb.segment.impl.distributed.segment_directory.MockMemberlistProvider"'
logServiceHost: 'value: "logservice.chroma"'
logServicePort: 'value: "50051"'
otherEnvConfig: ''

otherEnvConfig: |
- name: CHROMA_OTEL_COLLECTION_ENDPOINT
value: "http://jaeger:4317"
- name: CHROMA_OTEL_GRANULARITY
value: all
sysdb:
image:
repository: 'local'
Expand Down

0 comments on commit e7a9618

Please sign in to comment.