From 4bc5b7522b30592eb768ee6dcdbef022bcacb1fb Mon Sep 17 00:00:00 2001 From: Timofey Myagkikh Date: Sun, 26 Nov 2023 21:22:42 +0300 Subject: [PATCH] fixed messages distribution across partitions --- deploy_helm.sh | 2 +- helm/templates/acl/deployment.yml | 8 ++++++++ helm/templates/api/deployment.yml | 8 ++++++++ helm/templates/kafka/deployment.yml | 22 +++++++++++++++++++--- helm/templates/mongodb/deployment.yml | 2 ++ helm/templates/zookeeper/deployment.yml | 25 ++++++++++++++++--------- helm/values.yaml | 23 ++++++++++++++++------- src/Cargo.lock | 1 + src/acl/Dockerfile | 6 +++++- src/api/src/endpoints.rs | 9 ++++++--- src/docker-compose.yml | 1 + src/domain/src/events.rs | 2 +- src/host/Dockerfile | 6 +++++- src/infra/Cargo.toml | 1 + src/infra/src/tracker.rs | 6 ++++-- src/messaging/src/fake_kafka.rs | 2 +- src/messaging/src/kafka.rs | 11 +++++------ 17 files changed, 100 insertions(+), 35 deletions(-) diff --git a/deploy_helm.sh b/deploy_helm.sh index dab2841..2013a29 100755 --- a/deploy_helm.sh +++ b/deploy_helm.sh @@ -1,3 +1,3 @@ #!/bin/bash -helm install web-api-dev helm \ No newline at end of file +helm upgrade --install --atomic --timeout 300s --wait web-api-dev helm \ No newline at end of file diff --git a/helm/templates/acl/deployment.yml b/helm/templates/acl/deployment.yml index 717b97c..f5c2744 100644 --- a/helm/templates/acl/deployment.yml +++ b/helm/templates/acl/deployment.yml @@ -2,6 +2,8 @@ apiVersion: apps/v1 kind: Deployment metadata: name: {{ .Values.acl.label }}-deployment + annotations: + "helm.sh/hook-weight": "{{ .Values.acl.weight }}" spec: replicas: {{ .Values.acl.replicas }} selector: @@ -15,6 +17,12 @@ spec: containers: - name: {{ .Values.acl.image.name }} image: "{{ .Values.acl.image.name }}:{{ .Values.acl.image.tag }}" + readinessProbe: + exec: + command: + - sh + - -c + - "nc -z kafka-service 9092 || exit -1" env: - name: KAFKA_BROKERS value: {{ .Values.acl.kafka.brokers }} diff --git a/helm/templates/api/deployment.yml b/helm/templates/api/deployment.yml index 958e38e..b77d9c0 100644 --- a/helm/templates/api/deployment.yml +++ b/helm/templates/api/deployment.yml @@ -2,6 +2,8 @@ apiVersion: apps/v1 kind: Deployment metadata: name: {{ .Values.api.label }}-deployment + annotations: + "helm.sh/hook-weight": "{{ .Values.api.weight }}" spec: selector: matchLabels: @@ -14,6 +16,12 @@ spec: containers: - name: {{ .Values.api.image.name }} image: "{{ .Values.api.image.name }}:{{ .Values.api.image.tag }}" + readinessProbe: + exec: + command: + - sh + - -c + - "nc -z kafka-service 9092 || exit -1" env: - name: DB_URI value: {{ .Values.api.mongodb.uri }} diff --git a/helm/templates/kafka/deployment.yml b/helm/templates/kafka/deployment.yml index b8c8af1..8e0a9f6 100644 --- a/helm/templates/kafka/deployment.yml +++ b/helm/templates/kafka/deployment.yml @@ -2,6 +2,8 @@ apiVersion: apps/v1 kind: Deployment metadata: name: {{ .Values.kafka.label }}-deployment + annotations: + "helm.sh/hook-weight": "{{ .Values.kafka.weight }}" spec: replicas: {{ .Values.kafka.replicas }} selector: @@ -18,11 +20,25 @@ spec: value: "{{ .Values.kafka.variables.brokerId }}" - name: KAFKA_ZOOKEEPER_CONNECT value: {{ .Values.kafka.variables.zookeeperConnect }} - - name: KAFKA_LISTENERS - value: {{ .Values.kafka.variables.listeners }} + - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP + value: {{ .Values.kafka.variables.securityProtocolMap }} - name: KAFKA_ADVERTISED_LISTENERS value: {{ .Values.kafka.variables.advertisedListeners }} - image: {{ .Values.kafka.image.name }} + - name: KAFKA_LISTENERS + value: {{ .Values.kafka.variables.listeners }} + - name: KAFKA_NUM_PARTITIONS + value: "{{ .Values.kafka.variables.numPartitions }}" + - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR + value: "{{ .Values.kafka.variables.replicationFactor }}" + image: {{ .Values.kafka.image.name }}:{{ .Values.kafka.image.tag }} + readinessProbe: + exec: + command: + - sh + - -c + - "nc -z localhost 9092 || exit -1" + initialDelaySeconds: 5 + periodSeconds: 5 imagePullPolicy: {{ .Values.kafka.pullPolicy }} name: {{ .Values.kafka.label }} resources: diff --git a/helm/templates/mongodb/deployment.yml b/helm/templates/mongodb/deployment.yml index 66fc877..8cc9b03 100644 --- a/helm/templates/mongodb/deployment.yml +++ b/helm/templates/mongodb/deployment.yml @@ -2,6 +2,8 @@ apiVersion: apps/v1 kind: Deployment metadata: name: {{ .Values.mongodb.label }}-deployment + annotations: + "helm.sh/hook-weight": "{{ .Values.mongodb.weight }}" spec: selector: matchLabels: diff --git a/helm/templates/zookeeper/deployment.yml b/helm/templates/zookeeper/deployment.yml index f0d843d..570b42c 100644 --- a/helm/templates/zookeeper/deployment.yml +++ b/helm/templates/zookeeper/deployment.yml @@ -2,6 +2,8 @@ apiVersion: apps/v1 kind: Deployment metadata: name: {{ .Values.zookeeper.label }}-deployment + annotations: + "helm.sh/hook-weight": "{{ .Values.zookeeper.weight }}" spec: selector: matchLabels: @@ -12,12 +14,17 @@ spec: app: {{ .Values.zookeeper.label }} spec: containers: - - image: {{ .Values.zookeeper.image.name }} - imagePullPolicy: {{ .Values.zookeeper.pullPolicy }} - name: {{ .Values.zookeeper.label }} - ports: - - containerPort: {{ .Values.zookeeper.port }} - resources: - limits: - memory: {{ .Values.zookeeper.resources.limits.memory }} - cpu: {{ .Values.zookeeper.resources.limits.cpu }} + - env: + - name: ZOOKEEPER_CLIENT_PORT + value: "{{ .Values.zookeeper.port }}" + - name: ZOOKEEPER_TICK_TIME + value: "{{ .Values.zookeeper.tickTime }}" + image: {{ .Values.zookeeper.image.name }}:{{ .Values.zookeeper.image.tag }} + imagePullPolicy: {{ .Values.zookeeper.pullPolicy }} + name: {{ .Values.zookeeper.label }} + ports: + - containerPort: {{ .Values.zookeeper.port }} + resources: + limits: + memory: {{ .Values.zookeeper.resources.limits.memory }} + cpu: {{ .Values.zookeeper.resources.limits.cpu }} diff --git a/helm/values.yaml b/helm/values.yaml index 8b266bd..e05a984 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -1,6 +1,7 @@ mongodb: label: mongodb replicas: 1 + weight: 2 port: 27017 image: name: mongo @@ -15,28 +16,34 @@ mongodb: kafka: label: kafka replicas: 1 + weight: 1 port: 9092 image: - name: wurstmeister/kafka - tag: latest + name: confluentinc/cp-kafka + tag: 7.0.1 pullPolicy: IfNotPresent resources: limits: memory: "512Mi" cpu: "500m" variables: - listeners: PLAINTEXT://:9092 + brokerId: 1 zookeeperConnect: zookeeper-service:2181 + securityProtocolMap: PLAINTEXT:PLAINTEXT advertisedListeners: PLAINTEXT://kafka-service:9092 - brokerId: "1" + listeners: PLAINTEXT://:9092 + numPartitions: &patitions 2 + replicationFactor: 1 zookeeper: label: zookeeper replicas: 1 + weight: 0 port: 2181 + tickTime: 2000 image: - name: wurstmeister/zookeeper - tag: latest + name: confluentinc/cp-zookeeper + tag: 7.0.1 pullPolicy: IfNotPresent resources: limits: @@ -45,7 +52,8 @@ zookeeper: acl: label: acl - replicas: 5 + replicas: *patitions + weight: 3 kafka: brokers: kafka-service:9092 topic: messages @@ -62,6 +70,7 @@ acl: api: label: api replicas: 1 + weight: 4 port: 8080 image: name: api diff --git a/src/Cargo.lock b/src/Cargo.lock index 8a86bcd..69f5983 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1099,6 +1099,7 @@ dependencies = [ "domain_impl", "messaging", "mongodb", + "uuid", ] [[package]] diff --git a/src/acl/Dockerfile b/src/acl/Dockerfile index 5329287..b2d9da2 100644 --- a/src/acl/Dockerfile +++ b/src/acl/Dockerfile @@ -3,7 +3,7 @@ FROM rust:1.73.0-slim-buster as build WORKDIR /web-api COPY . . -# install deps for building librdkafka +# installing deps for building librdkafka RUN apt-get update && apt-get install -y build-essential \ openssl libssl-dev \ zlib1g \ @@ -14,5 +14,9 @@ RUN --mount=type=cache,target=/web-api/target cargo build --release && cp target # acl image FROM debian:buster-slim as acl + +# installing netcat for check kafka readiness +RUN apt-get update && apt-get install -y netcat + COPY --from=build /acl /acl CMD ["/acl"] \ No newline at end of file diff --git a/src/api/src/endpoints.rs b/src/api/src/endpoints.rs index d2b3544..69cc39b 100644 --- a/src/api/src/endpoints.rs +++ b/src/api/src/endpoints.rs @@ -86,10 +86,13 @@ pub async fn track_activity( ) -> HttpResponse { let request = json.into_inner(); - match request { + let result = match request { Click { x, y } => tracker.track(&ActivityEvent::Click { x, y }).await, Open { path } => tracker.track(&ActivityEvent::Open { p: path }).await, - } + }; - HttpResponse::Ok().json(()) + match result { + Some(_) => HttpResponse::Ok().json(()), + None => HttpResponse::BadRequest().json(ErrorResponse { code: 103 }) + } } diff --git a/src/docker-compose.yml b/src/docker-compose.yml index 04d880a..09acf4e 100644 --- a/src/docker-compose.yml +++ b/src/docker-compose.yml @@ -74,6 +74,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://0.0.0.0:9092 + KAFKA_NUM_PARTITIONS: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 diff --git a/src/domain/src/events.rs b/src/domain/src/events.rs index d3b7670..73c068b 100644 --- a/src/domain/src/events.rs +++ b/src/domain/src/events.rs @@ -3,7 +3,7 @@ use serde_derive::{Deserialize, Serialize}; #[async_trait] pub trait IActivityTracker: Send + Sync { - async fn track(&self, activity: &ActivityEvent); + async fn track(&self, activity: &ActivityEvent) -> Option<()>; } #[derive(Serialize, Deserialize, Debug)] pub enum ActivityEvent { diff --git a/src/host/Dockerfile b/src/host/Dockerfile index c6af9a3..5471332 100644 --- a/src/host/Dockerfile +++ b/src/host/Dockerfile @@ -3,7 +3,7 @@ FROM rust:1.73.0-slim-buster as build WORKDIR /web-api COPY . . -# install deps for building librdkafka +# installing deps for building librdkafka RUN apt-get update && apt-get install -y build-essential \ openssl libssl-dev \ zlib1g \ @@ -14,5 +14,9 @@ RUN --mount=type=cache,target=/web-api/target cargo build --release && cp target # api image FROM debian:buster-slim as api + +# installing netcat for check kafka readiness +RUN apt-get update && apt-get install -y netcat + COPY --from=build /host /host CMD ["/host"] \ No newline at end of file diff --git a/src/infra/Cargo.toml b/src/infra/Cargo.toml index ab866b1..50e037d 100644 --- a/src/infra/Cargo.toml +++ b/src/infra/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] async-trait = { version = "0.1.73", features = [] } mongodb = "2.7.0" +uuid = { version = "1.4.1", features = ["serde", "v4"] } domain = { path = "../domain" } domain_impl = { path = "../domain_impl" } messaging = { path = "../messaging" } \ No newline at end of file diff --git a/src/infra/src/tracker.rs b/src/infra/src/tracker.rs index 5cd1533..e8ade9f 100644 --- a/src/infra/src/tracker.rs +++ b/src/infra/src/tracker.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use domain::events::{ActivityEvent, IActivityTracker}; use messaging::kafka::IKafkaProducer; use std::sync::Arc; +use uuid::Uuid; #[derive(Clone)] pub struct ActivityTracker { @@ -16,7 +17,8 @@ impl ActivityTracker { #[async_trait] impl IActivityTracker for ActivityTracker { - async fn track(&self, activity: &ActivityEvent) { - self.producer.produce(activity).await; + async fn track(&self, activity: &ActivityEvent) -> Option<()> { + let uuid = Uuid::new_v4().to_string(); + self.producer.produce(&uuid, activity).await.or(None) } } diff --git a/src/messaging/src/fake_kafka.rs b/src/messaging/src/fake_kafka.rs index f1fb3f5..3ae3056 100644 --- a/src/messaging/src/fake_kafka.rs +++ b/src/messaging/src/fake_kafka.rs @@ -42,7 +42,7 @@ impl Deserialize<'a> + Serialize + Send + Sync + Debug> IKafkaFactory #[async_trait] impl IKafkaProducer for FakeKafkaProducer { - async fn produce(&self, message: &T) -> Option<()> { + async fn produce(&self, _key: &str, message: &T) -> Option<()> { let json = serde_json::to_string(message).ok()?; self.sender.send(json).ok()?; Some(()) diff --git a/src/messaging/src/kafka.rs b/src/messaging/src/kafka.rs index f18f600..9118e6a 100644 --- a/src/messaging/src/kafka.rs +++ b/src/messaging/src/kafka.rs @@ -61,7 +61,7 @@ pub trait IKafkaProducer: Sync + Send where T: Serialize + Send + Sync, { - async fn produce(&self, message: &T) -> Option<()>; + async fn produce(&self, key: &str, message: &T) -> Option<()>; } #[async_trait] @@ -91,11 +91,9 @@ impl KafkaProducer { #[async_trait] impl IKafkaProducer for KafkaProducer { - async fn produce(&self, message: &T) -> Option<()> { - let mut buffer: Vec = Vec::new(); - serde_json::to_writer(&mut buffer, message).ok()?; - - let record = FutureRecord::to(&self.topic).key("").payload(&buffer); + async fn produce(&self, key: &str, message: &T) -> Option<()> { + let json = serde_json::to_string(&message).ok()?; + let record = FutureRecord::to(&self.topic).key(key).payload(&json); self.producer .send(record, Duration::from_secs(0)) @@ -117,6 +115,7 @@ impl KafkaConsumer { let consumer: StreamConsumer = ClientConfig::new() .set("group.id", config.group_id) .set("bootstrap.servers", config.brokers) + .set("auto.offset.reset", "earliest") .set("enable.partition.eof", "false") .set("session.timeout.ms", "6000") .set("enable.auto.commit", "true")