diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionMgr.java b/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionMgr.java index 686cf01ba3d5d..1b35294675b23 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionMgr.java @@ -56,6 +56,21 @@ public class CompactionMgr implements MemoryTrackable { private Sorter sorter; private CompactionScheduler compactionScheduler; + /** + * We use `activeCompactionTransactionMap` to track all lake compaction txns that are not published on FE restart. + * The key of the map is the transaction id related to the compaction task, and the value is table id of the + * compaction task. It's possible that multiple keys have the same value, because there might be multiple compaction + * jobs on different partitions with the same table id. + * + * Note that, this will prevent all partitions whose tableId is maintained in the map from being compacted + */ + private final ConcurrentHashMap remainedActiveCompactionTxnWhenStart = new ConcurrentHashMap<>(); + + @VisibleForTesting + protected ConcurrentHashMap getRemainedActiveCompactionTxnWhenStart() { + return remainedActiveCompactionTxnWhenStart; + } + public CompactionMgr() { try { init(); @@ -90,6 +105,30 @@ public void start() { } } + /** + * iterate all transactions and find those with LAKE_COMPACTION labels and are not finished before FE restart. + **/ + public void rebuildActiveCompactionTransactionMapOnRestart() { + Map activeTxnStates = + GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getLakeCompactionActiveTxnStats(); + for (Map.Entry txnState : activeTxnStates.entrySet()) { + // for lake compaction txn, there can only be one table id for each txn state + remainedActiveCompactionTxnWhenStart.put(txnState.getKey(), txnState.getValue()); + LOG.info("Found lake compaction transaction not finished on table {}, txn_id: {}", txnState.getValue(), + txnState.getKey()); + } + } + + protected void removeFromStartupActiveCompactionTransactionMap(long txnId) { + if (remainedActiveCompactionTxnWhenStart.isEmpty()) { + return; + } + boolean ret = remainedActiveCompactionTxnWhenStart.keySet().removeIf(key -> key == txnId); + if (ret) { + LOG.info("Removed transaction {} from startup active compaction transaction map", txnId); + } + } + public void handleLoadingFinished(PartitionIdentifier partition, long version, long versionTime, Quantiles compactionScore) { PartitionVersion currentVersion = new PartitionVersion(version, versionTime); @@ -107,7 +146,8 @@ public void handleLoadingFinished(PartitionIdentifier partition, long version, l } public void handleCompactionFinished(PartitionIdentifier partition, long version, long versionTime, - Quantiles compactionScore) { + Quantiles compactionScore, long txnId) { + removeFromStartupActiveCompactionTransactionMap(txnId); PartitionVersion compactionVersion = new PartitionVersion(version, versionTime); PartitionStatistics statistics = partitionStatisticsHashMap.compute(partition, (k, v) -> { if (v == null) { @@ -125,8 +165,10 @@ public void handleCompactionFinished(PartitionIdentifier partition, long version @NotNull List choosePartitionsToCompact(@NotNull Set excludes, - @NotNull Set excludeTables) { - return choosePartitionsToCompact(excludeTables) + @NotNull Set excludeTables) { + Set copiedExcludeTables = new HashSet<>(excludeTables); + copiedExcludeTables.addAll(remainedActiveCompactionTxnWhenStart.values()); + return choosePartitionsToCompact(copiedExcludeTables) .stream() .filter(p -> !excludes.contains(p.getPartition())) .collect(Collectors.toList()); @@ -215,6 +257,16 @@ public void save(ImageWriter imageWriter) throws IOException, SRMetaBlockExcepti public void load(SRMetaBlockReader reader) throws IOException, SRMetaBlockException, SRMetaBlockEOFException { CompactionMgr compactionManager = reader.readJson(CompactionMgr.class); partitionStatisticsHashMap = compactionManager.partitionStatisticsHashMap; + + // In order to ensure that the input rowsets of compaction still exists when doing publishing version, it is + // necessary to ensure that the compaction task of the same partition is executed serially, that is, the next + // compaction task can be executed only after the status of the previous compaction task changes to visible or + // canceled. + // So when FE restarted, we should make sure all the active compaction transactions before restarting were tracked, + // and exclude them from choosing as candidates for compaction. + // Note here, the map is maintained on leader and follower fe, its keys were removed from the map after compaction + // transaction has finished, and for follower FE, this is done by replay process. + rebuildActiveCompactionTransactionMapOnRestart(); } public long getPartitionStatsCount() { diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionScheduler.java b/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionScheduler.java index 18a9452c138ab..6db3717a9e6e2 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionScheduler.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionScheduler.java @@ -79,8 +79,6 @@ public class CompactionScheduler extends Daemon { private final GlobalStateMgr stateMgr; private final ConcurrentHashMap runningCompactions; private final SynchronizedCircularQueue history; - private boolean finishedWaiting = false; - private long waitTxnId = -1; private long lastPartitionCleanTime; private Set disabledTables; // copy-on-write @@ -105,31 +103,12 @@ protected void runOneCycle() { cleanPhysicalPartition(); // Schedule compaction tasks only when this is a leader FE and all edit logs have finished replay. - // In order to ensure that the input rowsets of compaction still exists when doing publishing version, it is - // necessary to ensure that the compaction task of the same partition is executed serially, that is, the next - // compaction task can be executed only after the status of the previous compaction task changes to visible or - // canceled. - if (stateMgr.isLeader() && stateMgr.isReady() && allCommittedCompactionsBeforeRestartHaveFinished()) { + if (stateMgr.isLeader() && stateMgr.isReady()) { schedule(); history.changeMaxSize(Config.lake_compaction_history_size); } } - // Returns true if all compaction transactions committed before this restart have finished(i.e., of VISIBLE state). - private boolean allCommittedCompactionsBeforeRestartHaveFinished() { - if (finishedWaiting) { - return true; - } - // Note: must call getMinActiveCompactionTxnId() before getNextTransactionId(), otherwise if there are - // no running transactions waitTxnId <= minActiveTxnId will always be false. - long minActiveTxnId = transactionMgr.getMinActiveCompactionTxnId(); - if (waitTxnId < 0) { - waitTxnId = transactionMgr.getTransactionIDGenerator().getNextTransactionId(); - } - finishedWaiting = waitTxnId <= minActiveTxnId; - return finishedWaiting; - } - private void schedule() { // Check whether there are completed compaction jobs. for (Iterator> iterator = runningCompactions.entrySet().iterator(); diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java index 4522e444ec813..16c1faf01d6cd 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java @@ -838,6 +838,20 @@ public List getCommittedTxnList() { } } + public Map getLakeCompactionActiveTxnMap() { + readLock(); + try { + // for lake compaction txn, there can only be one table id for each txn state + Map txnIdToTableIdMap = new HashMap<>(); + idToRunningTransactionState.values().stream() + .filter(state -> state.getSourceType() == TransactionState.LoadJobSourceType.LAKE_COMPACTION) + .forEach(state -> txnIdToTableIdMap.put(state.getTransactionId(), state.getTableIdList().get(0))); + return txnIdToTableIdMap; + } finally { + readUnlock(); + } + } + // Check whether there is committed txns on partitionId. public boolean hasCommittedTxnOnPartition(long tableId, long partitionId) { readLock(); diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java index 9274436366d7f..9f2a216115d73 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java @@ -72,6 +72,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -647,6 +648,19 @@ public long getMinActiveCompactionTxnId() { return minId; } + /** + * Get the map of active txn [txnId, tableId] of compaction transactions. + * @return the list of active txn stats of compaction transactions. + */ + public Map getLakeCompactionActiveTxnStats() { + Map txnIdToTableIdMap = new HashMap<>(); + for (Map.Entry entry : dbIdToDatabaseTransactionMgrs.entrySet()) { + DatabaseTransactionMgr dbTransactionMgr = entry.getValue(); + txnIdToTableIdMap.putAll(dbTransactionMgr.getLakeCompactionActiveTxnMap()); + } + return txnIdToTableIdMap; + } + /** * Get the smallest transaction ID of active transactions in a database. * If there are no active transactions in the database, return the transaction ID that will be assigned to the diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/LakeTableTxnLogApplier.java b/fe/fe-core/src/main/java/com/starrocks/transaction/LakeTableTxnLogApplier.java index 03085ae28da83..180621d29bcba 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/LakeTableTxnLogApplier.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/LakeTableTxnLogApplier.java @@ -99,7 +99,8 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf PartitionIdentifier partitionIdentifier = new PartitionIdentifier(txnState.getDbId(), table.getId(), partition.getId()); if (txnState.getSourceType() == TransactionState.LoadJobSourceType.LAKE_COMPACTION) { - compactionManager.handleCompactionFinished(partitionIdentifier, version, versionTime, compactionScore); + compactionManager.handleCompactionFinished(partitionIdentifier, version, versionTime, compactionScore, + txnState.getTransactionId()); } else { compactionManager.handleLoadingFinished(partitionIdentifier, version, versionTime, compactionScore); } diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/TransactionStateSnapshot.java b/fe/fe-core/src/main/java/com/starrocks/transaction/TransactionStateSnapshot.java index 0e0ab4e42a219..9514994801d52 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/TransactionStateSnapshot.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/TransactionStateSnapshot.java @@ -40,4 +40,4 @@ public String toString() { ", reason='" + reason + '\'' + '}'; } -} +} \ No newline at end of file diff --git a/fe/fe-core/src/test/java/com/starrocks/catalog/GlobalStateMgrTestUtil.java b/fe/fe-core/src/test/java/com/starrocks/catalog/GlobalStateMgrTestUtil.java index 83e317fa637d1..84a3e3bca158e 100644 --- a/fe/fe-core/src/test/java/com/starrocks/catalog/GlobalStateMgrTestUtil.java +++ b/fe/fe-core/src/test/java/com/starrocks/catalog/GlobalStateMgrTestUtil.java @@ -89,6 +89,8 @@ public class GlobalStateMgrTestUtil { public static String testTxnLable8 = "testTxnLable8"; public static String testTxnLable9 = "testTxnLable9"; public static String testTxnLable10 = "testTxnLable10"; + public static String testTxnLableCompaction1 = "testTxnLableCompaction1"; + public static String testTxnLableCompaction2 = "testTxnLableCompaction2"; public static String testEsTable1 = "partitionedEsTable1"; public static long testEsTableId1 = 14; diff --git a/fe/fe-core/src/test/java/com/starrocks/lake/compaction/CompactionMgrTest.java b/fe/fe-core/src/test/java/com/starrocks/lake/compaction/CompactionMgrTest.java index 04125deb5fa0f..82d3c50d1588a 100644 --- a/fe/fe-core/src/test/java/com/starrocks/lake/compaction/CompactionMgrTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/lake/compaction/CompactionMgrTest.java @@ -26,20 +26,31 @@ import com.starrocks.persist.metablock.SRMetaBlockReaderV2; import com.starrocks.server.GlobalStateMgr; import com.starrocks.sql.common.MetaUtils; +import com.starrocks.transaction.GlobalTransactionMgr; import com.starrocks.utframe.UtFrameUtils; +import mockit.Expectations; import mockit.Mock; import mockit.MockUp; +import mockit.Mocked; import org.junit.Assert; import org.junit.Test; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; public class CompactionMgrTest { + @Mocked + private GlobalStateMgr globalStateMgr; + @Mocked + private GlobalTransactionMgr globalTransactionMgr; + @Mocked + private Database db; @Test public void testChoosePartitionsToCompact() { @@ -87,6 +98,65 @@ public void testChoosePartitionsToCompact() { Assert.assertSame(partition2, compactionList.get(0).getPartition()); } + @Test + public void testChoosePartitionsToCompactWithActiveTxnFilter() { + long dbId = 10001L; + long tableId1 = 10002L; + long tableId2 = 10003L; + long partitionId10 = 20001L; + long partitionId11 = 20003L; + long partitionId20 = 20002L; + + PartitionIdentifier partition10 = new PartitionIdentifier(dbId, tableId1, partitionId10); + PartitionIdentifier partition11 = new PartitionIdentifier(dbId, tableId1, partitionId11); + PartitionIdentifier partition20 = new PartitionIdentifier(dbId, tableId2, partitionId20); + + CompactionMgr compactionManager = new CompactionMgr(); + compactionManager.handleLoadingFinished(partition10, 1, System.currentTimeMillis(), + Quantiles.compute(Lists.newArrayList(100d))); + compactionManager.handleLoadingFinished(partition11, 2, System.currentTimeMillis(), + Quantiles.compute(Lists.newArrayList(100d))); + compactionManager.handleLoadingFinished(partition20, 3, System.currentTimeMillis(), + Quantiles.compute(Lists.newArrayList(100d))); + + // build active txn on table1 + long txnId = 10001L; + Map txnIdToTableIdMap = new HashMap<>(); + txnIdToTableIdMap.put(txnId, tableId1); + new Expectations() { + { + GlobalStateMgr.getCurrentState(); + result = globalStateMgr; + + globalStateMgr.getGlobalTransactionMgr(); + result = globalTransactionMgr; + + globalTransactionMgr.getLakeCompactionActiveTxnStats(); + result = txnIdToTableIdMap; + + } + }; + compactionManager.rebuildActiveCompactionTransactionMapOnRestart(); + + Set allPartitions = compactionManager.getAllPartitions(); + Assert.assertEquals(3, allPartitions.size()); + Assert.assertTrue(allPartitions.contains(partition10)); + Assert.assertTrue(allPartitions.contains(partition11)); + Assert.assertTrue(allPartitions.contains(partition20)); + + List compactionList = + compactionManager.choosePartitionsToCompact(new HashSet<>(), new HashSet<>()); + // both partition10 and partition11 are filtered because table1 has active txn + Assert.assertEquals(1, compactionList.size()); + Assert.assertSame(partition20, compactionList.get(0).getPartition()); + + Set excludeTables = new HashSet<>(); + excludeTables.add(tableId2); + compactionList = compactionManager.choosePartitionsToCompact(new HashSet<>(), excludeTables); + // tableId2 is filtered by excludeTables + Assert.assertEquals(0, compactionList.size()); + } + @Test public void testGetMaxCompactionScore() { double delta = 0.001; @@ -100,7 +170,7 @@ public void testGetMaxCompactionScore() { Quantiles.compute(Lists.newArrayList(1d))); Assert.assertEquals(1, compactionMgr.getMaxCompactionScore(), delta); compactionMgr.handleCompactionFinished(partition1, 3, System.currentTimeMillis(), - Quantiles.compute(Lists.newArrayList(2d))); + Quantiles.compute(Lists.newArrayList(2d)), 1234); Assert.assertEquals(2, compactionMgr.getMaxCompactionScore(), delta); compactionMgr.handleLoadingFinished(partition2, 2, System.currentTimeMillis(), @@ -205,4 +275,40 @@ public boolean isPhysicalPartitionExist(GlobalStateMgr stateMgr, long dbId, long compactionMgr2.load(reader); Assert.assertEquals(1, compactionMgr2.getPartitionStatsCount()); } + + @Test + public void testActiveCompactionTransactionMapOnRestart() { + long txnId = 10001L; + long tableId = 10002L; + Map txnIdToTableIdMap = new HashMap<>(); + txnIdToTableIdMap.put(txnId, tableId); + new Expectations() { + { + GlobalStateMgr.getCurrentState(); + result = globalStateMgr; + + globalStateMgr.getGlobalTransactionMgr(); + result = globalTransactionMgr; + + globalTransactionMgr.getLakeCompactionActiveTxnStats(); + result = txnIdToTableIdMap; + + } + }; + + CompactionMgr compactionMgr = new CompactionMgr(); + compactionMgr.rebuildActiveCompactionTransactionMapOnRestart(); + ConcurrentHashMap activeCompactionTransactionMap = + compactionMgr.getRemainedActiveCompactionTxnWhenStart(); + Assert.assertEquals(1, activeCompactionTransactionMap.size()); + Assert.assertTrue(activeCompactionTransactionMap.containsValue(tableId)); + + // test for removeFromStartupActiveCompactionTransactionMap + long nonExistedTxnId = 10003L; + compactionMgr.removeFromStartupActiveCompactionTransactionMap(nonExistedTxnId); + Assert.assertEquals(1, activeCompactionTransactionMap.size()); + + compactionMgr.removeFromStartupActiveCompactionTransactionMap(txnId); + Assert.assertEquals(0, activeCompactionTransactionMap.size()); + } } diff --git a/fe/fe-core/src/test/java/com/starrocks/lake/compaction/CompactionSchedulerTest.java b/fe/fe-core/src/test/java/com/starrocks/lake/compaction/CompactionSchedulerTest.java index ae2ddd9fc8bd6..906926d49cbd9 100644 --- a/fe/fe-core/src/test/java/com/starrocks/lake/compaction/CompactionSchedulerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/lake/compaction/CompactionSchedulerTest.java @@ -189,7 +189,7 @@ public ConcurrentHashMap getRunningCompactio @Test public void testCompactionTaskLimit() { - CompactionScheduler compactionScheduler = new CompactionScheduler(null, null, null, null, ""); + CompactionScheduler compactionScheduler = new CompactionScheduler(new CompactionMgr(), null, null, null, ""); int defaultValue = Config.lake_compaction_max_tasks; // explicitly set config to a value bigger than default -1 diff --git a/fe/fe-core/src/test/java/com/starrocks/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/com/starrocks/transaction/DatabaseTransactionMgrTest.java index e177ea12017a2..4667fa303a615 100644 --- a/fe/fe-core/src/test/java/com/starrocks/transaction/DatabaseTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/transaction/DatabaseTransactionMgrTest.java @@ -151,16 +151,7 @@ public Map addTransactionToTransactionMgr() throws StarRocksExcept Assert.assertEquals(idGenerator.peekNextTransactionId(), masterTransMgr.getMinActiveCompactionTxnId()); // commit a transaction - TabletCommitInfo tabletCommitInfo1 = new TabletCommitInfo(GlobalStateMgrTestUtil.testTabletId1, - GlobalStateMgrTestUtil.testBackendId1); - TabletCommitInfo tabletCommitInfo2 = new TabletCommitInfo(GlobalStateMgrTestUtil.testTabletId1, - GlobalStateMgrTestUtil.testBackendId2); - TabletCommitInfo tabletCommitInfo3 = new TabletCommitInfo(GlobalStateMgrTestUtil.testTabletId1, - GlobalStateMgrTestUtil.testBackendId3); - List transTablets = Lists.newArrayList(); - transTablets.add(tabletCommitInfo1); - transTablets.add(tabletCommitInfo2); - transTablets.add(tabletCommitInfo3); + List transTablets = buildTabletCommitInfoList(); masterTransMgr.commitTransaction(GlobalStateMgrTestUtil.testDbId1, transactionId1, transTablets, Lists.newArrayList(), null); DatabaseTransactionMgr masterDbTransMgr = @@ -206,7 +197,7 @@ public Map addTransactionToTransactionMgr() throws StarRocksExcept Lists.newArrayList(GlobalStateMgrTestUtil.testTableId1), GlobalStateMgrTestUtil.testTxnLable5, feTransactionSource, - TransactionState.LoadJobSourceType.LAKE_COMPACTION, + TransactionState.LoadJobSourceType.BACKEND_STREAMING, Config.max_load_timeout_second); // for test batch long transactionId6 = masterTransMgr @@ -249,7 +240,6 @@ public Map addTransactionToTransactionMgr() throws StarRocksExcept lableToTxnId.put(GlobalStateMgrTestUtil.testTxnLable8, transactionId8); Assert.assertEquals(transactionId2, masterTransMgr.getMinActiveTxnId()); - Assert.assertEquals(transactionId5, masterTransMgr.getMinActiveCompactionTxnId()); transactionGraph.add(transactionId6, Lists.newArrayList(GlobalStateMgrTestUtil.testTableId1)); transactionGraph.add(transactionId7, Lists.newArrayList(GlobalStateMgrTestUtil.testTableId1)); @@ -264,6 +254,59 @@ public Map addTransactionToTransactionMgr() throws StarRocksExcept return lableToTxnId; } + private List buildTabletCommitInfoList() { + TabletCommitInfo tabletCommitInfo1 = new TabletCommitInfo(GlobalStateMgrTestUtil.testTabletId1, + GlobalStateMgrTestUtil.testBackendId1); + TabletCommitInfo tabletCommitInfo2 = new TabletCommitInfo(GlobalStateMgrTestUtil.testTabletId1, + GlobalStateMgrTestUtil.testBackendId2); + TabletCommitInfo tabletCommitInfo3 = new TabletCommitInfo(GlobalStateMgrTestUtil.testTabletId1, + GlobalStateMgrTestUtil.testBackendId3); + List transTablets = Lists.newArrayList(); + transTablets.add(tabletCommitInfo1); + transTablets.add(tabletCommitInfo2); + transTablets.add(tabletCommitInfo3); + return transTablets; + } + + @Test + public void getLakeCompactionActiveTxnListTest() throws StarRocksException { + TransactionState.TxnCoordinator feTransactionSource = + new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "fe1"); + long committedCompactionTransactionId = masterTransMgr + .beginTransaction(GlobalStateMgrTestUtil.testDbId1, + Lists.newArrayList(GlobalStateMgrTestUtil.testTableId1), + GlobalStateMgrTestUtil.testTxnLableCompaction1, + feTransactionSource, + TransactionState.LoadJobSourceType.LAKE_COMPACTION, + Config.lake_compaction_default_timeout_second); + + DatabaseTransactionMgr masterDbTransMgr = + masterTransMgr.getDatabaseTransactionMgr(GlobalStateMgrTestUtil.testDbId1); + List transTablets = buildTabletCommitInfoList(); + masterTransMgr.commitTransaction(GlobalStateMgrTestUtil.testDbId1, committedCompactionTransactionId, transTablets, + Lists.newArrayList(), null); + assertEquals(TransactionStatus.COMMITTED, masterDbTransMgr.getTxnState(committedCompactionTransactionId).getStatus()); + + long preparedCompactionTransactionId = masterTransMgr + .beginTransaction(GlobalStateMgrTestUtil.testDbId1, + Lists.newArrayList(GlobalStateMgrTestUtil.testTableId1), + GlobalStateMgrTestUtil.testTxnLableCompaction2, + feTransactionSource, + TransactionState.LoadJobSourceType.LAKE_COMPACTION, + Config.lake_compaction_default_timeout_second); + + Map compactionActiveTxnMap = masterDbTransMgr.getLakeCompactionActiveTxnMap(); + Assert.assertEquals(2, compactionActiveTxnMap.size()); + Assert.assertTrue(compactionActiveTxnMap.containsKey(committedCompactionTransactionId)); + Assert.assertTrue(compactionActiveTxnMap.containsKey(preparedCompactionTransactionId)); + + // global transaction stats check + Map globalCompactionActiveTxnMap = masterTransMgr.getLakeCompactionActiveTxnStats(); + Assert.assertEquals(2, globalCompactionActiveTxnMap.size()); + Assert.assertTrue(globalCompactionActiveTxnMap.containsKey(committedCompactionTransactionId)); + Assert.assertTrue(globalCompactionActiveTxnMap.containsKey(preparedCompactionTransactionId)); + } + @Test public void testNormal() throws StarRocksException { DatabaseTransactionMgr masterDbTransMgr =