Skip to content

Commit

Permalink
use udpated grpc instrumentation APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
puckpuck committed Sep 3, 2024
1 parent 3788c31 commit ad8ff9f
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 164 deletions.
178 changes: 65 additions & 113 deletions src/checkoutservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
"math"
Expand Down Expand Up @@ -60,12 +61,23 @@ func init() {
}

type checkoutService struct {
productCatalogSvcAddr string
cartSvcAddr string
currencySvcAddr string
shippingSvcAddr string
emailSvcAddr string
paymentSvcAddr string
cartSvcAddr string
cartSvcClient pb.CartServiceClient

currencySvcAddr string
currencySvcClient pb.CurrencyServiceClient

emailSvcAddr string
emailSvcClient pb.EmailServiceClient

paymentSvcAddr string
paymentSvcClient pb.PaymentServiceClient

productCatalogSvcAddr string
productCatalogSvcClient pb.ProductCatalogServiceClient

shippingSvcAddr string
shippingSvcClient pb.ShippingServiceClient
}

func initOtelTracing(ctx context.Context, log logrus.FieldLogger) *sdktrace.TracerProvider {
Expand Down Expand Up @@ -124,12 +136,35 @@ func main() {
}

svc := new(checkoutService)
mustMapEnv(&svc.shippingSvcAddr, "SHIPPING_SERVICE_ADDR")
mustMapEnv(&svc.productCatalogSvcAddr, "PRODUCT_CATALOG_SERVICE_ADDR")
mustMapEnv(&svc.cartSvcAddr, "CART_SERVICE_ADDR")
c := mustCreateClientConn(svc.cartSvcAddr)
svc.cartSvcClient = pb.NewCartServiceClient(c)
defer c.Close()

mustMapEnv(&svc.currencySvcAddr, "CURRENCY_SERVICE_ADDR")
c = mustCreateClientConn(svc.currencySvcAddr)
svc.currencySvcClient = pb.NewCurrencyServiceClient(c)
defer c.Close()

mustMapEnv(&svc.emailSvcAddr, "EMAIL_SERVICE_ADDR")
c = mustCreateClientConn(svc.emailSvcAddr)
svc.emailSvcClient = pb.NewEmailServiceClient(c)
defer c.Close()

mustMapEnv(&svc.paymentSvcAddr, "PAYMENT_SERVICE_ADDR")
c = mustCreateClientConn(svc.paymentSvcAddr)
svc.paymentSvcClient = pb.NewPaymentServiceClient(c)
defer c.Close()

mustMapEnv(&svc.productCatalogSvcAddr, "PRODUCT_CATALOG_SERVICE_ADDR")
c = mustCreateClientConn(svc.productCatalogSvcAddr)
svc.productCatalogSvcClient = pb.NewProductCatalogServiceClient(c)
defer c.Close()

mustMapEnv(&svc.shippingSvcAddr, "SHIPPING_SERVICE_ADDR")
c = mustCreateClientConn(svc.shippingSvcAddr)
svc.shippingSvcClient = pb.NewShippingServiceClient(c)
defer c.Close()

log.Infof("service config: %+v", svc)

Expand All @@ -140,12 +175,11 @@ func main() {

// create gRPC server with OpenTelemetry instrumentation on all incoming requests
srv := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(otel.GetTracerProvider()))),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor(otelgrpc.WithTracerProvider(otel.GetTracerProvider()))),
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)

pb.RegisterCheckoutServiceServer(srv, svc)
healthpb.RegisterHealthServer(srv, svc)

log.Infof("starting to listen on tcp: %q", lis.Addr().String())
err = srv.Serve(lis)
log.Fatal(err)
Expand All @@ -159,6 +193,18 @@ func mustMapEnv(target *string, envKey string) {
*target = v
}

func mustCreateClientConn(svcAddr string) *grpc.ClientConn {
c, err := grpc.NewClient(svcAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
)
if err != nil {
log.Fatalf("could not connect to %s service, err: %+v", svcAddr, err)
}

return c
}

func (cs *checkoutService) Check(_ context.Context, _ *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
return &healthpb.HealthCheckResponse{Status: healthpb.HealthCheckResponse_SERVING}, nil
}
Expand Down Expand Up @@ -413,20 +459,7 @@ func (cs *checkoutService) prepareOrderItemsAndShippingQuoteFromCart(ctx context
}

