Skip to content

Commit

Permalink
Merge pull request #5 from msfidelis/schema-registry
Browse files Browse the repository at this point in the history
Schema registry
  • Loading branch information
msfidelis authored Jun 27, 2021
2 parents ceb604a + 83b025b commit dd45389
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 33 deletions.
Binary file added .github/workflows/assets/img/hash.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added .github/workflows/assets/img/leastbytes.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Tests finished in 1.232463918s. Producer mean time 8113.83/s
## Roadmap

* Improve reports output
* Add execution time on reports
* Add execution time on reports - :check:
* Add retry mechanism
* Add message size options
* Add test header
Expand All @@ -57,7 +57,7 @@ Tests finished in 1.232463918s. Producer mean time 8113.83/s
* Add TLS authetication
* Add IAM authentication for Amazon MSK
* Add Unit Tests
* Add consume tests
* Add consume tests - :check:
* Add time based tests
* Add goreleaser pipeline
* Add Docker image build
Expand Down
78 changes: 50 additions & 28 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,47 +1,69 @@
version: '3.2'

version: "3"

services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
image: confluentinc/cp-zookeeper:5.2.5
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
networks:
- developer

kafka:
image: confluentinc/cp-kafka:latest
hostname: kafka
broker:
image: confluentinc/cp-server:5.4.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,PLAINTEXT_HOST://kafka:9092,
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
extra_hosts:
- "moby:127.0.0.1"
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: "true"
CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"

schema-registry:
image: confluentinc/cp-schema-registry:6.1.2
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- 9092:9092
- 29092:29092
networks:
- developer
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"

kafdrop:
image: obsidiandynamics/kafdrop:latest
networks:
- developer
control-center:
image: confluentinc/cp-enterprise-control-center:5.4.0
hostname: control-center
container_name: control-center
depends_on:
- kafka
- zookeeper
- broker
- schema-registry
ports:
- 9000:9000
- "9021:9021"
environment:
KAFKA_BROKERCONNECT: kafka:9092

networks:
developer:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
67 changes: 67 additions & 0 deletions docs/DEVELOPMENT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Setup local environment

### Setup Development Dependencies with Docker

```bash
docker-compose up --force-recreate
```

### Create a test topic


```bash
docker-compose exec broker kafka-topics --create --topic kafka-stress --partitions 3 --replication-factor 1 --if-not-exists --zookeeper zookeeper:2181
```


# Schema Registry

## Utils

[Json to AVRO Converter](https://toolslick.com/generation/metadata/avro-schema-from-json)

## List Schemas

```bash
curl -X GET http://0.0.0.0:8081/subjects
```

## AVRO

Create and Schema in AVRO format

```
curl http://0.0.0.0:8081/subjects/example/versions -X POST \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '
{
"schema":"{\n \"name\":\"Example\",\n \"type\":\"record\",\n \"namespace\":\"com.acme.avro\",\n \"fields\":[\n {\n \"name\":\"name\",\n \"type\":\"string\"\n },\n {\n \"name\":\"age\",\n \"type\":\"int\"\n }\n ]\n }"
}
'
```



# Consumer

## Algoritms

Producing 15000 events in 3 partitions topic

### Hash

```bash
❯ go run main.go --bootstrap-servers 0.0.0.0:9092 --events 15000 --topic brabo --test-mode producer --consumers 3
Sent 15000 messages to topic brabo with 0 errors
Tests finished in 1.406677516s. Producer mean time 10663.42/s
```

### LeastBytes

```bash
❯ go run main.go --bootstrap-servers 0.0.0.0:9092 --events 15000 --topic brabo --test-mode producer --consumers 3
Sent 15000 messages to topic brabo with 0 errors
Tests finished in 885.728755ms. Producer mean time 16935.21/s
```

[!LeastBytes]()
7 changes: 4 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func main() {
testMode := flag.String("test-mode", "producer", "Test Type; Ex producer;consumer. Default: producer")
bootstrapServers := flag.String("bootstrap-servers", "0.0.0.0:9092", "Kafka Bootstrap Servers Broker Lists")
zookeeperServers := flag.String("zookeeper-servers", "0.0.0.0:2181", "Zookeeper Connection String")
schemaRegistryURL := flag.String("schema-registry", "0.0.0.0:8081", "Schema Registry URL")
schema := flag.String("schema", "", "Schema")
events := flag.Int("events", 10000, "Numer of events will be created in topic")
consumers := flag.Int("consumers", 1, "Number of consumers will be used in topic")
consumerGroup := flag.String("consumer-group", "kafka-stress", "Consumer group name")
Expand All @@ -32,7 +34,7 @@ func main() {

switch strings.ToLower(*testMode) {
case "producer":
produce(*bootstrapServers, *topic, *events)
produce(*bootstrapServers, *topic, *events, *schemaRegistryURL, *schema)
break
case "consumer":

Expand All @@ -41,15 +43,14 @@ func main() {
go consume(*bootstrapServers, *topic, *consumerGroup, consumerID)
}


consume(*bootstrapServers, *topic, *consumerGroup, *consumers)
break
default:
return
}
}

func produce(bootstrapServers string, topic string, events int) {
func produce(bootstrapServers string, topic string, events int, schemaRegistryURL string, schema string) {
producer := getProducer(bootstrapServers, topic)

defer producer.Close()
Expand Down

0 comments on commit dd45389

Please sign in to comment.