diff --git a/README.md b/README.md index ba8c7d1..34458c5 100644 --- a/README.md +++ b/README.md @@ -145,7 +145,7 @@ kafka-stress --bootstrap-servers 0.0.0.0:9092 --events 10000 --topic kafka-stres ### Customize consumer-group name -Use `--ssl` to enable ssl authentication. +Use `--consumer-group` to change customer group name used by workers. ```bash kafka-stress --bootstrap-servers 0.0.0.0:9092 --topic kafka-stress --test-mode consumer --consumer-group custom-consumer-group @@ -155,10 +155,10 @@ kafka-stress --bootstrap-servers 0.0.0.0:9092 --topic kafka-stress --test-mode c ### SSL -Use `--ssl` to enable ssl authentication. +Use `--ssl-enabled` to enable ssl authentication. ```bash -kafka-stress --bootstrap-servers 0.0.0.0:9092 --topic kafka-stress --test-mode consumer --ssl +kafka-stress --bootstrap-servers 0.0.0.0:9092 --topic kafka-stress --test-mode consumer --ssl-enabled ``` ## Roadmap diff --git a/go.mod b/go.mod index cc1d38c..db03f0f 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module kafka-stress go 1.15 require ( + github.com/bxcodec/faker/v3 v3.7.0 github.com/google/uuid v1.2.0 // indirect github.com/segmentio/kafka-go v0.4.16 golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect diff --git a/go.sum b/go.sum index b2eb760..7303558 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +github.com/bxcodec/faker v1.5.0 h1:RIWOeAcM3ZHye1i8bQtHU2LfNOaLmHuRiCo60mNMOcQ= +github.com/bxcodec/faker v2.0.1+incompatible h1:P0KUpUw5w6WJXwrPfv35oc91i4d8nf40Nwln+M/+faA= +github.com/bxcodec/faker/v3 v3.7.0 h1:qWAFFwcyVS0ukF0UoJju1wBLO0cuPQ7JdVBPggM8kNo= +github.com/bxcodec/faker/v3 v3.7.0/go.mod h1:gF31YgnMSMKgkvl+fyEo1xuSMbEuieyqfeslGYFjneM= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= diff --git a/main.go b/main.go index 9da02b3..1845bab 100644 --- a/main.go +++ b/main.go @@ -11,6 +11,7 @@ import ( "kafka-stress/pkg/clients" "kafka-stress/pkg/stringgenerator" + "kafka-stress/pkg/fakejson" guuid "github.com/google/uuid" kafka "github.com/segmentio/kafka-go" @@ -31,6 +32,7 @@ func main() { events := flag.Int("events", 10000, "Numer of events will be created in topic") consumers := flag.Int("consumers", 1, "Number of consumers will be used in topic") consumerGroup := flag.String("consumer-group", "kafka-stress", "Consumer group name") + format := flag.String("format", "string", "Events Format; ex string,json,avro") flag.Parse() @@ -40,7 +42,7 @@ func main() { switch strings.ToLower(*testMode) { case "producer": - produce(*bootstrapServers, *topic, *events, *size, *batchSize, *acks, *schemaRegistryURL, *schema, *ssl) + produce(*bootstrapServers, *topic, *events, *size, *batchSize, *acks, *schemaRegistryURL, *schema, *ssl, *format) break case "consumer": consume(*bootstrapServers, *topic, *consumerGroup, *consumers, *ssl) @@ -49,21 +51,32 @@ func main() { } } -func produce(bootstrapServers string, topic string, events int, size int, batchSize int, acks int, schemaRegistryURL string, schema string, ssl bool) { +func produce(bootstrapServers string, topic string, events int, size int, batchSize int, acks int, schemaRegistryURL string, schema string, ssl bool, format string) { var wg sync.WaitGroup var executions uint64 var errors uint64 + var message string producer := clients.GetProducer(bootstrapServers, topic, batchSize, acks, ssl) defer producer.Close() start := time.Now() - message := stringgenerator.RandStringBytes(size) for i := 0; i < events; i++ { wg.Add(1) + switch format { + case "string": + message = stringgenerator.RandStringBytes(size) + break; + case "json": + message = fakejson.RandJsonPayload() + break; + default: + message = stringgenerator.RandStringBytes(size) + } + go func() { msg := kafka.Message{ Key: []byte(guuid.New().String()), @@ -72,7 +85,6 @@ func produce(bootstrapServers string, topic string, events int, size int, batchS err := producer.WriteMessages(context.Background(), msg) if err != nil { - // fmt.Println(err) atomic.AddUint64(&errors, 1) } else { atomic.AddUint64(&executions, 1) diff --git a/pkg/fakejson/main.go b/pkg/fakejson/main.go new file mode 100644 index 0000000..fd88650 --- /dev/null +++ b/pkg/fakejson/main.go @@ -0,0 +1,49 @@ +package fakejson + +import ( + "fmt" + "github.com/bxcodec/faker/v3" + "encoding/json" +) + +type fake struct { + UserName string `faker:"username" json:"username"` + PhoneNumber string `faker:"phone_number" json:"phone_number"` + IPV4 string `faker:"ipv4" json:"ipv4"` + IPV6 string `faker:"ipv6" json:"ipv6"` + MacAddress string `faker:"mac_address" json:"mac_address"` + URL string `faker:"url" json: "url"` + DayOfWeek string `faker:"day_of_week" json: "day_of_week"` + DayOfMonth string `faker:"day_of_month" json: "day_of_month"` + Timestamp string `faker:"timestamp" json: "timestamp"` + Century string `faker:"century" json: "century"` + TimeZone string `faker:"timezone", json:"timezone"` + TimePeriod string `faker:"time_period" json:"time_period"` + Word string `faker:"word" json:"word"` + Sentence string `faker:"sentence" json:"sentence"` + Paragraph string `faker:"paragraph" json:"paragraph"` + Currency string `faker:"currency" json:"currency"` + Amount float64 `faker:"amount" json:"amount" ` + AmountWithCurrency string `faker:"amount_with_currency" json:"amount_with_currency"` + UUIDHypenated string `faker:"uuid_hyphenated" json:"uuid_hyphenated"` + UUID string `faker:"uuid_digit" json:"uuid_digit"` + PaymentMethod string `faker:"oneof: cc, paypal, check, money order"` +} + +func RandJsonPayload() string { + + a := fake{} + err := faker.FakeData(&a) + if err != nil { + fmt.Println(err) + return "{}" + } + + b, err := json.Marshal(a) + if err != nil { + fmt.Println(err) + return "{}" + } + return string(b) + +} \ No newline at end of file diff --git a/pkg/stringgenerator/main.go b/pkg/stringgenerator/main.go index 25dacd4..26ae172 100644 --- a/pkg/stringgenerator/main.go +++ b/pkg/stringgenerator/main.go @@ -1,9 +1,7 @@ package stringgenerator import ( - "fmt" "math/rand" - "unsafe" ) const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" @@ -14,6 +12,5 @@ func RandStringBytes(n int) string { for i := range b { b[i] = letterBytes[rand.Intn(len(letterBytes))] } - fmt.Println(unsafe.Sizeof(b)) return string(b) }