Skip to content

Commit

Permalink
Add Json producer support
Browse files Browse the repository at this point in the history
  • Loading branch information
msfidelis committed Feb 22, 2022
1 parent 1bb0871 commit d40abf1
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 10 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ kafka-stress --bootstrap-servers 0.0.0.0:9092 --events 10000 --topic kafka-stres

### Customize consumer-group name

Use `--ssl` to enable ssl authentication.
Use `--consumer-group` to change customer group name used by workers.

```bash
kafka-stress --bootstrap-servers 0.0.0.0:9092 --topic kafka-stress --test-mode consumer --consumer-group custom-consumer-group
Expand All @@ -155,10 +155,10 @@ kafka-stress --bootstrap-servers 0.0.0.0:9092 --topic kafka-stress --test-mode c

### SSL

Use `--ssl` to enable ssl authentication.
Use `--ssl-enabled` to enable ssl authentication.

```bash
kafka-stress --bootstrap-servers 0.0.0.0:9092 --topic kafka-stress --test-mode consumer --ssl
kafka-stress --bootstrap-servers 0.0.0.0:9092 --topic kafka-stress --test-mode consumer --ssl-enabled
```

## Roadmap
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module kafka-stress
go 1.15

require (
github.com/bxcodec/faker/v3 v3.7.0
github.com/google/uuid v1.2.0 // indirect
github.com/segmentio/kafka-go v0.4.16
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
github.com/bxcodec/faker v1.5.0 h1:RIWOeAcM3ZHye1i8bQtHU2LfNOaLmHuRiCo60mNMOcQ=
github.com/bxcodec/faker v2.0.1+incompatible h1:P0KUpUw5w6WJXwrPfv35oc91i4d8nf40Nwln+M/+faA=
github.com/bxcodec/faker/v3 v3.7.0 h1:qWAFFwcyVS0ukF0UoJju1wBLO0cuPQ7JdVBPggM8kNo=
github.com/bxcodec/faker/v3 v3.7.0/go.mod h1:gF31YgnMSMKgkvl+fyEo1xuSMbEuieyqfeslGYFjneM=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
Expand Down
20 changes: 16 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"kafka-stress/pkg/clients"
"kafka-stress/pkg/stringgenerator"
"kafka-stress/pkg/fakejson"

guuid "github.com/google/uuid"
kafka "github.com/segmentio/kafka-go"
Expand All @@ -31,6 +32,7 @@ func main() {
events := flag.Int("events", 10000, "Numer of events will be created in topic")
consumers := flag.Int("consumers", 1, "Number of consumers will be used in topic")
consumerGroup := flag.String("consumer-group", "kafka-stress", "Consumer group name")
format := flag.String("format", "string", "Events Format; ex string,json,avro")

flag.Parse()

Expand All @@ -40,7 +42,7 @@ func main() {

switch strings.ToLower(*testMode) {
case "producer":
produce(*bootstrapServers, *topic, *events, *size, *batchSize, *acks, *schemaRegistryURL, *schema, *ssl)
produce(*bootstrapServers, *topic, *events, *size, *batchSize, *acks, *schemaRegistryURL, *schema, *ssl, *format)
break
case "consumer":
consume(*bootstrapServers, *topic, *consumerGroup, *consumers, *ssl)
Expand All @@ -49,21 +51,32 @@ func main() {
}
}

func produce(bootstrapServers string, topic string, events int, size int, batchSize int, acks int, schemaRegistryURL string, schema string, ssl bool) {
func produce(bootstrapServers string, topic string, events int, size int, batchSize int, acks int, schemaRegistryURL string, schema string, ssl bool, format string) {

var wg sync.WaitGroup
var executions uint64
var errors uint64
var message string

producer := clients.GetProducer(bootstrapServers, topic, batchSize, acks, ssl)
defer producer.Close()

start := time.Now()
message := stringgenerator.RandStringBytes(size)

for i := 0; i < events; i++ {
wg.Add(1)

switch format {
case "string":
message = stringgenerator.RandStringBytes(size)
break;
case "json":
message = fakejson.RandJsonPayload()
break;
default:
message = stringgenerator.RandStringBytes(size)
}

go func() {
msg := kafka.Message{
Key: []byte(guuid.New().String()),
Expand All @@ -72,7 +85,6 @@ func produce(bootstrapServers string, topic string, events int, size int, batchS

err := producer.WriteMessages(context.Background(), msg)
if err != nil {
// fmt.Println(err)
atomic.AddUint64(&errors, 1)
} else {
atomic.AddUint64(&executions, 1)
Expand Down
49 changes: 49 additions & 0 deletions pkg/fakejson/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package fakejson

import (
"fmt"
"github.com/bxcodec/faker/v3"
"encoding/json"
)

type fake struct {
UserName string `faker:"username" json:"username"`
PhoneNumber string `faker:"phone_number" json:"phone_number"`
IPV4 string `faker:"ipv4" json:"ipv4"`
IPV6 string `faker:"ipv6" json:"ipv6"`
MacAddress string `faker:"mac_address" json:"mac_address"`
URL string `faker:"url" json: "url"`
DayOfWeek string `faker:"day_of_week" json: "day_of_week"`
DayOfMonth string `faker:"day_of_month" json: "day_of_month"`
Timestamp string `faker:"timestamp" json: "timestamp"`
Century string `faker:"century" json: "century"`
TimeZone string `faker:"timezone", json:"timezone"`
TimePeriod string `faker:"time_period" json:"time_period"`
Word string `faker:"word" json:"word"`
Sentence string `faker:"sentence" json:"sentence"`
Paragraph string `faker:"paragraph" json:"paragraph"`
Currency string `faker:"currency" json:"currency"`
Amount float64 `faker:"amount" json:"amount" `
AmountWithCurrency string `faker:"amount_with_currency" json:"amount_with_currency"`
UUIDHypenated string `faker:"uuid_hyphenated" json:"uuid_hyphenated"`
UUID string `faker:"uuid_digit" json:"uuid_digit"`
PaymentMethod string `faker:"oneof: cc, paypal, check, money order"`
}

func RandJsonPayload() string {

a := fake{}
err := faker.FakeData(&a)
if err != nil {
fmt.Println(err)
return "{}"
}

b, err := json.Marshal(a)
if err != nil {
fmt.Println(err)
return "{}"
}
return string(b)

}
3 changes: 0 additions & 3 deletions pkg/stringgenerator/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package stringgenerator

import (
"fmt"
"math/rand"
"unsafe"
)

const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
Expand All @@ -14,6 +12,5 @@ func RandStringBytes(n int) string {
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
fmt.Println(unsafe.Sizeof(b))
return string(b)
}

0 comments on commit d40abf1

Please sign in to comment.