Skip to content

Commit

Permalink
[ISSUE #490]Support service invocation
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed Aug 24, 2021
1 parent aeed6f5 commit 6d54e3a
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public interface MeshMQProducer extends Producer {

void send(Message message, SendCallback sendCallback) throws Exception;

void request(Message message, SendCallback sendCallback, RRCallback rrCallback, long timeout) throws Exception;
void request(Message message, RRCallback rrCallback, long timeout) throws Exception;

Message request(Message message, long timeout) throws Exception;
// Message request(Message message, long timeout) throws Exception;

boolean reply(final Message message, final SendCallback sendCallback) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Properties;
import java.util.concurrent.ExecutorService;

import com.alibaba.fastjson.JSONObject;
import io.openmessaging.api.Message;
import io.openmessaging.api.MessageBuilder;
import io.openmessaging.api.OnExceptionContext;
Expand All @@ -28,14 +29,26 @@
import io.openmessaging.api.SendResult;
import io.openmessaging.api.exception.OMSRuntimeException;

import org.apache.eventmesh.api.RRCallback;
import org.apache.eventmesh.connector.rocketmq.utils.OMSUtil;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.RequestCallback;
import org.apache.rocketmq.client.producer.RequestResponseFuture;
import org.apache.rocketmq.client.utils.MessageUtil;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ProducerImpl extends AbstractOMSProducer implements Producer {

public static final int eventMeshServerAsyncAccumulationThreshold = 1000;

private final Logger logger = LoggerFactory.getLogger(ProducerImpl.class);

public ProducerImpl(final Properties properties) {
super(properties);
}
Expand Down Expand Up @@ -100,6 +113,49 @@ public void sendAsync(Message message, SendCallback sendCallback) {
}
}

public void request(Message message, RRCallback rrCallback, long timeout)
throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
// try {
this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
org.apache.rocketmq.common.message.Message msgRMQ = OMSUtil.msgConvert(message);
rocketmqProducer.request(msgRMQ, rrCallbackConvert(message, rrCallback), timeout);

// }catch (Exception e){
// String topic = message.getTopic();
// String msgId = message.getMsgID();
// OMSRuntimeException onsEx = ProducerImpl.this.checkProducerException(topic, msgId, e);
// OnExceptionContext context = new OnExceptionContext();
// context.setTopic(topic);
// context.setMessageId(msgId);
// context.setException(onsEx);
// sendCallback.onException(context);
// }

}

private RequestCallback rrCallbackConvert(final Message message, final RRCallback rrCallback){
return new RequestCallback() {
@Override
public void onSuccess(org.apache.rocketmq.common.message.Message message) {
Message openMessage = OMSUtil.msgConvert((MessageExt) message);
rrCallback.onSuccess(openMessage);
}

@Override
public void onException(Throwable e) {
String topic = message.getTopic();
String msgId = message.getMsgID();
OMSRuntimeException onsEx = ProducerImpl.this.checkProducerException(topic, msgId, e);
OnExceptionContext context = new OnExceptionContext();
context.setTopic(topic);
context.setMessageId(msgId);
context.setException(onsEx);
rrCallback.onException(e);

}
};
}

private org.apache.rocketmq.client.producer.SendCallback sendCallbackConvert(final Message message, final SendCallback sendCallback) {
org.apache.rocketmq.client.producer.SendCallback rmqSendCallback = new org.apache.rocketmq.client.producer.SendCallback() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.eventmesh.connector.rocketmq.config.ConfigurationWrapper;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -98,19 +100,16 @@ public void send(Message message, SendCallback sendCallback) throws Exception {
}

@Override
public void request(Message message, SendCallback sendCallback, RRCallback rrCallback, long timeout)
public void request(Message message, RRCallback rrCallback, long timeout)
throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
throw new UnsupportedOperationException("not support request-reply mode when eventstore=rocketmq");
}

@Override
public Message request(Message message, long timeout) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
throw new UnsupportedOperationException("not support request-reply mode when eventstore=rocketmq");
producer.request(message, rrCallback, timeout);
}

@Override
public boolean reply(final Message message, final SendCallback sendCallback) throws Exception {
throw new UnsupportedOperationException("not support request-reply mode when eventstore=rocketmq");
message.putSystemProperties(MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG);
producer.sendAsync(message, sendCallback);
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,13 @@ public static Message msgConvert(MessageExt rmqMsg) {
}
}

systemProperties.put(Constants.PROPERTY_MESSAGE_MESSAGE_ID, rmqMsg.getMsgId());
if (rmqMsg.getMsgId() != null){
systemProperties.put(Constants.PROPERTY_MESSAGE_MESSAGE_ID, rmqMsg.getMsgId());
}

systemProperties.put(Constants.PROPERTY_MESSAGE_DESTINATION, rmqMsg.getTopic());
if (rmqMsg.getTopic() != null){
systemProperties.put(Constants.PROPERTY_MESSAGE_DESTINATION, rmqMsg.getTopic());
}

