Skip to content

Commit

Permalink
[ISSUE #726] feat: replace fmt to rlog (#756)
Browse files Browse the repository at this point in the history
* Replace fmt to rlog
  • Loading branch information
yuanmoon authored Dec 22, 2021
1 parent f6cfc22 commit de5f561
Show file tree
Hide file tree
Showing 15 changed files with 154 additions and 79 deletions.
34 changes: 18 additions & 16 deletions admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,38 +158,40 @@ func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error {
}

if _, err := a.deleteTopicInBroker(ctx, cfg.Topic, cfg.BrokerAddr); err != nil {
if err != nil {
rlog.Error("delete topic in broker error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
rlog.LogKeyUnderlayError: err,
})
}
rlog.Error("delete topic in broker error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
rlog.LogKeyUnderlayError: err,
})
return err
}

//delete topic in nameserver
if len(cfg.NameSrvAddr) == 0 {
a.namesrv.UpdateTopicRouteInfo(cfg.Topic)
_, _, err := a.namesrv.UpdateTopicRouteInfo(cfg.Topic)
if err != nil {
rlog.Error("delete topic in nameserver error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyUnderlayError: err,
})
}
cfg.NameSrvAddr = a.namesrv.AddrList()
}

for _, nameSrvAddr := range cfg.NameSrvAddr {
if _, err := a.deleteTopicInNameServer(ctx, cfg.Topic, nameSrvAddr); err != nil {
if err != nil {
rlog.Error("delete topic in name server error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
"nameServer": nameSrvAddr,
rlog.LogKeyUnderlayError: err,
})
}
rlog.Error("delete topic in nameserver error", map[string]interface{}{
"nameServer": nameSrvAddr,
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyUnderlayError: err,
})
return err
}
}
rlog.Info("delete topic success", map[string]interface{}{
"nameServer": cfg.NameSrvAddr,
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
"nameServer": cfg.NameSrvAddr,
})
return nil
}
Expand Down
26 changes: 15 additions & 11 deletions benchmark/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
"os"
"os/signal"
"sync"
Expand Down Expand Up @@ -89,10 +90,13 @@ func (s *consumeSnapshots) printStati() {
avgS2CRT := float64(l.store2ConsumerTotalRT-f.store2ConsumerTotalRT) / respSucCount
s.RUnlock()

fmt.Printf(
"Consume TPS: %d Average(B2C) RT: %7.3f Average(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d\n",
int64(consumeTps), avgB2CRT, avgS2CRT, l.born2ConsumerMaxRT, l.store2ConsumerMaxRT,
)
rlog.Info("Benchmark Consumer Snapshot", map[string]interface{}{
"consumeTPS": int64(consumeTps),
"average(B2C)RT": avgB2CRT,
"average(S2C)RT": avgS2CRT,
"max(B2C)RT": l.born2ConsumerMaxRT,
"max(S2C)RT": l.store2ConsumerMaxRT,
})
}

type consumerBenchmark struct {
Expand Down Expand Up @@ -164,7 +168,7 @@ func (bc *consumerBenchmark) consumeMsg(stati *statiBenchmarkConsumerSnapshot, e
return consumer.ConsumeSuccess, nil
})

