Skip to content

Commit

Permalink
[BugFix] Fix dynamic partition table unexpectly stop scheduling (back…
Browse files Browse the repository at this point in the history
…port #45235) (#45313)

Signed-off-by: yiming <[email protected]>
Signed-off-by: Dejun Xia <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: yiming <[email protected]>
(cherry picked from commit 5ef58ac)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/clone/DynamicPartitionScheduler.java
#	fe/fe-core/src/main/java/com/starrocks/common/util/DynamicPartitionUtil.java
#	fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java
  • Loading branch information
mergify[bot] committed May 9, 2024
1 parent b34435a commit 3e8a534
Show file tree
Hide file tree
Showing 12 changed files with 324 additions and 46 deletions.
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.starrocks.persist.CreateTableInfo;
import com.starrocks.persist.DropInfo;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.statistic.StatsConstants;
import com.starrocks.system.SystemInfoService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -863,6 +864,10 @@ public boolean isInfoSchemaDb() {
return fullQualifiedName.equalsIgnoreCase(InfoSchemaDb.DATABASE_NAME);
}

public boolean isStatisticsDatabase() {
return fullQualifiedName.equalsIgnoreCase(StatsConstants.STATISTICS_DB_NAME);
}

// the invoker should hold db's writeLock
public void setExist(boolean exist) {
this.exist = exist;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ public class DynamicPartitionProperty {
public static final int NOT_SET_HISTORY_PARTITION_NUM = 0;
public static final String NOT_SET_PREFIX = "p";

private boolean exist;
private final boolean exists;

private boolean enable;
private boolean enabled;
private String timeUnit;
private int start;
private int end;
Expand All @@ -64,8 +64,8 @@ public class DynamicPartitionProperty {
private int historyPartitionNum;
public DynamicPartitionProperty(Map<String, String> properties) {
if (properties != null && !properties.isEmpty()) {
this.exist = true;
this.enable = Boolean.parseBoolean(properties.get(ENABLE));
this.exists = true;
this.enabled = Boolean.parseBoolean(properties.get(ENABLE));
this.timeUnit = properties.get(TIME_UNIT);
this.tz = TimeUtils.getOrSystemTimeZone(properties.get(TIME_ZONE));
// In order to compatible dynamic add partition version
Expand All @@ -79,7 +79,7 @@ public DynamicPartitionProperty(Map<String, String> properties) {
HISTORY_PARTITION_NUM, String.valueOf(NOT_SET_HISTORY_PARTITION_NUM)));
createStartOfs(properties);
} else {
this.exist = false;
this.exists = false;
}
}

Expand All @@ -99,8 +99,8 @@ private void createStartOfs(Map<String, String> properties) {
}
}

public boolean isExist() {
return exist;
public boolean isExists() {
return exists;
}

public String getTimeUnit() {
Expand All @@ -123,8 +123,8 @@ public int getBuckets() {
return buckets;
}

public boolean getEnable() {
return enable;
public boolean isEnabled() {
return enabled;
}

public StartOfDate getStartOfWeek() {
Expand Down Expand Up @@ -161,7 +161,7 @@ public int getHistoryPartitionNum() {
public String getPropString() {
StringBuilder sb = new StringBuilder();
sb.append("{");
sb.append(ENABLE + ":" + enable + ",");
sb.append(ENABLE + ":" + enabled + ",");
sb.append(TIME_UNIT + ":" + timeUnit + ",");
sb.append(TIME_ZONE + ":" + tz.getID() + ",");
sb.append(START + ":" + start + ",");
Expand Down Expand Up @@ -190,7 +190,7 @@ public void setTimeUnit(String timeUnit) {

@Override
public String toString() {
String res = ",\n\"" + ENABLE + "\" = \"" + enable + "\""
String res = ",\n\"" + ENABLE + "\" = \"" + enabled + "\""
+ ",\n\"" + TIME_UNIT + "\" = \"" + timeUnit + "\""
+ ",\n\"" + TIME_ZONE + "\" = \"" + tz.getID() + "\""
+ ",\n\"" + START + "\" = \"" + start + "\""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ public TableProperty getTableProperty() {
public boolean dynamicPartitionExists() {
return tableProperty != null
&& tableProperty.getDynamicPartitionProperty() != null
&& tableProperty.getDynamicPartitionProperty().isExist();
&& tableProperty.getDynamicPartitionProperty().isExists();
}

public void setBaseIndexId(long baseIndexId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,18 @@ public class DynamicPartitionScheduler extends LeaderDaemon {
// (DbId, TableId) for a collection of objects marked with partition_ttl_number > 0 on the table
private final Set<Pair<Long, Long>> ttlPartitionInfo = Sets.newConcurrentHashSet();

private boolean initialize;
private long lastFindingTime = -1;

public enum State {
NORMAL, ERROR
}

public boolean isInScheduler(long dbId, long tableId) {
return dynamicPartitionTableInfo.contains(new Pair<>(dbId, tableId));
}

public DynamicPartitionScheduler(String name, long intervalMs) {
super(name, intervalMs);
this.initialize = false;
}

public void registerDynamicPartitionTable(Long dbId, Long tableId) {
Expand Down Expand Up @@ -318,7 +321,7 @@ private ArrayList<DropPartitionClause> getDropPartitionClause(Database db, OlapT
return dropPartitionClauses;
}

private void executeDynamicPartition() {
private void scheduleDynamicPartition() {
Iterator<Pair<Long, Long>> iterator = dynamicPartitionTableInfo.iterator();
OUTER: while (iterator.hasNext()) {
Pair<Long, Long> tableInfo = iterator.next();
Expand Down Expand Up @@ -608,7 +611,7 @@ public boolean executeDynamicPartitionForTable(Long dbId, Long tableId) {
olapTable = (OlapTable) db.getTable(tableId);
// Only OlapTable has DynamicPartitionProperty
if (olapTable == null || !olapTable.dynamicPartitionExists() ||
!olapTable.getTableProperty().getDynamicPartitionProperty().getEnable()) {
!olapTable.getTableProperty().getDynamicPartitionProperty().isEnabled()) {
if (olapTable == null) {
LOG.warn("Automatically removes the schedule because table does not exist, " +
"tableId: {}", tableId);
Expand Down Expand Up @@ -687,21 +690,227 @@ public boolean executeDynamicPartitionForTable(Long dbId, Long tableId) {
return false;
}

<<<<<<< HEAD
=======
private void scheduleTTLPartition() {
Iterator<Pair<Long, Long>> iterator = ttlPartitionInfo.iterator();
while (iterator.hasNext()) {
Pair<Long, Long> tableInfo = iterator.next();
Long dbId = tableInfo.first;
Long tableId = tableInfo.second;
Database db = GlobalStateMgr.getCurrentState().getDb(dbId);
if (db == null) {
iterator.remove();
LOG.warn("Could not get database={} info. remove it from scheduler", dbId);
continue;
}
Table table = db.getTable(tableId);
OlapTable olapTable;
if (table instanceof OlapTable) {
olapTable = (OlapTable) table;
} else {
iterator.remove();
LOG.warn("database={}-{}, table={}. is not olap table. remove it from scheduler",
db.getFullName(), dbId, tableId);
continue;
}

PartitionInfo partitionInfo = olapTable.getPartitionInfo();
RangePartitionInfo rangePartitionInfo;
if (partitionInfo instanceof RangePartitionInfo) {
rangePartitionInfo = (RangePartitionInfo) olapTable.getPartitionInfo();
} else {
LOG.warn("currently only support range partition." +
"remove database={}, table={} from scheduler", dbId, tableId);
continue;
}

if (rangePartitionInfo.getPartitionColumns().size() != 1) {
iterator.remove();
LOG.warn("currently only support partition with single column. " +
"remove database={}, table={} from scheduler", dbId, tableId);
continue;
}

int ttlNumber = olapTable.getTableProperty().getPartitionTTLNumber();
if (Objects.equals(ttlNumber, INVALID)) {
iterator.remove();
LOG.warn("database={}, table={} have no ttl. remove it from scheduler", dbId, tableId);
continue;
}

ArrayList<DropPartitionClause> dropPartitionClauses = null;
try {
dropPartitionClauses = getDropPartitionClauseByTTL(olapTable, ttlNumber);
} catch (AnalysisException e) {
LOG.warn("database={}-{}, table={}-{} failed to build drop partition statement.",
db.getFullName(), dbId, table.getName(), tableId, e);
}
if (dropPartitionClauses == null) {
continue;
}

String tableName = olapTable.getName();
for (DropPartitionClause dropPartitionClause : dropPartitionClauses) {
db.writeLock();
try {
GlobalStateMgr.getCurrentState().dropPartition(db, olapTable, dropPartitionClause);
clearDropPartitionFailedMsg(tableName);
} catch (DdlException e) {
recordDropPartitionFailedMsg(db.getOriginName(), tableName, e.getMessage());
} finally {
db.writeUnlock();
}
}

}
}


private ArrayList<DropPartitionClause> getDropPartitionClauseByTTL(OlapTable olapTable, int ttlNumber)
throws AnalysisException {
ArrayList<DropPartitionClause> dropPartitionClauses = new ArrayList<>();
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) (olapTable.getPartitionInfo());
List<Column> partitionColumns = rangePartitionInfo.getPartitionColumns();

// Currently, materialized views and automatically created partition tables
// only support single-column partitioning.
Preconditions.checkArgument(partitionColumns.size() == 1);
Type partitionType = partitionColumns.get(0).getType();
List<Map.Entry<Long, Range<PartitionKey>>> candidatePartitionList = Lists.newArrayList();

if (partitionType.isDateType()) {
LocalDateTime currentDateTime = LocalDateTime.now();
PartitionValue currentPartitionValue = new PartitionValue(currentDateTime.format(DateUtils.DATE_FORMATTER_UNIX));
PartitionKey currentPartitionKey = PartitionKey.createPartitionKey(
ImmutableList.of(currentPartitionValue), partitionColumns);
// For expr partitioning table, always has a shadow partition, we should avoid deleting it.
PartitionKey shadowPartitionKey = PartitionKey.createShadowPartitionKey(partitionColumns);

Map<Long, Range<PartitionKey>> idToRange = rangePartitionInfo.getIdToRange(false);
for (Map.Entry<Long, Range<PartitionKey>> partitionRange : idToRange.entrySet()) {
PartitionKey lowerPartitionKey = partitionRange.getValue().lowerEndpoint();

if (lowerPartitionKey.compareTo(shadowPartitionKey) == 0) {
continue;
}

if (lowerPartitionKey.compareTo(currentPartitionKey) <= 0) {
candidatePartitionList.add(partitionRange);
}
}
} else if (partitionType.isNumericType()) {
candidatePartitionList = new ArrayList<>(rangePartitionInfo.getIdToRange(false).entrySet());
} else {
throw new AnalysisException("Partition ttl does not support type:" + partitionType);
}

candidatePartitionList.sort(Comparator.comparing(o -> o.getValue().upperEndpoint()));

int allPartitionNumber = candidatePartitionList.size();
if (allPartitionNumber > ttlNumber) {
int dropSize = allPartitionNumber - ttlNumber;
for (int i = 0; i < dropSize; i++) {
Long checkDropPartitionId = candidatePartitionList.get(i).getKey();
Partition partition = olapTable.getPartition(checkDropPartitionId);
if (partition != null) {
String dropPartitionName = partition.getName();
dropPartitionClauses.add(new DropPartitionClause(false, dropPartitionName,
false, true));
}
}
}
return dropPartitionClauses;
}

private void recordCreatePartitionFailedMsg(String dbName, String tableName, String msg) {
LOG.warn("dynamic add partition failed: {}, db: {}, table: {}", msg, dbName, tableName);
createOrUpdateRuntimeInfo(tableName, DYNAMIC_PARTITION_STATE, State.ERROR.toString());
createOrUpdateRuntimeInfo(tableName, CREATE_PARTITION_MSG, msg);
}

private void clearCreatePartitionFailedMsg(String tableName) {
createOrUpdateRuntimeInfo(tableName, DYNAMIC_PARTITION_STATE, State.NORMAL.toString());
createOrUpdateRuntimeInfo(tableName, CREATE_PARTITION_MSG, DEFAULT_RUNTIME_VALUE);
}

private void recordDropPartitionFailedMsg(String dbName, String tableName, String msg) {
LOG.warn("dynamic drop partition failed: {}, db: {}, table: {}", msg, dbName, tableName);
createOrUpdateRuntimeInfo(tableName, DYNAMIC_PARTITION_STATE, State.ERROR.toString());
createOrUpdateRuntimeInfo(tableName, DROP_PARTITION_MSG, msg);
}

private void clearDropPartitionFailedMsg(String tableName) {
createOrUpdateRuntimeInfo(tableName, DYNAMIC_PARTITION_STATE, State.NORMAL.toString());
createOrUpdateRuntimeInfo(tableName, DROP_PARTITION_MSG, DEFAULT_RUNTIME_VALUE);
}

private void findSchedulableTables() {
Map<String, List<String>> dynamicPartitionTables = new HashMap<>();
Map<String, List<String>> ttlPartitionTables = new HashMap<>();
long start = System.currentTimeMillis();
for (Long dbId : GlobalStateMgr.getCurrentState().getDbIds()) {
Database db = GlobalStateMgr.getCurrentState().getDb(dbId);
if (db == null) {
continue;
}
if (db.isSystemDatabase() || db.isStatisticsDatabase()) {
continue;
}

db.readLock();
try {
for (Table table : GlobalStateMgr.getCurrentState().getDb(dbId).getTables()) {
if (DynamicPartitionUtil.isDynamicPartitionTable(table)) {
registerDynamicPartitionTable(db.getId(), table.getId());
dynamicPartitionTables.computeIfAbsent(db.getFullName(), k -> new ArrayList<>())
.add(table.getName());
} else if (DynamicPartitionUtil.isTTLPartitionTable(table)) {
// Table(MV) with dynamic partition enabled should not specify partition_ttl_number(MV) or
// partition_live_number property.
registerTtlPartitionTable(db.getId(), table.getId());
ttlPartitionTables.computeIfAbsent(db.getFullName(), k -> new ArrayList<>())
.add(table.getName());
}
}
} finally {
db.readUnlock();
}
}
LOG.info("finished to find all schedulable tables, cost: {}ms, " +
"dynamic partition tables: {}, ttl partition tables: {}, scheduler enabled: {}, scheduler interval: {}s",
System.currentTimeMillis() - start, dynamicPartitionTables, ttlPartitionTables,
Config.dynamic_partition_enable, Config.dynamic_partition_check_interval_seconds);
lastFindingTime = System.currentTimeMillis();
}

>>>>>>> 5ef58ac7d6 ([BugFix] Fix dynamic partition table unexpectly stop scheduling (backport #45235) (#45313))
@VisibleForTesting
public void runOnceForTest() {
runAfterCatalogReady();
}

@Override
protected void runAfterCatalogReady() {
if (!initialize) {
// check Dynamic Partition tables only when FE start
initDynamicPartitionTable();
// Find all tables that need to be scheduled.
long now = System.currentTimeMillis();
if ((now - lastFindingTime) > Math.max(300000, Config.dynamic_partition_check_interval_seconds)) {
findSchedulableTables();
}

// Update scheduler interval.
setInterval(Config.dynamic_partition_check_interval_seconds * 1000L);

// Schedule tables with dynamic partition enabled(only works for base table).
if (Config.dynamic_partition_enable) {
executeDynamicPartition();
scheduleDynamicPartition();
}
executePartitionTimeToLive();

// Schedule tables(mvs) with ttl partition enabled.
// For now, partition_live_number works for base table with
// single column range partitioning(including expr partitioning, e.g. ... partition by date_trunc('month', col).
// partition_ttl_number and partition_ttl work for mv with
// single column range partitioning(including expr partitioning).
scheduleTTLPartition();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public boolean register(String name, ProcNodeInterface node) {

@Override
public ProcNodeInterface lookup(String name) throws AnalysisException {
throw new AnalysisException(name + " does't exist.");
throw new AnalysisException(name + " doesn't exist.");
}

@Override
Expand Down
Loading

0 comments on commit 3e8a534

Please sign in to comment.