Skip to content

Commit

Permalink
fixed messages distribution across partitions (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdcm authored Nov 26, 2023
1 parent f763faa commit 1c5ace9
Show file tree
Hide file tree
Showing 17 changed files with 100 additions and 35 deletions.
2 changes: 1 addition & 1 deletion deploy_helm.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/bin/bash

helm install web-api-dev helm
helm upgrade --install --atomic --timeout 300s --wait web-api-dev helm
8 changes: 8 additions & 0 deletions helm/templates/acl/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.acl.label }}-deployment
annotations:
"helm.sh/hook-weight": "{{ .Values.acl.weight }}"
spec:
replicas: {{ .Values.acl.replicas }}
selector:
Expand All @@ -15,6 +17,12 @@ spec:
containers:
- name: {{ .Values.acl.image.name }}
image: "{{ .Values.acl.image.name }}:{{ .Values.acl.image.tag }}"
readinessProbe:
exec:
command:
- sh
- -c
- "nc -z kafka-service 9092 || exit -1"
env:
- name: KAFKA_BROKERS
value: {{ .Values.acl.kafka.brokers }}
Expand Down
8 changes: 8 additions & 0 deletions helm/templates/api/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.api.label }}-deployment
annotations:
"helm.sh/hook-weight": "{{ .Values.api.weight }}"
spec:
selector:
matchLabels:
Expand All @@ -14,6 +16,12 @@ spec:
containers:
- name: {{ .Values.api.image.name }}
image: "{{ .Values.api.image.name }}:{{ .Values.api.image.tag }}"
readinessProbe:
exec:
command:
- sh
- -c
- "nc -z kafka-service 9092 || exit -1"
env:
- name: DB_URI
value: {{ .Values.api.mongodb.uri }}
Expand Down
22 changes: 19 additions & 3 deletions helm/templates/kafka/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.kafka.label }}-deployment
annotations:
"helm.sh/hook-weight": "{{ .Values.kafka.weight }}"
spec:
replicas: {{ .Values.kafka.replicas }}
selector:
Expand All @@ -18,11 +20,25 @@ spec:
value: "{{ .Values.kafka.variables.brokerId }}"
- name: KAFKA_ZOOKEEPER_CONNECT
value: {{ .Values.kafka.variables.zookeeperConnect }}
- name: KAFKA_LISTENERS
value: {{ .Values.kafka.variables.listeners }}
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: {{ .Values.kafka.variables.securityProtocolMap }}
- name: KAFKA_ADVERTISED_LISTENERS
value: {{ .Values.kafka.variables.advertisedListeners }}
image: {{ .Values.kafka.image.name }}
- name: KAFKA_LISTENERS
value: {{ .Values.kafka.variables.listeners }}
- name: KAFKA_NUM_PARTITIONS
value: "{{ .Values.kafka.variables.numPartitions }}"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "{{ .Values.kafka.variables.replicationFactor }}"
image: {{ .Values.kafka.image.name }}:{{ .Values.kafka.image.tag }}
readinessProbe:
exec:
command:
- sh
- -c
- "nc -z localhost 9092 || exit -1"
initialDelaySeconds: 5
periodSeconds: 5
imagePullPolicy: {{ .Values.kafka.pullPolicy }}
name: {{ .Values.kafka.label }}
resources:
Expand Down
2 changes: 2 additions & 0 deletions helm/templates/mongodb/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.mongodb.label }}-deployment
annotations:
"helm.sh/hook-weight": "{{ .Values.mongodb.weight }}"
spec:
selector:
matchLabels:
Expand Down
25 changes: 16 additions & 9 deletions helm/templates/zookeeper/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.zookeeper.label }}-deployment
annotations:
"helm.sh/hook-weight": "{{ .Values.zookeeper.weight }}"
spec:
selector:
matchLabels:
Expand All @@ -12,12 +14,17 @@ spec:
app: {{ .Values.zookeeper.label }}
spec:
containers:
- image: {{ .Values.zookeeper.image.name }}
imagePullPolicy: {{ .Values.zookeeper.pullPolicy }}
name: {{ .Values.zookeeper.label }}
ports:
- containerPort: {{ .Values.zookeeper.port }}
resources:
limits:
memory: {{ .Values.zookeeper.resources.limits.memory }}
cpu: {{ .Values.zookeeper.resources.limits.cpu }}
- env:
- name: ZOOKEEPER_CLIENT_PORT
value: "{{ .Values.zookeeper.port }}"
- name: ZOOKEEPER_TICK_TIME
value: "{{ .Values.zookeeper.tickTime }}"
image: {{ .Values.zookeeper.image.name }}:{{ .Values.zookeeper.image.tag }}
imagePullPolicy: {{ .Values.zookeeper.pullPolicy }}
name: {{ .Values.zookeeper.label }}
ports:
- containerPort: {{ .Values.zookeeper.port }}
resources:
limits:
memory: {{ .Values.zookeeper.resources.limits.memory }}
cpu: {{ .Values.zookeeper.resources.limits.cpu }}
23 changes: 16 additions & 7 deletions helm/values.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mongodb:
label: mongodb
replicas: 1
weight: 2
port: 27017
image:
name: mongo
Expand All @@ -15,28 +16,34 @@ mongodb:
kafka:
label: kafka
replicas: 1
weight: 1
port: 9092
image:
name: wurstmeister/kafka
tag: latest
name: confluentinc/cp-kafka
tag: 7.0.1
pullPolicy: IfNotPresent
resources:
limits:
memory: "512Mi"
cpu: "500m"
variables:
listeners: PLAINTEXT://:9092
brokerId: 1
zookeeperConnect: zookeeper-service:2181
securityProtocolMap: PLAINTEXT:PLAINTEXT
advertisedListeners: PLAINTEXT://kafka-service:9092
brokerId: "1"
listeners: PLAINTEXT://:9092
numPartitions: &patitions 2
replicationFactor: 1