// omsMsg.putSysHeaders(BuiltinKeys.SEARCH_KEYS, rmqMsg.getKeys());
systemProperties.put(Constants.PROPERTY_MESSAGE_BORN_HOST, String.valueOf(rmqMsg.getBornHost()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ public void send(Message message, SendCallback sendCallback) throws Exception {
meshMQProducer.send(message, sendCallback);
}

public void request(Message message, SendCallback sendCallback, RRCallback rrCallback, long timeout)
public void request(Message message, RRCallback rrCallback, long timeout)
throws Exception {
meshMQProducer.request(message, sendCallback, rrCallback, timeout);
meshMQProducer.request(message, rrCallback, timeout);
}

public Message request(Message message, long timeout) throws Exception {
return meshMQProducer.request(message, timeout);
}
// public Message request(Message message, long timeout) throws Exception {
// return meshMQProducer.request(message, timeout);
// }

public boolean reply(final Message message, final SendCallback sendCallback) throws Exception {
return meshMQProducer.reply(message, sendCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,53 +205,7 @@ public void onResponse(HttpCommand httpCommand) {
.setProp(sendMessageRequestBody.getExtFields());

try {
eventMeshProducer.request(sendMessageContext, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
long endTime = System.currentTimeMillis();
eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(endTime - startTime);
messageLogger.info("message|eventMesh2mq|REQ|SYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
endTime - startTime,
sendMessageRequestBody.getTopic(),
sendMessageRequestBody.getBizSeqNo(),
sendMessageRequestBody.getUniqueId());
}

@Override
public void onException(OnExceptionContext context) {
HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(context.getException(), 2)));
asyncContext.onComplete(err, handler);
long endTime = System.currentTimeMillis();
eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed();
eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(endTime - startTime);
messageLogger.error("message|eventMesh2mq|REQ|SYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
endTime - startTime,
sendMessageRequestBody.getTopic(),
sendMessageRequestBody.getBizSeqNo(),
sendMessageRequestBody.getUniqueId(), context.getException());
}
// }
//
// @Override
// public void onException(Throwable e) {
// HttpCommand err = asyncContext.getRequest().createHttpCommandResponse(
// sendMessageResponseHeader,
// SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getRetCode(),
// EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2)));
// asyncContext.onComplete(err, handler);
// long endTime = System.currentTimeMillis();
// eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed();
// eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(endTime - startTime);
// messageLogger.error("message|eventMesh2mq|REQ|SYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
// endTime - startTime,
// sendMessageRequestBody.getTopic(),
// sendMessageRequestBody.getBizSeqNo(),
// sendMessageRequestBody.getUniqueId(), e);
// }
}, new RRCallback() {
eventMeshProducer.request(sendMessageContext, new RRCallback() {
@Override
public void onSuccess(Message omsMsg) {
omsMsg.getUserProperties().put(Constants.PROPERTY_MESSAGE_BORN_TIMESTAMP, omsMsg.getSystemProperties("BORN_TIMESTAMP"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ public void send(SendMessageContext sendMsgContext, SendCallback sendCallback) t
mqProducerWrapper.send(sendMsgContext.getMsg(), sendCallback);
}

public void request(SendMessageContext sendMsgContext, SendCallback sendCallback, RRCallback rrCallback, long timeout)
public void request(SendMessageContext sendMsgContext, RRCallback rrCallback, long timeout)
throws Exception {
mqProducerWrapper.request(sendMsgContext.getMsg(), sendCallback, rrCallback, timeout);
mqProducerWrapper.request(sendMsgContext.getMsg(), rrCallback, timeout);
}

public Message request(SendMessageContext sendMessageContext, long timeout) throws Exception {
return mqProducerWrapper.request(sendMessageContext.getMsg(), timeout);
}
// public Message request(SendMessageContext sendMessageContext, long timeout) throws Exception {
// return mqProducerWrapper.request(sendMessageContext.getMsg(), timeout);
// }

public boolean reply(final SendMessageContext sendMsgContext, final SendCallback sendCallback) throws Exception {
mqProducerWrapper.reply(sendMsgContext.getMsg(), sendCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ public boolean send(UpStreamMsgContext upStreamMsgContext, SendCallback sendCall
return true;
}

public void request(UpStreamMsgContext upStreamMsgContext, SendCallback sendCallback, RRCallback rrCallback, long timeout)
public void request(UpStreamMsgContext upStreamMsgContext, RRCallback rrCallback, long timeout)
throws Exception {
mqProducerWrapper.request(upStreamMsgContext.getMsg(), sendCallback, rrCallback, timeout);
mqProducerWrapper.request(upStreamMsgContext.getMsg(), rrCallback, timeout);
}

public boolean reply(UpStreamMsgContext upStreamMsgContext) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,15 @@ public EventMeshTcpSendResult send(Header header, Message msg, SendCallback send
if (Command.REQUEST_TO_SERVER == cmd) {
long ttl = msg.getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL) != null ? Long.parseLong(msg.getSystemProperties(EventMeshConstants.PROPERTY_MESSAGE_TTL)) : EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS;
upStreamMsgContext = new UpStreamMsgContext(header.getSeq(), session, msg);
session.getClientGroupWrapper().get().request(upStreamMsgContext, sendCallback, initSyncRRCallback(header, startTime, taskExecuteTime), ttl);
session.getClientGroupWrapper().get().request(upStreamMsgContext, initSyncRRCallback(header, startTime, taskExecuteTime), ttl);
upstreamBuff.release();
} else if (Command.RESPONSE_TO_SERVER == cmd) {
String cluster = msg.getUserProperties(EventMeshConstants.PROPERTY_MESSAGE_CLUSTER);
if (!StringUtils.isEmpty(cluster)) {
String replyTopic = EventMeshConstants.RR_REPLY_TOPIC;
replyTopic = cluster + "-" + replyTopic;
msg.getSystemProperties().put(Constants.PROPERTY_MESSAGE_DESTINATION, replyTopic);
msg.setTopic(replyTopic);
}

// //for rocketmq support
Expand Down

0 comments on commit 6d54e3a

Please sign in to comment.