Skip to content

Commit

Permalink
Implement message queue (#457)
Browse files Browse the repository at this point in the history
  • Loading branch information
secustor authored Dec 7, 2022
1 parent 48892f5 commit 780172d
Show file tree
Hide file tree
Showing 33 changed files with 1,666 additions and 37 deletions.
5 changes: 5 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,9 @@ GRAFANA_SERVICE_HOST=grafana
JAEGER_SERVICE_PORT=16686
JAEGER_SERVICE_HOST=jaeger

# Kafka
KAFKA_SERVICE_PORT=9092
KAFKA_SERVICE_ADDR=kafka:${KAFKA_SERVICE_PORT}
ZOOKEEPER_SERVICE_PORT=2181

ENV_PLATFORM=local
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,5 @@ significant modifications will be credited to OpenTelemetry Authors.
([#600](https://github.com/open-telemetry/opentelemetry-demo/pull/600))
* Add HTTP client instrumentation to shippingservice
([#610](https://github.com/open-telemetry/opentelemetry-demo/pull/610))
* Added Kafka, accountingservice and frauddetectionservice for async workflows
([#512](https://github.com/open-telemetry/opentelemetry-demo/pull/457))
117 changes: 106 additions & 11 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,34 @@ networks:
driver: bridge

services:
# Ad service
# Accounting service
accountingservice:
image: ${IMAGE_NAME}:${IMAGE_VERSION}-accountingservice
container_name: accounting-service
build:
context: ./
dockerfile: ./src/accountingservice/Dockerfile
cache_from:
- ${IMAGE_NAME}:${IMAGE_VERSION}-accountingservice
deploy:
resources:
limits:
memory: 300M
restart: always
environment:
- KAFKA_SERVICE_ADDR
- OTEL_EXPORTER_OTLP_TRACES_ENDPOINT
- OTEL_EXPORTER_OTLP_METRICS_ENDPOINT
- OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE
- OTEL_SERVICE_NAME=accountingservice
depends_on:
otelcol:
condition: service_started
kafka:
condition: service_healthy
logging: *logging

# AdService
adservice:
image: ${IMAGE_NAME}:${IMAGE_VERSION}-adservice
container_name: ad-service
Expand Down Expand Up @@ -102,18 +129,28 @@ services:
- PAYMENT_SERVICE_ADDR
- PRODUCT_CATALOG_SERVICE_ADDR
- SHIPPING_SERVICE_ADDR
- KAFKA_SERVICE_ADDR
- OTEL_EXPORTER_OTLP_TRACES_ENDPOINT
- OTEL_EXPORTER_OTLP_METRICS_ENDPOINT
- OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE
- OTEL_SERVICE_NAME=checkoutservice
depends_on:
- cartservice
- currencyservice
- emailservice
- paymentservice
- productcatalogservice
- shippingservice
- otelcol
cartservice:
condition: service_started
currencyservice:
condition: service_started
emailservice:
condition: service_started
paymentservice:
condition: service_started
productcatalogservice:
condition: service_started
shippingservice:
condition: service_started
otelcol:
condition: service_started
kafka:
condition: service_healthy
logging: *logging

# Currency service
Expand Down Expand Up @@ -196,6 +233,33 @@ services:
condition: service_healthy
logging: *logging

# Fraud Detection service
frauddetectionservice:
image: ${IMAGE_NAME}:${IMAGE_VERSION}-frauddetectionservice
container_name: frauddetection-service
build:
context: ./
dockerfile: ./src/frauddetectionservice/Dockerfile
cache_from:
- ${IMAGE_NAME}:${IMAGE_VERSION}-frauddetectionservice
deploy:
resources:
limits:
memory: 200M
restart: always
environment:
- KAFKA_SERVICE_ADDR
- OTEL_EXPORTER_OTLP_TRACES_ENDPOINT
- OTEL_EXPORTER_OTLP_METRICS_ENDPOINT
- OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE
- OTEL_SERVICE_NAME=frauddetectionservice
depends_on:
otelcol:
condition: service_started
kafka:
condition: service_healthy
logging: *logging

# Frontend
frontend:
image: ${IMAGE_NAME}:${IMAGE_VERSION}-frontend
Expand Down Expand Up @@ -240,7 +304,7 @@ services:
- shippingservice
logging: *logging

# Frontend
# Frontend Proxy (Envoy)
frontendproxy:
image: ${IMAGE_NAME}:${IMAGE_VERSION}-frontendproxy
build:
Expand Down Expand Up @@ -444,7 +508,7 @@ services:
deploy:
resources:
limits:
memory: 120M
memory: 200M
restart: always
environment:
- POSTGRES_USER=ffs
Expand All @@ -457,6 +521,38 @@ services:
timeout: 5s
retries: 5

# Kafka used by Checkout, Accounting, and Fraud Detection services
kafka:
image: confluentinc/cp-kafka:7.2.2-1-ubi8
hostname: kafka
container_name: kafka
environment:
KAFKA_LISTENERS: PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_PROCESS_ROLES: 'controller,broker'
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:9093'
KAFKA_METADATA_LOG_SEGMENT_MS: 15000
KAFKA_METADATA_MAX_RETENTION_MS: 1200000
KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS: 2800
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
volumes:
- ./src/kafka/update_run.sh:/tmp/update_run.sh
- ./src/kafka/clusterID:/tmp/clusterID
command: "bash -c 'ls -lh /tmp && /tmp/update_run.sh && /etc/confluent/docker/run'"
logging: *logging
healthcheck:
test: nc -z kafka 9092
start_period: 10s
interval: 5s
timeout: 10s
retries: 10

# Jaeger
jaeger:
image: jaegertracing/all-in-one
Expand Down Expand Up @@ -538,7 +634,6 @@ services:
- "${REDIS_PORT}"
logging: *logging


# Frontend Tests
frontendTests:
image: ${IMAGE_NAME}:${IMAGE_VERSION}-frontend-tests
Expand Down
25 changes: 13 additions & 12 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@ Want to deploy the demo and see it in action? Start here.
Want to understand how a particular language's instrumentation works? Start
here.

| Language | Auto Instrumentation | Manual Instrumentation |
|---------------|---------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------|
| .NET | [Cart Service](./services/cartservice.md) | [Cart Service](./services/cartservice.md) |
| C++ | | [Currency Service](./services/currencyservice.md) |
| Erlang/Elixir | [Feature Flag Service](./services/featureflagservice.md) | [Feature Flag Service](./services/featureflagservice.md) |
| Go | [Checkout Service](./services/checkoutservice.md), [Product Catalog Service]( ./services/productcatalogservice.md ) | [Checkout Service](./services/checkoutservice.md), [Product Catalog Service]( ./services/productcatalogservice.md ) |
| Java | [Ad Service](./services/adservice.md) | [Ad Service](./services/adservice.md) |
| JavaScript | [Frontend]( ./services/frontend.md ) | [Frontend](./services/frontend.md), [Payment Service](./services/paymentservice.md) |
| PHP | [Quote Service](./services/quoteservice.md) | [Quote Service](./services/quoteservice.md) |
| Python | [Recommendation Service](./services/recommendationservice.md) | [Recommendation Service](./services/recommendationservice.md) |
| Ruby | [Email Service](./services/emailservice.md) | [Email Service](./services/emailservice.md) |
| Rust | [Shipping Service](./services/shippingservice.md) | [Shipping Service](./services/shippingservice.md) |
| Language | Auto Instrumentation | Manual Instrumentation |
|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------|
| .NET | [Cart Service](./services/cartservice.md) | [Cart Service](./services/cartservice.md) |
| C++ | | [Currency Service](./services/currencyservice.md) |
| Erlang/Elixir | [Feature Flag Service](./services/featureflagservice.md) | [Feature Flag Service](./services/featureflagservice.md) |
| Go | [Accounting Service](./services/accountingservice.md), [Checkout Service](./services/checkoutservice.md), [Product Catalog Service]( ./services/productcatalogservice.md ) | [Checkout Service](./services/checkoutservice.md), [Product Catalog Service]( ./services/productcatalogservice.md ) |
| Java | [Ad Service](./services/adservice.md) | [Ad Service](./services/adservice.md) |
| JavaScript | [Frontend]( ./services/frontend.md ) | [Frontend](./services/frontend.md), [Payment Service](./services/paymentservice.md) |
| Kotlin | [Fraud Detection Service]( ./services/frauddetectionservice.md ) | |
| PHP | [Quote Service](./services/quoteservice.md) | [Quote Service](./services/quoteservice.md) |
| Python | [Recommendation Service](./services/recommendationservice.md) | [Recommendation Service](./services/recommendationservice.md) |
| Ruby | [Email Service](./services/emailservice.md) | [Email Service](./services/emailservice.md) |
| Rust | [Shipping Service](./services/shippingservice.md) | [Shipping Service](./services/shippingservice.md) |

### Service Documentation

Expand Down
11 changes: 11 additions & 0 deletions docs/current_architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ uses [Locust](https://locust.io/) to fake user traffic.
graph TD
subgraph Service Diagram
adservice(Ad Service):::java
accountingservice(Accounting Service):::golang
cache[(Cache<br/>&#40redis&#41)]
cartservice(Cart Service):::dotnet
checkoutservice(Checkout Service):::golang
currencyservice(Currency Service):::cpp
emailservice(Email Service):::ruby
frauddetectionservice(Fraud Detection Service):::kotlin
frontend(Frontend):::javascript
frontendproxy(Frontend Proxy <br/>&#40Envoy&#41):::cpp
loadgenerator([Load Generator]):::python
Expand All @@ -23,18 +25,22 @@ recommendationservice(Recommendation Service):::python
shippingservice(Shipping Service):::rust
featureflagservice(Feature Flag Service):::erlang
featureflagstore[(Feature Flag Store<br/>&#40PostgreSQL DB&#41)]
queue[(queue<br/>&#40Kafka&#41)]
Internet -->|HTTP| frontendproxy
frontendproxy -->|HTTP| frontend
frontendproxy -->|HTTP| featureflagservice
loadgenerator -->|HTTP| frontend
accountingservice -->|TCP| queue
checkoutservice --->|gRPC| cartservice --> cache
checkoutservice --->|gRPC| productcatalogservice
checkoutservice --->|gRPC| currencyservice
checkoutservice --->|HTTP| emailservice
checkoutservice --->|gRPC| paymentservice
checkoutservice -->|gRPC| shippingservice
checkoutservice -->|TCP| queue
frontend -->|gRPC| adservice
frontend -->|gRPC| cartservice
Expand All @@ -44,6 +50,8 @@ frontend -->|gRPC| currencyservice
frontend -->|gRPC| recommendationservice -->|gRPC| productcatalogservice
frontend -->|gRPC| shippingservice -->|HTTP| quoteservice
frauddetectionservice -->|TCP| queue
productcatalogservice -->|gRPC| featureflagservice
shippingservice -->|gRPC| featureflagservice
Expand All @@ -53,6 +61,7 @@ featureflagservice --> featureflagstore
end
classDef java fill:#b07219,color:white;
classDef kotlin fill:#560ba1,color:white;
classDef dotnet fill:#178600,color:white;
classDef golang fill:#00add8,color:black;
classDef cpp fill:#f34b7d,color:white;
Expand All @@ -77,9 +86,11 @@ subgraph Service Legend
rustsvc(Rust):::rust
erlangsvc(Erlang/Elixir):::erlang
phpsvc(PHP):::php
kotlinsvc(Kotlin):::kotlin
end
classDef java fill:#b07219,color:white;
classDef kotlin fill:#560ba1,color:white;
classDef dotnet fill:#178600,color:white;
classDef golang fill:#00add8,color:black;
classDef cpp fill:#f34b7d,color:white;
Expand Down
2 changes: 1 addition & 1 deletion docs/docker_deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

- Docker
- [Docker Compose](https://docs.docker.com/compose/install/#install-compose) v2.0.0+
- 4 GB of RAM
- 5 GB of RAM

## Clone Repo

Expand Down
2 changes: 2 additions & 0 deletions docs/service_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ View [Service Graph](./current_architecture.md) to visualize request flows.

| Service | Language | Description |
|--------------------------------------------------------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------|
| [accountingservice](./services/accountingservice.md) | Go | Processes incoming orders and count the sum of all orders (mock). |
| [adservice](./services/adservice.md) | Java | Provides text ads based on given context words. |
| [cartservice](./services/cartservice.md) | DotNet | Stores the items in the user's shopping cart in Redis and retrieves it. |
| [checkoutservice](./services/checkoutservice.md) | Go | Retrieves user cart, prepares order and orchestrates the payment, shipping and the email notification. |
| [currencyservice](./services/currencyservice.md) | C++ | Converts one money amount to another currency. Uses real values fetched from European Central Bank. It's the highest QPS service. |
| [emailservice](./services/emailservice.md) | Ruby | Sends users an order confirmation email (mock). |
| [frauddetectionservice](./services/frauddetectionservice.md) | Kotlin | Analyzes incoming orders and detects fraud attempts (mock). |
| [featureflagservice](./services/featureflagservice.md) | Erlang/Elixir | CRUD feature flag service to demonstrate various scenarios like fault injection & how to emit telemetry from a feature flag reliant service. |
| [frontend](./services/frontend.md) | JavaScript | Exposes an HTTP server to serve the website. Does not require signup/login and generates session IDs for all users automatically. |
| [loadgenerator](./services/loadgenerator.md) | Python/Locust | Continuously sends requests imitating realistic user shopping flows to the frontend. |
Expand Down
58 changes: 58 additions & 0 deletions docs/services/accountingservice.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Accounting Service

This service calculates the total amount of sold products.
This is only mocked and received orders are printed out.

[Accounting Service](../../src/accountingservice/)

## Traces

### Initializing Tracing

The OpenTelemetry SDK is initialized from `main` using the `initTracerProvider`
function.

```go
func initTracerProvider() (*sdktrace.TracerProvider, error) {
ctx := context.Background()

exporter, err := otlptracegrpc.New(ctx)
if err != nil {
return nil, err
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return tp, nil
}
```

You should call `TracerProvider.Shutdown()` when your service is shutdown to
ensure all spans are exported. This service makes that call as part of a
deferred function in main

```go
tp, err := initTracerProvider()
if err != nil {
log.Fatal(err)
}
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Printf("Error shutting down tracer provider: %v", err)
}
}()
```

### Adding Kafka ( Sarama ) auto-instrumentation

This service will receive the processed results of the Checkout Service via a
Kafka topic.
To instrument the Kafka client the ConsumerHandler implemented by the developer
has to be wrapped.

```go
handler := groupHandler{} // implements sarama.ConsumerGroupHandler
wrappedHandler := otelsarama.WrapConsumerGroupHandler(&handler)
```
Loading

0 comments on commit 780172d

Please sign in to comment.