Skip to content

Commit

Permalink
Optimize Tests (apache#12560)
Browse files Browse the repository at this point in the history
  • Loading branch information
liangyepianzhou authored and eolivelli committed Nov 29, 2021
1 parent 1f48191 commit 91cfa8f
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 327 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.transaction;

import com.google.common.collect.Sets;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
Expand Down Expand Up @@ -55,10 +53,7 @@
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
Expand All @@ -75,8 +70,6 @@
@Slf4j
public class TopicTransactionBufferRecoverTest extends TransactionTestBase {

private static final String TENANT = "tnx";
private static final String NAMESPACE1 = TENANT + "/ns1";
private static final String RECOVER_COMMIT = NAMESPACE1 + "/recover-commit";
private static final String RECOVER_ABORT = NAMESPACE1 + "/recover-abort";
private static final String SUBSCRIPTION_NAME = "test-recover";
Expand All @@ -85,36 +78,9 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
private static final int NUM_PARTITIONS = 16;
@BeforeMethod
protected void setup() throws Exception {
setBrokerCount(1);
internalSetup();

String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1];
admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
admin.tenants().createTenant(TENANT,
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NAMESPACE1);
admin.topics().createNonPartitionedTopic(RECOVER_COMMIT);
setUpBase(1, NUM_PARTITIONS, RECOVER_COMMIT, 0);
admin.topics().createNonPartitionedTopic(RECOVER_ABORT);
admin.topics().createNonPartitionedTopic(TAKE_SNAPSHOT);

admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);

if (pulsarClient != null) {
pulsarClient.shutdown();
}
pulsarClient = PulsarClient.builder()
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
.enableTransaction(true)
.build();


// wait tc init success to ready state
waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
}

@AfterMethod(alwaysRun = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,13 @@
*/
package org.apache.pulsar.broker.transaction;

import com.google.common.collect.Sets;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
Expand All @@ -49,33 +43,12 @@

public class TransactionClientReconnectTest extends TransactionTestBase {

private static final String RECONNECT_TOPIC = "persistent://public/txn/txn-client-reconnect-test";
private static final String RECONNECT_TOPIC = NAMESPACE1 + "/txn-client-reconnect-test";
private static final int NUM_PARTITIONS = 1;
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
setBrokerCount(1);
super.internalSetup();

String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1];
admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
admin.tenants().createTenant("public",
new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace("public/txn", 10);
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
admin.topics().createNonPartitionedTopic(RECONNECT_TOPIC);
setUpBase(1, NUM_PARTITIONS, RECONNECT_TOPIC, 0);
admin.topics().createSubscription(RECONNECT_TOPIC, "test", MessageId.latest);
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);

pulsarClient = PulsarClient.builder()
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
.enableTransaction(true)
.build();
// wait tc init success to ready state
waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
}

@AfterMethod(alwaysRun = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.transaction;

import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashSet;
Expand Down Expand Up @@ -50,10 +49,7 @@
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.api.proto.MarkerType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
import org.testng.Assert;
Expand All @@ -69,47 +65,17 @@
public class TransactionProduceTest extends TransactionTestBase {

private static final int TOPIC_PARTITION = 3;

private static final String TENANT = "tnx";
private static final String NAMESPACE1 = TENANT + "/ns1";
private static final String PRODUCE_COMMIT_TOPIC = NAMESPACE1 + "/produce-commit";
private static final String PRODUCE_ABORT_TOPIC = NAMESPACE1 + "/produce-abort";
private static final String ACK_COMMIT_TOPIC = NAMESPACE1 + "/ack-commit";
private static final String ACK_ABORT_TOPIC = NAMESPACE1 + "/ack-abort";
private static final int NUM_PARTITIONS = 16;
@BeforeMethod
protected void setup() throws Exception {
setBrokerCount(1);
internalSetup();

String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1];
admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
admin.tenants().createTenant(TENANT,
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NAMESPACE1);
admin.topics().createPartitionedTopic(PRODUCE_COMMIT_TOPIC, 3);
admin.topics().createPartitionedTopic(PRODUCE_ABORT_TOPIC, 3);
admin.topics().createPartitionedTopic(ACK_COMMIT_TOPIC, 3);
admin.topics().createPartitionedTopic(ACK_ABORT_TOPIC, 3);

admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);

if (pulsarClient != null) {
pulsarClient.shutdown();
}
pulsarClient = PulsarClient.builder()
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
.enableTransaction(true)
.build();


// wait tc init success to ready state
waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
setUpBase(1, NUM_PARTITIONS, PRODUCE_COMMIT_TOPIC, TOPIC_PARTITION);
admin.topics().createPartitionedTopic(PRODUCE_ABORT_TOPIC, TOPIC_PARTITION);
admin.topics().createPartitionedTopic(ACK_COMMIT_TOPIC, TOPIC_PARTITION);
admin.topics().createPartitionedTopic(ACK_ABORT_TOPIC, TOPIC_PARTITION);
}