func (cs *checkoutService) quoteShipping(ctx context.Context, address *pb.Address, items []*pb.CartItem) (*pb.Money, error) {
// add OpenTelemetry instrumentation to outgoing gRPC requests
conn, err := grpc.DialContext(ctx, cs.shippingSvcAddr,
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(otel.GetTracerProvider()))),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(otel.GetTracerProvider()))))

if err != nil {
return nil, fmt.Errorf("could not connect shipping service: %+v", err)
}
defer func(conn *grpc.ClientConn) {
_ = conn.Close()
}(conn)

shippingQuote, err := pb.NewShippingServiceClient(conn).
shippingQuote, err := cs.shippingSvcClient.
GetQuote(ctx, &pb.GetQuoteRequest{
Address: address,
Items: items})
Expand All @@ -438,26 +471,14 @@ func (cs *checkoutService) quoteShipping(ctx context.Context, address *pb.Addres
}

func (cs *checkoutService) getUserCart(ctx context.Context, userID string) ([]*pb.CartItem, error) {
// add OpenTelemetry instrumentation to outgoing gRPC requests
conn, err := grpc.DialContext(ctx, cs.cartSvcAddr, grpc.WithInsecure(),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(otel.GetTracerProvider()))),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(otel.GetTracerProvider()))))

var (
userIDKey = attribute.Key("app.user_id")
)

span := trace.SpanFromContext(ctx)
span.SetAttributes(userIDKey.String(userID))

if err != nil {
return nil, fmt.Errorf("could not connect cart service: %+v", err)
}
defer func(conn *grpc.ClientConn) {
_ = conn.Close()
}(conn)

cart, err := pb.NewCartServiceClient(conn).GetCart(ctx, &pb.GetCartRequest{UserId: userID})
cart, err := cs.cartSvcClient.GetCart(ctx, &pb.GetCartRequest{UserId: userID})
if err != nil {
return nil, fmt.Errorf("failed to get user cart during checkout: %+v", err)
}
Expand All @@ -466,43 +487,19 @@ func (cs *checkoutService) getUserCart(ctx context.Context, userID string) ([]*p
}

func (cs *checkoutService) emptyUserCart(ctx context.Context, userID string) error {
// add OpenTelemetry instrumentation to outgoing gRPC requests
conn, err := grpc.DialContext(ctx, cs.cartSvcAddr, grpc.WithInsecure(),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(otel.GetTracerProvider()))),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(otel.GetTracerProvider()))))

if err != nil {
return fmt.Errorf("could not connect cart service: %+v", err)
}
defer func(conn *grpc.ClientConn) {
_ = conn.Close()
}(conn)

if _, err = pb.NewCartServiceClient(conn).EmptyCart(ctx, &pb.EmptyCartRequest{UserId: userID}); err != nil {
if _, err := cs.cartSvcClient.EmptyCart(ctx, &pb.EmptyCartRequest{UserId: userID}); err != nil {
return fmt.Errorf("failed to empty user cart during checkout: %+v", err)
}
sleepRandom(20)
return nil
}

func (cs *checkoutService) prepOrderItems(ctx context.Context, items []*pb.CartItem, userCurrency string) ([]*pb.OrderItem, error) {
// add OpenTelemetry instrumentation to outgoing gRPC requests
conn, err := grpc.DialContext(ctx, cs.productCatalogSvcAddr, grpc.WithInsecure(),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(otel.GetTracerProvider()))),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(otel.GetTracerProvider()))))

out := make([]*pb.OrderItem, len(items))

if err != nil {
return nil, fmt.Errorf("could not connect product catalog service: %+v", err)
}
defer func(conn *grpc.ClientConn) {
_ = conn.Close()
}(conn)
cl := pb.NewProductCatalogServiceClient(conn)

for i, item := range items {
product, err := cl.GetProduct(ctx, &pb.GetProductRequest{Id: item.GetProductId()})
product, err := cs.productCatalogSvcClient.GetProduct(ctx, &pb.GetProductRequest{Id: item.GetProductId()})
if err != nil {
return nil, fmt.Errorf("failed to get product #%q", item.GetProductId())
}
Expand All @@ -519,18 +516,7 @@ func (cs *checkoutService) prepOrderItems(ctx context.Context, items []*pb.CartI
}

func (cs *checkoutService) convertCurrency(ctx context.Context, from *pb.Money, toCurrency string) (*pb.Money, error) {
// add OpenTelemetry instrumentation to outgoing gRPC requests
conn, err := grpc.DialContext(ctx, cs.currencySvcAddr, grpc.WithInsecure(),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(otel.GetTracerProvider()))),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(otel.GetTracerProvider()))))

