Skip to content

Commit

Permalink
feature : RocketMQ transaction are supported (#6230)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bughue authored Mar 3, 2024
1 parent e83d49b commit 7ce96e4
Show file tree
Hide file tree
Showing 26 changed files with 785 additions and 1 deletion.
5 changes: 5 additions & 0 deletions all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.seata</groupId>
<artifactId>seata-rocketmq</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seata</groupId>
<artifactId>seata-sqlparser-core</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@
<artifactId>seata-http</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seata</groupId>
<artifactId>seata-rocketmq</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seata</groupId>
<artifactId>seata-rm</artifactId>
Expand Down
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6370](https://github.com/seata/seata/pull/6370)] seata saga decouple spring, optimize architecture.
- [[#6205](https://github.com/apache/incubator-seata/pull/6205)] mock server
- [[#6169](https://github.com/apache/incubator-seata/pull/6169)] full support for states in the refactored state machine designer
- [[#6230](https://github.com/apache/incubator-seata/pull/6230)] RocketMQ transaction are supported

### bugfix:
- [[#6090](https://github.com/apache/incubator-seata/pull/6090)] fix the TCC aspect exception handling process, do not wrapping the internal call exceptions
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- [[#6370](https://github.com/seata/seata/pull/6370)] seata saga spring接耦、架构优化。
- [[#6205](https://github.com/apache/incubator-seata/pull/6205)] 提供mock server
- [[#6169](https://github.com/apache/incubator-seata/pull/6169)] 支持新版本状态机设计器
- [[#6230](https://github.com/apache/incubator-seata/pull/6230)] 支持RocketMQ消息事务

### bugfix:
- [[#6090](https://github.com/apache/incubator-seata/pull/6090)] 修复tcc切面异常处理过程,不对内部调用异常做包装处理,直接向外抛出
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,4 +1006,9 @@ public interface ConfigurationKeys {
* The constant SERVER_APPLICATION_DATA_SIZE_CHECK
*/
String SERVER_APPLICATION_DATA_SIZE_CHECK = SERVER_PREFIX + "applicationDataLimitCheck";

/**
* The constant ROCKET_MQ_MSG_TIMEOUT
*/
String ROCKET_MQ_MSG_TIMEOUT = SERVER_PREFIX + "rocketmqMsgTimeout";
}
Original file line number Diff line number Diff line change
Expand Up @@ -312,4 +312,6 @@ public interface DefaultValues {
* Default druid location in classpath
*/
String DRUID_LOCATION = "lib/sqlparser/druid.jar";

int DEFAULT_ROCKET_MQ_MSG_TIMEOUT = 60 * 1000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ private RootContext() {
*/
public static final String KEY_XID = "TX_XID";

public static final String KEY_BRANCHID = "TX_BRANCHID";

/**
* The constant HIDDEN_KEY_XID for sofa-rpc integration.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,13 @@ public interface ResourceManager extends ResourceManagerInbound, ResourceManager
* @return The BranchType of ResourceManager.
*/
BranchType getBranchType();

/**
* Get the GlobalStatus.
*
* @param branchType The BranchType of ResourceManager.
* @param xid The xid of transaction.
* @return The GlobalStatus of transaction.
*/
GlobalStatus getGlobalStatus(BranchType branchType, String xid);
}
7 changes: 7 additions & 0 deletions dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@
<!-- for jdbc driver when package -->
<mysql5.version>${mysql.version}</mysql5.version>
<mysql8.version>8.0.27</mysql8.version>
<!-- rocketmq -->
<rocketmq-version>5.0.0</rocketmq-version>

<!-- # for kotlin -->
<kotlin.version>1.4.32</kotlin.version>
Expand Down Expand Up @@ -781,6 +783,11 @@
<artifactId>janino</artifactId>
<version>${janino-version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq-version}</version>
</dependency>

<!-- web -->
<dependency>
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
<module>integration/brpc</module>
<module>rm</module>
<module>rm-datasource</module>
<module>rocketmq</module>
<module>spring</module>
<module>tcc</module>
<module>test</module>
Expand Down
15 changes: 15 additions & 0 deletions rm/src/main/java/org/apache/seata/rm/AbstractResourceManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
import org.apache.seata.core.exception.TransactionExceptionCode;
import org.apache.seata.core.model.BranchStatus;
import org.apache.seata.core.model.BranchType;
import org.apache.seata.core.model.GlobalStatus;
import org.apache.seata.core.model.Resource;
import org.apache.seata.core.model.ResourceManager;
import org.apache.seata.core.protocol.ResultCode;
import org.apache.seata.core.protocol.transaction.BranchRegisterRequest;
import org.apache.seata.core.protocol.transaction.BranchRegisterResponse;
import org.apache.seata.core.protocol.transaction.BranchReportRequest;
import org.apache.seata.core.protocol.transaction.BranchReportResponse;
import org.apache.seata.core.protocol.transaction.GlobalStatusRequest;
import org.apache.seata.core.protocol.transaction.GlobalStatusResponse;
import org.apache.seata.core.rpc.netty.RmNettyRemotingClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -140,4 +143,16 @@ public void unregisterResource(Resource resource) {
public void registerResource(Resource resource) {
RmNettyRemotingClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());
}

@Override
public GlobalStatus getGlobalStatus(BranchType branchType, String xid) {
GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
queryGlobalStatus.setXid(xid);
try {
GlobalStatusResponse response = (GlobalStatusResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(queryGlobalStatus);
return response.getGlobalStatus();
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seata.core.exception.TransactionException;
import org.apache.seata.core.model.BranchStatus;
import org.apache.seata.core.model.BranchType;
import org.apache.seata.core.model.GlobalStatus;
import org.apache.seata.core.model.Resource;
import org.apache.seata.core.model.ResourceManager;

Expand Down Expand Up @@ -150,6 +151,11 @@ public BranchType getBranchType() {
throw new FrameworkException("DefaultResourceManager isn't a real ResourceManager");
}

@Override
public GlobalStatus getGlobalStatus(BranchType branchType, String xid) {
return getResourceManager(branchType).getGlobalStatus(branchType, xid);
}

private static class SingletonHolder {
private static DefaultResourceManager INSTANCE = new DefaultResourceManager();
}
Expand Down
49 changes: 49 additions & 0 deletions rocketmq/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.seata</groupId>
<artifactId>seata-parent</artifactId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seata-rocketmq</artifactId>
<packaging>jar</packaging>
<name>seata-rocketmq ${project.version}</name>
<description>rocketmq integration for Seata built with Maven</description>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-tcc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.integration.rocketmq;

import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.context.RootContext;
import org.apache.seata.core.model.GlobalStatus;
import org.apache.seata.rm.DefaultResourceManager;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;

/**
* Seata MQ Producer
**/
public class SeataMQProducer extends TransactionMQProducer {

private static final Logger LOGGER = LoggerFactory.getLogger(SeataMQProducer.class);

private static final List<GlobalStatus> COMMIT_STATUSES = Arrays.asList(GlobalStatus.Committed, GlobalStatus.Committing, GlobalStatus.CommitRetrying);
private static final List<GlobalStatus> ROLLBACK_STATUSES = Arrays.asList(GlobalStatus.Rollbacked, GlobalStatus.Rollbacking, GlobalStatus.RollbackRetrying);

public static String PROPERTY_SEATA_XID = RootContext.KEY_XID;
public static String PROPERTY_SEATA_BRANCHID = RootContext.KEY_BRANCHID;
private TransactionListener transactionListener;

private TCCRocketMQ tccRocketMQ;

SeataMQProducer(final String producerGroup) {
this(null, producerGroup, null);
}

SeataMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
super(namespace, producerGroup, rpcHook);
this.transactionListener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return LocalTransactionState.UNKNOW;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String xid = msg.getProperty(PROPERTY_SEATA_XID);
if (StringUtils.isBlank(xid)) {
LOGGER.error("msg has no xid, msgTransactionId: {}, msg will be rollback", msg.getTransactionId());
return LocalTransactionState.ROLLBACK_MESSAGE;
}
GlobalStatus globalStatus = DefaultResourceManager.get().getGlobalStatus(SeataMQProducerFactory.ROCKET_BRANCH_TYPE, xid);
if (COMMIT_STATUSES.contains(globalStatus)) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (ROLLBACK_STATUSES.contains(globalStatus) || GlobalStatus.isOnePhaseTimeout(globalStatus)) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (GlobalStatus.Finished.equals(globalStatus)) {
LOGGER.error("global transaction finished, msg will be rollback, xid: {}", xid);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
};
}

@Override
public SendResult send(Message msg) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
return send(msg, this.getSendMsgTimeout());
}

@Override
public SendResult send(Message msg, long timeout) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
if (RootContext.inGlobalTransaction()) {
if (tccRocketMQ == null) {
throw new RuntimeException("TCCRocketMQ is not initialized");
}
return tccRocketMQ.prepare(msg, timeout);
} else {
return super.send(msg, timeout);
}
}

public SendResult doSendMessageInTransaction(final Message msg, long timeout, String xid, long branchId) throws MQClientException {
msg.setTopic(withNamespace(msg.getTopic()));
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this);

SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.getProducerGroup());
MessageAccessor.putProperty(msg, PROPERTY_SEATA_XID, xid);
MessageAccessor.putProperty(msg, PROPERTY_SEATA_BRANCHID, String.valueOf(branchId));
try {
sendResult = super.send(msg, timeout);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}

if (SendStatus.SEND_OK != sendResult.getSendStatus()) {
throw new RuntimeException("Message send fail.status=" + sendResult.getSendStatus());
}
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
return sendResult;
}


@Override
public TransactionListener getTransactionListener() {
return transactionListener;
}

public void setTccRocketMQ(TCCRocketMQ tccRocketMQ) {
this.tccRocketMQ = tccRocketMQ;
}
}
Loading

0 comments on commit 7ce96e4

Please sign in to comment.