Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed messages distribution across partitions #4

Merged
merged 1 commit into from
Nov 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deploy_helm.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/bin/bash

helm install web-api-dev helm
helm upgrade --install --atomic --timeout 300s --wait web-api-dev helm
8 changes: 8 additions & 0 deletions helm/templates/acl/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.acl.label }}-deployment
annotations:
"helm.sh/hook-weight": "{{ .Values.acl.weight }}"
spec:
replicas: {{ .Values.acl.replicas }}
selector:
Expand All @@ -15,6 +17,12 @@ spec:
containers:
- name: {{ .Values.acl.image.name }}
image: "{{ .Values.acl.image.name }}:{{ .Values.acl.image.tag }}"
readinessProbe:
exec:
command:
- sh
- -c
- "nc -z kafka-service 9092 || exit -1"
env:
- name: KAFKA_BROKERS
value: {{ .Values.acl.kafka.brokers }}
Expand Down
8 changes: 8 additions & 0 deletions helm/templates/api/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.api.label }}-deployment
annotations:
"helm.sh/hook-weight": "{{ .Values.api.weight }}"
spec:
selector:
matchLabels:
Expand All @@ -14,6 +16,12 @@ spec:
containers:
- name: {{ .Values.api.image.name }}
image: "{{ .Values.api.image.name }}:{{ .Values.api.image.tag }}"
readinessProbe:
exec:
command:
- sh
- -c
- "nc -z kafka-service 9092 || exit -1"
env:
- name: DB_URI
value: {{ .Values.api.mongodb.uri }}
Expand Down
22 changes: 19 additions & 3 deletions helm/templates/kafka/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.kafka.label }}-deployment
annotations:
"helm.sh/hook-weight": "{{ .Values.kafka.weight }}"
spec:
replicas: {{ .Values.kafka.replicas }}
selector:
Expand All @@ -18,11 +20,25 @@ spec:
value: "{{ .Values.kafka.variables.brokerId }}"
- name: KAFKA_ZOOKEEPER_CONNECT
value: {{ .Values.kafka.variables.zookeeperConnect }}
- name: KAFKA_LISTENERS
value: {{ .Values.kafka.variables.listeners }}
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: {{ .Values.kafka.variables.securityProtocolMap }}
- name: KAFKA_ADVERTISED_LISTENERS
value: {{ .Values.kafka.variables.advertisedListeners }}
image: {{ .Values.kafka.image.name }}
- name: KAFKA_LISTENERS
value: {{ .Values.kafka.variables.listeners }}
- name: KAFKA_NUM_PARTITIONS
value: "{{ .Values.kafka.variables.numPartitions }}"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "{{ .Values.kafka.variables.replicationFactor }}"
image: {{ .Values.kafka.image.name }}:{{ .Values.kafka.image.tag }}
readinessProbe:
exec:
command:
- sh
- -c
- "nc -z localhost 9092 || exit -1"
initialDelaySeconds: 5
periodSeconds: 5
imagePullPolicy: {{ .Values.kafka.pullPolicy }}
name: {{ .Values.kafka.label }}
resources:
Expand Down
2 changes: 2 additions & 0 deletions helm/templates/mongodb/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.mongodb.label }}-deployment
annotations:
"helm.sh/hook-weight": "{{ .Values.mongodb.weight }}"
spec:
selector:
matchLabels:
Expand Down
25 changes: 16 additions & 9 deletions helm/templates/zookeeper/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.zookeeper.label }}-deployment
annotations:
"helm.sh/hook-weight": "{{ .Values.zookeeper.weight }}"
spec:
selector:
matchLabels:
Expand All @@ -12,12 +14,17 @@ spec:
app: {{ .Values.zookeeper.label }}
spec:
containers:
- image: {{ .Values.zookeeper.image.name }}
imagePullPolicy: {{ .Values.zookeeper.pullPolicy }}
name: {{ .Values.zookeeper.label }}
ports:
- containerPort: {{ .Values.zookeeper.port }}
resources:
limits:
memory: {{ .Values.zookeeper.resources.limits.memory }}
cpu: {{ .Values.zookeeper.resources.limits.cpu }}
- env:
- name: ZOOKEEPER_CLIENT_PORT
value: "{{ .Values.zookeeper.port }}"
- name: ZOOKEEPER_TICK_TIME
value: "{{ .Values.zookeeper.tickTime }}"
image: {{ .Values.zookeeper.image.name }}:{{ .Values.zookeeper.image.tag }}
imagePullPolicy: {{ .Values.zookeeper.pullPolicy }}
name: {{ .Values.zookeeper.label }}
ports:
- containerPort: {{ .Values.zookeeper.port }}
resources:
limits:
memory: {{ .Values.zookeeper.resources.limits.memory }}
cpu: {{ .Values.zookeeper.resources.limits.cpu }}
23 changes: 16 additions & 7 deletions helm/values.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mongodb:
label: mongodb
replicas: 1
weight: 2
port: 27017
image:
name: mongo
Expand All @@ -15,28 +16,34 @@ mongodb:
kafka:
label: kafka
replicas: 1
weight: 1
port: 9092
image:
name: wurstmeister/kafka
tag: latest
name: confluentinc/cp-kafka
tag: 7.0.1
pullPolicy: IfNotPresent
resources:
limits:
memory: "512Mi"
cpu: "500m"
variables:
listeners: PLAINTEXT://:9092
brokerId: 1
zookeeperConnect: zookeeper-service:2181
securityProtocolMap: PLAINTEXT:PLAINTEXT
advertisedListeners: PLAINTEXT://kafka-service:9092
brokerId: "1"
listeners: PLAINTEXT://:9092
numPartitions: &patitions 2
replicationFactor: 1

