Skip to content

Commit

Permalink
[opt](Nereids) lock table in ascending order of table IDs
Browse files Browse the repository at this point in the history
  • Loading branch information
morrySnow committed Dec 5, 2024
1 parent ecd8af2 commit d030993
Show file tree
Hide file tree
Showing 37 changed files with 671 additions and 662 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1054,10 +1054,7 @@ public boolean equals(Object obj) {
&& isKey == other.isKey
&& isAllowNull == other.isAllowNull
&& isAutoInc == other.isAutoInc
&& getDataType().equals(other.getDataType())
&& getStrLen() == other.getStrLen()
&& getPrecision() == other.getPrecision()
&& getScale() == other.getScale()
&& Objects.equals(type, other.type)
&& Objects.equals(comment, other.comment)
&& visible == other.visible
&& Objects.equals(children, other.children)
Expand Down
4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void addTaskResult(MTMVTask task, MTMVRelation relation,
// to connection issues such as S3, so it is directly set to null
if (!isReplay) {
// shouldn't do this while holding mvWriteLock
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true);
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true, true);
}
} catch (Throwable e) {
mtmvCache = null;
Expand Down Expand Up @@ -323,7 +323,7 @@ public MTMVCache getOrGenerateCache(ConnectContext connectionContext) throws Ana
MTMVCache mtmvCache;
try {
// Should new context with ADMIN user
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true);
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true, false);
} finally {
connectionContext.setThreadLocalInfo();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ private Optional<LogicalSqlCache> tryParseSqlWithoutCheckVariable(
SqlCacheContext sqlCacheContext, UserIdentity currentUserIdentity) {
Env env = connectContext.getEnv();

if (!tryLockTables(connectContext, env, sqlCacheContext)) {
return invalidateCache(key);
}

// check table and view and their columns authority
if (privilegeChanged(connectContext, env, sqlCacheContext)) {
return invalidateCache(key);
Expand Down Expand Up @@ -378,16 +382,38 @@ private boolean dataMaskPoliciesChanged(
return false;
}

private boolean privilegeChanged(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) {
/**
* Execute table locking operations in ascending order of table IDs.
*
* @return true if obtain all tables lock.
*/
private boolean tryLockTables(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) {
StatementContext currentStatementContext = connectContext.getStatementContext();
for (FullTableName fullTableName : sqlCacheContext.getUsedTables()) {
TableIf tableIf = findTableIf(env, fullTableName);
if (tableIf == null) {
return false;
}
currentStatementContext.getTables().put(fullTableName.toList(), tableIf);
}
for (FullTableName fullTableName : sqlCacheContext.getUsedViews().keySet()) {
TableIf tableIf = findTableIf(env, fullTableName);
if (tableIf == null) {
return false;
}
currentStatementContext.getTables().put(fullTableName.toList(), tableIf);
}
currentStatementContext.lock();
return true;
}

private boolean privilegeChanged(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) {
for (Entry<FullTableName, Set<String>> kv : sqlCacheContext.getCheckPrivilegeTablesOrViews().entrySet()) {
Set<String> usedColumns = kv.getValue();
TableIf tableIf = findTableIf(env, kv.getKey());
if (tableIf == null) {
return true;
}
// release when close statementContext
currentStatementContext.addTableReadLock(tableIf);
try {
UserAuthentication.checkPermission(tableIf, connectContext, usedColumns);
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class SummaryProfile {
public static final String GET_TABLE_VERSION_COUNT = "Get Table Version Count";

public static final String PARSE_SQL_TIME = "Parse SQL Time";
public static final String NEREIDS_LOCK_TABLE_TIME = "Nereids Lock Table Time";
public static final String NEREIDS_ANALYSIS_TIME = "Nereids Analysis Time";
public static final String NEREIDS_REWRITE_TIME = "Nereids Rewrite Time";
public static final String NEREIDS_OPTIMIZE_TIME = "Nereids Optimize Time";
Expand Down Expand Up @@ -136,6 +137,7 @@ public class SummaryProfile {
// The display order of execution summary items.
public static final ImmutableList<String> EXECUTION_SUMMARY_KEYS = ImmutableList.of(
PARSE_SQL_TIME,
NEREIDS_LOCK_TABLE_TIME,
NEREIDS_ANALYSIS_TIME,
NEREIDS_REWRITE_TIME,
NEREIDS_OPTIMIZE_TIME,
Expand Down Expand Up @@ -224,6 +226,8 @@ public class SummaryProfile {
private long parseSqlStartTime = -1;
@SerializedName(value = "parseSqlFinishTime")
private long parseSqlFinishTime = -1;
@SerializedName(value = "nereidsLockTableFinishTime")
private long nereidsLockTableFinishTime = -1;
@SerializedName(value = "nereidsAnalysisFinishTime")
private long nereidsAnalysisFinishTime = -1;
@SerializedName(value = "nereidsRewriteFinishTime")
Expand Down Expand Up @@ -410,6 +414,7 @@ private void updateSummaryProfile(Map<String, String> infos) {

private void updateExecutionSummaryProfile() {
executionSummaryProfile.addInfoString(PARSE_SQL_TIME, getPrettyParseSqlTime());
executionSummaryProfile.addInfoString(NEREIDS_LOCK_TABLE_TIME, getPrettyNereidsLockTableTime());
executionSummaryProfile.addInfoString(NEREIDS_ANALYSIS_TIME, getPrettyNereidsAnalysisTime());
executionSummaryProfile.addInfoString(NEREIDS_REWRITE_TIME, getPrettyNereidsRewriteTime());
executionSummaryProfile.addInfoString(NEREIDS_OPTIMIZE_TIME, getPrettyNereidsOptimizeTime());
Expand Down Expand Up @@ -506,6 +511,10 @@ public void setParseSqlFinishTime(long parseSqlFinishTime) {
this.parseSqlFinishTime = parseSqlFinishTime;
}

public void setNereidsLockTableFinishTime() {
this.nereidsLockTableFinishTime = TimeUtils.getStartTimeMs();
}

public void setNereidsAnalysisTime() {
this.nereidsAnalysisFinishTime = TimeUtils.getStartTimeMs();
}
Expand Down Expand Up @@ -766,8 +775,12 @@ public String getPrettyParseSqlTime() {
return getPrettyTime(parseSqlFinishTime, parseSqlStartTime, TUnit.TIME_MS);
}

public String getPrettyNereidsLockTableTime() {
return getPrettyTime(nereidsLockTableFinishTime, parseSqlStartTime, TUnit.TIME_MS);
}

public String getPrettyNereidsAnalysisTime() {
return getPrettyTime(nereidsAnalysisFinishTime, queryBeginTime, TUnit.TIME_MS);
return getPrettyTime(nereidsAnalysisFinishTime, nereidsLockTableFinishTime, TUnit.TIME_MS);
}

public String getPrettyNereidsRewriteTime() {
Expand Down
34 changes: 24 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,37 @@ public StructInfo getStructInfo() {
return structInfo;
}

public static MTMVCache from(MTMV mtmv, ConnectContext connectContext, boolean needCost) {
public static MTMVCache from(MTMV mtmv, ConnectContext connectContext, boolean needCost, boolean needLock) {
StatementContext mvSqlStatementContext = new StatementContext(connectContext,
new OriginStatement(mtmv.getQuerySql(), 0));
if (mvSqlStatementContext.getConnectContext().getStatementContext() == null) {
mvSqlStatementContext.getConnectContext().setStatementContext(mvSqlStatementContext);
}
LogicalPlan unboundMvPlan = new NereidsParser().parseSingle(mtmv.getQuerySql());
NereidsPlanner planner = new NereidsPlanner(mvSqlStatementContext);
boolean originalRewriteFlag = connectContext.getSessionVariable().enableMaterializedViewRewrite;
connectContext.getSessionVariable().enableMaterializedViewRewrite = false;
try {
// Can not convert to table sink, because use the same column from different table when self join
// the out slot is wrong
if (needCost) {
// Only in mv rewrite, we need plan with eliminated cost which is used for mv chosen
if (needLock) {
planner.planWithLock(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
} else {
planner.planWithoutLock(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN, false);
}
} else {
// No need cost for performance
if (needLock) {
planner.planWithLock(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);

// Can not convert to table sink, because use the same column from different table when self join
// the out slot is wrong
if (needCost) {
// Only in mv rewrite, we need plan with eliminated cost which is used for mv chosen
planner.planWithLock(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
} else {
// No need cost for performance
planner.planWithLock(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
} else {
planner.planWithoutLock(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN, false);
}
}
} finally {
connectContext.getSessionVariable().enableMaterializedViewRewrite = originalRewriteFlag;
}
Plan originPlan = planner.getCascadesContext().getRewritePlan();
// Eliminate result sink because sink operator is useless in query rewrite by materialized view
Expand All @@ -128,6 +142,6 @@ public Plan visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResu
new BitSet());
return new MTMVCache(mvPlan, originPlan, planner.getAnalyzedPlan(), needCost
? planner.getCascadesContext().getMemo().getRoot().getStatistics() : null,
structInfoOptional.orElseGet(() -> null));
structInfoOptional.orElse(null));
}
}
Loading

0 comments on commit d030993

Please sign in to comment.