Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLATFORM-1085]: Add some event bus specific impl #155

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
485 changes: 255 additions & 230 deletions .drone.yml

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ members = [
]

[features]
postgres = ["sqlx/postgres"]
postgres = ["sqlx/postgres", "typed-builder"]
kafka = ["rdkafka", "typed-builder"]
rabbit = ["lapin", "typed-builder"]

[dependencies]
# Serialization/Deserialization
Expand All @@ -45,6 +47,13 @@ ouroboros = "0.15"

# Sql library for async impl
sqlx = { version = "0.6.1", features = ["runtime-tokio-native-tls", "uuid", "json", "chrono"], optional = true }
# Kafka library
rdkafka = { version = "0.29.*", features = ["ssl-vendored"], optional = true }
# Rabbit library
lapin = { version = "2.1.1", optional = true }

typed-builder = { version = "0.14.0", optional = true }

# To stream over sqlx results
futures = "0.3"
tracing = "0.1"
Expand Down
16 changes: 11 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
FROM public.ecr.aws/prima/rust:1.66.1

# Serve per avere l'owner dei file scritti dal container uguale all'utente Linux sull'host
USER app
FROM public.ecr.aws/prima/rust:1.68.0-1

WORKDIR /code

ENTRYPOINT ["/bin/bash"]
COPY entrypoint /code/entrypoint

RUN cargo install sqlx-cli --no-default-features --features native-tls,postgres --version 0.6.2

RUN chown -R app:app /code

# Needed to have the same file owner in the container and in Linux host
USER app

ENTRYPOINT ["./entrypoint"]
96 changes: 50 additions & 46 deletions Makefile.toml
Original file line number Diff line number Diff line change
@@ -1,75 +1,79 @@
[env]
DATABASE_URL = "postgresql://postgres:postgres@localhost:5432/postgres"
DATABASE_URL = "postgresql://postgres:postgres@postgres:5432/postgres"
RABBIT_URL = "amqp://rabbit:rabbit@rabbit/rabbit"
KAFKA_BROKERS_URL = "kafka:9092"

[env.drone]
DATABASE_URL = "postgresql://postgres:postgres@postgres:5432/postgres"
RABBIT_URL = "amqp://rabbit:rabbit@rabbit/rabbit"
KAFKA_BROKERS_URL = "kafka:9092"

[config]
default_to_workspace = false

# Check
[tasks.check]
description = "Run checks"
dependencies = ["check-no-features", "check-postgres"]

[tasks.check-no-features]
description = "Run check without features"
command = "cargo"
args = ["check", "--workspace"]
description = "Run checks for each feature"
dependencies = ["check-ci"]

[tasks.check-postgres]
description = "Run check with `postgres` feature on"
command = "cargo"
args = ["check", "--workspace", "--features=postgres"]
[tasks.check-ci]
script = [
"cargo check --workspace",
"cargo check --workspace --features=postgres",
"cargo check --workspace --features=kafka",
"cargo check --workspace --features=rabbit",
"cargo check --workspace --all-features"
]

# Build
[tasks.build-ci]
description = "Build prima_tracing.rs inside CI."
dependencies = ["build-ci-no-features", "build-ci-postgres"]
[tasks.build]
description = "Build the binaries for each feature"
dependencies = ["build-ci"]

[tasks.build-ci-no-features]
description = "Build prima_tracing.rs inside CI."
command = "cargo"
args = ["build", "-j", "2", "--all-features", "--workspace"]

[tasks.build-ci-postgres]
description = "Build prima_tracing.rs inside CI."
command = "cargo"
args = ["build", "-j", "2", "--all-features", "--workspace"]
[tasks.build-ci]
script = [
"cargo build -j 2 --workspace",
"cargo build -j 2 --workspace --features=postgres",
"cargo build -j 2 --workspace --features=kafka",
"cargo build -j 2 --workspace --features=rabbit",
"cargo build -j 2 --workspace --all-features"
]

# Format
[tasks.format-ci]
description = "Runs the cargo rustfmt plugin during CI."
description = "Runs the cargo rustfmt plugin during CI"
command = "cargo"
args = ["fmt", "--all", "--", "--check"]

# Tests
[tasks.test]
description = "Run tests."
dependencies = ["test-no-features", "test-postgres"]
description = "Run tests"
dependencies = ["test-ci"]

[tasks.test-no-features]
description = "Run tests."
command = "cargo"
args = ["test", "${@}", "--workspace"]

[tasks.test-postgres]
description = "Run tests."
command = "cargo"
args = ["test", "${@}", "--workspace", "--features=postgres"]
[tasks.test-ci]
description = "Run tests during CI"
script = [
"cargo test ${@} --workspace",
"cargo test ${@} --workspace --features=postgres",
"cargo test ${@} --workspace --features=kafka",
"cargo test ${@} --workspace --features=rabbit",
"cargo test ${@} --workspace --all-features"
]

# Clippy
[tasks.clippy-ci]
description = "Run clippy linter."
dependencies = ["clippy-ci-no-features", "clippy-ci-postgres"]
[tasks.clippy]
description = "Run clippy linter"
dependencies = ["clippy-ci"]

[tasks.clippy-ci-no-features]
command = "cargo"
args = ["clippy", "--workspace", "--", "-D", "warnings"]

[tasks.clippy-ci-postgres]
command = "cargo"
args = ["clippy", "--workspace", "--features", "postgres", "--", "-D", "warnings"]
[tasks.clippy-ci]
description = "Run clippy linter during CI"
script = [
"cargo clippy --workspace -- -D warnings",
"cargo clippy --workspace --features=postgres -- -D warnings",
"cargo clippy --workspace --features=kafka -- -D warnings",
"cargo clippy --workspace --features=rabbit -- -D warnings",
"cargo clippy --workspace --all-features -- -D warnings"
]

