From e8e355386bfd173c08dbe5c9597aed38701c210d Mon Sep 17 00:00:00 2001 From: Lam Tran Date: Mon, 19 Feb 2024 22:59:07 +0700 Subject: [PATCH 1/3] refactor(checkoutservice): reuse grpc service client (#1387) --- src/checkoutservice/main.go | 97 ++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 51 deletions(-) diff --git a/src/checkoutservice/main.go b/src/checkoutservice/main.go index b06bd44c0a..91b3c64da3 100644 --- a/src/checkoutservice/main.go +++ b/src/checkoutservice/main.go @@ -124,7 +124,13 @@ type checkoutService struct { paymentSvcAddr string kafkaBrokerSvcAddr string pb.UnimplementedCheckoutServiceServer - KafkaProducerClient sarama.AsyncProducer + KafkaProducerClient sarama.AsyncProducer + shippingSvcClient pb.ShippingServiceClient + productCatalogSvcClient pb.ProductCatalogServiceClient + cartSvcClient pb.CartServiceClient + currencySvcClient pb.CurrencyServiceClient + emailSvcClient pb.EmailServiceClient + paymentSvcClient pb.PaymentServiceClient } func main() { @@ -153,12 +159,37 @@ func main() { tracer = tp.Tracer("checkoutservice") svc := new(checkoutService) + mustMapEnv(&svc.shippingSvcAddr, "SHIPPING_SERVICE_ADDR") + c := mustCreateClient(context.Background(), svc.shippingSvcAddr) + svc.shippingSvcClient = pb.NewShippingServiceClient(c) + defer c.Close() + mustMapEnv(&svc.productCatalogSvcAddr, "PRODUCT_CATALOG_SERVICE_ADDR") + c = mustCreateClient(context.Background(), svc.productCatalogSvcAddr) + svc.productCatalogSvcClient = pb.NewProductCatalogServiceClient(c) + defer c.Close() + mustMapEnv(&svc.cartSvcAddr, "CART_SERVICE_ADDR") + c = mustCreateClient(context.Background(), svc.cartSvcAddr) + svc.cartSvcClient = pb.NewCartServiceClient(c) + defer c.Close() + mustMapEnv(&svc.currencySvcAddr, "CURRENCY_SERVICE_ADDR") + c = mustCreateClient(context.Background(), svc.currencySvcAddr) + svc.currencySvcClient = pb.NewCurrencyServiceClient(c) + defer c.Close() + mustMapEnv(&svc.emailSvcAddr, "EMAIL_SERVICE_ADDR") + c = mustCreateClient(context.Background(), svc.emailSvcAddr) + svc.emailSvcClient = pb.NewEmailServiceClient(c) + defer c.Close() + mustMapEnv(&svc.paymentSvcAddr, "PAYMENT_SERVICE_ADDR") + c = mustCreateClient(context.Background(), svc.paymentSvcAddr) + svc.paymentSvcClient = pb.NewPaymentServiceClient(c) + defer c.Close() + svc.kafkaBrokerSvcAddr = os.Getenv("KAFKA_SERVICE_ADDR") if svc.kafkaBrokerSvcAddr != "" { @@ -334,21 +365,20 @@ func (cs *checkoutService) prepareOrderItemsAndShippingQuoteFromCart(ctx context return out, nil } -func createClient(ctx context.Context, svcAddr string) (*grpc.ClientConn, error) { - return grpc.DialContext(ctx, svcAddr, +func mustCreateClient(ctx context.Context, svcAddr string) *grpc.ClientConn { + c, err := grpc.DialContext(ctx, svcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(otelgrpc.NewClientHandler()), ) -} - -func (cs *checkoutService) quoteShipping(ctx context.Context, address *pb.Address, items []*pb.CartItem) (*pb.Money, error) { - conn, err := createClient(ctx, cs.shippingSvcAddr) if err != nil { - return nil, fmt.Errorf("could not connect shipping service: %+v", err) + log.Fatalf("could not connect to %s service, err: %+v", svcAddr, err) } - defer conn.Close() - shippingQuote, err := pb.NewShippingServiceClient(conn). + return c +} + +func (cs *checkoutService) quoteShipping(ctx context.Context, address *pb.Address, items []*pb.CartItem) (*pb.Money, error) { + shippingQuote, err := cs.shippingSvcClient. GetQuote(ctx, &pb.GetQuoteRequest{ Address: address, Items: items}) @@ -359,13 +389,7 @@ func (cs *checkoutService) quoteShipping(ctx context.Context, address *pb.Addres } func (cs *checkoutService) getUserCart(ctx context.Context, userID string) ([]*pb.CartItem, error) { - conn, err := createClient(ctx, cs.cartSvcAddr) - if err != nil { - return nil, fmt.Errorf("could not connect cart service: %+v", err) - } - defer conn.Close() - - cart, err := pb.NewCartServiceClient(conn).GetCart(ctx, &pb.GetCartRequest{UserId: userID}) + cart, err := cs.cartSvcClient.GetCart(ctx, &pb.GetCartRequest{UserId: userID}) if err != nil { return nil, fmt.Errorf("failed to get user cart during checkout: %+v", err) } @@ -373,13 +397,7 @@ func (cs *checkoutService) getUserCart(ctx context.Context, userID string) ([]*p } func (cs *checkoutService) emptyUserCart(ctx context.Context, userID string) error { - conn, err := createClient(ctx, cs.cartSvcAddr) - if err != nil { - return fmt.Errorf("could not connect cart service: %+v", err) - } - defer conn.Close() - - if _, err = pb.NewCartServiceClient(conn).EmptyCart(ctx, &pb.EmptyCartRequest{UserId: userID}); err != nil { + if _, err := cs.cartSvcClient.EmptyCart(ctx, &pb.EmptyCartRequest{UserId: userID}); err != nil { return fmt.Errorf("failed to empty user cart during checkout: %+v", err) } return nil @@ -388,15 +406,8 @@ func (cs *checkoutService) emptyUserCart(ctx context.Context, userID string) err func (cs *checkoutService) prepOrderItems(ctx context.Context, items []*pb.CartItem, userCurrency string) ([]*pb.OrderItem, error) { out := make([]*pb.OrderItem, len(items)) - conn, err := createClient(ctx, cs.productCatalogSvcAddr) - if err != nil { - return nil, fmt.Errorf("could not connect product catalog service: %+v", err) - } - defer conn.Close() - cl := pb.NewProductCatalogServiceClient(conn) - for i, item := range items { - product, err := cl.GetProduct(ctx, &pb.GetProductRequest{Id: item.GetProductId()}) + product, err := cs.productCatalogSvcClient.GetProduct(ctx, &pb.GetProductRequest{Id: item.GetProductId()}) if err != nil { return nil, fmt.Errorf("failed to get product #%q", item.GetProductId()) } @@ -412,12 +423,7 @@ func (cs *checkoutService) prepOrderItems(ctx context.Context, items []*pb.CartI } func (cs *checkoutService) convertCurrency(ctx context.Context, from *pb.Money, toCurrency string) (*pb.Money, error) { - conn, err := createClient(ctx, cs.currencySvcAddr) - if err != nil { - return nil, fmt.Errorf("could not connect currency service: %+v", err) - } - defer conn.Close() - result, err := pb.NewCurrencyServiceClient(conn).Convert(ctx, &pb.CurrencyConversionRequest{ + result, err := cs.currencySvcClient.Convert(ctx, &pb.CurrencyConversionRequest{ From: from, ToCode: toCurrency}) if err != nil { @@ -427,13 +433,7 @@ func (cs *checkoutService) convertCurrency(ctx context.Context, from *pb.Money, } func (cs *checkoutService) chargeCard(ctx context.Context, amount *pb.Money, paymentInfo *pb.CreditCardInfo) (string, error) { - conn, err := createClient(ctx, cs.paymentSvcAddr) - if err != nil { - return "", fmt.Errorf("failed to connect payment service: %+v", err) - } - defer conn.Close() - - paymentResp, err := pb.NewPaymentServiceClient(conn).Charge(ctx, &pb.ChargeRequest{ + paymentResp, err := cs.paymentSvcClient.Charge(ctx, &pb.ChargeRequest{ Amount: amount, CreditCard: paymentInfo}) if err != nil { @@ -465,12 +465,7 @@ func (cs *checkoutService) sendOrderConfirmation(ctx context.Context, email stri } func (cs *checkoutService) shipOrder(ctx context.Context, address *pb.Address, items []*pb.CartItem) (string, error) { - conn, err := createClient(ctx, cs.shippingSvcAddr) - if err != nil { - return "", fmt.Errorf("failed to connect email service: %+v", err) - } - defer conn.Close() - resp, err := pb.NewShippingServiceClient(conn).ShipOrder(ctx, &pb.ShipOrderRequest{ + resp, err := cs.shippingSvcClient.ShipOrder(ctx, &pb.ShipOrderRequest{ Address: address, Items: items}) if err != nil { From 6eab783cc005fdfb1db646acef4576d1009624a4 Mon Sep 17 00:00:00 2001 From: Lam Tran Date: Mon, 19 Feb 2024 23:55:10 +0700 Subject: [PATCH 2/3] chore: use semantic convention v1.24.0 (#1385) * chore(accountingservice): use semantic convention v1.24.0 * chore(checkoutservice): use semantic convention v1.24.0 --------- Co-authored-by: Austin Parker --- src/accountingservice/kafka/trace_interceptor.go | 8 ++++---- src/checkoutservice/main.go | 7 +++---- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/accountingservice/kafka/trace_interceptor.go b/src/accountingservice/kafka/trace_interceptor.go index 44110a5c17..16899a5891 100644 --- a/src/accountingservice/kafka/trace_interceptor.go +++ b/src/accountingservice/kafka/trace_interceptor.go @@ -9,7 +9,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/propagation" - semconv "go.opentelemetry.io/otel/semconv/v1.21.0" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" "go.opentelemetry.io/otel/trace" "github.com/IBM/sarama" @@ -27,9 +27,9 @@ func NewOTelInterceptor(groupID string) *OTelInterceptor { oi.tracer = otel.Tracer("github.com/open-telemetry/opentelemetry-demo/accountingservice/sarama") oi.fixedAttrs = []attribute.KeyValue{ - semconv.MessagingSystem("kafka"), + semconv.MessagingSystemKafka, semconv.MessagingKafkaConsumerGroup(groupID), - semconv.NetTransportTCP, + semconv.NetworkTransportTCP, } return &oi } @@ -52,7 +52,7 @@ func (oi *OTelInterceptor) OnConsume(msg *sarama.ConsumerMessage) { trace.WithAttributes( semconv.MessagingDestinationName(msg.Topic), semconv.MessagingKafkaMessageOffset(int(msg.Offset)), - semconv.MessagingMessagePayloadSizeBytes(len(msg.Value)), + semconv.MessagingMessageBodySize(len(msg.Value)), semconv.MessagingOperationReceive, semconv.MessagingKafkaDestinationPartition(int(msg.Partition)), ), diff --git a/src/checkoutservice/main.go b/src/checkoutservice/main.go index 91b3c64da3..687b7e307b 100644 --- a/src/checkoutservice/main.go +++ b/src/checkoutservice/main.go @@ -7,7 +7,7 @@ import ( "context" "encoding/json" "fmt" - semconv "go.opentelemetry.io/otel/semconv/v1.19.0" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" "net" "net/http" "os" @@ -502,9 +502,8 @@ func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace. trace.WithSpanKind(trace.SpanKindProducer), trace.WithAttributes( semconv.PeerService("kafka"), - semconv.NetTransportTCP, - semconv.MessagingSystem("kafka"), - semconv.MessagingDestinationKindTopic, + semconv.NetworkTransportTCP, + semconv.MessagingSystemKafka, semconv.MessagingDestinationName(msg.Topic), semconv.MessagingOperationPublish, semconv.MessagingKafkaDestinationPartition(int(msg.Partition)), From 59e0528f997800c2a4c81999b43c8fc6b78c099a Mon Sep 17 00:00:00 2001 From: Juliano Costa Date: Mon, 19 Feb 2024 18:12:01 +0100 Subject: [PATCH 3/3] bump dependencies (#1390) --- src/quoteservice/Dockerfile | 4 ++-- src/quoteservice/composer.json | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/quoteservice/Dockerfile b/src/quoteservice/Dockerfile index 9f3079969a..f1ab3956be 100644 --- a/src/quoteservice/Dockerfile +++ b/src/quoteservice/Dockerfile @@ -2,7 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 -FROM php:8.2-cli as base +FROM php:8.3-cli as base ADD https://github.com/mlocati/docker-php-extension-installer/releases/latest/download/install-php-extensions /usr/local/bin/ RUN chmod +x /usr/local/bin/install-php-extensions \ @@ -17,7 +17,7 @@ CMD php public/index.php USER www-data EXPOSE ${QUOTE_SERVICE_PORT} -FROM composer:2.6 AS vendor +FROM composer:2.7 AS vendor WORKDIR /tmp/ COPY ./src/quoteservice/composer.json . diff --git a/src/quoteservice/composer.json b/src/quoteservice/composer.json index 130fd1f910..83386dc0ba 100644 --- a/src/quoteservice/composer.json +++ b/src/quoteservice/composer.json @@ -3,17 +3,17 @@ "description": "Quote Service part of OpenTelemetry Demo", "license": "Apache-2.0", "require": { - "php": ">= 8.2", + "php": ">= 8.3", "ext-json": "*", "ext-pcntl": "*", "monolog/monolog": "3.5.0", - "open-telemetry/api": "1.0.0", - "open-telemetry/sdk": "1.0.0", - "open-telemetry/exporter-otlp": "1.0.0", - "open-telemetry/opentelemetry-auto-slim": "1.0.0", + "open-telemetry/api": "1.0.3", + "open-telemetry/sdk": "1.0.8", + "open-telemetry/exporter-otlp": "1.0.3", + "open-telemetry/opentelemetry-auto-slim": "1.0.4", "open-telemetry/detector-container": "1.0.0", "open-telemetry/opentelemetry-logger-monolog": "1.0.0", - "guzzlehttp/guzzle": "7.8.0", + "guzzlehttp/guzzle": "7.8.1", "php-di/php-di": "7.0.6", "php-di/slim-bridge": "3.4.0", "php-http/guzzle7-adapter": "1.0.0",