println("Start")
rlog.Info("Test Start", nil)
c.Start()
select {
case <-exit:
Expand All @@ -176,31 +180,31 @@ func (bc *consumerBenchmark) consumeMsg(stati *statiBenchmarkConsumerSnapshot, e
func (bc *consumerBenchmark) run(args []string) {
bc.flags.Parse(args)
if bc.topic == "" {
println("empty topic")
rlog.Error("Empty Topic", nil)
bc.usage()
return
}

if bc.groupPrefix == "" {
println("empty group prefix")
rlog.Error("Empty Group Prefix", nil)
bc.usage()
return
}

if bc.nameSrv == "" {
println("empty name server")
rlog.Error("Empty Nameserver", nil)
bc.usage()
return
}

if bc.testMinutes <= 0 {
println("test time must be positive integer")
rlog.Error("Test Time Must Be Positive Integer", nil)
bc.usage()
return
}

if bc.instanceCount <= 0 {
println("thread count must be positive integer")
rlog.Error("Thread Count Must Be Positive Integer", nil)
bc.usage()
return
}
Expand Down Expand Up @@ -261,11 +265,11 @@ func (bc *consumerBenchmark) run(args []string) {
case <-signalChan:
}

println("Closed")
close(exitChan)
wg.Wait()
snapshots.takeSnapshot()
snapshots.printStati()
rlog.Info("Test Done", nil)
}

func (bc *consumerBenchmark) usage() {
Expand Down
11 changes: 8 additions & 3 deletions benchmark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main

import (
"fmt"
"github.com/apache/rocketmq-client-go/v2/rlog"
"os"
)

Expand All @@ -45,7 +46,9 @@ func registerCommand(name string, cmd command) {
}

func usage() {
println(os.Args[0] + " commandName [...]")
rlog.Info("Command", map[string]interface{}{
"name": os.Args[0],
})
for _, cmd := range cmds {
cmd.usage()
}
Expand All @@ -54,15 +57,17 @@ func usage() {
// go run *.go [command name] [command args]
func main() {
if len(os.Args) < 2 {
println("error:lack cmd name\n")
rlog.Error("Lack Command Name", nil)
usage()
return
}

name := os.Args[1]
cmd, ok := cmds[name]
if !ok {
fmt.Printf("command %s is not supported\n", name)
rlog.Error("Command Isn't Supported", map[string]interface{}{
"command": name,
})
usage()
return
}
Expand Down
47 changes: 30 additions & 17 deletions benchmark/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ package main
import (
"context"
"flag"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"github.com/apache/rocketmq-client-go/v2/rlog"
"os"
"os/signal"
"sync"
Expand Down Expand Up @@ -91,10 +91,14 @@ func (s *produceSnapshots) printStati() {
maxRT := atomic.LoadInt64(&s.cur.sendMessageMaxRT)
s.RUnlock()

fmt.Printf(
"Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d Total:%d\n",
int64(sendTps), maxRT, avgRT, l.sendRequestFailedCount, l.receiveResponseFailedCount, l.receiveResponseSuccessCount,
)
rlog.Info("Benchmark Producer Snapshot", map[string]interface{}{
"sendTps": int64(sendTps),
"maxRt": maxRT,
"averageRt": avgRT,
"sendFailed": l.sendRequestFailedCount,
"responseFailed": l.receiveResponseFailedCount,
"total": l.receiveResponseSuccessCount,
})
}

type producerBenchmark struct {
Expand Down Expand Up @@ -130,7 +134,9 @@ func (bp *producerBenchmark) produceMsg(stati *statiBenchmarkProducerSnapshot, e
)

if err != nil {
fmt.Printf("new producer error: %s\n", err)
rlog.Error("New Producer Error", map[string]interface{}{
rlog.LogKeyUnderlayError: err.Error(),
})
return
}

Expand All @@ -152,7 +158,9 @@ AGAIN:
r, err := p.SendSync(context.Background(), primitive.NewMessage(topic, []byte(msgStr)))

if err != nil {
fmt.Printf("send message sync error:%s", err)
rlog.Error("Send Message Error", map[string]interface{}{
rlog.LogKeyUnderlayError: err.Error(),
})
goto AGAIN
}

Expand All @@ -170,43 +178,46 @@ AGAIN:
}
goto AGAIN
}

fmt.Printf("%v send message %s:%s error:%s\n", time.Now(), topic, tag, err.Error())
rlog.Error("Send Message Error", map[string]interface{}{
"topic": topic,
"tag": tag,
rlog.LogKeyUnderlayError: err.Error(),
})
goto AGAIN
}

func (bp *producerBenchmark) run(args []string) {
bp.flags.Parse(args)

if bp.topic == "" {
println("empty topic")
rlog.Error("Empty Topic", nil)
bp.flags.Usage()
return
}

if bp.groupID == "" {
println("empty group id")
rlog.Error("Empty Group Id", nil)
bp.flags.Usage()
return
}

if bp.nameSrv == "" {
println("empty namesrv")
rlog.Error("Empty Nameserver", nil)
bp.flags.Usage()
return
}
if bp.instanceCount <= 0 {
println("instance count must be positive integer")
rlog.Error("Instance Count Must Be Positive Integer", nil)
bp.flags.Usage()
return
}
if bp.testMinutes <= 0 {
println("test time must be positive integer")
rlog.Error("Test Time Must Be Positive Integer", nil)
bp.flags.Usage()
return
}
if bp.bodySize <= 0 {
println("body size must be positive integer")
rlog.Error("Body Size Must Be Positive Integer", nil)
bp.flags.Usage()
return
}
Expand All @@ -221,7 +232,9 @@ func (bp *producerBenchmark) run(args []string) {
go func() {
wg.Add(1)
bp.produceMsg(&stati, exitChan)
fmt.Printf("exit of produce %d\n", i)
rlog.Info("Producer Done and Exit", map[string]interface{}{
"id": i,
})
wg.Done()
}()
}
Expand Down Expand Up @@ -269,7 +282,7 @@ func (bp *producerBenchmark) run(args []string) {
wg.Wait()
snapshots.takeSnapshot()
snapshots.printStati()
fmt.Println("TEST DONE")
rlog.Info("Test Done", nil)
}

func (bp *producerBenchmark) usage() {
Expand Down
24 changes: 17 additions & 7 deletions benchmark/stable.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package main

import (
"flag"
"fmt"
"github.com/apache/rocketmq-client-go/v2/errors"
"github.com/apache/rocketmq-client-go/v2/rlog"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -84,11 +84,11 @@ func (st *stableTest) run() {
select {
case <-signalChan:
opTicker.Stop()
fmt.Println("test over")
rlog.Info("Test Done", nil)
return
case <-closeChan:
opTicker.Stop()
fmt.Println("test over")
rlog.Info("Test Done", nil)
return
case <-opTicker.C:
st.op()
Expand Down Expand Up @@ -127,14 +127,19 @@ func (stp *stableTestProducer) usage() {
func (stp *stableTestProducer) run(args []string) {
err := stp.flags.Parse(args)
if err != nil {
fmt.Printf("parse args:%v, error:%s\n", args, err)
rlog.Info("Parse Args Error", map[string]interface{}{
"args": args,
rlog.LogKeyUnderlayError: err.Error(),
})
stp.usage()
return
}

err = stp.checkFlag()
if err != nil {
fmt.Println(err)
rlog.Error("Check Flag Error", map[string]interface{}{
rlog.LogKeyUnderlayError: err.Error(),
})
stp.usage()
return
}
Expand Down Expand Up @@ -199,15 +204,20 @@ func (stc *stableTestConsumer) usage() {
func (stc *stableTestConsumer) run(args []string) {
err := stc.flags.Parse(args)
if err != nil {
fmt.Printf("parse args:%v, error:%s\n", args, err)
rlog.Error("Parse Args Error", map[string]interface{}{
"args": args,
rlog.LogKeyUnderlayError: err.Error(),
})
stc.usage()
return
}

err = stc.checkFlag()
if err != nil {
rlog.Error("Check Flag Error", map[string]interface{}{
rlog.LogKeyUnderlayError: err.Error(),
})
stc.usage()
fmt.Printf("%s\n", err)
return
}
//
Expand Down
Loading

0 comments on commit de5f561

Please sign in to comment.