# Run example
[tasks.example]
Expand Down
14 changes: 4 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,22 @@ A tracing span is produced every time a projector is used or a policy is applied

## Run examples, tests and linting

Start a Postgres instance.
Start the docker-compose stack

```shell
docker run -p 5432:5432 --name postgres -e POSTGRES_PASSWORD=postgres -d postgres:11-alpine
```

Export DATABASE_URL environment variable with freshly new created database.

```shell
export DATABASE_URL=postgres://postgres:postgres@localhost:5432/postgres
docker compose run --service-ports web bash
```

Run tests.

```shell
cargo test --all-targets --all-features
cargo make test
```

Run linters.

```
cargo clippy --all-targets --all-features -- -W clippy::nursery
cargo make clippy
```

Finally eventually unset `DATABASE_URL` environment variable.
Expand Down
89 changes: 89 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
version: "3"

services:
web:
build: .
container_name: esrs_web
volumes:
- .:/code
- "app:/home/app/"
- "~/.ssh:/home/app/.ssh"
- "~/.aws:/home/app/.aws"
- "~/.gitconfig:/home/app/.gitconfig"
- "~/.gitignore:/home/app/.gitignore"
working_dir: /code
depends_on:
- postgres
- rabbit
- zookeeper
- kafka
environment:
BUILD_ENV: dev
CARGO_HOME: /home/app/.cargo
CARGO_TARGET_DIR: /home/app/target
CARGO_MAKE_DISABLE_UPDATE_CHECK: 1
networks:
- default

postgres:
image: public.ecr.aws/bitnami/postgresql:11
ports:
- "5542:5432"
environment:
POSTGRES_PASSWORD: postgres
POSTGRES_USER: postgres

rabbit:
image: rabbitmq:3.7.4-management
ports:
- "15676:15672"
- "5676:5672"
environment:
RABBITMQ_DEFAULT_VHOST: rabbit
RABBITMQ_DEFAULT_USER: rabbit
RABBITMQ_DEFAULT_PASS: rabbit

zookeeper:
container_name: zookeeper
image: public.ecr.aws/prima/zookeeper:7.3.0-1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=ruok"
ports:
- "2181:2181"
healthcheck:
test: "[ $$(echo ruok | nc localhost 2181 ) == imok ]"
interval: 1s
timeout: 10s
retries: 100

kafka:
container_name: kafka
image: public.ecr.aws/prima/kafka:7.3.0-1
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9092:9092"
- "9997:9997"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT_HOST://host.docker.internal:29092, PLAINTEXT://kafka:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONFLUENT_REPORTERS_TELEMETRY_AUTO_ENABLE: "false"
# KAFKA_SCHEMA_REGISTRY_URL: "schema-registry:8085"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
healthcheck:
test: [ "CMD", "kafka-topics", "--bootstrap-server", "kafka:9092", "--list" ]
interval: 1s
timeout: 10s
retries: 100

volumes:
app:
9 changes: 1 addition & 8 deletions entrypoint
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
#!/usr/bin/env bash

source /setup_common.sh
source /decrypt_secrets.sh

if [ -n "$1" ]; then
sh -c "$@"
else
cargo watch -x 'run'
fi
sh -c "$@"
8 changes: 7 additions & 1 deletion src/esrs/event_bus.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
use crate::{Aggregate, StoreEvent};
use async_trait::async_trait;

use crate::{Aggregate, StoreEvent};

#[cfg(feature = "kafka")]
pub mod kafka;
#[cfg(feature = "rabbit")]
pub mod rabbit;

/// The `EventBus` trait is responsible of the publishing an event on a given bus implementation.
#[async_trait]
pub trait EventBus<A>: Sync
Expand Down
24 changes: 24 additions & 0 deletions src/esrs/event_bus/kafka/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use rdkafka::ClientConfig;
use typed_builder::TypedBuilder;

use crate::event_bus::kafka::error::KafkaEventBusError;

#[derive(TypedBuilder)]
pub struct KafkaEventBusConfig<'a> {
pub(crate) broker_url_list: &'a str,
pub(crate) topic: &'a str,
#[builder(default, setter(strip_option))]
pub(crate) security: Option<Security<'a>>,
#[builder(default = 5000)]
pub(crate) request_timeout: u64,
#[builder(default, setter(strip_option))]
pub(crate) client_config: Option<ClientConfig>,
#[builder(default = Box::new(|_| ()))]
pub(crate) error_handler: Box<dyn Fn(KafkaEventBusError) + Send + Sync>,
}

pub struct Security<'a> {
pub(crate) username: &'a str,
pub(crate) password: &'a str,
pub(crate) sasl_mechanism: &'a str,
}
27 changes: 27 additions & 0 deletions src/esrs/event_bus/kafka/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use rdkafka::error::KafkaError;
use rdkafka::message::OwnedMessage;
use serde_json::Error;

#[derive(Debug)]
pub enum KafkaEventBusError {
Json(Error),
Kafka(KafkaError),
}

impl From<Error> for KafkaEventBusError {
fn from(value: Error) -> Self {
Self::Json(value)
}
}

impl From<KafkaError> for KafkaEventBusError {
fn from(value: KafkaError) -> Self {
Self::Kafka(value)
}
}

impl From<(KafkaError, OwnedMessage)> for KafkaEventBusError {
fn from((error, _): (KafkaError, OwnedMessage)) -> Self {
Self::Kafka(error)
}
}
Loading