diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index bd3fbc59edc0a84..b61295459b4e7c0 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -90,7 +90,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque int64_t base_max_version = _base_tablet->max_version_unlocked(); if (request.alter_version > 1) { // [0-1] is a placeholder rowset, no need to convert - RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2, base_max_version}, &rs_splits)); + RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2, base_max_version}, &rs_splits, false)); } // FIXME(cyx): Should trigger compaction on base_tablet if there are too many rowsets to convert. diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index 1e8a2cb08ecc8fb..10e1390496140a7 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -35,7 +35,7 @@ class CloudTablet final : public BaseTablet { bool vertical) override; Status capture_rs_readers(const Version& spec_version, std::vector* rs_splits, - bool skip_missing_version = false) override; + bool skip_missing_version) override; Status capture_consistent_rowsets_unlocked( const Version& spec_version, std::vector* rowsets) const override; diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java index 01f529d5034fefd..5362b4849e45ebd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java @@ -28,6 +28,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.task.AgentTask; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; @@ -292,6 +293,8 @@ protected boolean checkTableStable(Database db) throws AlterCancelException { protected abstract void getInfo(List> infos); + protected void ensureCloudClusterExist(List tasks) throws AlterCancelException {} + public abstract void replay(AlterJobV2 replayedJob); public static AlterJobV2 read(DataInput in) throws IOException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java index b7b38d9670048d9..622f71471959f4a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java @@ -80,7 +80,7 @@ public CloudRollupJobV2(String rawSql, long jobId, long dbId, long tableId, Stri } @Override - protected void commitRollupIndex() throws AlterCancelException { + protected void onCreateRollupReplicaDone() throws AlterCancelException { List rollupIndexList = new ArrayList(); rollupIndexList.add(rollupIndexId); try { @@ -91,12 +91,12 @@ protected void commitRollupIndex() throws AlterCancelException { throw new AlterCancelException(e.getMessage()); } - LOG.info("commitRollupIndex finished, dbId:{}, tableId:{}, jobId:{}, rollupIndexList:{}", + LOG.info("onCreateRollupReplicaDone finished, dbId:{}, tableId:{}, jobId:{}, rollupIndexList:{}", dbId, tableId, jobId, rollupIndexList); } @Override - protected void postProcessRollupIndex() { + protected void onCancel() { List rollupIndexList = new ArrayList(); rollupIndexList.add(rollupIndexId); long tryTimes = 1; @@ -106,13 +106,13 @@ protected void postProcessRollupIndex() { .dropMaterializedIndex(tableId, rollupIndexList); break; } catch (Exception e) { - LOG.warn("tryTimes:{}, postProcessRollupIndex exception:", tryTimes, e); + LOG.warn("tryTimes:{}, onCancel exception:", tryTimes, e); } sleepSeveralSeconds(); tryTimes++; } - LOG.info("postProcessRollupIndex finished, dbId:{}, tableId:{}, jobId:{}, rollupIndexList:{}", + LOG.info("onCancel finished, dbId:{}, tableId:{}, jobId:{}, rollupIndexList:{}", dbId, tableId, jobId, rollupIndexList); } @@ -138,33 +138,7 @@ protected void createRollupReplica() throws AlterCancelException { rollupIndexList.add(rollupIndexId); ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) .prepareMaterializedIndex(tbl.getId(), rollupIndexList, expiration); - for (Map.Entry entry : this.partitionIdToRollupIndex.entrySet()) { - long partitionId = entry.getKey(); - Partition partition = tbl.getPartition(partitionId); - if (partition == null) { - continue; - } - TTabletType tabletType = tbl.getPartitionInfo().getTabletType(partitionId); - MaterializedIndex rollupIndex = entry.getValue(); - Cloud.CreateTabletsRequest.Builder requestBuilder = - Cloud.CreateTabletsRequest.newBuilder(); - for (Tablet rollupTablet : rollupIndex.getTablets()) { - OlapFile.TabletMetaCloudPB.Builder builder = - ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) - .createTabletMetaBuilder(tableId, rollupIndexId, - partitionId, rollupTablet, tabletType, rollupSchemaHash, - rollupKeysType, rollupShortKeyColumnCount, tbl.getCopiedBfColumns(), - tbl.getBfFpp(), null, rollupSchema, - tbl.getDataSortInfo(), tbl.getCompressionType(), tbl.getStoragePolicy(), - tbl.isInMemory(), true, - tbl.getName(), tbl.getTTLSeconds(), - tbl.getEnableUniqueKeyMergeOnWrite(), tbl.storeRowColumn(), - tbl.getBaseSchemaVersion()); - requestBuilder.addTabletMetas(builder); - } // end for rollupTablets - ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) - .sendCreateTabletsRpc(requestBuilder); - } + createRollupReplicaForPartition(tbl); } catch (Exception e) { LOG.warn("createCloudShadowIndexReplica Exception:{}", e); throw new AlterCancelException(e.getMessage()); @@ -184,8 +158,38 @@ protected void createRollupReplica() throws AlterCancelException { } } + private void createRollupReplicaForPartition(OlapTable tbl) throws Exception { + for (Map.Entry entry : this.partitionIdToRollupIndex.entrySet()) { + long partitionId = entry.getKey(); + Partition partition = tbl.getPartition(partitionId); + if (partition == null) { + continue; + } + TTabletType tabletType = tbl.getPartitionInfo().getTabletType(partitionId); + MaterializedIndex rollupIndex = entry.getValue(); + Cloud.CreateTabletsRequest.Builder requestBuilder = + Cloud.CreateTabletsRequest.newBuilder(); + for (Tablet rollupTablet : rollupIndex.getTablets()) { + OlapFile.TabletMetaCloudPB.Builder builder = + ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) + .createTabletMetaBuilder(tableId, rollupIndexId, + partitionId, rollupTablet, tabletType, rollupSchemaHash, + rollupKeysType, rollupShortKeyColumnCount, tbl.getCopiedBfColumns(), + tbl.getBfFpp(), null, rollupSchema, + tbl.getDataSortInfo(), tbl.getCompressionType(), tbl.getStoragePolicy(), + tbl.isInMemory(), true, + tbl.getName(), tbl.getTTLSeconds(), + tbl.getEnableUniqueKeyMergeOnWrite(), tbl.storeRowColumn(), + tbl.getBaseSchemaVersion()); + requestBuilder.addTabletMetas(builder); + } // end for rollupTablets + ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) + .sendCreateTabletsRpc(requestBuilder); + } + } + @Override - protected void checkCloudClusterName(List tasks) throws AlterCancelException { + protected void ensureCloudClusterExist(List tasks) throws AlterCancelException { if (((CloudSystemInfoService) Env.getCurrentSystemInfo()) .getCloudClusterIdByName(cloudClusterName) == null) { for (AgentTask task : tasks) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java index efea337214e463f..af9456c1a5b6cc6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java @@ -134,43 +134,7 @@ protected void createShadowIndexReplica() throws AlterCancelException { ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) .prepareMaterializedIndex(tableId, shadowIdxList, expiration); - - for (long partitionId : partitionIndexMap.rowKeySet()) { - Partition partition = tbl.getPartition(partitionId); - if (partition == null) { - continue; - } - Map shadowIndexMap = partitionIndexMap.row(partitionId); - for (Map.Entry entry : shadowIndexMap.entrySet()) { - long shadowIdxId = entry.getKey(); - MaterializedIndex shadowIdx = entry.getValue(); - - short shadowShortKeyColumnCount = indexShortKeyMap.get(shadowIdxId); - List shadowSchema = indexSchemaMap.get(shadowIdxId); - int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash; - int shadowSchemaVersion = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaVersion; - long originIndexId = indexIdMap.get(shadowIdxId); - KeysType originKeysType = tbl.getKeysTypeByIndexId(originIndexId); - - Cloud.CreateTabletsRequest.Builder requestBuilder = - Cloud.CreateTabletsRequest.newBuilder(); - for (Tablet shadowTablet : shadowIdx.getTablets()) { - OlapFile.TabletMetaCloudPB.Builder builder = - ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) - .createTabletMetaBuilder(tableId, shadowIdxId, - partitionId, shadowTablet, tbl.getPartitionInfo().getTabletType(partitionId), - shadowSchemaHash, originKeysType, shadowShortKeyColumnCount, bfColumns, - bfFpp, indexes, shadowSchema, tbl.getDataSortInfo(), tbl.getCompressionType(), - tbl.getStoragePolicy(), tbl.isInMemory(), true, - tbl.getName(), tbl.getTTLSeconds(), - tbl.getEnableUniqueKeyMergeOnWrite(), tbl.storeRowColumn(), - shadowSchemaVersion); - requestBuilder.addTabletMetas(builder); - } // end for rollupTablets - ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) - .sendCreateTabletsRpc(requestBuilder); - } - } + createShadowIndexReplicaForPartition(tbl); } catch (Exception e) { LOG.warn("createCloudShadowIndexReplica Exception:", e); throw new AlterCancelException(e.getMessage()); @@ -191,8 +155,47 @@ protected void createShadowIndexReplica() throws AlterCancelException { } } + private void createShadowIndexReplicaForPartition(OlapTable tbl) throws Exception { + for (long partitionId : partitionIndexMap.rowKeySet()) { + Partition partition = tbl.getPartition(partitionId); + if (partition == null) { + continue; + } + Map shadowIndexMap = partitionIndexMap.row(partitionId); + for (Map.Entry entry : shadowIndexMap.entrySet()) { + long shadowIdxId = entry.getKey(); + MaterializedIndex shadowIdx = entry.getValue(); + + short shadowShortKeyColumnCount = indexShortKeyMap.get(shadowIdxId); + List shadowSchema = indexSchemaMap.get(shadowIdxId); + int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash; + int shadowSchemaVersion = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaVersion; + long originIndexId = indexIdMap.get(shadowIdxId); + KeysType originKeysType = tbl.getKeysTypeByIndexId(originIndexId); + + Cloud.CreateTabletsRequest.Builder requestBuilder = + Cloud.CreateTabletsRequest.newBuilder(); + for (Tablet shadowTablet : shadowIdx.getTablets()) { + OlapFile.TabletMetaCloudPB.Builder builder = + ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) + .createTabletMetaBuilder(tableId, shadowIdxId, + partitionId, shadowTablet, tbl.getPartitionInfo().getTabletType(partitionId), + shadowSchemaHash, originKeysType, shadowShortKeyColumnCount, bfColumns, + bfFpp, indexes, shadowSchema, tbl.getDataSortInfo(), tbl.getCompressionType(), + tbl.getStoragePolicy(), tbl.isInMemory(), true, + tbl.getName(), tbl.getTTLSeconds(), + tbl.getEnableUniqueKeyMergeOnWrite(), tbl.storeRowColumn(), + shadowSchemaVersion); + requestBuilder.addTabletMetas(builder); + } // end for rollupTablets + ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) + .sendCreateTabletsRpc(requestBuilder); + } + } + } + @Override - protected void checkCloudClusterName(List tasks) throws AlterCancelException { + protected void ensureCloudClusterExist(List tasks) throws AlterCancelException { if (((CloudSystemInfoService) Env.getCurrentSystemInfo()) .getCloudClusterIdByName(cloudClusterName) == null) { for (AgentTask task : tasks) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 1caeae70fbb4449..cccaad503c771b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -526,7 +526,7 @@ protected void runRunningJob() throws AlterCancelException { if (!rollupBatchTask.isFinished()) { LOG.info("rollup tasks not finished. job: {}", jobId); List tasks = rollupBatchTask.getUnfinishedTasks(2000); - checkCloudClusterName(tasks); + ensureCloudClusterExist(tasks); for (AgentTask task : tasks) { if (task.getFailedTimes() > 0) { task.setFinished(true); @@ -591,7 +591,7 @@ protected void runRunningJob() throws AlterCancelException { } } // end for tablets } // end for partitions - commitRollupIndex(); + onCreateRollupReplicaDone(); onFinished(tbl); } finally { tbl.writeUnlock(); @@ -649,7 +649,7 @@ protected boolean cancelImpl(String errMsg) { this.finishedTimeMs = System.currentTimeMillis(); Env.getCurrentEnv().getEditLog().logAlterJob(this); // try best to drop roll index, when job is cancelled - postProcessRollupIndex(); + onCancel(); LOG.info("cancel {} job {}, err: {}", this.type, jobId, errMsg); return true; @@ -781,7 +781,7 @@ private void replayRunningJob(RollupJobV2 replayedJob) { private void replayCancelled(RollupJobV2 replayedJob) { cancelInternal(); // try best to drop roll index, when job is cancelled - postProcessRollupIndex(); + onCancel(); this.jobState = JobState.CANCELLED; this.finishedTimeMs = replayedJob.finishedTimeMs; this.errMsg = replayedJob.errMsg; @@ -903,11 +903,9 @@ public void gsonPostProcess() throws IOException { setColumnsDefineExpr(stmt.getMVColumnItemList()); } - protected void commitRollupIndex() throws AlterCancelException {} + protected void onCreateRollupReplicaDone() throws AlterCancelException {} - protected void postProcessRollupIndex() {} - - protected void checkCloudClusterName(List tasks) throws AlterCancelException {} + protected void onCancel() {} @Override public String toJson() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 0e89f029e5a57d4..bd3a0c291daf451 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -2750,7 +2750,8 @@ public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable o } } - public void replayModifyTableLightSchemaChange(TableAddOrDropColumnsInfo info) throws MetaNotFoundException { + public void replayModifyTableLightSchemaChange(TableAddOrDropColumnsInfo info) + throws MetaNotFoundException, AnalysisException { if (LOG.isDebugEnabled()) { LOG.debug("info:{}", info); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index d63372d3ceef89f..2256aac21218a8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -535,7 +535,7 @@ protected void runRunningJob() throws AlterCancelException { if (!schemaChangeBatchTask.isFinished()) { LOG.info("schema change tasks not finished. job: {}", jobId); List tasks = schemaChangeBatchTask.getUnfinishedTasks(2000); - checkCloudClusterName(tasks); + ensureCloudClusterExist(tasks); for (AgentTask task : tasks) { if (task.getFailedTimes() > 0) { task.setFinished(true); @@ -999,8 +999,6 @@ protected void postProcessShadowIndex() {} protected void postProcessOriginIndex() {} - protected void checkCloudClusterName(List tasks) throws AlterCancelException {} - @Override public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this, AlterJobV2.class);