diff --git a/admin/admin.go b/admin/admin.go index f45f39ae..1957a06f 100644 --- a/admin/admin.go +++ b/admin/admin.go @@ -158,38 +158,40 @@ func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error { } if _, err := a.deleteTopicInBroker(ctx, cfg.Topic, cfg.BrokerAddr); err != nil { - if err != nil { - rlog.Error("delete topic in broker error", map[string]interface{}{ - rlog.LogKeyTopic: cfg.Topic, - rlog.LogKeyBroker: cfg.BrokerAddr, - rlog.LogKeyUnderlayError: err, - }) - } + rlog.Error("delete topic in broker error", map[string]interface{}{ + rlog.LogKeyTopic: cfg.Topic, + rlog.LogKeyBroker: cfg.BrokerAddr, + rlog.LogKeyUnderlayError: err, + }) return err } //delete topic in nameserver if len(cfg.NameSrvAddr) == 0 { - a.namesrv.UpdateTopicRouteInfo(cfg.Topic) + _, _, err := a.namesrv.UpdateTopicRouteInfo(cfg.Topic) + if err != nil { + rlog.Error("delete topic in nameserver error", map[string]interface{}{ + rlog.LogKeyTopic: cfg.Topic, + rlog.LogKeyUnderlayError: err, + }) + } cfg.NameSrvAddr = a.namesrv.AddrList() } for _, nameSrvAddr := range cfg.NameSrvAddr { if _, err := a.deleteTopicInNameServer(ctx, cfg.Topic, nameSrvAddr); err != nil { - if err != nil { - rlog.Error("delete topic in name server error", map[string]interface{}{ - rlog.LogKeyTopic: cfg.Topic, - "nameServer": nameSrvAddr, - rlog.LogKeyUnderlayError: err, - }) - } + rlog.Error("delete topic in nameserver error", map[string]interface{}{ + "nameServer": nameSrvAddr, + rlog.LogKeyTopic: cfg.Topic, + rlog.LogKeyUnderlayError: err, + }) return err } } rlog.Info("delete topic success", map[string]interface{}{ + "nameServer": cfg.NameSrvAddr, rlog.LogKeyTopic: cfg.Topic, rlog.LogKeyBroker: cfg.BrokerAddr, - "nameServer": cfg.NameSrvAddr, }) return nil } diff --git a/benchmark/consumer.go b/benchmark/consumer.go index cada9333..907a1e71 100644 --- a/benchmark/consumer.go +++ b/benchmark/consumer.go @@ -24,6 +24,7 @@ import ( "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" + "github.com/apache/rocketmq-client-go/v2/rlog" "os" "os/signal" "sync" @@ -89,10 +90,13 @@ func (s *consumeSnapshots) printStati() { avgS2CRT := float64(l.store2ConsumerTotalRT-f.store2ConsumerTotalRT) / respSucCount s.RUnlock() - fmt.Printf( - "Consume TPS: %d Average(B2C) RT: %7.3f Average(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d\n", - int64(consumeTps), avgB2CRT, avgS2CRT, l.born2ConsumerMaxRT, l.store2ConsumerMaxRT, - ) + rlog.Info("Benchmark Consumer Snapshot", map[string]interface{}{ + "consumeTPS": int64(consumeTps), + "average(B2C)RT": avgB2CRT, + "average(S2C)RT": avgS2CRT, + "max(B2C)RT": l.born2ConsumerMaxRT, + "max(S2C)RT": l.store2ConsumerMaxRT, + }) } type consumerBenchmark struct { @@ -164,7 +168,7 @@ func (bc *consumerBenchmark) consumeMsg(stati *statiBenchmarkConsumerSnapshot, e return consumer.ConsumeSuccess, nil }) - println("Start") + rlog.Info("Test Start", nil) c.Start() select { case <-exit: @@ -176,31 +180,31 @@ func (bc *consumerBenchmark) consumeMsg(stati *statiBenchmarkConsumerSnapshot, e func (bc *consumerBenchmark) run(args []string) { bc.flags.Parse(args) if bc.topic == "" { - println("empty topic") + rlog.Error("Empty Topic", nil) bc.usage() return } if bc.groupPrefix == "" { - println("empty group prefix") + rlog.Error("Empty Group Prefix", nil) bc.usage() return } if bc.nameSrv == "" { - println("empty name server") + rlog.Error("Empty Nameserver", nil) bc.usage() return } if bc.testMinutes <= 0 { - println("test time must be positive integer") + rlog.Error("Test Time Must Be Positive Integer", nil) bc.usage() return } if bc.instanceCount <= 0 { - println("thread count must be positive integer") + rlog.Error("Thread Count Must Be Positive Integer", nil) bc.usage() return } @@ -261,11 +265,11 @@ func (bc *consumerBenchmark) run(args []string) { case <-signalChan: } - println("Closed") close(exitChan) wg.Wait() snapshots.takeSnapshot() snapshots.printStati() + rlog.Info("Test Done", nil) } func (bc *consumerBenchmark) usage() { diff --git a/benchmark/main.go b/benchmark/main.go index 080a9480..79eca8f7 100644 --- a/benchmark/main.go +++ b/benchmark/main.go @@ -19,6 +19,7 @@ package main import ( "fmt" + "github.com/apache/rocketmq-client-go/v2/rlog" "os" ) @@ -45,7 +46,9 @@ func registerCommand(name string, cmd command) { } func usage() { - println(os.Args[0] + " commandName [...]") + rlog.Info("Command", map[string]interface{}{ + "name": os.Args[0], + }) for _, cmd := range cmds { cmd.usage() } @@ -54,7 +57,7 @@ func usage() { // go run *.go [command name] [command args] func main() { if len(os.Args) < 2 { - println("error:lack cmd name\n") + rlog.Error("Lack Command Name", nil) usage() return } @@ -62,7 +65,9 @@ func main() { name := os.Args[1] cmd, ok := cmds[name] if !ok { - fmt.Printf("command %s is not supported\n", name) + rlog.Error("Command Isn't Supported", map[string]interface{}{ + "command": name, + }) usage() return } diff --git a/benchmark/producer.go b/benchmark/producer.go index 537ffbe3..7516352c 100644 --- a/benchmark/producer.go +++ b/benchmark/producer.go @@ -20,10 +20,10 @@ package main import ( "context" "flag" - "fmt" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" + "github.com/apache/rocketmq-client-go/v2/rlog" "os" "os/signal" "sync" @@ -91,10 +91,14 @@ func (s *produceSnapshots) printStati() { maxRT := atomic.LoadInt64(&s.cur.sendMessageMaxRT) s.RUnlock() - fmt.Printf( - "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d Total:%d\n", - int64(sendTps), maxRT, avgRT, l.sendRequestFailedCount, l.receiveResponseFailedCount, l.receiveResponseSuccessCount, - ) + rlog.Info("Benchmark Producer Snapshot", map[string]interface{}{ + "sendTps": int64(sendTps), + "maxRt": maxRT, + "averageRt": avgRT, + "sendFailed": l.sendRequestFailedCount, + "responseFailed": l.receiveResponseFailedCount, + "total": l.receiveResponseSuccessCount, + }) } type producerBenchmark struct { @@ -130,7 +134,9 @@ func (bp *producerBenchmark) produceMsg(stati *statiBenchmarkProducerSnapshot, e ) if err != nil { - fmt.Printf("new producer error: %s\n", err) + rlog.Error("New Producer Error", map[string]interface{}{ + rlog.LogKeyUnderlayError: err.Error(), + }) return } @@ -152,7 +158,9 @@ AGAIN: r, err := p.SendSync(context.Background(), primitive.NewMessage(topic, []byte(msgStr))) if err != nil { - fmt.Printf("send message sync error:%s", err) + rlog.Error("Send Message Error", map[string]interface{}{ + rlog.LogKeyUnderlayError: err.Error(), + }) goto AGAIN } @@ -170,8 +178,11 @@ AGAIN: } goto AGAIN } - - fmt.Printf("%v send message %s:%s error:%s\n", time.Now(), topic, tag, err.Error()) + rlog.Error("Send Message Error", map[string]interface{}{ + "topic": topic, + "tag": tag, + rlog.LogKeyUnderlayError: err.Error(), + }) goto AGAIN } @@ -179,34 +190,34 @@ func (bp *producerBenchmark) run(args []string) { bp.flags.Parse(args) if bp.topic == "" { - println("empty topic") + rlog.Error("Empty Topic", nil) bp.flags.Usage() return } if bp.groupID == "" { - println("empty group id") + rlog.Error("Empty Group Id", nil) bp.flags.Usage() return } if bp.nameSrv == "" { - println("empty namesrv") + rlog.Error("Empty Nameserver", nil) bp.flags.Usage() return } if bp.instanceCount <= 0 { - println("instance count must be positive integer") + rlog.Error("Instance Count Must Be Positive Integer", nil) bp.flags.Usage() return } if bp.testMinutes <= 0 { - println("test time must be positive integer") + rlog.Error("Test Time Must Be Positive Integer", nil) bp.flags.Usage() return } if bp.bodySize <= 0 { - println("body size must be positive integer") + rlog.Error("Body Size Must Be Positive Integer", nil) bp.flags.Usage() return } @@ -221,7 +232,9 @@ func (bp *producerBenchmark) run(args []string) { go func() { wg.Add(1) bp.produceMsg(&stati, exitChan) - fmt.Printf("exit of produce %d\n", i) + rlog.Info("Producer Done and Exit", map[string]interface{}{ + "id": i, + }) wg.Done() }() } @@ -269,7 +282,7 @@ func (bp *producerBenchmark) run(args []string) { wg.Wait() snapshots.takeSnapshot() snapshots.printStati() - fmt.Println("TEST DONE") + rlog.Info("Test Done", nil) } func (bp *producerBenchmark) usage() { diff --git a/benchmark/stable.go b/benchmark/stable.go index cd5fb9b0..2659bc5f 100644 --- a/benchmark/stable.go +++ b/benchmark/stable.go @@ -19,8 +19,8 @@ package main import ( "flag" - "fmt" "github.com/apache/rocketmq-client-go/v2/errors" + "github.com/apache/rocketmq-client-go/v2/rlog" "os" "os/signal" "syscall" @@ -84,11 +84,11 @@ func (st *stableTest) run() { select { case <-signalChan: opTicker.Stop() - fmt.Println("test over") + rlog.Info("Test Done", nil) return case <-closeChan: opTicker.Stop() - fmt.Println("test over") + rlog.Info("Test Done", nil) return case <-opTicker.C: st.op() @@ -127,14 +127,19 @@ func (stp *stableTestProducer) usage() { func (stp *stableTestProducer) run(args []string) { err := stp.flags.Parse(args) if err != nil { - fmt.Printf("parse args:%v, error:%s\n", args, err) + rlog.Info("Parse Args Error", map[string]interface{}{ + "args": args, + rlog.LogKeyUnderlayError: err.Error(), + }) stp.usage() return } err = stp.checkFlag() if err != nil { - fmt.Println(err) + rlog.Error("Check Flag Error", map[string]interface{}{ + rlog.LogKeyUnderlayError: err.Error(), + }) stp.usage() return } @@ -199,15 +204,20 @@ func (stc *stableTestConsumer) usage() { func (stc *stableTestConsumer) run(args []string) { err := stc.flags.Parse(args) if err != nil { - fmt.Printf("parse args:%v, error:%s\n", args, err) + rlog.Error("Parse Args Error", map[string]interface{}{ + "args": args, + rlog.LogKeyUnderlayError: err.Error(), + }) stc.usage() return } err = stc.checkFlag() if err != nil { + rlog.Error("Check Flag Error", map[string]interface{}{ + rlog.LogKeyUnderlayError: err.Error(), + }) stc.usage() - fmt.Printf("%s\n", err) return } // diff --git a/consumer/push_consumer_test.go b/consumer/push_consumer_test.go index e67b2db5..78bc1f7b 100644 --- a/consumer/push_consumer_test.go +++ b/consumer/push_consumer_test.go @@ -19,7 +19,7 @@ package consumer import ( "context" - "fmt" + "github.com/apache/rocketmq-client-go/v2/rlog" "testing" "github.com/apache/rocketmq-client-go/v2/internal" @@ -48,7 +48,9 @@ func TestStart(t *testing.T) { err := c.Subscribe("TopicTest", MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (ConsumeResult, error) { - fmt.Printf("subscribe callback: %v \n", msgs) + rlog.Info("Subscribe Callback", map[string]interface{}{ + "msgs": msgs, + }) return ConsumeSuccess, nil }) @@ -62,7 +64,9 @@ func TestStart(t *testing.T) { err = c.Subscribe("TopicTest", MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (ConsumeResult, error) { - fmt.Printf("subscribe callback: %v \n", msgs) + rlog.Info("Subscribe Callback", map[string]interface{}{ + "msgs": msgs, + }) return ConsumeSuccess, nil }) diff --git a/consumer/strategy_test.go b/consumer/strategy_test.go index e66b15cf..d521b4b4 100644 --- a/consumer/strategy_test.go +++ b/consumer/strategy_test.go @@ -18,7 +18,7 @@ limitations under the License. package consumer import ( - "fmt" + "github.com/apache/rocketmq-client-go/v2/rlog" "testing" "github.com/apache/rocketmq-client-go/v2/primitive" @@ -476,7 +476,11 @@ func TestAllocateByConsistentHash(t *testing.T) { Convey("observe the result of AllocateByMachineRoom", func() { for _, value := range cases { result := strategy("testGroup", value.currentCid, value.mqAll, value.cidAll) - fmt.Printf("\n\n currentCid:%s, cidAll:%s, \n allocateResult:%+v \n", value.currentCid, value.cidAll, result) + rlog.Info("Result Of AllocateByMachineRoom", map[string]interface{}{ + "currentCid": value.currentCid, + "cidAll": value.cidAll, + "allocateResult": result, + }) } }) }) diff --git a/docs/Introduction.md b/docs/Introduction.md index a4e954e8..011603a4 100644 --- a/docs/Introduction.md +++ b/docs/Introduction.md @@ -91,7 +91,9 @@ c, err := rocketmq.NewPushConsumer( ``` err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext, msgs []*primitive.MessageExt) (consumer.ConsumeResult, error) { - fmt.Printf("subscribe callback: %v \n", msgs) + rlog.Info("Subscribe Callback", map[string]interface{}{ + "msgs": msgs, + }) return consumer.ConsumeSuccess, nil }) ``` diff --git a/errors/errors.go b/errors/errors.go index 43b49ca9..195984e7 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -20,7 +20,7 @@ package errors import "errors" var ( - ErrRequestTimeout = errors.New("equest timeout") + ErrRequestTimeout = errors.New("request timeout") ErrMQEmpty = errors.New("MessageQueue is nil") ErrOffset = errors.New("offset < 0") ErrNumbers = errors.New("numbers < 0") diff --git a/internal/model_test.go b/internal/model_test.go index 56eeb24e..a505f3ec 100644 --- a/internal/model_test.go +++ b/internal/model_test.go @@ -19,7 +19,7 @@ package internal import ( "encoding/json" - "fmt" + "github.com/apache/rocketmq-client-go/v2/rlog" "strings" "testing" @@ -46,7 +46,9 @@ func TestHeartbeatData(t *testing.T) { v, err := json.Marshal(set) So(err, ShouldBeNil) - fmt.Printf("json producer set: %s", string(v)) + rlog.Info("Json Producer", map[string]interface{}{ + "result": string(v), + }) }) Convey("producer heatbeat", func() { @@ -64,7 +66,9 @@ func TestHeartbeatData(t *testing.T) { v, err := json.Marshal(hbt) So(err, ShouldBeNil) - fmt.Printf("json producer: %s\n", string(v)) + rlog.Info("Json Producer", map[string]interface{}{ + "result": string(v), + }) }) Convey("consumer heartbeat", func() { @@ -81,7 +85,9 @@ func TestHeartbeatData(t *testing.T) { v, err := json.Marshal(hbt) So(err, ShouldBeNil) - fmt.Printf("json consumer: %s\n", string(v)) + rlog.Info("Json Consumer", map[string]interface{}{ + "result": string(v), + }) }) Convey("producer & consumer heartbeat", func() { @@ -109,7 +115,9 @@ func TestHeartbeatData(t *testing.T) { v, err := json.Marshal(hbt) So(err, ShouldBeNil) - fmt.Printf("json producer & consumer: %s\n", string(v)) + rlog.Info("Json Producer and Consumer", map[string]interface{}{ + "result": string(v), + }) }) }) @@ -374,7 +382,9 @@ func TestConsumeMessageDirectlyResult_MarshalJSON(t *testing.T) { consumeMessageDirectlyResult.ConsumeResult = ConsumeSuccess data, err := consumeMessageDirectlyResult.Encode() So(err, ShouldBeNil) - fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data)) + rlog.Info("Json consumeMessageDirectlyResult", map[string]interface{}{ + "result": string(data), + }) }) Convey("test consume timeout", func() { @@ -386,7 +396,9 @@ func TestConsumeMessageDirectlyResult_MarshalJSON(t *testing.T) { consumeResult.ConsumeResult = ReturnNull data, err := consumeResult.Encode() So(err, ShouldBeNil) - fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data)) + rlog.Info("Json consumeMessageDirectlyResult", map[string]interface{}{ + "result": string(data), + }) }) Convey("test consume exception", func() { @@ -399,7 +411,9 @@ func TestConsumeMessageDirectlyResult_MarshalJSON(t *testing.T) { consumeResult.Remark = "Unknown Exception" data, err := consumeResult.Encode() So(err, ShouldBeNil) - fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data)) + rlog.Info("Json consumeMessageDirectlyResult", map[string]interface{}{ + "result": string(data), + }) }) }) } diff --git a/internal/namesrv_test.go b/internal/namesrv_test.go index e58dc291..b047a077 100644 --- a/internal/namesrv_test.go +++ b/internal/namesrv_test.go @@ -19,6 +19,7 @@ package internal import ( "fmt" + "github.com/apache/rocketmq-client-go/v2/rlog" "net" "net/http" "os" @@ -91,7 +92,9 @@ func TestUpdateNameServerAddress(t *testing.T) { port := listener.Addr().(*net.TCPAddr).Port nameServerDommain := fmt.Sprintf("http://127.0.0.1:%d/nameserver/addrs", port) - fmt.Println("temporary name server domain: ", nameServerDommain) + rlog.Info("Temporary Nameserver", map[string]interface{}{ + "domain": nameServerDommain, + }) resolver := primitive.NewHttpResolver("DEFAULT", nameServerDommain) ns := &namesrvs{ diff --git a/internal/remote/codec_test.go b/internal/remote/codec_test.go index 8fb8a608..07174514 100644 --- a/internal/remote/codec_test.go +++ b/internal/remote/codec_test.go @@ -18,7 +18,7 @@ package remote import ( "encoding/json" - "fmt" + "github.com/apache/rocketmq-client-go/v2/rlog" "math/rand" "reflect" "testing" @@ -350,19 +350,29 @@ func TestCommandJsonIter(t *testing.T) { cmd := NewRemotingCommand(192, h, []byte("Hello RocketMQCodecs")) cmdData, err := json.Marshal(cmd) assert.Nil(t, err) - fmt.Printf("cmd data from json: %v\n", *(*string)(unsafe.Pointer(&cmdData))) + rlog.Info("Command Data From Json", map[string]interface{}{ + "data": *(*string)(unsafe.Pointer(&cmdData)), + }) data, err := jsoniter.Marshal(cmd) assert.Nil(t, err) - fmt.Printf("cmd data from jsoniter: %v\n", *(*string)(unsafe.Pointer(&data))) + rlog.Info("Command Data From Jsoniter", map[string]interface{}{ + "data": *(*string)(unsafe.Pointer(&data)), + }) var cmdResp RemotingCommand err = json.Unmarshal(cmdData, &cmdResp) assert.Nil(t, err) - fmt.Printf("cmd: %#v language: %v\n", cmdResp, cmdResp.Language) + rlog.Info("Json Decode Success", map[string]interface{}{ + "cmd": cmdResp, + "language": cmdResp.Language, + }) var cmdResp2 RemotingCommand err = json.Unmarshal(data, &cmdResp2) assert.Nil(t, err) - fmt.Printf("cmd: %#v language: %v\n", cmdResp2, cmdResp2.Language) + rlog.Info("Json Decode Success", map[string]interface{}{ + "cmd": cmdResp2, + "language": cmdResp2.Language, + }) } diff --git a/primitive/ctx.go b/primitive/ctx.go index 4481dcd5..936b54c7 100644 --- a/primitive/ctx.go +++ b/primitive/ctx.go @@ -22,7 +22,6 @@ package primitive import ( "context" - "fmt" "math" "github.com/apache/rocketmq-client-go/v2/rlog" @@ -47,7 +46,9 @@ func (c ConsumeReturnType) Ordinal() int { case FailedReturn: return 4 default: - rlog.Error(fmt.Sprintf("illegal ConsumeReturnType: %v", c), nil) + rlog.Error("Illegal Consumer Return Type", map[string]interface{}{ + "type": c, + }) return 0 } } diff --git a/primitive/nsresolver_test.go b/primitive/nsresolver_test.go index 98d839af..d42d2c6b 100644 --- a/primitive/nsresolver_test.go +++ b/primitive/nsresolver_test.go @@ -18,6 +18,7 @@ package primitive import ( "fmt" + "github.com/apache/rocketmq-client-go/v2/rlog" "io/ioutil" "net" "net/http" @@ -65,7 +66,9 @@ func TestHttpResolverWithGet(t *testing.T) { port := listener.Addr().(*net.TCPAddr).Port nameServerDommain := fmt.Sprintf("http://127.0.0.1:%d/nameserver/addrs2", port) - fmt.Println("temporary name server domain: ", nameServerDommain) + rlog.Info("Temporary Nameserver", map[string]interface{}{ + "domain": nameServerDommain, + }) resolver := NewHttpResolver("DEFAULT", nameServerDommain) resolver.Resolve() diff --git a/rlog/log.go b/rlog/log.go index 382f5aad..037cfcfa 100644 --- a/rlog/log.go +++ b/rlog/log.go @@ -97,7 +97,7 @@ func (l *defaultLogger) Error(msg string, fields map[string]interface{}) { if msg == "" && len(fields) == 0 { return } - l.logger.WithFields(fields).WithFields(fields).Error(msg) + l.logger.WithFields(fields).Error(msg) } func (l *defaultLogger) Fatal(msg string, fields map[string]interface{}) {