Skip to content

Commit

Permalink
[ISSUE #1018] Fix unintended shutdown of shared rmqClient
Browse files Browse the repository at this point in the history
caused by missing ref count
  • Loading branch information
wgdzlh authored Apr 28, 2023
1 parent 1c6598f commit 8ea107c
Showing 1 changed file with 2 additions and 5 deletions.
7 changes: 2 additions & 5 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package internal

import (
"context"
"errors"
"fmt"
"net"
"os"
Expand Down Expand Up @@ -385,10 +384,8 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R
func (c *rmqClient) Start() {
//ctx, cancel := context.WithCancel(context.Background())
//c.cancel = cancel
atomic.AddInt32(&c.instanceCount, 1)
c.once.Do(func() {

atomic.AddInt32(&c.instanceCount, 1)

if !c.option.Credentials.IsEmpty() {
c.remoteClient.RegisterInterceptor(remote.ACLInterceptor(c.option.Credentials))
}
Expand Down Expand Up @@ -704,7 +701,7 @@ func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC
case ResSuccess:
status = primitive.SendOK
default:
return errors.New(fmt.Sprintf("CODE: %d, DESC: %s", cmd.Code, cmd.Remark))
return fmt.Errorf("CODE: %d, DESC: %s", cmd.Code, cmd.Remark)
}

msgIDs := make([]string, 0)
Expand Down

0 comments on commit 8ea107c

Please sign in to comment.