zookeeper:
label: zookeeper
replicas: 1
weight: 0
port: 2181
tickTime: 2000
image:
name: wurstmeister/zookeeper
tag: latest
name: confluentinc/cp-zookeeper
tag: 7.0.1
pullPolicy: IfNotPresent
resources:
limits:
Expand All @@ -45,7 +52,8 @@ zookeeper:

acl:
label: acl
replicas: 5
replicas: *patitions
weight: 3
kafka:
brokers: kafka-service:9092
topic: messages
Expand All @@ -62,6 +70,7 @@ acl:
api:
label: api
replicas: 1
weight: 4
port: 8080
image:
name: api
Expand Down
1 change: 1 addition & 0 deletions src/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion src/acl/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM rust:1.73.0-slim-buster as build
WORKDIR /web-api
COPY . .

# install deps for building librdkafka
# installing deps for building librdkafka
RUN apt-get update && apt-get install -y build-essential \
openssl libssl-dev \
zlib1g \
Expand All @@ -14,5 +14,9 @@ RUN --mount=type=cache,target=/web-api/target cargo build --release && cp target

# acl image
FROM debian:buster-slim as acl

# installing netcat for check kafka readiness
RUN apt-get update && apt-get install -y netcat

COPY --from=build /acl /acl
CMD ["/acl"]
9 changes: 6 additions & 3 deletions src/api/src/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,13 @@ pub async fn track_activity(
) -> HttpResponse {
let request = json.into_inner();

match request {
let result = match request {
Click { x, y } => tracker.track(&ActivityEvent::Click { x, y }).await,
Open { path } => tracker.track(&ActivityEvent::Open { p: path }).await,
}
};

HttpResponse::Ok().json(())
match result {
Some(_) => HttpResponse::Ok().json(()),
None => HttpResponse::BadRequest().json(ErrorResponse { code: 103 })
}
}
1 change: 1 addition & 0 deletions src/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ services:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_NUM_PARTITIONS: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
Expand Down
2 changes: 1 addition & 1 deletion src/domain/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use serde_derive::{Deserialize, Serialize};

