Skip to content

Commit

Permalink
support client passing trace
Browse files Browse the repository at this point in the history
Signed-off-by: tinswzy <[email protected]>
  • Loading branch information
tinswzy committed Dec 27, 2024
1 parent 5a960e9 commit a6abbf1
Show file tree
Hide file tree
Showing 18 changed files with 636 additions and 12 deletions.
8 changes: 8 additions & 0 deletions client/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@ package client
import (
"context"

"go.opentelemetry.io/otel"

"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
)

// GetVersion returns milvus server version information.
func (c *GrpcClient) GetVersion(ctx context.Context) (string, error) {
method := "GetVersion"
ctx, span := otel.Tracer("client").Start(ctx, method)
defer span.End()
if c.Service == nil {
return "", ErrClientNotReady
}
Expand All @@ -32,6 +37,9 @@ func (c *GrpcClient) GetVersion(ctx context.Context) (string, error) {

// CheckHealth returns milvus state
func (c *GrpcClient) CheckHealth(ctx context.Context) (*entity.MilvusState, error) {
method := "CheckHealth"
ctx, span := otel.Tracer("client").Start(ctx, method)
defer span.End()
if c.Service == nil {
return nil, ErrClientNotReady
}
Expand Down
21 changes: 21 additions & 0 deletions client/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@ package client

import (
"context"
"log"

"go.opentelemetry.io/otel"

"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
)

// CreateAlias creates an alias for collection
func (c *GrpcClient) CreateAlias(ctx context.Context, collName string, alias string) error {
method := "CreateAlias"
ctx, span := otel.Tracer("client").Start(ctx, method)
defer span.End()
traceID := span.SpanContext().TraceID().String()
if c.Service == nil {
return ErrClientNotReady
}
Expand All @@ -36,17 +43,23 @@ func (c *GrpcClient) CreateAlias(ctx context.Context, collName string, alias str

resp, err := c.Service.CreateAlias(ctx, req)
if err != nil {
log.Printf("create alias failed, collName:%s, alias:%s, traceID:%s err: %v", collName, alias, traceID, err)
return err
}
err = handleRespStatus(resp)
if err != nil {
log.Printf("create alias failed, collName:%s, alias:%s, traceID:%s err: %v", collName, alias, traceID, err)
return err
}
return nil
}

// DropAlias drops the specified Alias
func (c *GrpcClient) DropAlias(ctx context.Context, alias string) error {
method := "DropAlias"
ctx, span := otel.Tracer("client").Start(ctx, method)
defer span.End()
traceID := span.SpanContext().TraceID().String()
if c.Service == nil {
return ErrClientNotReady
}
Expand All @@ -58,17 +71,23 @@ func (c *GrpcClient) DropAlias(ctx context.Context, alias string) error {

resp, err := c.Service.DropAlias(ctx, req)
if err != nil {
log.Printf("drop alias failed, alias:%s, traceID:%s err: %v", alias, traceID, err)
return err
}
err = handleRespStatus(resp)
if err != nil {
log.Printf("drop alias failed, alias:%s, traceID:%s err: %v", alias, traceID, err)
return err
}
return nil
}

// AlterAlias changes collection alias to provided alias
func (c *GrpcClient) AlterAlias(ctx context.Context, collName string, alias string) error {
method := "AlterAlias"
ctx, span := otel.Tracer("client").Start(ctx, method)
defer span.End()
traceID := span.SpanContext().TraceID().String()
if c.Service == nil {
return ErrClientNotReady
}
Expand All @@ -81,10 +100,12 @@ func (c *GrpcClient) AlterAlias(ctx context.Context, collName string, alias stri

resp, err := c.Service.AlterAlias(ctx, req)
if err != nil {
log.Printf("alter alias failed, collName:%s, alias:%s, traceID:%s err: %v", collName, alias, traceID, err)
return err
}
err = handleRespStatus(resp)
if err != nil {
log.Printf("alter alias failed, collName:%s, alias:%s, traceID:%s err: %v", collName, alias, traceID, err)
return err
}
return nil
Expand Down
27 changes: 27 additions & 0 deletions client/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,20 @@ package client

import (
"context"
"log"

"go.opentelemetry.io/otel"

"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-sdk-go/v2/internal/utils/crypto"
)

// CreateCredential create new user and password
func (c *GrpcClient) CreateCredential(ctx context.Context, username string, password string) error {
method := "CreateCredential"
ctx, span := otel.Tracer("client").Start(ctx, method)
defer span.End()
traceID := span.SpanContext().TraceID().String()
if c.Service == nil {
return ErrClientNotReady
}
Expand All @@ -18,17 +25,23 @@ func (c *GrpcClient) CreateCredential(ctx context.Context, username string, pass
}
resp, err := c.Service.CreateCredential(ctx, req)
if err != nil {
log.Printf("create credential failed, traceID:%s err: %v", traceID, err)
return err
}
err = handleRespStatus(resp)
if err != nil {
log.Printf("create credential failed, traceID:%s err: %v", traceID, err)
return err
}
return nil
}

// UpdateCredential update password for a user
func (c *GrpcClient) UpdateCredential(ctx context.Context, username string, oldPassword string, newPassword string) error {
method := "UpdateCredential"
ctx, span := otel.Tracer("client").Start(ctx, method)
defer span.End()
traceID := span.SpanContext().TraceID().String()
if c.Service == nil {
return ErrClientNotReady
}
Expand All @@ -39,17 +52,23 @@ func (c *GrpcClient) UpdateCredential(ctx context.Context, username string, oldP
}
resp, err := c.Service.UpdateCredential(ctx, req)
if err != nil {
log.Printf("update credential failed, traceID:%s err: %v", traceID, err)
return err
}
err = handleRespStatus(resp)
if err != nil {
log.Printf("update credential failed, traceID:%s err: %v", traceID, err)
return err
}
return nil
}

// DeleteCredential delete a user
func (c *GrpcClient) DeleteCredential(ctx context.Context, username string) error {
method := "DeleteCredential"
ctx, span := otel.Tracer("client").Start(ctx, method)
defer span.End()
traceID := span.SpanContext().TraceID().String()
if c.Service == nil {
return ErrClientNotReady
}
Expand All @@ -58,27 +77,35 @@ func (c *GrpcClient) DeleteCredential(ctx context.Context, username string) erro
}
resp, err := c.Service.DeleteCredential(ctx, req)
if err != nil {
log.Printf("delete credential failed, traceID:%s err: %v", traceID, err)
return err
}
err = handleRespStatus(resp)
if err != nil {
log.Printf("delete credential failed, traceID:%s err: %v", traceID, err)
return err
}
return nil
}

// ListCredUsers list all usernames
func (c *GrpcClient) ListCredUsers(ctx context.Context) ([]string, error) {
method := "ListCredUsers"
ctx, span := otel.Tracer("client").Start(ctx, method)
defer span.End()
traceID := span.SpanContext().TraceID().String()
if c.Service == nil {
return nil, ErrClientNotReady
}
req := &milvuspb.ListCredUsersRequest{}
resp, err := c.Service.ListCredUsers(ctx, req)
if err != nil {
log.Printf("list credential users failed, traceID:%s err: %v", traceID, err)
return nil, err
}
err = handleRespStatus(resp.Status)
if err != nil {
log.Printf("list credential users failed, traceID:%s err: %v", traceID, err)
return nil, err
}
return resp.Usernames, nil
Expand Down
50 changes: 49 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,19 @@ package client

import (
"context"
"sync"
"time"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"

"github.com/milvus-io/milvus-sdk-go/v2/entity"
)

Expand Down Expand Up @@ -261,6 +268,46 @@ type Client interface {
HybridSearch(ctx context.Context, collName string, partitions []string, limit int, outputFields []string, reranker Reranker, subRequests []*ANNSearchRequest, opts ...SearchQueryOptionFunc) ([]SearchResult, error)
}

var (
initClientOnce sync.Once
)

func initTracerOnce() {
initClientOnce.Do(func() {
// init trace noop provider
tp := sdk.NewTracerProvider(
sdk.WithBatcher(nil),
sdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("Client"),
)),
sdk.WithSampler(sdk.ParentBased(
sdk.TraceIDRatioBased(1),
)),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
})
}

func getUnaryClientInterceptor() grpc.UnaryClientInterceptor {
initTracerOnce()
return otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(otel.GetTracerProvider()))

}

func StartTrace(ctx context.Context, name string, spanName string) (context.Context, trace.Span, string) {
initTracerOnce()
ctx, span := otel.Tracer(name).Start(ctx, spanName)
return ctx, span, span.SpanContext().TraceID().String()
}

func StartNewTrace(name string, spanName string) (context.Context, trace.Span, string) {
initTracerOnce()
ctx, span := otel.Tracer(name).Start(context.Background(), spanName)
return ctx, span, span.SpanContext().TraceID().String()
}

// NewClient create a client connected to remote milvus cluster.
// More connect option can be modified by Config.
func NewClient(ctx context.Context, config Config) (Client, error) {
Expand All @@ -277,6 +324,7 @@ func NewClient(ctx context.Context, config Config) (Client, error) {

// Parse grpc options
options := c.config.getDialOption()
options = append(options, grpc.WithChainUnaryInterceptor(getUnaryClientInterceptor()))

// Connect the grpc server.
if err := c.connect(ctx, addr, options...); err != nil {
Expand Down
Loading

0 comments on commit a6abbf1

Please sign in to comment.