zookeeper:
label: zookeeper
replicas: 1
weight: 0
port: 2181
tickTime: 2000
image:
name: wurstmeister/zookeeper
tag: latest
name: confluentinc/cp-zookeeper
tag: 7.0.1
pullPolicy: IfNotPresent
resources:
limits:
Expand All @@ -45,7 +52,8 @@ zookeeper:

acl:
label: acl
replicas: 5
replicas: *patitions
weight: 3
kafka:
brokers: kafka-service:9092
topic: messages
Expand All @@ -62,6 +70,7 @@ acl:
api:
label: api
replicas: 1
weight: 4
port: 8080
image:
name: api
Expand Down
1 change: 1 addition & 0 deletions src/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion src/acl/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM rust:1.73.0-slim-buster as build
WORKDIR /web-api
COPY . .

# install deps for building librdkafka
# installing deps for building librdkafka
RUN apt-get update && apt-get install -y build-essential \
openssl libssl-dev \
zlib1g \
Expand All @@ -14,5 +14,9 @@ RUN --mount=type=cache,target=/web-api/target cargo build --release && cp target

# acl image
FROM debian:buster-slim as acl

# installing netcat for check kafka readiness
RUN apt-get update && apt-get install -y netcat

COPY --from=build /acl /acl
CMD ["/acl"]
9 changes: 6 additions & 3 deletions src/api/src/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,13 @@ pub async fn track_activity(
) -> HttpResponse {
let request = json.into_inner();

match request {
let result = match request {
Click { x, y } => tracker.track(&ActivityEvent::Click { x, y }).await,
Open { path } => tracker.track(&ActivityEvent::Open { p: path }).await,
}
};

HttpResponse::Ok().json(())
match result {
Some(_) => HttpResponse::Ok().json(()),
None => HttpResponse::BadRequest().json(ErrorResponse { code: 103 })
}
}
1 change: 1 addition & 0 deletions src/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ services:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_NUM_PARTITIONS: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
Expand Down
2 changes: 1 addition & 1 deletion src/domain/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use serde_derive::{Deserialize, Serialize};

