Skip to content

Commit

Permalink
[Enhancement] Lake compaction scheduler optimize in fe restart scenar…
Browse files Browse the repository at this point in the history
…ios (StarRocks#54881)

Signed-off-by: drake_wang <[email protected]>
  • Loading branch information
wxl24life authored Jan 22, 2025
1 parent 11eaaf1 commit e886985
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,21 @@ public class CompactionMgr implements MemoryTrackable {
private Sorter sorter;
private CompactionScheduler compactionScheduler;

/**
* We use `activeCompactionTransactionMap` to track all lake compaction txns that are not published on FE restart.
* The key of the map is the transaction id related to the compaction task, and the value is table id of the
* compaction task. It's possible that multiple keys have the same value, because there might be multiple compaction
* jobs on different partitions with the same table id.
*
* Note that, this will prevent all partitions whose tableId is maintained in the map from being compacted
*/
private final ConcurrentHashMap<Long, Long> remainedActiveCompactionTxnWhenStart = new ConcurrentHashMap<>();

@VisibleForTesting
protected ConcurrentHashMap<Long, Long> getRemainedActiveCompactionTxnWhenStart() {
return remainedActiveCompactionTxnWhenStart;
}

public CompactionMgr() {
try {
init();
Expand Down Expand Up @@ -90,6 +105,30 @@ public void start() {
}
}

/**
* iterate all transactions and find those with LAKE_COMPACTION labels and are not finished before FE restart.
**/
public void rebuildActiveCompactionTransactionMapOnRestart() {
Map<Long, Long> activeTxnStates =
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getLakeCompactionActiveTxnStats();
for (Map.Entry<Long, Long> txnState : activeTxnStates.entrySet()) {
// for lake compaction txn, there can only be one table id for each txn state
remainedActiveCompactionTxnWhenStart.put(txnState.getKey(), txnState.getValue());
LOG.info("Found lake compaction transaction not finished on table {}, txn_id: {}", txnState.getValue(),
txnState.getKey());
}
}

protected void removeFromStartupActiveCompactionTransactionMap(long txnId) {
if (remainedActiveCompactionTxnWhenStart.isEmpty()) {
return;
}
boolean ret = remainedActiveCompactionTxnWhenStart.keySet().removeIf(key -> key == txnId);
if (ret) {
LOG.info("Removed transaction {} from startup active compaction transaction map", txnId);
}
}

public void handleLoadingFinished(PartitionIdentifier partition, long version, long versionTime,
Quantiles compactionScore) {
PartitionVersion currentVersion = new PartitionVersion(version, versionTime);
Expand All @@ -107,7 +146,8 @@ public void handleLoadingFinished(PartitionIdentifier partition, long version, l
}

public void handleCompactionFinished(PartitionIdentifier partition, long version, long versionTime,
Quantiles compactionScore) {
Quantiles compactionScore, long txnId) {
removeFromStartupActiveCompactionTransactionMap(txnId);
PartitionVersion compactionVersion = new PartitionVersion(version, versionTime);
PartitionStatistics statistics = partitionStatisticsHashMap.compute(partition, (k, v) -> {
if (v == null) {
Expand All @@ -125,8 +165,10 @@ public void handleCompactionFinished(PartitionIdentifier partition, long version

@NotNull
List<PartitionStatisticsSnapshot> choosePartitionsToCompact(@NotNull Set<PartitionIdentifier> excludes,
@NotNull Set<Long> excludeTables) {
return choosePartitionsToCompact(excludeTables)
@NotNull Set<Long> excludeTables) {
Set<Long> copiedExcludeTables = new HashSet<>(excludeTables);
copiedExcludeTables.addAll(remainedActiveCompactionTxnWhenStart.values());
return choosePartitionsToCompact(copiedExcludeTables)
.stream()
.filter(p -> !excludes.contains(p.getPartition()))
.collect(Collectors.toList());
Expand Down Expand Up @@ -215,6 +257,16 @@ public void save(ImageWriter imageWriter) throws IOException, SRMetaBlockExcepti
public void load(SRMetaBlockReader reader) throws IOException, SRMetaBlockException, SRMetaBlockEOFException {
CompactionMgr compactionManager = reader.readJson(CompactionMgr.class);
partitionStatisticsHashMap = compactionManager.partitionStatisticsHashMap;

// In order to ensure that the input rowsets of compaction still exists when doing publishing version, it is
// necessary to ensure that the compaction task of the same partition is executed serially, that is, the next
// compaction task can be executed only after the status of the previous compaction task changes to visible or
// canceled.
// So when FE restarted, we should make sure all the active compaction transactions before restarting were tracked,
// and exclude them from choosing as candidates for compaction.
// Note here, the map is maintained on leader and follower fe, its keys were removed from the map after compaction
// transaction has finished, and for follower FE, this is done by replay process.
rebuildActiveCompactionTransactionMapOnRestart();
}

public long getPartitionStatsCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ public class CompactionScheduler extends Daemon {
private final GlobalStateMgr stateMgr;
private final ConcurrentHashMap<PartitionIdentifier, CompactionJob> runningCompactions;
private final SynchronizedCircularQueue<CompactionRecord> history;
private boolean finishedWaiting = false;
private long waitTxnId = -1;
private long lastPartitionCleanTime;
private Set<Long> disabledTables; // copy-on-write

Expand All @@ -105,31 +103,12 @@ protected void runOneCycle() {
cleanPhysicalPartition();

// Schedule compaction tasks only when this is a leader FE and all edit logs have finished replay.
// In order to ensure that the input rowsets of compaction still exists when doing publishing version, it is
// necessary to ensure that the compaction task of the same partition is executed serially, that is, the next
// compaction task can be executed only after the status of the previous compaction task changes to visible or
// canceled.
if (stateMgr.isLeader() && stateMgr.isReady() && allCommittedCompactionsBeforeRestartHaveFinished()) {
if (stateMgr.isLeader() && stateMgr.isReady()) {
schedule();
history.changeMaxSize(Config.lake_compaction_history_size);
}
}

// Returns true if all compaction transactions committed before this restart have finished(i.e., of VISIBLE state).
private boolean allCommittedCompactionsBeforeRestartHaveFinished() {
if (finishedWaiting) {
return true;
}
// Note: must call getMinActiveCompactionTxnId() before getNextTransactionId(), otherwise if there are
// no running transactions waitTxnId <= minActiveTxnId will always be false.
long minActiveTxnId = transactionMgr.getMinActiveCompactionTxnId();
if (waitTxnId < 0) {
waitTxnId = transactionMgr.getTransactionIDGenerator().getNextTransactionId();
}
finishedWaiting = waitTxnId <= minActiveTxnId;
return finishedWaiting;
}

private void schedule() {
// Check whether there are completed compaction jobs.
for (Iterator<Map.Entry<PartitionIdentifier, CompactionJob>> iterator = runningCompactions.entrySet().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,20 @@ public List<TransactionState> getCommittedTxnList() {
}
}

public Map<Long, Long> getLakeCompactionActiveTxnMap() {
readLock();
try {
// for lake compaction txn, there can only be one table id for each txn state
Map<Long, Long> txnIdToTableIdMap = new HashMap<>();
idToRunningTransactionState.values().stream()
.filter(state -> state.getSourceType() == TransactionState.LoadJobSourceType.LAKE_COMPACTION)
.forEach(state -> txnIdToTableIdMap.put(state.getTransactionId(), state.getTableIdList().get(0)));
return txnIdToTableIdMap;
} finally {
readUnlock();
}
}

// Check whether there is committed txns on partitionId.
public boolean hasCommittedTxnOnPartition(long tableId, long partitionId) {
readLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -647,6 +648,19 @@ public long getMinActiveCompactionTxnId() {
return minId;
}

/**
* Get the map of active txn [txnId, tableId] of compaction transactions.
* @return the list of active txn stats of compaction transactions.
*/
public Map<Long, Long> getLakeCompactionActiveTxnStats() {
Map<Long, Long> txnIdToTableIdMap = new HashMap<>();
for (Map.Entry<Long, DatabaseTransactionMgr> entry : dbIdToDatabaseTransactionMgrs.entrySet()) {
DatabaseTransactionMgr dbTransactionMgr = entry.getValue();
txnIdToTableIdMap.putAll(dbTransactionMgr.getLakeCompactionActiveTxnMap());
}
return txnIdToTableIdMap;
}

/**
* Get the smallest transaction ID of active transactions in a database.
* If there are no active transactions in the database, return the transaction ID that will be assigned to the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public void applyVisibleLog(TransactionState txnState, TableCommitInfo commitInf
PartitionIdentifier partitionIdentifier =
new PartitionIdentifier(txnState.getDbId(), table.getId(), partition.getId());
if (txnState.getSourceType() == TransactionState.LoadJobSourceType.LAKE_COMPACTION) {
compactionManager.handleCompactionFinished(partitionIdentifier, version, versionTime, compactionScore);
compactionManager.handleCompactionFinished(partitionIdentifier, version, versionTime, compactionScore,
txnState.getTransactionId());
} else {
compactionManager.handleLoadingFinished(partitionIdentifier, version, versionTime, compactionScore);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ public String toString() {
", reason='" + reason + '\'' +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public class GlobalStateMgrTestUtil {
public static String testTxnLable8 = "testTxnLable8";
public static String testTxnLable9 = "testTxnLable9";
public static String testTxnLable10 = "testTxnLable10";
public static String testTxnLableCompaction1 = "testTxnLableCompaction1";
public static String testTxnLableCompaction2 = "testTxnLableCompaction2";
public static String testEsTable1 = "partitionedEsTable1";
public static long testEsTableId1 = 14;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,31 @@
import com.starrocks.persist.metablock.SRMetaBlockReaderV2;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.common.MetaUtils;
import com.starrocks.transaction.GlobalTransactionMgr;
import com.starrocks.utframe.UtFrameUtils;
import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public class CompactionMgrTest {
@Mocked
private GlobalStateMgr globalStateMgr;
@Mocked
private GlobalTransactionMgr globalTransactionMgr;
@Mocked
private Database db;

@Test
public void testChoosePartitionsToCompact() {
Expand Down Expand Up @@ -87,6 +98,65 @@ public void testChoosePartitionsToCompact() {
Assert.assertSame(partition2, compactionList.get(0).getPartition());
}

@Test
public void testChoosePartitionsToCompactWithActiveTxnFilter() {
long dbId = 10001L;
long tableId1 = 10002L;
long tableId2 = 10003L;
long partitionId10 = 20001L;
long partitionId11 = 20003L;
long partitionId20 = 20002L;

PartitionIdentifier partition10 = new PartitionIdentifier(dbId, tableId1, partitionId10);
PartitionIdentifier partition11 = new PartitionIdentifier(dbId, tableId1, partitionId11);
PartitionIdentifier partition20 = new PartitionIdentifier(dbId, tableId2, partitionId20);

CompactionMgr compactionManager = new CompactionMgr();
compactionManager.handleLoadingFinished(partition10, 1, System.currentTimeMillis(),
Quantiles.compute(Lists.newArrayList(100d)));
compactionManager.handleLoadingFinished(partition11, 2, System.currentTimeMillis(),
Quantiles.compute(Lists.newArrayList(100d)));
compactionManager.handleLoadingFinished(partition20, 3, System.currentTimeMillis(),
Quantiles.compute(Lists.newArrayList(100d)));

// build active txn on table1
long txnId = 10001L;
Map<Long, Long> txnIdToTableIdMap = new HashMap<>();
txnIdToTableIdMap.put(txnId, tableId1);
new Expectations() {
{
GlobalStateMgr.getCurrentState();
result = globalStateMgr;

globalStateMgr.getGlobalTransactionMgr();
result = globalTransactionMgr;

globalTransactionMgr.getLakeCompactionActiveTxnStats();
result = txnIdToTableIdMap;

}
};
compactionManager.rebuildActiveCompactionTransactionMapOnRestart();

Set<PartitionIdentifier> allPartitions = compactionManager.getAllPartitions();
Assert.assertEquals(3, allPartitions.size());
Assert.assertTrue(allPartitions.contains(partition10));
Assert.assertTrue(allPartitions.contains(partition11));
Assert.assertTrue(allPartitions.contains(partition20));

List<PartitionStatisticsSnapshot> compactionList =
compactionManager.choosePartitionsToCompact(new HashSet<>(), new HashSet<>());
// both partition10 and partition11 are filtered because table1 has active txn
Assert.assertEquals(1, compactionList.size());
Assert.assertSame(partition20, compactionList.get(0).getPartition());

Set<Long> excludeTables = new HashSet<>();
excludeTables.add(tableId2);
compactionList = compactionManager.choosePartitionsToCompact(new HashSet<>(), excludeTables);
// tableId2 is filtered by excludeTables
Assert.assertEquals(0, compactionList.size());
}

@Test
public void testGetMaxCompactionScore() {
double delta = 0.001;
Expand All @@ -100,7 +170,7 @@ public void testGetMaxCompactionScore() {
Quantiles.compute(Lists.newArrayList(1d)));
Assert.assertEquals(1, compactionMgr.getMaxCompactionScore(), delta);
compactionMgr.handleCompactionFinished(partition1, 3, System.currentTimeMillis(),
Quantiles.compute(Lists.newArrayList(2d)));
Quantiles.compute(Lists.newArrayList(2d)), 1234);
Assert.assertEquals(2, compactionMgr.getMaxCompactionScore(), delta);

compactionMgr.handleLoadingFinished(partition2, 2, System.currentTimeMillis(),
Expand Down Expand Up @@ -205,4 +275,40 @@ public boolean isPhysicalPartitionExist(GlobalStateMgr stateMgr, long dbId, long
compactionMgr2.load(reader);
Assert.assertEquals(1, compactionMgr2.getPartitionStatsCount());
}

@Test
public void testActiveCompactionTransactionMapOnRestart() {
long txnId = 10001L;
long tableId = 10002L;
Map<Long, Long> txnIdToTableIdMap = new HashMap<>();
txnIdToTableIdMap.put(txnId, tableId);
new Expectations() {
{
GlobalStateMgr.getCurrentState();
result = globalStateMgr;

globalStateMgr.getGlobalTransactionMgr();
result = globalTransactionMgr;

globalTransactionMgr.getLakeCompactionActiveTxnStats();
result = txnIdToTableIdMap;

}
};

CompactionMgr compactionMgr = new CompactionMgr();
compactionMgr.rebuildActiveCompactionTransactionMapOnRestart();
ConcurrentHashMap<Long, Long> activeCompactionTransactionMap =
compactionMgr.getRemainedActiveCompactionTxnWhenStart();
Assert.assertEquals(1, activeCompactionTransactionMap.size());
Assert.assertTrue(activeCompactionTransactionMap.containsValue(tableId));

// test for removeFromStartupActiveCompactionTransactionMap
long nonExistedTxnId = 10003L;
compactionMgr.removeFromStartupActiveCompactionTransactionMap(nonExistedTxnId);
Assert.assertEquals(1, activeCompactionTransactionMap.size());

compactionMgr.removeFromStartupActiveCompactionTransactionMap(txnId);
Assert.assertEquals(0, activeCompactionTransactionMap.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public ConcurrentHashMap<PartitionIdentifier, CompactionJob> getRunningCompactio

@Test
public void testCompactionTaskLimit() {
CompactionScheduler compactionScheduler = new CompactionScheduler(null, null, null, null, "");
CompactionScheduler compactionScheduler = new CompactionScheduler(new CompactionMgr(), null, null, null, "");

int defaultValue = Config.lake_compaction_max_tasks;
// explicitly set config to a value bigger than default -1
Expand Down
Loading

0 comments on commit e886985

Please sign in to comment.