Skip to content

Commit

Permalink
Merge pull request #223 from WeBankFinTech/1.2.0
Browse files Browse the repository at this point in the history
[ISSUE #222]When onChange is NEW, eventMesh occurred NullPointException
  • Loading branch information
xwm1992 authored Feb 20, 2021
2 parents e6a498d + 6cd07ad commit 37405ce
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 8 deletions.
12 changes: 12 additions & 0 deletions docker/centos7-jdk8/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM docker.io/centos:7

MAINTAINER mikexue <[email protected]>

RUN yum update -y && yum install net-tools -y && yum install lrzsz -y && yum install vim -y
ADD jdk-8u281-linux-x64.tar.gz /usr/local/src/
RUN ln -s /usr/local/src/jdk1.8.0_281/ /usr/local/jdk

ENV JAVA_HOME /usr/local/jdk
ENV JRE_HOME $JAVA_HOME/jre
ENV CLASSPATH .:$JAVA_HOME/lib/:$JRE_HOME/lib/
ENV PATH $PATH:$JAVA_HOME/bin
15 changes: 15 additions & 0 deletions docker/eventmesh-defibus/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM centos7-jdk8:latest

MAINTAINER mikexue [email protected]

WORKDIR /data
RUN mkdir /data/app
ADD eventmesh-runtime_1.2.0-SNAPSHOT.tar.gz /data/app/eventmesh
COPY eventmesh-connector-defibus-1.2.0-SNAPSHOT.jar /data/app/eventmesh/apps
WORKDIR /data/app/eventmesh/bin

EXPOSE 10000
EXPOSE 10105

ENV DOCKER true
CMD sh start.sh
16 changes: 16 additions & 0 deletions docker/eventmesh-rocketmq/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM centos7-jdk8:latest

MAINTAINER mikexue [email protected]

WORKDIR /data
RUN mkdir /data/app
ADD eventmesh-runtime_1.2.0-SNAPSHOT.tar.gz /data/app/eventmesh
COPY eventmesh-connector-rocketmq-1.2.0.jar /data/app/eventmesh/apps
WORKDIR /data/app/eventmesh/bin

EXPOSE 10000
EXPOSE 10105

ENV DOCKER true

CMD sh start.sh
7 changes: 6 additions & 1 deletion eventmesh-runtime/bin/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ echo "using jdk[$JAVA]" >> ${PROXY_LOG_HOME}/proxy.out


PROXY_MAIN=com.webank.eventmesh.runtime.boot.ProxyStartup
$JAVA $JAVA_OPT -classpath ${PROXY_HOME}/conf:${PROXY_HOME}/apps/*:${PROXY_HOME}/lib/* $PROXY_MAIN >> ${PROXY_LOG_HOME}/proxy.out 2>&1 &
if [ $DOCKER ]
then
$JAVA $JAVA_OPT -classpath ${PROXY_HOME}/conf:${PROXY_HOME}/apps/*:${PROXY_HOME}/lib/* $PROXY_MAIN >> ${PROXY_LOG_HOME}/proxy.out
else
$JAVA $JAVA_OPT -classpath ${PROXY_HOME}/conf:${PROXY_HOME}/apps/*:${PROXY_HOME}/lib/* $PROXY_MAIN >> ${PROXY_LOG_HOME}/proxy.out 2>&1 &
echo $!>pid.file
fi
exit 0
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,15 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
}
synchronized (proxyHTTPServer.localClientInfoMapping){
for (Map.Entry<String, List<Client>> groupTopicClientMapping : tmp.entrySet()) {
proxyHTTPServer.localClientInfoMapping.put(groupTopicClientMapping.getKey(), groupTopicClientMapping.getValue());
List<Client> localClientList = proxyHTTPServer.localClientInfoMapping.get(groupTopicClientMapping.getKey());
if (CollectionUtils.isEmpty(localClientList)){
proxyHTTPServer.localClientInfoMapping.put(groupTopicClientMapping.getKey(), groupTopicClientMapping.getValue());
}else {
List<Client> tmpClientList = groupTopicClientMapping.getValue();
supplyClientInfoList(tmpClientList, localClientList);
proxyHTTPServer.localClientInfoMapping.put(groupTopicClientMapping.getKey(), localClientList);
}

}
}

Expand Down Expand Up @@ -184,6 +192,22 @@ public void onResponse(HttpCommand httpCommand) {

}

private void supplyClientInfoList(List<Client> tmpClientList, List<Client> localClientList) {
for (Client tmpClient : tmpClientList){
boolean isContains = false;
for (Client localClient : localClientList){
if (StringUtils.equals(localClient.url, tmpClient.url)){
isContains = true;
localClient.lastUpTime = tmpClient.lastUpTime;
break;
}
}
if (!isContains){
localClientList.add(tmpClient);
}
}
}

@Override
public boolean rejectRequest() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ public synchronized void initClientGroupBroadcastConsumer() throws Exception {
keyValue.put("isBroadcast", "true");
keyValue.put("consumerGroup", groupName);
keyValue.put("proxyIDC", accessConfiguration.proxyIDC);
keyValue.put("instanceName", ProxyUtil.buildProxyTcpClientID(sysId, dcn, "SUB", accessConfiguration.proxyCluster));
broadCastMsgConsumer.init(keyValue);
// broadCastMsgConsumer.registerMessageListener(new ProxyMessageListenerConcurrently() {
// @Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,14 @@ private void retryHandle(DownStreamMsgContext downStreamMsgContext){

private boolean isRetryMsgTimeout(DownStreamMsgContext downStreamMsgContext){
boolean flag =false;
long ttl = Long.valueOf(downStreamMsgContext.msgExt.sysHeaders().getString(ProxyConstants.PROPERTY_MESSAGE_TTL));
long ttl = Long.parseLong(downStreamMsgContext.msgExt.userHeaders().getString(ProxyConstants.PROPERTY_MESSAGE_TTL));
//TODO 关注是否能取到
long storeTimestamp = Long.valueOf(downStreamMsgContext.msgExt.sysHeaders().getString(DeFiBusConstant.STORE_TIME));
String leaveTimeStr = downStreamMsgContext.msgExt.sysHeaders().getString(DeFiBusConstant.LEAVE_TIME);
long brokerCost = StringUtils.isNumeric(leaveTimeStr) ? Long.valueOf(leaveTimeStr) - storeTimestamp : 0;
long storeTimestamp = Long.parseLong(downStreamMsgContext.msgExt.userHeaders().getString(DeFiBusConstant.STORE_TIME));
String leaveTimeStr = downStreamMsgContext.msgExt.userHeaders().getString(DeFiBusConstant.LEAVE_TIME);
long brokerCost = StringUtils.isNumeric(leaveTimeStr) ? Long.parseLong(leaveTimeStr) - storeTimestamp : 0;

String arriveTimeStr = downStreamMsgContext.msgExt.sysHeaders().getString(DeFiBusConstant.ARRIVE_TIME);
long accessCost = StringUtils.isNumeric(arriveTimeStr) ? System.currentTimeMillis() - Long.valueOf(arriveTimeStr) : 0;
String arriveTimeStr = downStreamMsgContext.msgExt.userHeaders().getString(DeFiBusConstant.ARRIVE_TIME);
long accessCost = StringUtils.isNumeric(arriveTimeStr) ? System.currentTimeMillis() - Long.parseLong(arriveTimeStr) : 0;
double elapseTime = brokerCost + accessCost;
if (elapseTime >= ttl) {
logger.warn("discard the retry because timeout, seq:{}, retryTimes:{}, bizSeq:{}", downStreamMsgContext.seq, downStreamMsgContext.retryTimes, ProxyUtil.getMessageBizSeq(downStreamMsgContext.msgExt));
Expand Down

0 comments on commit 37405ce

Please sign in to comment.