Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
julianocosta89 authored Feb 20, 2024
2 parents 14d2f15 + 59e0528 commit 0f8cc86
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 67 deletions.
8 changes: 4 additions & 4 deletions src/accountingservice/kafka/trace_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
"go.opentelemetry.io/otel/trace"

"github.com/IBM/sarama"
Expand All @@ -27,9 +27,9 @@ func NewOTelInterceptor(groupID string) *OTelInterceptor {
oi.tracer = otel.Tracer("github.com/open-telemetry/opentelemetry-demo/accountingservice/sarama")

oi.fixedAttrs = []attribute.KeyValue{
semconv.MessagingSystem("kafka"),
semconv.MessagingSystemKafka,
semconv.MessagingKafkaConsumerGroup(groupID),
semconv.NetTransportTCP,
semconv.NetworkTransportTCP,
}
return &oi
}
Expand All @@ -52,7 +52,7 @@ func (oi *OTelInterceptor) OnConsume(msg *sarama.ConsumerMessage) {
trace.WithAttributes(
semconv.MessagingDestinationName(msg.Topic),
semconv.MessagingKafkaMessageOffset(int(msg.Offset)),
semconv.MessagingMessagePayloadSizeBytes(len(msg.Value)),
semconv.MessagingMessageBodySize(len(msg.Value)),
semconv.MessagingOperationReceive,
semconv.MessagingKafkaDestinationPartition(int(msg.Partition)),
),
Expand Down
104 changes: 49 additions & 55 deletions src/checkoutservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"context"
"encoding/json"
"fmt"
semconv "go.opentelemetry.io/otel/semconv/v1.19.0"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -124,7 +124,13 @@ type checkoutService struct {
paymentSvcAddr string
kafkaBrokerSvcAddr string
pb.UnimplementedCheckoutServiceServer
KafkaProducerClient sarama.AsyncProducer
KafkaProducerClient sarama.AsyncProducer
shippingSvcClient pb.ShippingServiceClient
productCatalogSvcClient pb.ProductCatalogServiceClient
cartSvcClient pb.CartServiceClient
currencySvcClient pb.CurrencyServiceClient
emailSvcClient pb.EmailServiceClient
paymentSvcClient pb.PaymentServiceClient
}

func main() {
Expand Down Expand Up @@ -153,12 +159,37 @@ func main() {
tracer = tp.Tracer("checkoutservice")

svc := new(checkoutService)

mustMapEnv(&svc.shippingSvcAddr, "SHIPPING_SERVICE_ADDR")
c := mustCreateClient(context.Background(), svc.shippingSvcAddr)
svc.shippingSvcClient = pb.NewShippingServiceClient(c)
defer c.Close()

mustMapEnv(&svc.productCatalogSvcAddr, "PRODUCT_CATALOG_SERVICE_ADDR")
c = mustCreateClient(context.Background(), svc.productCatalogSvcAddr)
svc.productCatalogSvcClient = pb.NewProductCatalogServiceClient(c)
defer c.Close()

mustMapEnv(&svc.cartSvcAddr, "CART_SERVICE_ADDR")
c = mustCreateClient(context.Background(), svc.cartSvcAddr)
svc.cartSvcClient = pb.NewCartServiceClient(c)
defer c.Close()

mustMapEnv(&svc.currencySvcAddr, "CURRENCY_SERVICE_ADDR")
c = mustCreateClient(context.Background(), svc.currencySvcAddr)
svc.currencySvcClient = pb.NewCurrencyServiceClient(c)
defer c.Close()

mustMapEnv(&svc.emailSvcAddr, "EMAIL_SERVICE_ADDR")
c = mustCreateClient(context.Background(), svc.emailSvcAddr)
svc.emailSvcClient = pb.NewEmailServiceClient(c)
defer c.Close()

mustMapEnv(&svc.paymentSvcAddr, "PAYMENT_SERVICE_ADDR")
c = mustCreateClient(context.Background(), svc.paymentSvcAddr)
svc.paymentSvcClient = pb.NewPaymentServiceClient(c)
defer c.Close()

svc.kafkaBrokerSvcAddr = os.Getenv("KAFKA_SERVICE_ADDR")

if svc.kafkaBrokerSvcAddr != "" {
Expand Down Expand Up @@ -334,21 +365,20 @@ func (cs *checkoutService) prepareOrderItemsAndShippingQuoteFromCart(ctx context
return out, nil
}

func createClient(ctx context.Context, svcAddr string) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, svcAddr,
func mustCreateClient(ctx context.Context, svcAddr string) *grpc.ClientConn {
c, err := grpc.DialContext(ctx, svcAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
)
}

func (cs *checkoutService) quoteShipping(ctx context.Context, address *pb.Address, items []*pb.CartItem) (*pb.Money, error) {
conn, err := createClient(ctx, cs.shippingSvcAddr)
if err != nil {
return nil, fmt.Errorf("could not connect shipping service: %+v", err)
log.Fatalf("could not connect to %s service, err: %+v", svcAddr, err)
}
defer conn.Close()

shippingQuote, err := pb.NewShippingServiceClient(conn).
return c
}

func (cs *checkoutService) quoteShipping(ctx context.Context, address *pb.Address, items []*pb.CartItem) (*pb.Money, error) {
shippingQuote, err := cs.shippingSvcClient.
GetQuote(ctx, &pb.GetQuoteRequest{
Address: address,
Items: items})
Expand All @@ -359,27 +389,15 @@ func (cs *checkoutService) quoteShipping(ctx context.Context, address *pb.Addres
}

func (cs *checkoutService) getUserCart(ctx context.Context, userID string) ([]*pb.CartItem, error) {
conn, err := createClient(ctx, cs.cartSvcAddr)
if err != nil {
return nil, fmt.Errorf("could not connect cart service: %+v", err)
}
defer conn.Close()

cart, err := pb.NewCartServiceClient(conn).GetCart(ctx, &pb.GetCartRequest{UserId: userID})
cart, err := cs.cartSvcClient.GetCart(ctx, &pb.GetCartRequest{UserId: userID})
if err != nil {
return nil, fmt.Errorf("failed to get user cart during checkout: %+v", err)
}
return cart.GetItems(), nil
}

func (cs *checkoutService) emptyUserCart(ctx context.Context, userID string) error {
conn, err := createClient(ctx, cs.cartSvcAddr)
if err != nil {
return fmt.Errorf("could not connect cart service: %+v", err)
}
defer conn.Close()

if _, err = pb.NewCartServiceClient(conn).EmptyCart(ctx, &pb.EmptyCartRequest{UserId: userID}); err != nil {
if _, err := cs.cartSvcClient.EmptyCart(ctx, &pb.EmptyCartRequest{UserId: userID}); err != nil {
return fmt.Errorf("failed to empty user cart during checkout: %+v", err)
}
return nil
Expand All @@ -388,15 +406,8 @@ func (cs *checkoutService) emptyUserCart(ctx context.Context, userID string) err
func (cs *checkoutService) prepOrderItems(ctx context.Context, items []*pb.CartItem, userCurrency string) ([]*pb.OrderItem, error) {
out := make([]*pb.OrderItem, len(items))

conn, err := createClient(ctx, cs.productCatalogSvcAddr)
if err != nil {
return nil, fmt.Errorf("could not connect product catalog service: %+v", err)
}
defer conn.Close()
cl := pb.NewProductCatalogServiceClient(conn)

for i, item := range items {
product, err := cl.GetProduct(ctx, &pb.GetProductRequest{Id: item.GetProductId()})
product, err := cs.productCatalogSvcClient.GetProduct(ctx, &pb.GetProductRequest{Id: item.GetProductId()})
if err != nil {
return nil, fmt.Errorf("failed to get product #%q", item.GetProductId())
}
Expand All @@ -412,12 +423,7 @@ func (cs *checkoutService) prepOrderItems(ctx context.Context, items []*pb.CartI
}

func (cs *checkoutService) convertCurrency(ctx context.Context, from *pb.Money, toCurrency string) (*pb.Money, error) {
conn, err := createClient(ctx, cs.currencySvcAddr)
if err != nil {
return nil, fmt.Errorf("could not connect currency service: %+v", err)
}
defer conn.Close()
result, err := pb.NewCurrencyServiceClient(conn).Convert(ctx, &pb.CurrencyConversionRequest{
result, err := cs.currencySvcClient.Convert(ctx, &pb.CurrencyConversionRequest{
From: from,
ToCode: toCurrency})
if err != nil {
Expand All @@ -427,13 +433,7 @@ func (cs *checkoutService) convertCurrency(ctx context.Context, from *pb.Money,
}

func (cs *checkoutService) chargeCard(ctx context.Context, amount *pb.Money, paymentInfo *pb.CreditCardInfo) (string, error) {
conn, err := createClient(ctx, cs.paymentSvcAddr)
if err != nil {
return "", fmt.Errorf("failed to connect payment service: %+v", err)
}
defer conn.Close()

paymentResp, err := pb.NewPaymentServiceClient(conn).Charge(ctx, &pb.ChargeRequest{
paymentResp, err := cs.paymentSvcClient.Charge(ctx, &pb.ChargeRequest{
Amount: amount,
CreditCard: paymentInfo})
if err != nil {
Expand Down Expand Up @@ -465,12 +465,7 @@ func (cs *checkoutService) sendOrderConfirmation(ctx context.Context, email stri
}

func (cs *checkoutService) shipOrder(ctx context.Context, address *pb.Address, items []*pb.CartItem) (string, error) {
conn, err := createClient(ctx, cs.shippingSvcAddr)
if err != nil {
return "", fmt.Errorf("failed to connect email service: %+v", err)
}
defer conn.Close()
resp, err := pb.NewShippingServiceClient(conn).ShipOrder(ctx, &pb.ShipOrderRequest{
resp, err := cs.shippingSvcClient.ShipOrder(ctx, &pb.ShipOrderRequest{
Address: address,
Items: items})
if err != nil {
Expand Down Expand Up @@ -507,9 +502,8 @@ func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.
trace.WithSpanKind(trace.SpanKindProducer),
trace.WithAttributes(
semconv.PeerService("kafka"),
semconv.NetTransportTCP,
semconv.MessagingSystem("kafka"),
semconv.MessagingDestinationKindTopic,
semconv.NetworkTransportTCP,
semconv.MessagingSystemKafka,
semconv.MessagingDestinationName(msg.Topic),
semconv.MessagingOperationPublish,
semconv.MessagingKafkaDestinationPartition(int(msg.Partition)),
Expand Down
4 changes: 2 additions & 2 deletions src/quoteservice/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# SPDX-License-Identifier: Apache-2.0


FROM php:8.2-cli as base
FROM php:8.3-cli as base

ADD https://github.com/mlocati/docker-php-extension-installer/releases/latest/download/install-php-extensions /usr/local/bin/
RUN chmod +x /usr/local/bin/install-php-extensions \
Expand All @@ -17,7 +17,7 @@ CMD php public/index.php
USER www-data
EXPOSE ${QUOTE_SERVICE_PORT}

FROM composer:2.6 AS vendor
FROM composer:2.7 AS vendor

WORKDIR /tmp/
COPY ./src/quoteservice/composer.json .
Expand Down
12 changes: 6 additions & 6 deletions src/quoteservice/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@
"description": "Quote Service part of OpenTelemetry Demo",
"license": "Apache-2.0",
"require": {
"php": ">= 8.2",
"php": ">= 8.3",
"ext-json": "*",
"ext-pcntl": "*",
"monolog/monolog": "3.5.0",
"open-telemetry/api": "1.0.0",
"open-telemetry/sdk": "1.0.0",
"open-telemetry/exporter-otlp": "1.0.0",
"open-telemetry/opentelemetry-auto-slim": "1.0.0",
"open-telemetry/api": "1.0.3",
"open-telemetry/sdk": "1.0.8",
"open-telemetry/exporter-otlp": "1.0.3",
"open-telemetry/opentelemetry-auto-slim": "1.0.4",
"open-telemetry/detector-container": "1.0.0",
"open-telemetry/opentelemetry-logger-monolog": "1.0.0",
"guzzlehttp/guzzle": "7.8.0",
"guzzlehttp/guzzle": "7.8.1",
"php-di/php-di": "7.0.6",
"php-di/slim-bridge": "3.4.0",
"php-http/guzzle7-adapter": "1.0.0",
Expand Down

0 comments on commit 0f8cc86

Please sign in to comment.