Skip to content

Commit

Permalink
[opt](Nereids) lock table in ascending order of table IDs
Browse files Browse the repository at this point in the history
  • Loading branch information
morrySnow committed Dec 12, 2024
1 parent e2c24e3 commit 3b9418a
Show file tree
Hide file tree
Showing 53 changed files with 942 additions and 1,182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1054,10 +1054,7 @@ public boolean equals(Object obj) {
&& isKey == other.isKey
&& isAllowNull == other.isAllowNull
&& isAutoInc == other.isAutoInc
&& getDataType().equals(other.getDataType())
&& getStrLen() == other.getStrLen()
&& getPrecision() == other.getPrecision()
&& getScale() == other.getScale()
&& Objects.equals(type, other.type)
&& Objects.equals(comment, other.comment)
&& visible == other.visible
&& Objects.equals(children, other.children)
Expand Down
8 changes: 4 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void addTaskResult(MTMVTask task, MTMVRelation relation,
// to connection issues such as S3, so it is directly set to null
if (!isReplay) {
// shouldn't do this while holding mvWriteLock
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true);
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true, true);
}
} catch (Throwable e) {
mtmvCache = null;
Expand Down Expand Up @@ -323,7 +323,7 @@ public MTMVCache getOrGenerateCache(ConnectContext connectionContext) throws Ana
MTMVCache mtmvCache;
try {
// Should new context with ADMIN user
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true);
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true, false);
} finally {
connectionContext.setThreadLocalInfo();
}
Expand Down Expand Up @@ -362,7 +362,7 @@ public MTMVRefreshSnapshot getRefreshSnapshot() {
*
* @return mvPartitionName ==> mvPartitionKeyDesc
*/
public Map<String, PartitionKeyDesc> generateMvPartitionDescs() throws AnalysisException {
public Map<String, PartitionKeyDesc> generateMvPartitionDescs() {
Map<String, PartitionItem> mtmvItems = getAndCopyPartitionItems();
Map<String, PartitionKeyDesc> result = Maps.newHashMap();
for (Entry<String, PartitionItem> entry : mtmvItems.entrySet()) {
Expand Down Expand Up @@ -392,7 +392,7 @@ public Pair<Map<String, Set<String>>, Map<String, String>> calculateDoublyPartit
Map<String, String> baseToMv = Maps.newHashMap();
Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = MTMVPartitionUtil
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties);
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItemsWithoutLock();
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
for (Entry<String, PartitionItem> entry : mvPartitionItems.entrySet()) {
Set<String> basePartitionNames = relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(),
Sets.newHashSet());
Expand Down
17 changes: 2 additions & 15 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -3313,23 +3312,11 @@ public PartitionType getPartitionType() {
}

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot)
throws AnalysisException {
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
return getAndCopyPartitionItems();
}

public Map<String, PartitionItem> getAndCopyPartitionItems() throws AnalysisException {
if (!tryReadLock(1, TimeUnit.MINUTES)) {
throw new AnalysisException("get table read lock timeout, database=" + getDBName() + ",table=" + getName());
}
try {
return getAndCopyPartitionItemsWithoutLock();
} finally {
readUnlock();
}
}

public Map<String, PartitionItem> getAndCopyPartitionItemsWithoutLock() throws AnalysisException {
public Map<String, PartitionItem> getAndCopyPartitionItems() {
Map<String, PartitionItem> res = Maps.newHashMap();
for (Entry<Long, PartitionItem> entry : getPartitionInfo().getIdToItem(false).entrySet()) {
Partition partition = idToPartition.get(entry.getKey());
Expand Down
134 changes: 48 additions & 86 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,55 +213,43 @@ default Map<String, Constraint> getConstraintsMapUnsafe() {
}

default Set<ForeignKeyConstraint> getForeignKeyConstraints() {
readLock();
try {
return getConstraintsMapUnsafe().values().stream()
.filter(ForeignKeyConstraint.class::isInstance)
.map(ForeignKeyConstraint.class::cast)
.collect(ImmutableSet.toImmutableSet());
} catch (Exception ignored) {
return ImmutableSet.of();
} finally {
readUnlock();
}
}

default Map<String, Constraint> getConstraintsMap() {
readLock();
try {
return ImmutableMap.copyOf(getConstraintsMapUnsafe());
} catch (Exception ignored) {
return ImmutableMap.of();
} finally {
readUnlock();
}
}

default Set<PrimaryKeyConstraint> getPrimaryKeyConstraints() {
readLock();
try {
return getConstraintsMapUnsafe().values().stream()
.filter(PrimaryKeyConstraint.class::isInstance)
.map(PrimaryKeyConstraint.class::cast)
.collect(ImmutableSet.toImmutableSet());
} catch (Exception ignored) {
return ImmutableSet.of();
} finally {
readUnlock();
}
}

default Set<UniqueConstraint> getUniqueConstraints() {
readLock();
try {
return getConstraintsMapUnsafe().values().stream()
.filter(UniqueConstraint.class::isInstance)
.map(UniqueConstraint.class::cast)
.collect(ImmutableSet.toImmutableSet());
} catch (Exception ignored) {
return ImmutableSet.of();
} finally {
readUnlock();
}
}

Expand All @@ -280,34 +268,24 @@ default void checkConstraintNotExistenceUnsafe(String name, Constraint primaryKe
}

default void addUniqueConstraint(String name, ImmutableList<String> columns, boolean replay) {
writeLock();
try {
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
UniqueConstraint uniqueConstraint = new UniqueConstraint(name, ImmutableSet.copyOf(columns));
checkConstraintNotExistenceUnsafe(name, uniqueConstraint, constraintMap);
constraintMap.put(name, uniqueConstraint);
if (!replay) {
Env.getCurrentEnv().getEditLog().logAddConstraint(
new AlterConstraintLog(uniqueConstraint, this));
}
} finally {
writeUnlock();
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
UniqueConstraint uniqueConstraint = new UniqueConstraint(name, ImmutableSet.copyOf(columns));
checkConstraintNotExistenceUnsafe(name, uniqueConstraint, constraintMap);
constraintMap.put(name, uniqueConstraint);
if (!replay) {
Env.getCurrentEnv().getEditLog().logAddConstraint(
new AlterConstraintLog(uniqueConstraint, this));
}
}

default void addPrimaryKeyConstraint(String name, ImmutableList<String> columns, boolean replay) {
writeLock();
try {
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
PrimaryKeyConstraint primaryKeyConstraint = new PrimaryKeyConstraint(name, ImmutableSet.copyOf(columns));
checkConstraintNotExistenceUnsafe(name, primaryKeyConstraint, constraintMap);
constraintMap.put(name, primaryKeyConstraint);
if (!replay) {
Env.getCurrentEnv().getEditLog().logAddConstraint(
new AlterConstraintLog(primaryKeyConstraint, this));
}
} finally {
writeUnlock();
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
PrimaryKeyConstraint primaryKeyConstraint = new PrimaryKeyConstraint(name, ImmutableSet.copyOf(columns));
checkConstraintNotExistenceUnsafe(name, primaryKeyConstraint, constraintMap);
constraintMap.put(name, primaryKeyConstraint);
if (!replay) {
Env.getCurrentEnv().getEditLog().logAddConstraint(
new AlterConstraintLog(primaryKeyConstraint, this));
}
}

Expand All @@ -326,26 +304,19 @@ default PrimaryKeyConstraint tryGetPrimaryKeyForForeignKeyUnsafe(

default void addForeignConstraint(String name, ImmutableList<String> columns,
TableIf referencedTable, ImmutableList<String> referencedColumns, boolean replay) {
writeLock();
referencedTable.writeLock();
try {
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
ForeignKeyConstraint foreignKeyConstraint =
new ForeignKeyConstraint(name, columns, referencedTable, referencedColumns);
checkConstraintNotExistenceUnsafe(name, foreignKeyConstraint, constraintMap);
PrimaryKeyConstraint requirePrimaryKeyName = new PrimaryKeyConstraint(name,
foreignKeyConstraint.getReferencedColumnNames());
PrimaryKeyConstraint primaryKeyConstraint =
tryGetPrimaryKeyForForeignKeyUnsafe(requirePrimaryKeyName, referencedTable);
primaryKeyConstraint.addForeignTable(this);
constraintMap.put(name, foreignKeyConstraint);
if (!replay) {
Env.getCurrentEnv().getEditLog().logAddConstraint(
new AlterConstraintLog(foreignKeyConstraint, this));
}
} finally {
referencedTable.writeUnlock();
writeUnlock();
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
ForeignKeyConstraint foreignKeyConstraint =
new ForeignKeyConstraint(name, columns, referencedTable, referencedColumns);
checkConstraintNotExistenceUnsafe(name, foreignKeyConstraint, constraintMap);
PrimaryKeyConstraint requirePrimaryKeyName = new PrimaryKeyConstraint(name,
foreignKeyConstraint.getReferencedColumnNames());
PrimaryKeyConstraint primaryKeyConstraint =
tryGetPrimaryKeyForForeignKeyUnsafe(requirePrimaryKeyName, referencedTable);
primaryKeyConstraint.addForeignTable(this);
constraintMap.put(name, foreignKeyConstraint);
if (!replay) {
Env.getCurrentEnv().getEditLog().logAddConstraint(
new AlterConstraintLog(foreignKeyConstraint, this));
}
}

Expand Down Expand Up @@ -381,40 +352,31 @@ default void replayDropConstraint(String name) {
}

default void dropConstraint(String name, boolean replay) {
writeLock();
try {
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
if (!constraintMap.containsKey(name)) {
throw new AnalysisException(
String.format("Unknown constraint %s on table %s.", name, this.getName()));
}
Constraint constraint = constraintMap.get(name);
constraintMap.remove(name);
if (constraint instanceof PrimaryKeyConstraint) {
((PrimaryKeyConstraint) constraint).getForeignTables()
.forEach(t -> t.dropFKReferringPK(this, (PrimaryKeyConstraint) constraint));
}
if (!replay) {
Env.getCurrentEnv().getEditLog().logDropConstraint(new AlterConstraintLog(constraint, this));
}
} finally {
writeUnlock();
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
if (!constraintMap.containsKey(name)) {
throw new AnalysisException(
String.format("Unknown constraint %s on table %s.", name, this.getName()));
}
Constraint constraint = constraintMap.get(name);
constraintMap.remove(name);
if (constraint instanceof PrimaryKeyConstraint) {
((PrimaryKeyConstraint) constraint).getForeignTables()
.forEach(t -> t.dropFKReferringPK(this, (PrimaryKeyConstraint) constraint));
}
if (!replay) {
Env.getCurrentEnv().getEditLog().logDropConstraint(new AlterConstraintLog(constraint, this));
}
}

default void dropFKReferringPK(TableIf table, PrimaryKeyConstraint constraint) {
writeLock();
try {
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
Set<String> fkName = constraintMap.entrySet().stream()
.filter(e -> e.getValue() instanceof ForeignKeyConstraint
&& ((ForeignKeyConstraint) e.getValue()).isReferringPK(table, constraint))
.map(Entry::getKey)
.collect(Collectors.toSet());
fkName.forEach(constraintMap::remove);
} finally {
writeUnlock();
}
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
Set<String> fkName = constraintMap.entrySet().stream()
.filter(e -> e.getValue() instanceof ForeignKeyConstraint
&& ((ForeignKeyConstraint) e.getValue()).isReferringPK(table, constraint))
.map(Entry::getKey)
.collect(Collectors.toSet());
fkName.forEach(constraintMap::remove);

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ private Optional<LogicalSqlCache> tryParseSqlWithoutCheckVariable(
SqlCacheContext sqlCacheContext, UserIdentity currentUserIdentity) {
Env env = connectContext.getEnv();

if (!tryLockTables(connectContext, env, sqlCacheContext)) {
return invalidateCache(key);
}

// check table and view and their columns authority
if (privilegeChanged(connectContext, env, sqlCacheContext)) {
return invalidateCache(key);
Expand Down Expand Up @@ -378,16 +382,38 @@ private boolean dataMaskPoliciesChanged(
return false;
}

private boolean privilegeChanged(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) {
/**
* Execute table locking operations in ascending order of table IDs.
*
* @return true if obtain all tables lock.
*/
private boolean tryLockTables(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) {
StatementContext currentStatementContext = connectContext.getStatementContext();
for (FullTableName fullTableName : sqlCacheContext.getUsedTables()) {
TableIf tableIf = findTableIf(env, fullTableName);
if (tableIf == null) {
return false;
}
currentStatementContext.getTables().put(fullTableName.toList(), tableIf);
}
for (FullTableName fullTableName : sqlCacheContext.getUsedViews().keySet()) {
TableIf tableIf = findTableIf(env, fullTableName);
if (tableIf == null) {
return false;
}
currentStatementContext.getTables().put(fullTableName.toList(), tableIf);
}
currentStatementContext.lock();
return true;
}

private boolean privilegeChanged(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) {
for (Entry<FullTableName, Set<String>> kv : sqlCacheContext.getCheckPrivilegeTablesOrViews().entrySet()) {
Set<String> usedColumns = kv.getValue();
TableIf tableIf = findTableIf(env, kv.getKey());
if (tableIf == null) {
return true;
}
// release when close statementContext
currentStatementContext.addTableReadLock(tableIf);
try {
UserAuthentication.checkPermission(tableIf, connectContext, usedColumns);
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,21 @@

package org.apache.doris.common.lock;

import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.qe.ConnectContext;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* A monitored version of ReentrantReadWriteLock that provides additional
* monitoring capabilities for read and write locks.
*/
public class MonitoredReentrantReadWriteLock extends ReentrantReadWriteLock {

private static final Logger LOG = LogManager.getLogger(MonitoredReentrantReadWriteLock.class);
// Monitored read and write lock instances
private final ReadLock readLock = new ReadLock(this);
private final WriteLock writeLock = new WriteLock(this);
Expand Down Expand Up @@ -97,6 +105,11 @@ protected WriteLock(ReentrantReadWriteLock lock) {
public void lock() {
super.lock();
monitor.afterLock();
if (isFair() && getReadHoldCount() > 0) {
LOG.warn(" read lock count is {}, write lock count is {}, stack is {}, query id is {}",
getReadHoldCount(), getWriteHoldCount(), Thread.currentThread().getStackTrace(),
ConnectContext.get() == null ? "" : DebugUtil.printId(ConnectContext.get().queryId()));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ private List<Pair<List<Comparable>, TRow>> getPartitionInfosInrernal() throws An
List<Pair<List<Comparable>, TRow>> partitionInfos = new ArrayList<Pair<List<Comparable>, TRow>>();
Map<Long, List<String>> partitionsUnSyncTables = null;
String mtmvPartitionSyncErrorMsg = null;
olapTable.readLock();
if (olapTable instanceof MTMV) {
try {
partitionsUnSyncTables = MTMVPartitionUtil
Expand All @@ -251,7 +252,6 @@ private List<Pair<List<Comparable>, TRow>> getPartitionInfosInrernal() throws An
mtmvPartitionSyncErrorMsg = e.getMessage();
}
}
olapTable.readLock();
try {
List<Long> partitionIds;
PartitionInfo tblPartitionInfo = olapTable.getPartitionInfo();
Expand Down
Loading

0 comments on commit 3b9418a

Please sign in to comment.