@AfterMethod(alwaysRun = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,36 +77,12 @@
@Test(groups = "broker")
public class TransactionTest extends TransactionTestBase {

private static final String TENANT = "tnx";
private static final String NAMESPACE1 = TENANT + "/ns1";
private static final int NUM_BROKERS = 1;
private static final int NUM_PARTITIONS = 1;

@BeforeMethod
protected void setup() throws Exception {
this.setBrokerCount(NUM_BROKERS);
this.internalSetup();

String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length - 1];
admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder()
.serviceUrl("http://localhost:" + webServicePort).build());
admin.tenants().createTenant(TENANT,
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NAMESPACE1);

admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
pulsarClient.close();
pulsarClient = PulsarClient.builder()
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
.enableTransaction(true)
.build();
// wait tc init success to ready state
waitForCoordinatorToBeAvailable(NUM_PARTITIONS);
setUpBase(NUM_BROKERS, NUM_PARTITIONS, NAMESPACE1 + "/test", 0);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.EventLoopGroup;
import java.util.ArrayList;
Expand All @@ -45,10 +46,10 @@
import org.apache.pulsar.broker.auth.SameThreadOrderedSafeExecutor;
import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
Expand All @@ -61,6 +62,7 @@
import org.apache.zookeeper.MockZooKeeperSession;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
import org.testng.Assert;

@Slf4j
public abstract class TransactionTestBase extends TestRetrySupport {
Expand All @@ -83,6 +85,9 @@ public abstract class TransactionTestBase extends TestRetrySupport {
private OrderedExecutor bkExecutor;
private NonClosableMockBookKeeper mockBookKeeper;

public static final String TENANT = "tnx";
protected static final String NAMESPACE1 = TENANT + "/ns1";

public void internalSetup() throws Exception {
incrementSetupNumber();
init();
Expand All @@ -108,6 +113,40 @@ private void init() throws Exception {
mockBookKeeper = createMockBookKeeper(bkExecutor);
startBroker();
}
protected void setUpBase(int numBroker,int numPartitionsOfTC, String topic, int numPartitions) throws Exception{
setBrokerCount(numBroker);
internalSetup();

String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1];
admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:"
+ webServicePort).build());

admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), numPartitionsOfTC);
if (topic != null) {
admin.tenants().createTenant(TENANT,
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NAMESPACE1);
if (numPartitions == 0) {
admin.topics().createNonPartitionedTopic(topic);
} else {
admin.topics().createPartitionedTopic(topic, numPartitions);
}
}
if (pulsarClient != null) {
pulsarClient.shutdown();
}
pulsarClient = PulsarClient.builder()
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
.enableTransaction(true)
.build();
// wait tc init success to ready state
waitForCoordinatorToBeAvailable(numPartitionsOfTC);
}

protected void startBroker() throws Exception {
for (int i = 0; i < brokerCount; i++) {
Expand Down Expand Up @@ -295,20 +334,12 @@ protected final void internalCleanup() {
}
public void waitForCoordinatorToBeAvailable(int numOfTCPerBroker){
// wait tc init success to ready state
Awaitility.await().until(() -> {
Map<TransactionCoordinatorID, TransactionMetadataStore> stores =
getPulsarServiceList().get(brokerCount-1).getTransactionMetadataStoreService().getStores();
if (stores.size() == numOfTCPerBroker) {
for (TransactionCoordinatorID transactionCoordinatorID : stores.keySet()) {
if (((MLTransactionMetadataStore) stores.get(transactionCoordinatorID)).getState()
!= TransactionMetadataStoreState.State.Ready) {
return false;
}
}
return true;
} else {
return false;
}
});
Awaitility.await()
.untilAsserted(() -> {
int transactionMetaStoreCount = pulsarServiceList.stream()
.mapToInt(pulsarService -> pulsarService.getTransactionMetadataStoreService().getStores().size())
.sum();
Assert.assertEquals(transactionMetaStoreCount, numOfTCPerBroker);
});
}
}
Loading

0 comments on commit 91cfa8f

Please sign in to comment.