diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index f13d5e23..8fb46371 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -291,6 +291,59 @@ func (pc *pushConsumer) GetWhere() string { } +func (pc *pushConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *internal.ConsumeMessageDirectlyResult { + var msgs = []*primitive.MessageExt{msg} + var mq = &primitive.MessageQueue{ + Topic: msg.Topic, + BrokerName: brokerName, + QueueId: msg.Queue.QueueId, + } + + beginTime := time.Now() + pc.resetRetryAndNamespace(msgs) + var result ConsumeResult + + var err error + msgCtx := &primitive.ConsumeMessageContext{ + Properties: make(map[string]string), + ConsumerGroup: pc.consumerGroup, + MQ: mq, + Msgs: msgs, + } + ctx := context.Background() + ctx = primitive.WithConsumerCtx(ctx, msgCtx) + ctx = primitive.WithMethod(ctx, primitive.ConsumerPush) + concurrentCtx := primitive.NewConsumeConcurrentlyContext() + concurrentCtx.MQ = *mq + ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx) + + result, err = pc.consumeInner(ctx, msgs) + + consumeRT := time.Now().Sub(beginTime) + + res := &internal.ConsumeMessageDirectlyResult{ + Order: false, + AutoCommit: true, + SpentTimeMills: int64(consumeRT / time.Millisecond), + } + + if err != nil { + msgCtx.Properties[primitive.PropCtxType] = string(primitive.ExceptionReturn) + res.ConsumeResult = internal.ThrowException + res.Remark = err.Error() + } else if result == ConsumeSuccess { + msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn) + res.ConsumeResult = internal.ConsumeSuccess + } else if result == ConsumeRetryLater { + msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn) + res.ConsumeResult = internal.ConsumeRetryLater + } + + increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond)) + + return res +} + func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo { info := internal.NewConsumerRunningInfo() diff --git a/internal/client.go b/internal/client.go index e2264a72..bc523ab4 100644 --- a/internal/client.go +++ b/internal/client.go @@ -84,6 +84,7 @@ type InnerConsumer interface { Rebalance() IsUnitMode() bool GetConsumerRunningInfo() *ConsumerRunningInfo + ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *ConsumeMessageDirectlyResult GetcType() string GetModel() string GetWhere() string @@ -252,6 +253,36 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R } return res }) + + client.remoteClient.RegisterRequestFunc(ReqConsumeMessageDirectly, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand { + rlog.Info("receive consume message directly request...", nil) + header := new(ConsumeMessageDirectlyHeader) + header.Decode(req.ExtFields) + val, exist := clientMap.Load(header.clientID) + res := remote.NewRemotingCommand(ResError, nil, nil) + if !exist { + res.Remark = fmt.Sprintf("Can't find specified client instance of: %s", header.clientID) + } else { + cli, ok := val.(*rmqClient) + msg := primitive.DecodeMessage(req.Body)[0] + var consumeMessageDirectlyResult *ConsumeMessageDirectlyResult + if ok { + consumeMessageDirectlyResult = cli.consumeMessageDirectly(msg, header.consumerGroup, header.brokerName) + } + if consumeMessageDirectlyResult != nil { + res.Code = ResSuccess + data, err := consumeMessageDirectlyResult.Encode() + if err != nil { + res.Remark = fmt.Sprintf("json marshal error: %s", err.Error()) + } else { + res.Body = data + } + } else { + res.Remark = "there is unexpected error when consume message directly, please check log" + } + } + return res + }) } return actual.(*rmqClient) } @@ -744,6 +775,15 @@ func (c *rmqClient) getConsumerRunningInfo(group string) *ConsumerRunningInfo { return info } +func (c *rmqClient) consumeMessageDirectly(msg *primitive.MessageExt, group string, brokerName string) *ConsumeMessageDirectlyResult { + consumer, exist := c.consumerMap.Load(group) + if !exist { + return nil + } + res := consumer.(InnerConsumer).ConsumeMessageDirectly(msg, brokerName) + return res +} + func routeData2SubscribeInfo(topic string, data *TopicRouteData) []*primitive.MessageQueue { list := make([]*primitive.MessageQueue, 0) for idx := range data.QueueDataList { diff --git a/internal/model.go b/internal/model.go index 0ee9ccc1..934c6100 100644 --- a/internal/model.go +++ b/internal/model.go @@ -261,3 +261,30 @@ func NewConsumerRunningInfo() *ConsumerRunningInfo { StatusTable: make(map[string]ConsumeStatus), } } + +type ConsumeMessageDirectlyResult struct { + Order bool `json:"order"` + AutoCommit bool `json:"autoCommit"` + ConsumeResult ConsumeResult `json:"consumeResult"` + Remark string `json:"remark"` + SpentTimeMills int64 `json:"spentTimeMills"` +} + +type ConsumeResult int + +const ( + ConsumeSuccess ConsumeResult = iota + ConsumeRetryLater + Rollback + Commit + ThrowException + ReturnNull +) + +func (result ConsumeMessageDirectlyResult) Encode() ([]byte, error) { + data, err := json.Marshal(result) + if err != nil { + return nil, err + } + return data, nil +} diff --git a/internal/model_test.go b/internal/model_test.go index 0d8dcd78..57ff0af9 100644 --- a/internal/model_test.go +++ b/internal/model_test.go @@ -362,3 +362,45 @@ func TestConsumerRunningInfo_MarshalJSON(t *testing.T) { }) }) } + +func TestConsumeMessageDirectlyResult_MarshalJSON(t *testing.T) { + Convey("test ConsumeMessageDirectlyResult MarshalJson", t, func() { + Convey("test consume success", func() { + consumeMessageDirectlyResult := ConsumeMessageDirectlyResult{ + Order: false, + AutoCommit: true, + SpentTimeMills: 2, + } + consumeMessageDirectlyResult.ConsumeResult = ConsumeSuccess + data, err := consumeMessageDirectlyResult.Encode() + So(err, ShouldBeNil) + fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data)) + }) + + Convey("test consume timeout", func() { + consumeResult := ConsumeMessageDirectlyResult{ + Order: false, + AutoCommit: true, + SpentTimeMills: 2, + } + consumeResult.ConsumeResult = ReturnNull + data, err := consumeResult.Encode() + So(err, ShouldBeNil) + fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data)) + }) + + Convey("test consume exception", func() { + consumeResult := ConsumeMessageDirectlyResult{ + Order: false, + AutoCommit: true, + SpentTimeMills: 5, + } + consumeResult.ConsumeResult = ThrowException + consumeResult.Remark = "Unknown Exception" + data, err := consumeResult.Encode() + So(err, ShouldBeNil) + fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data)) + }) + }) + +} diff --git a/internal/request.go b/internal/request.go index fa88efe4..ed3de33b 100644 --- a/internal/request.go +++ b/internal/request.go @@ -407,3 +407,41 @@ func (request *DeleteTopicRequestHeader) Encode() map[string]string { return maps } + +type ConsumeMessageDirectlyHeader struct { + consumerGroup string + clientID string + msgId string + brokerName string +} + +func (request *ConsumeMessageDirectlyHeader) Encode() map[string]string { + maps := make(map[string]string) + maps["consumerGroup"] = request.consumerGroup + maps["clientId"] = request.clientID + maps["msgId"] = request.msgId + maps["brokerName"] = request.brokerName + return maps +} + +func (request *ConsumeMessageDirectlyHeader) Decode(properties map[string]string) { + if len(properties) == 0 { + return + } + + if v, existed := properties["consumerGroup"]; existed { + request.consumerGroup = v + } + + if v, existed := properties["clientId"]; existed { + request.clientID = v + } + + if v, existed := properties["msgId"]; existed { + request.msgId = v + } + + if v, existed := properties["brokerName"]; existed { + request.brokerName = v + } +}