#[async_trait]
pub trait IActivityTracker: Send + Sync {
async fn track(&self, activity: &ActivityEvent);
async fn track(&self, activity: &ActivityEvent) -> Option<()>;
}
#[derive(Serialize, Deserialize, Debug)]
pub enum ActivityEvent {
Expand Down
6 changes: 5 additions & 1 deletion src/host/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM rust:1.73.0-slim-buster as build
WORKDIR /web-api
COPY . .

# install deps for building librdkafka
# installing deps for building librdkafka
RUN apt-get update && apt-get install -y build-essential \
openssl libssl-dev \
zlib1g \
Expand All @@ -14,5 +14,9 @@ RUN --mount=type=cache,target=/web-api/target cargo build --release && cp target

# api image
FROM debian:buster-slim as api

# installing netcat for check kafka readiness
RUN apt-get update && apt-get install -y netcat

COPY --from=build /host /host
CMD ["/host"]
1 change: 1 addition & 0 deletions src/infra/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
async-trait = { version = "0.1.73", features = [] }
mongodb = "2.7.0"
uuid = { version = "1.4.1", features = ["serde", "v4"] }
domain = { path = "../domain" }
domain_impl = { path = "../domain_impl" }
messaging = { path = "../messaging" }
6 changes: 4 additions & 2 deletions src/infra/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use async_trait::async_trait;
use domain::events::{ActivityEvent, IActivityTracker};
use messaging::kafka::IKafkaProducer;
use std::sync::Arc;
use uuid::Uuid;

#[derive(Clone)]
pub struct ActivityTracker {
Expand All @@ -16,7 +17,8 @@ impl ActivityTracker {

#[async_trait]
impl IActivityTracker for ActivityTracker {
async fn track(&self, activity: &ActivityEvent) {
self.producer.produce(activity).await;
async fn track(&self, activity: &ActivityEvent) -> Option<()> {
let uuid = Uuid::new_v4().to_string();
self.producer.produce(&uuid, activity).await.or(None)
}
}
2 changes: 1 addition & 1 deletion src/messaging/src/fake_kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<T: for<'a> Deserialize<'a> + Serialize + Send + Sync + Debug> IKafkaFactory

#[async_trait]
impl<T: Serialize + Send + Sync> IKafkaProducer<T> for FakeKafkaProducer {
async fn produce(&self, message: &T) -> Option<()> {
async fn produce(&self, _key: &str, message: &T) -> Option<()> {
let json = serde_json::to_string(message).ok()?;
self.sender.send(json).ok()?;
Some(())
Expand Down
11 changes: 5 additions & 6 deletions src/messaging/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub trait IKafkaProducer<T>: Sync + Send
where
T: Serialize + Send + Sync,
{
async fn produce(&self, message: &T) -> Option<()>;
async fn produce(&self, key: &str, message: &T) -> Option<()>;
}

#[async_trait]
Expand Down Expand Up @@ -91,11 +91,9 @@ impl KafkaProducer {

#[async_trait]
impl<T: Serialize + Send + Sync> IKafkaProducer<T> for KafkaProducer {
async fn produce(&self, message: &T) -> Option<()> {
let mut buffer: Vec<u8> = Vec::new();
serde_json::to_writer(&mut buffer, message).ok()?;

let record = FutureRecord::to(&self.topic).key("").payload(&buffer);
async fn produce(&self, key: &str, message: &T) -> Option<()> {
let json = serde_json::to_string(&message).ok()?;
let record = FutureRecord::to(&self.topic).key(key).payload(&json);

self.producer
.send(record, Duration::from_secs(0))
Expand All @@ -117,6 +115,7 @@ impl KafkaConsumer {
let consumer: StreamConsumer<CustomContext> = ClientConfig::new()
.set("group.id", config.group_id)
.set("bootstrap.servers", config.brokers)
.set("auto.offset.reset", "earliest")
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
Expand Down

0 comments on commit 1c5ace9

Please sign in to comment.