Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang committed Feb 20, 2024
1 parent 8f97843 commit 9250c49
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 85 deletions.
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
int64_t base_max_version = _base_tablet->max_version_unlocked();
if (request.alter_version > 1) {
// [0-1] is a placeholder rowset, no need to convert
RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2, base_max_version}, &rs_splits));
RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2, base_max_version}, &rs_splits, false));
}
// FIXME(cyx): Should trigger compaction on base_tablet if there are too many rowsets to convert.

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class CloudTablet final : public BaseTablet {
bool vertical) override;

Status capture_rs_readers(const Version& spec_version, std::vector<RowSetSplits>* rs_splits,
bool skip_missing_version = false) override;
bool skip_missing_version) override;

Status capture_consistent_rowsets_unlocked(
const Version& spec_version, std::vector<RowsetSharedPtr>* rowsets) const override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.task.AgentTask;

import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -292,6 +293,8 @@ protected boolean checkTableStable(Database db) throws AlterCancelException {

protected abstract void getInfo(List<List<Comparable>> infos);

protected void ensureCloudClusterExist(List<AgentTask> tasks) throws AlterCancelException {}

public abstract void replay(AlterJobV2 replayedJob);

public static AlterJobV2 read(DataInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public CloudRollupJobV2(String rawSql, long jobId, long dbId, long tableId, Stri
}

@Override
protected void commitRollupIndex() throws AlterCancelException {
protected void onCreateRollupReplicaDone() throws AlterCancelException {
List<Long> rollupIndexList = new ArrayList<Long>();
rollupIndexList.add(rollupIndexId);
try {
Expand All @@ -91,12 +91,12 @@ protected void commitRollupIndex() throws AlterCancelException {
throw new AlterCancelException(e.getMessage());
}

LOG.info("commitRollupIndex finished, dbId:{}, tableId:{}, jobId:{}, rollupIndexList:{}",
LOG.info("onCreateRollupReplicaDone finished, dbId:{}, tableId:{}, jobId:{}, rollupIndexList:{}",
dbId, tableId, jobId, rollupIndexList);
}

@Override
protected void postProcessRollupIndex() {
protected void onCancel() {
List<Long> rollupIndexList = new ArrayList<Long>();
rollupIndexList.add(rollupIndexId);
long tryTimes = 1;
Expand All @@ -106,13 +106,13 @@ protected void postProcessRollupIndex() {
.dropMaterializedIndex(tableId, rollupIndexList);
break;
} catch (Exception e) {
LOG.warn("tryTimes:{}, postProcessRollupIndex exception:", tryTimes, e);
LOG.warn("tryTimes:{}, onCancel exception:", tryTimes, e);
}
sleepSeveralSeconds();
tryTimes++;
}

LOG.info("postProcessRollupIndex finished, dbId:{}, tableId:{}, jobId:{}, rollupIndexList:{}",
LOG.info("onCancel finished, dbId:{}, tableId:{}, jobId:{}, rollupIndexList:{}",
dbId, tableId, jobId, rollupIndexList);
}

Expand All @@ -138,33 +138,7 @@ protected void createRollupReplica() throws AlterCancelException {
rollupIndexList.add(rollupIndexId);
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.prepareMaterializedIndex(tbl.getId(), rollupIndexList, expiration);
for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) {
long partitionId = entry.getKey();
Partition partition = tbl.getPartition(partitionId);
if (partition == null) {
continue;
}
TTabletType tabletType = tbl.getPartitionInfo().getTabletType(partitionId);
MaterializedIndex rollupIndex = entry.getValue();
Cloud.CreateTabletsRequest.Builder requestBuilder =
Cloud.CreateTabletsRequest.newBuilder();
for (Tablet rollupTablet : rollupIndex.getTablets()) {
OlapFile.TabletMetaCloudPB.Builder builder =
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.createTabletMetaBuilder(tableId, rollupIndexId,
partitionId, rollupTablet, tabletType, rollupSchemaHash,
rollupKeysType, rollupShortKeyColumnCount, tbl.getCopiedBfColumns(),
tbl.getBfFpp(), null, rollupSchema,
tbl.getDataSortInfo(), tbl.getCompressionType(), tbl.getStoragePolicy(),
tbl.isInMemory(), true,
tbl.getName(), tbl.getTTLSeconds(),
tbl.getEnableUniqueKeyMergeOnWrite(), tbl.storeRowColumn(),
tbl.getBaseSchemaVersion());
requestBuilder.addTabletMetas(builder);
} // end for rollupTablets
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.sendCreateTabletsRpc(requestBuilder);
}
createRollupReplicaForPartition(tbl);
} catch (Exception e) {
LOG.warn("createCloudShadowIndexReplica Exception:{}", e);
throw new AlterCancelException(e.getMessage());
Expand All @@ -184,8 +158,38 @@ protected void createRollupReplica() throws AlterCancelException {
}
}

private void createRollupReplicaForPartition(OlapTable tbl) throws Exception {
for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) {
long partitionId = entry.getKey();
Partition partition = tbl.getPartition(partitionId);
if (partition == null) {
continue;
}
TTabletType tabletType = tbl.getPartitionInfo().getTabletType(partitionId);
MaterializedIndex rollupIndex = entry.getValue();
Cloud.CreateTabletsRequest.Builder requestBuilder =
Cloud.CreateTabletsRequest.newBuilder();
for (Tablet rollupTablet : rollupIndex.getTablets()) {
OlapFile.TabletMetaCloudPB.Builder builder =
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.createTabletMetaBuilder(tableId, rollupIndexId,
partitionId, rollupTablet, tabletType, rollupSchemaHash,
rollupKeysType, rollupShortKeyColumnCount, tbl.getCopiedBfColumns(),
tbl.getBfFpp(), null, rollupSchema,
tbl.getDataSortInfo(), tbl.getCompressionType(), tbl.getStoragePolicy(),
tbl.isInMemory(), true,
tbl.getName(), tbl.getTTLSeconds(),
tbl.getEnableUniqueKeyMergeOnWrite(), tbl.storeRowColumn(),
tbl.getBaseSchemaVersion());
requestBuilder.addTabletMetas(builder);
} // end for rollupTablets
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.sendCreateTabletsRpc(requestBuilder);
}
}

@Override
protected void checkCloudClusterName(List<AgentTask> tasks) throws AlterCancelException {
protected void ensureCloudClusterExist(List<AgentTask> tasks) throws AlterCancelException {
if (((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getCloudClusterIdByName(cloudClusterName) == null) {
for (AgentTask task : tasks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,43 +134,7 @@ protected void createShadowIndexReplica() throws AlterCancelException {
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.prepareMaterializedIndex(tableId, shadowIdxList,
expiration);

for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
if (partition == null) {
continue;
}
Map<Long, MaterializedIndex> shadowIndexMap = partitionIndexMap.row(partitionId);
for (Map.Entry<Long, MaterializedIndex> entry : shadowIndexMap.entrySet()) {
long shadowIdxId = entry.getKey();
MaterializedIndex shadowIdx = entry.getValue();

short shadowShortKeyColumnCount = indexShortKeyMap.get(shadowIdxId);
List<Column> shadowSchema = indexSchemaMap.get(shadowIdxId);
int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash;
int shadowSchemaVersion = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaVersion;
long originIndexId = indexIdMap.get(shadowIdxId);
KeysType originKeysType = tbl.getKeysTypeByIndexId(originIndexId);

Cloud.CreateTabletsRequest.Builder requestBuilder =
Cloud.CreateTabletsRequest.newBuilder();
for (Tablet shadowTablet : shadowIdx.getTablets()) {
OlapFile.TabletMetaCloudPB.Builder builder =
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.createTabletMetaBuilder(tableId, shadowIdxId,
partitionId, shadowTablet, tbl.getPartitionInfo().getTabletType(partitionId),
shadowSchemaHash, originKeysType, shadowShortKeyColumnCount, bfColumns,
bfFpp, indexes, shadowSchema, tbl.getDataSortInfo(), tbl.getCompressionType(),
tbl.getStoragePolicy(), tbl.isInMemory(), true,
tbl.getName(), tbl.getTTLSeconds(),
tbl.getEnableUniqueKeyMergeOnWrite(), tbl.storeRowColumn(),
shadowSchemaVersion);
requestBuilder.addTabletMetas(builder);
} // end for rollupTablets
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.sendCreateTabletsRpc(requestBuilder);
}
}
createShadowIndexReplicaForPartition(tbl);
} catch (Exception e) {
LOG.warn("createCloudShadowIndexReplica Exception:", e);
throw new AlterCancelException(e.getMessage());
Expand All @@ -191,8 +155,47 @@ protected void createShadowIndexReplica() throws AlterCancelException {
}
}

private void createShadowIndexReplicaForPartition(OlapTable tbl) throws Exception {
for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
if (partition == null) {
continue;
}
Map<Long, MaterializedIndex> shadowIndexMap = partitionIndexMap.row(partitionId);
for (Map.Entry<Long, MaterializedIndex> entry : shadowIndexMap.entrySet()) {
long shadowIdxId = entry.getKey();
MaterializedIndex shadowIdx = entry.getValue();

short shadowShortKeyColumnCount = indexShortKeyMap.get(shadowIdxId);
List<Column> shadowSchema = indexSchemaMap.get(shadowIdxId);
int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash;
int shadowSchemaVersion = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaVersion;
long originIndexId = indexIdMap.get(shadowIdxId);
KeysType originKeysType = tbl.getKeysTypeByIndexId(originIndexId);

Cloud.CreateTabletsRequest.Builder requestBuilder =
Cloud.CreateTabletsRequest.newBuilder();
for (Tablet shadowTablet : shadowIdx.getTablets()) {
OlapFile.TabletMetaCloudPB.Builder builder =
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.createTabletMetaBuilder(tableId, shadowIdxId,
partitionId, shadowTablet, tbl.getPartitionInfo().getTabletType(partitionId),
shadowSchemaHash, originKeysType, shadowShortKeyColumnCount, bfColumns,
bfFpp, indexes, shadowSchema, tbl.getDataSortInfo(), tbl.getCompressionType(),
tbl.getStoragePolicy(), tbl.isInMemory(), true,
tbl.getName(), tbl.getTTLSeconds(),
tbl.getEnableUniqueKeyMergeOnWrite(), tbl.storeRowColumn(),
shadowSchemaVersion);
requestBuilder.addTabletMetas(builder);
} // end for rollupTablets
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.sendCreateTabletsRpc(requestBuilder);
}
}
}

@Override
protected void checkCloudClusterName(List<AgentTask> tasks) throws AlterCancelException {
protected void ensureCloudClusterExist(List<AgentTask> tasks) throws AlterCancelException {
if (((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getCloudClusterIdByName(cloudClusterName) == null) {
for (AgentTask task : tasks) {
Expand Down
14 changes: 6 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ protected void runRunningJob() throws AlterCancelException {
if (!rollupBatchTask.isFinished()) {
LOG.info("rollup tasks not finished. job: {}", jobId);
List<AgentTask> tasks = rollupBatchTask.getUnfinishedTasks(2000);
checkCloudClusterName(tasks);
ensureCloudClusterExist(tasks);
for (AgentTask task : tasks) {
if (task.getFailedTimes() > 0) {
task.setFinished(true);
Expand Down Expand Up @@ -591,7 +591,7 @@ protected void runRunningJob() throws AlterCancelException {
}
} // end for tablets
} // end for partitions
commitRollupIndex();
onCreateRollupReplicaDone();
onFinished(tbl);
} finally {
tbl.writeUnlock();
Expand Down Expand Up @@ -649,7 +649,7 @@ protected boolean cancelImpl(String errMsg) {
this.finishedTimeMs = System.currentTimeMillis();
Env.getCurrentEnv().getEditLog().logAlterJob(this);
// try best to drop roll index, when job is cancelled
postProcessRollupIndex();
onCancel();

LOG.info("cancel {} job {}, err: {}", this.type, jobId, errMsg);
return true;
Expand Down Expand Up @@ -781,7 +781,7 @@ private void replayRunningJob(RollupJobV2 replayedJob) {
private void replayCancelled(RollupJobV2 replayedJob) {
cancelInternal();
// try best to drop roll index, when job is cancelled
postProcessRollupIndex();
onCancel();
this.jobState = JobState.CANCELLED;
this.finishedTimeMs = replayedJob.finishedTimeMs;
this.errMsg = replayedJob.errMsg;
Expand Down Expand Up @@ -903,11 +903,9 @@ public void gsonPostProcess() throws IOException {
setColumnsDefineExpr(stmt.getMVColumnItemList());
}

protected void commitRollupIndex() throws AlterCancelException {}
protected void onCreateRollupReplicaDone() throws AlterCancelException {}

protected void postProcessRollupIndex() {}

protected void checkCloudClusterName(List<AgentTask> tasks) throws AlterCancelException {}
protected void onCancel() {}

@Override
public String toJson() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2750,7 +2750,8 @@ public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable o
}
}

public void replayModifyTableLightSchemaChange(TableAddOrDropColumnsInfo info) throws MetaNotFoundException {
public void replayModifyTableLightSchemaChange(TableAddOrDropColumnsInfo info)
throws MetaNotFoundException, AnalysisException {
if (LOG.isDebugEnabled()) {
LOG.debug("info:{}", info);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ protected void runRunningJob() throws AlterCancelException {
if (!schemaChangeBatchTask.isFinished()) {
LOG.info("schema change tasks not finished. job: {}", jobId);
List<AgentTask> tasks = schemaChangeBatchTask.getUnfinishedTasks(2000);
checkCloudClusterName(tasks);
ensureCloudClusterExist(tasks);
for (AgentTask task : tasks) {
if (task.getFailedTimes() > 0) {
task.setFinished(true);
Expand Down Expand Up @@ -999,8 +999,6 @@ protected void postProcessShadowIndex() {}

protected void postProcessOriginIndex() {}

protected void checkCloudClusterName(List<AgentTask> tasks) throws AlterCancelException {}

@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this, AlterJobV2.class);
Expand Down

0 comments on commit 9250c49

Please sign in to comment.