#[async_trait]
pub trait IActivityTracker: Send + Sync {
async fn track(&self, activity: &ActivityEvent);
async fn track(&self, activity: &ActivityEvent) -> Option<()>;
}
#[derive(Serialize, Deserialize, Debug)]
pub enum ActivityEvent {
Expand Down
6 changes: 5 additions & 1 deletion src/host/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM rust:1.73.0-slim-buster as build
WORKDIR /web-api
COPY . .

# install deps for building librdkafka
# installing deps for building librdkafka
RUN apt-get update && apt-get install -y build-essential \
openssl libssl-dev \
zlib1g \
Expand All @@ -14,5 +14,9 @@ RUN --mount=type=cache,target=/web-api/target cargo build --release && cp target

# api image
FROM debian:buster-slim as api

# installing netcat for check kafka readiness
RUN apt-get update && apt-get install -y netcat

COPY --from=build /host /host
CMD ["/host"]
1 change: 1 addition & 0 deletions src/infra/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
async-trait = { version = "0.1.73", features = [] }
mongodb = "2.7.0"
uuid = { version = "1.4.1", features = ["serde", "v4"] }
domain = { path = "../domain" }
domain_impl = { path = "../domain_impl" }
messaging = { path = "../messaging" }
6 changes: 4 additions & 2 deletions src/infra/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use async_trait::async_trait;
use domain::events::{ActivityEvent, IActivityTracker};
use messaging::kafka::IKafkaProducer;
use std::sync::Arc;
use uuid::Uuid;

#[derive(Clone)]
pub struct ActivityTracker {
Expand All @@ -16,7 +17,8 @@ impl ActivityTracker {

#[async_trait]
impl IActivityTracker for ActivityTracker {
async fn track(&self, activity: &ActivityEvent) {
self.producer.produce(activity).await;
async fn track(&self, activity: &ActivityEvent) -> Option<()> {
let uuid = Uuid::new_v4().to_string();
self.producer.produce(&uuid, activity).await.or(None)
}
}
2 changes: 1 addition & 1 deletion src/messaging/src/fake_kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<T: for<'a> Deserialize<'a> + Serialize + Send + Sync + Debug> IKafkaFactory

#[async_trait]
impl<T: Serialize + Send + Sync> IKafkaProducer<T> for FakeKafkaProducer {
async fn produce(&self, message: &T) -> Option<()> {
async fn produce(&self, _key: &str, message: &T) -> Option<()> {
let json = serde_json::to_string(message).ok()?;
self.sender.send(json).ok()?;
Some(())
Expand Down
11 changes: 5 additions & 6 deletions src/messaging/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub trait IKafkaProducer<T>: Sync + Send
where
T: Serialize + Send + Sync,
{
async fn produce(&self, message: &T) -> Option<()>;
async fn produce(&self, key: &str, message: &T) -> Option<()>;
}

#[async_trait]
Expand Down Expand Up @@ -91,11 +91,9 @@ impl KafkaProducer {

#[async_trait]
impl<T: Serialize + Send + Sync> IKafkaProducer<T> for KafkaProducer {
async fn produce(&self, message: &T) -> Option<()> {
let mut buffer: Vec<u8> = Vec::new();
serde_json::to_writer(&mut buffer, message).ok()?;

let record = FutureRecord::to(&self.topic).key("").payload(&buffer);
async fn produce(&self, key: &str, message: &T) -> Option<()> {
let json = serde_json::to_string(&message).ok()?;
let record = FutureRecord::to(&self.topic).key(key).payload(&json);

self.producer
.send(record, Duration::from_secs(0))
Expand All @@ -117,6 +115,7 @@ impl KafkaConsumer {
let consumer: StreamConsumer<CustomContext> = ClientConfig::new()
.set("group.id", config.group_id)
.set("bootstrap.servers", config.brokers)
.set("auto.offset.reset", "earliest")
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
Expand Down