Skip to content

Commit

Permalink
[opt](Nereids) lock table in ascending order of table IDs
Browse files Browse the repository at this point in the history
  • Loading branch information
morrySnow committed Dec 11, 2024
1 parent 47b2ddd commit 1aedf9d
Show file tree
Hide file tree
Showing 72 changed files with 1,297 additions and 1,540 deletions.
20 changes: 8 additions & 12 deletions fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -278,18 +278,14 @@ public void replayAlterJobV2(AlterJobV2 alterJob) throws AnalysisException {
* there will be OOM if there are too many replicas of the table when schema change.
*/
protected void checkReplicaCount(OlapTable olapTable) throws DdlException {
olapTable.readLock();
try {
long replicaCount = olapTable.getReplicaCount();
long maxReplicaCount = Config.max_replica_count_when_schema_change;
if (replicaCount > maxReplicaCount) {
String msg = String.format("%s have %d replicas reach %d limit when schema change.",
olapTable.getName(), replicaCount, maxReplicaCount);
LOG.warn(msg);
throw new DdlException(msg);
}
} finally {
olapTable.readUnlock();
long replicaCount = olapTable.getReplicaCount();
long maxReplicaCount = Config.max_replica_count_when_schema_change;
if (replicaCount > maxReplicaCount) {
String msg = String.format("%s have %d replicas reach %d limit when schema change.",
olapTable.getName(), replicaCount, maxReplicaCount);
LOG.warn(msg);
throw new DdlException(msg);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,6 @@ public void processCreateMaterializedView(CreateMaterializedViewStmt addMVClause
*/
public void processBatchAddRollup(String rawSql, List<AlterClause> alterClauses, Database db, OlapTable olapTable)
throws DdlException, AnalysisException {
checkReplicaCount(olapTable);

// wait wal delete
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
Expand All @@ -257,6 +255,7 @@ public void processBatchAddRollup(String rawSql, List<AlterClause> alterClauses,
Set<Long> logJobIdSet = new HashSet<>();
olapTable.writeLockOrDdlException();
try {
checkReplicaCount(olapTable);
olapTable.checkNormalStateForAlter();
if (olapTable.existTempPartitions()) {
throw new DdlException("Can not alter table when there are temp partitions in table");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ private void getAllDbStats() throws AnalysisException {
try {
dbInfo.add(String.valueOf(db.getId()));
dbInfo.add(dbName);
Pair<Long, Long> usedSize = ((Database) db).getUsedDataSize();
Pair<Long, Long> usedSize = ((Database) db).getUsedDataSizeWithoutLock();
dbInfo.add(String.valueOf(usedSize.first));
dbInfo.add(String.valueOf(usedSize.second));
totalSize += usedSize.first;
Expand Down
36 changes: 11 additions & 25 deletions fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ private void prepareAndSendSnapshotTask() {
break;
case OLAP:
OlapTable olapTable = (OlapTable) tbl;
checkOlapTable(olapTable, tableRef);
checkOlapTableWithoutLock(olapTable, tableRef);
if (getContent() == BackupContent.ALL) {
prepareSnapshotTaskForOlapTableWithoutLock(db, (OlapTable) tbl, tableRef, batchTask);
}
Expand Down Expand Up @@ -562,22 +562,17 @@ private void prepareAndSendSnapshotTask() {
LOG.info("finished to send snapshot tasks to backend. {}", this);
}

private void checkOlapTable(OlapTable olapTable, TableRef backupTableRef) {
olapTable.readLock();
try {
// check backup table again
if (backupTableRef.getPartitionNames() != null) {
for (String partName : backupTableRef.getPartitionNames().getPartitionNames()) {
Partition partition = olapTable.getPartition(partName);
if (partition == null) {
status = new Status(ErrCode.NOT_FOUND, "partition " + partName
+ " does not exist in table" + backupTableRef.getName().getTbl());
return;
}
private void checkOlapTableWithoutLock(OlapTable olapTable, TableRef backupTableRef) {
// check backup table again
if (backupTableRef.getPartitionNames() != null) {
for (String partName : backupTableRef.getPartitionNames().getPartitionNames()) {
Partition partition = olapTable.getPartition(partName);
if (partition == null) {
status = new Status(ErrCode.NOT_FOUND, "partition " + partName
+ " does not exist in table" + backupTableRef.getName().getTbl());
return;
}
}
} finally {
olapTable.readUnlock();
}
}

Expand All @@ -595,16 +590,7 @@ private void prepareSnapshotTaskForOlapTableWithoutLock(Database db, OlapTable o
properties.put(tableKey, String.valueOf(commitSeq));

// check backup table again
if (backupTableRef.getPartitionNames() != null) {
for (String partName : backupTableRef.getPartitionNames().getPartitionNames()) {
Partition partition = olapTable.getPartition(partName);
if (partition == null) {
status = new Status(ErrCode.NOT_FOUND, "partition " + partName
+ " does not exist in table" + backupTableRef.getName().getTbl());
return;
}
}
}
checkOlapTableWithoutLock(olapTable, backupTableRef);

// create snapshot tasks
List<Partition> partitions = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1054,10 +1054,7 @@ public boolean equals(Object obj) {
&& isKey == other.isKey
&& isAllowNull == other.isAllowNull
&& isAutoInc == other.isAutoInc
&& getDataType().equals(other.getDataType())
&& getStrLen() == other.getStrLen()
&& getPrecision() == other.getPrecision()
&& getScale() == other.getScale()
&& Objects.equals(type, other.type)
&& Objects.equals(comment, other.comment)
&& visible == other.visible
&& Objects.equals(children, other.children)
Expand Down
66 changes: 36 additions & 30 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,16 +259,11 @@ public DbState getDbState() {
}

public void setTransactionQuotaSize(long newQuota) {
writeLock();
try {
Preconditions.checkArgument(newQuota >= 0L);
LOG.info("database[{}] try to set transaction quota from {} to {}",
fullQualifiedName, transactionQuotaSize, newQuota);
this.transactionQuotaSize = newQuota;
this.dbProperties.put(TRANSACTION_QUOTA_SIZE, String.valueOf(transactionQuotaSize));
} finally {
writeUnlock();
}
Preconditions.checkArgument(newQuota >= 0L);
LOG.info("database[{}] try to set transaction quota from {} to {}",
fullQualifiedName, transactionQuotaSize, newQuota);
this.transactionQuotaSize = newQuota;
this.dbProperties.put(TRANSACTION_QUOTA_SIZE, String.valueOf(transactionQuotaSize));
}

public long getDataQuota() {
Expand All @@ -295,12 +290,7 @@ public void setDbProperties(DatabaseProperty dbProperties) {
}

public long getUsedDataQuotaWithLock() {
return getUsedDataSize().first;
}

public Pair<Long, Long> getUsedDataSize() {
long usedDataSize = 0;
long usedRemoteDataSize = 0;
List<Table> tables = new ArrayList<>();
readLock();
try {
Expand All @@ -309,6 +299,22 @@ public Pair<Long, Long> getUsedDataSize() {
readUnlock();
}

for (Table table : tables) {
if (!table.isManagedTable()) {
continue;
}

OlapTable olapTable = (OlapTable) table;
usedDataSize = usedDataSize + olapTable.getDataSize();

}
return usedDataSize;
}

public Pair<Long, Long> getUsedDataSizeWithoutLock() {
long usedDataSize = 0;
long usedRemoteDataSize = 0;
List<Table> tables = new ArrayList<>(this.idToTable.values());
for (Table table : tables) {
if (!table.isManagedTable()) {
continue;
Expand All @@ -323,26 +329,26 @@ public Pair<Long, Long> getUsedDataSize() {
}

public long getReplicaCount() {
readLock();
try {
long usedReplicaCount = 0;
for (Table table : this.idToTable.values()) {
if (!table.isManagedTable()) {
continue;
}

OlapTable olapTable = (OlapTable) table;
usedReplicaCount = usedReplicaCount + olapTable.getReplicaCount();
long usedReplicaCount = 0;
for (Table table : this.idToTable.values()) {
if (!table.isManagedTable()) {
continue;
}
return usedReplicaCount;
} finally {
readUnlock();

OlapTable olapTable = (OlapTable) table;
usedReplicaCount = usedReplicaCount + olapTable.getReplicaCount();
}
return usedReplicaCount;
}

public long getReplicaQuotaLeftWithLock() {
long leftReplicaQuota = replicaQuotaSize - getReplicaCount();
return Math.max(leftReplicaQuota, 0L);
readLock();
try {
long leftReplicaQuota = replicaQuotaSize - getReplicaCount();
return Math.max(leftReplicaQuota, 0L);
} finally {
readUnlock();
}
}

public void checkDataSizeQuota() throws DdlException {
Expand Down
42 changes: 19 additions & 23 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -6030,32 +6030,28 @@ public void replayBackendTabletsInfo(BackendTabletsInfo backendTabletsInfo) {

// Convert table's distribution type from hash to random.
public void convertDistributionType(Database db, OlapTable tbl) throws DdlException {
tbl.writeLockOrDdlException();
try {
if (tbl.isColocateTable()) {
throw new DdlException("Cannot change distribution type of colocate table.");
}
if (tbl.getKeysType() == KeysType.UNIQUE_KEYS) {
throw new DdlException("Cannot change distribution type of unique keys table.");
}
if (tbl.getKeysType() == KeysType.AGG_KEYS) {
for (Column column : tbl.getBaseSchema()) {
if (column.getAggregationType() == AggregateType.REPLACE
|| column.getAggregationType() == AggregateType.REPLACE_IF_NOT_NULL) {
throw new DdlException("Cannot change distribution type of aggregate keys table which has value"
+ " columns with " + column.getAggregationType() + " type.");
}
if (tbl.isColocateTable()) {
throw new DdlException("Cannot change distribution type of colocate table.");
}
if (tbl.getKeysType() == KeysType.UNIQUE_KEYS) {
throw new DdlException("Cannot change distribution type of unique keys table.");
}
if (tbl.getKeysType() == KeysType.AGG_KEYS) {
for (Column column : tbl.getBaseSchema()) {
if (column.getAggregationType() == AggregateType.REPLACE
|| column.getAggregationType() == AggregateType.REPLACE_IF_NOT_NULL) {
throw new DdlException("Cannot change distribution type of aggregate keys table which has value"
+ " columns with " + column.getAggregationType() + " type.");
}
}
if (!tbl.convertHashDistributionToRandomDistribution()) {
throw new DdlException("Table " + tbl.getName() + " is not hash distributed");
}
TableInfo tableInfo = TableInfo.createForModifyDistribution(db.getId(), tbl.getId());
editLog.logModifyDistributionType(tableInfo);
LOG.info("finished to modify distribution type of table from hash to random : " + tbl.getName());
} finally {
tbl.writeUnlock();
}
if (!tbl.convertHashDistributionToRandomDistribution()) {
throw new DdlException("Table " + tbl.getName() + " is not hash distributed");
}
TableInfo tableInfo = TableInfo.createForModifyDistribution(db.getId(), tbl.getId());
editLog.logModifyDistributionType(tableInfo);
LOG.info("finished to modify distribution type of table from hash to random : " + tbl.getName());

}

public void replayConvertDistributionType(TableInfo info) throws MetaNotFoundException {
Expand Down
8 changes: 4 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void addTaskResult(MTMVTask task, MTMVRelation relation,
// to connection issues such as S3, so it is directly set to null
if (!isReplay) {
// shouldn't do this while holding mvWriteLock
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true);
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true, true);
}
} catch (Throwable e) {
mtmvCache = null;
Expand Down Expand Up @@ -323,7 +323,7 @@ public MTMVCache getOrGenerateCache(ConnectContext connectionContext) throws Ana
MTMVCache mtmvCache;
try {
// Should new context with ADMIN user
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true);
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true, false);
} finally {
connectionContext.setThreadLocalInfo();
}
Expand Down Expand Up @@ -362,7 +362,7 @@ public MTMVRefreshSnapshot getRefreshSnapshot() {
*
* @return mvPartitionName ==> mvPartitionKeyDesc
*/
public Map<String, PartitionKeyDesc> generateMvPartitionDescs() throws AnalysisException {
public Map<String, PartitionKeyDesc> generateMvPartitionDescs() {
Map<String, PartitionItem> mtmvItems = getAndCopyPartitionItems();
Map<String, PartitionKeyDesc> result = Maps.newHashMap();
for (Entry<String, PartitionItem> entry : mtmvItems.entrySet()) {
Expand Down Expand Up @@ -392,7 +392,7 @@ public Pair<Map<String, Set<String>>, Map<String, String>> calculateDoublyPartit
Map<String, String> baseToMv = Maps.newHashMap();
Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = MTMVPartitionUtil
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties);
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItemsWithoutLock();
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
for (Entry<String, PartitionItem> entry : mvPartitionItems.entrySet()) {
Set<String> basePartitionNames = relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(),
Sets.newHashSet());
Expand Down
17 changes: 2 additions & 15 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -3305,23 +3304,11 @@ public PartitionType getPartitionType() {
}

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot)
throws AnalysisException {
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
return getAndCopyPartitionItems();
}

public Map<String, PartitionItem> getAndCopyPartitionItems() throws AnalysisException {
if (!tryReadLock(1, TimeUnit.MINUTES)) {
throw new AnalysisException("get table read lock timeout, database=" + getDBName() + ",table=" + getName());
}
try {
return getAndCopyPartitionItemsWithoutLock();
} finally {
readUnlock();
}
}

public Map<String, PartitionItem> getAndCopyPartitionItemsWithoutLock() throws AnalysisException {
public Map<String, PartitionItem> getAndCopyPartitionItems() {
Map<String, PartitionItem> res = Maps.newHashMap();
for (Entry<Long, PartitionItem> entry : getPartitionInfo().getIdToItem(false).entrySet()) {
Partition partition = idToPartition.get(entry.getKey());
Expand Down
Loading

0 comments on commit 1aedf9d

Please sign in to comment.