if err != nil {
return nil, fmt.Errorf("could not connect currency service: %+v", err)
}
defer func(conn *grpc.ClientConn) {
_ = conn.Close()
}(conn)
result, err := pb.NewCurrencyServiceClient(conn).Convert(ctx, &pb.CurrencyConversionRequest{
result, err := cs.currencySvcClient.Convert(ctx, &pb.CurrencyConversionRequest{
From: from,
ToCode: toCurrency})
if err != nil {
Expand All @@ -541,19 +527,7 @@ func (cs *checkoutService) convertCurrency(ctx context.Context, from *pb.Money,
}

func (cs *checkoutService) chargeCard(ctx context.Context, amount *pb.Money, paymentInfo *pb.CreditCardInfo) (string, error) {
// add OpenTelemetry instrumentation to outgoing gRPC requests
conn, err := grpc.DialContext(ctx, cs.paymentSvcAddr, grpc.WithInsecure(),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(otel.GetTracerProvider()))),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(otel.GetTracerProvider()))))

if err != nil {
return "", fmt.Errorf("failed to connect payment service: %+v", err)
}
defer func(conn *grpc.ClientConn) {
_ = conn.Close()
}(conn)

paymentResp, err := pb.NewPaymentServiceClient(conn).Charge(ctx, &pb.ChargeRequest{
paymentResp, err := cs.paymentSvcClient.Charge(ctx, &pb.ChargeRequest{
Amount: amount,
CreditCard: paymentInfo})
if err != nil {
Expand All @@ -564,37 +538,15 @@ func (cs *checkoutService) chargeCard(ctx context.Context, amount *pb.Money, pay
}

func (cs *checkoutService) sendOrderConfirmation(ctx context.Context, email string, order *pb.OrderResult) error {
// add OpenTelemetry instrumentation to outgoing gRPC requests
conn, err := grpc.DialContext(ctx, cs.emailSvcAddr, grpc.WithInsecure(),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(otel.GetTracerProvider()))),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(otel.GetTracerProvider()))))

if err != nil {
return fmt.Errorf("failed to connect email service: %+v", err)
}
defer func(conn *grpc.ClientConn) {
_ = conn.Close()
}(conn)
_, err = pb.NewEmailServiceClient(conn).SendOrderConfirmation(ctx, &pb.SendOrderConfirmationRequest{
_, err := cs.emailSvcClient.SendOrderConfirmation(ctx, &pb.SendOrderConfirmationRequest{
Email: email,
Order: order})
sleepRandom(30)
return err
}

func (cs *checkoutService) shipOrder(ctx context.Context, address *pb.Address, items []*pb.CartItem) (string, error) {
// add OpenTelemetry instrumentation to outgoing gRPC requests
conn, err := grpc.DialContext(ctx, cs.shippingSvcAddr, grpc.WithInsecure(),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(otel.GetTracerProvider()))),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(otel.GetTracerProvider()))))

if err != nil {
return "", fmt.Errorf("failed to connect email service: %+v", err)
}
defer func(conn *grpc.ClientConn) {
_ = conn.Close()
}(conn)
resp, err := pb.NewShippingServiceClient(conn).ShipOrder(ctx, &pb.ShipOrderRequest{
resp, err := cs.shippingSvcClient.ShipOrder(ctx, &pb.ShipOrderRequest{
Address: address,
Items: items})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (fe *frontendServer) addToCartHandler(w http.ResponseWriter, r *http.Reques
renderHTTPError(log, r, w, errors.Wrap(err, "could not retrieve product"), http.StatusInternalServerError)
return
}

if err := fe.insertCart(r.Context(), sessionID(r), p.GetId(), int32(quantity)); err != nil {
renderHTTPError(log, r, w, errors.Wrap(err, "failed to add to cart"), http.StatusInternalServerError)
return
Expand Down Expand Up @@ -314,7 +314,7 @@ func (fe *frontendServer) placeOrderHandler(w http.ResponseWriter, r *http.Reque
// Get current span and set additional attributes to it
span := trace.SpanFromContext(ctx)

order, err := pb.NewCheckoutServiceClient(fe.checkoutSvcConn).
order, err := fe.checkoutSvcClient.
PlaceOrder(ctx, &pb.PlaceOrderRequest{
Email: email,
CreditCard: &pb.CreditCardInfo{
Expand Down
Loading

0 comments on commit ad8ff9f

Please sign in to comment.