Skip to content

Commit

Permalink
[BugFix] Fix dynamic overwrite replay fail
Browse files Browse the repository at this point in the history
Signed-off-by: meegoo <[email protected]>
  • Loading branch information
meegoo committed Dec 12, 2024
1 parent c07f7b6 commit d96a2fe
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ public static <T extends Comparable> boolean isRangeEqual(Range<T> l, Range<T> r
*/
public static void checkRangeListsMatch(List<Range<PartitionKey>> list1, List<Range<PartitionKey>> list2)
throws DdlException {
if (list1.isEmpty() && list2.isEmpty()) {
return;
}
Collections.sort(list1, RangeUtils.RANGE_COMPARATOR);
Collections.sort(list2, RangeUtils.RANGE_COMPARATOR);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public void replayStateChange(InsertOverwriteStateChangeInfo info) {
LOG.info("replay state change:{}", info);
// If the final status is failure, then GC must be done
if (info.getToState() == OVERWRITE_FAILED) {
job.setTmpPartitionIds(info.getTmpPartitionIds());
job.setJobState(OVERWRITE_FAILED);
LOG.info("replay insert overwrite job:{} to FAILED", job.getJobId());
gc(true);
Expand All @@ -193,6 +194,7 @@ public void replayStateChange(InsertOverwriteStateChangeInfo info) {
job.setJobState(InsertOverwriteJobState.OVERWRITE_RUNNING);
break;
case OVERWRITE_SUCCESS:
job.setTmpPartitionIds(info.getTmpPartitionIds());
job.setJobState(InsertOverwriteJobState.OVERWRITE_SUCCESS);
doCommit(true);
LOG.info("replay insert overwrite job:{} to SUCCESS", job.getJobId());
Expand Down Expand Up @@ -429,19 +431,26 @@ private void gc(boolean isReplay) {
}
// if dynamic overwrite, drop all runtime created partitions
if (job.isDynamicOverwrite()) {
TransactionState txnState = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr()
.getTransactionState(dbId, insertStmt.getTxnId());
if (txnState == null) {
throw new DmlException("transaction state is null for dbId:%s, txnId:%s", dbId, insertStmt.getTxnId());
List<String> tmpPartitionNames = Lists.newArrayList();
if (!isReplay) {
TransactionState txnState = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr()
.getTransactionState(dbId, insertStmt.getTxnId());
if (txnState == null) {
throw new DmlException("transaction state is null for dbId:%s, txnId:%s", dbId, insertStmt.getTxnId());
}
tmpPartitionNames = txnState.getCreatedPartitionNames();
job.setTmpPartitionIds(tmpPartitionNames.stream()
.map(name -> targetTable.getPartition(name, true).getId())
.collect(Collectors.toList()));
}
List<String> tmpPartitionNames = txnState.getCreatedPartitionNames();
for (String partitionName : tmpPartitionNames) {
Partition partition = targetTable.getPartition(partitionName, true);
if (partition != null) {
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
sourceTablets.addAll(index.getTablets());
}
targetTable.dropTempPartition(partitionName, true);
LOG.info("drop temp partition {} from dynamic overwrite job {}", partitionName, job.getJobId());
}
}
}
Expand Down Expand Up @@ -508,15 +517,23 @@ private void doCommit(boolean isReplay) {
.sum();
stats.setSourceRows(sumSourceRows);

LOG.info("overwrite job {} replace source partitions:{} to tmp partitions:{}", job.getJobId(),
sourcePartitionNames, tmpPartitionNames);
PartitionInfo partitionInfo = targetTable.getPartitionInfo();
if (partitionInfo.isRangePartition() || partitionInfo.getType() == PartitionType.LIST) {
if (job.isDynamicOverwrite()) {
TransactionState txnState = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr()
.getTransactionState(dbId, insertStmt.getTxnId());
if (txnState == null) {
throw new DmlException("transaction state is null for dbId:%s, txnId:%s", dbId, insertStmt.getTxnId());
if (!isReplay) {
TransactionState txnState = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr()
.getTransactionState(dbId, insertStmt.getTxnId());
if (txnState == null) {
throw new DmlException("transaction state is null dbId:%s, txnId:%s", dbId, insertStmt.getTxnId());
}
tmpPartitionNames = txnState.getCreatedPartitionNames();
job.setTmpPartitionIds(tmpPartitionNames.stream()
.map(name -> targetTable.getPartition(name, true).getId())
.collect(Collectors.toList()));
}
tmpPartitionNames = txnState.getCreatedPartitionNames();
LOG.info("dynamic overwrite job {} replace tmpPartitionNames:{}", job.getJobId(), tmpPartitionNames);
targetTable.replaceMatchPartitions(tmpPartitionNames);
} else {
targetTable.replaceTempPartitions(sourcePartitionNames, tmpPartitionNames, true, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public CreateInsertOverwriteJobLog(long jobId, long dbId, long tableId,
this.dbId = dbId;
this.tableId = tableId;
this.targetPartitionIds = targetPartitionIds;
this.dynamicOverwrite = dynamicOverwrite;
}

public long getJobId() {
Expand Down Expand Up @@ -78,6 +79,7 @@ public String toString() {
", dbId=" + dbId +
", tableId=" + tableId +
", targetPartitionIds=" + targetPartitionIds +
", dynamicOverwrite=" + dynamicOverwrite +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2279,7 +2279,7 @@ private static TCreatePartitionResult createPartitionProcess(TCreatePartitionReq
}
state.getLocalMetastore().addPartitions(ctx, db, olapTable.getName(), addPartitionClause);
} catch (Exception e) {
LOG.warn("failed to cancel alter operation", e);
LOG.warn("failed to add partitions", e);
errorStatus.setError_msgs(Lists.newArrayList(
String.format("automatic create partition failed. error:%s", e.getMessage())));
result.setStatus(errorStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ public Void visitAddPartitionClause(AddPartitionClause clause, ConnectContext co
upgradeDeprecatedSingleItemListPartitionDesc(olapTable, partitionDescList, clause, partitionInfo);
analyzeAddPartition(olapTable, partitionDescList, clause, partitionInfo);
} catch (DdlException | AnalysisException | NotImplementedException e) {
throw new SemanticException(e.getMessage());
throw new SemanticException(e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1377,9 +1377,6 @@ private static AddPartitionClause getAddPartitionClauseForRangePartition(
long interval = measure.getInterval();
Type firstPartitionColumnType = expressionRangePartitionInfo.getPartitionColumns(olapTable.getIdToColumn())
.get(0).getType();
if (partitionPrefix == null) {
partitionPrefix = DEFAULT_PARTITION_NAME_PREFIX;
}
Short replicationNum = olapTable.getTableProperty().getReplicationNum();
DistributionDesc distributionDesc = olapTable.getDefaultDistributionInfo()
.toDistributionDesc(olapTable.getIdToColumn());
Expand Down Expand Up @@ -1407,27 +1404,27 @@ private static AddPartitionClause getAddPartitionClauseForRangePartition(
switch (granularity.toLowerCase()) {
case "minute":
beginTime = beginTime.withSecond(0).withNano(0);
partitionName = partitionPrefix + beginTime.format(DateUtils.MINUTE_FORMATTER_UNIX);
partitionName = DEFAULT_PARTITION_NAME_PREFIX + beginTime.format(DateUtils.MINUTE_FORMATTER_UNIX);
endTime = beginTime.plusMinutes(interval);
break;
case "hour":
beginTime = beginTime.withMinute(0).withSecond(0).withNano(0);
partitionName = partitionPrefix + beginTime.format(DateUtils.HOUR_FORMATTER_UNIX);
partitionName = DEFAULT_PARTITION_NAME_PREFIX + beginTime.format(DateUtils.HOUR_FORMATTER_UNIX);
endTime = beginTime.plusHours(interval);
break;
case "day":
beginTime = beginTime.withHour(0).withMinute(0).withSecond(0).withNano(0);
partitionName = partitionPrefix + beginTime.format(DateUtils.DATEKEY_FORMATTER_UNIX);
partitionName = DEFAULT_PARTITION_NAME_PREFIX + beginTime.format(DateUtils.DATEKEY_FORMATTER_UNIX);
endTime = beginTime.plusDays(interval);
break;
case "month":
beginTime = beginTime.withDayOfMonth(1);
partitionName = partitionPrefix + beginTime.format(DateUtils.MONTH_FORMATTER_UNIX);
partitionName = DEFAULT_PARTITION_NAME_PREFIX + beginTime.format(DateUtils.MONTH_FORMATTER_UNIX);
endTime = beginTime.plusMonths(interval);
break;
case "year":
beginTime = beginTime.withDayOfYear(1);
partitionName = partitionPrefix + beginTime.format(DateUtils.YEAR_FORMATTER_UNIX);
partitionName = DEFAULT_PARTITION_NAME_PREFIX + beginTime.format(DateUtils.YEAR_FORMATTER_UNIX);
endTime = beginTime.plusYears(interval);
break;
default:
Expand All @@ -1436,6 +1433,13 @@ private static AddPartitionClause getAddPartitionClauseForRangePartition(
PartitionKeyDesc partitionKeyDesc =
createPartitionKeyDesc(firstPartitionColumnType, beginTime, endTime);

if (partitionPrefix != null) {
if (partitionPrefix.contains(PARTITION_NAME_PREFIX_SPLIT)) {
throw new AnalysisException("partition name prefix can not contain " + PARTITION_NAME_PREFIX_SPLIT);
}
partitionName = partitionPrefix + PARTITION_NAME_PREFIX_SPLIT + partitionName;
}

if (!partitionColNames.contains(partitionName)) {
SingleRangePartitionDesc singleRangePartitionDesc =
new SingleRangePartitionDesc(true, partitionName, partitionKeyDesc, partitionProperties);
Expand Down
43 changes: 43 additions & 0 deletions test/sql/test_dynamic_overwrite/R/test_overwrite
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ dynamic_overwrite true





-- name: test_dynamic_overwrite
create table t(k int, v int) partition by (k);
-- result:
Expand Down Expand Up @@ -52,6 +54,8 @@ select * from t;





-- name: test_dynamic_overwrite_on_other_table
create table t(k int) partition by range(k)
(
Expand Down Expand Up @@ -122,4 +126,43 @@ insert overwrite t1 values(0);
select * from t1;
-- result:
0
-- !result

-- name: test_overwrite_range_table
create table fv(k datetime, v int) partition by date_trunc('day', k);
-- result:
-- !result
set dynamic_overwrite=false;
-- result:
-- !result
insert overwrite fv values('2020-01-01', 4),('2020-01-02', 2);
-- result:
-- !result
select * from fv;
-- result:
2020-01-02 00:00:00 2
2020-01-01 00:00:00 4
-- !result
insert overwrite fv values('2020-01-03', 4),('2020-01-02', 2);
-- result:
-- !result
select * from fv;
-- result:
2020-01-02 00:00:00 2
2020-01-03 00:00:00 4
-- !result
set dynamic_overwrite=true;
-- result:
-- !result
insert overwrite fv values('2020-01-04', 3),('2020-01-02', 3);
-- result:
-- !result
select * from fv;
-- result:
2020-01-03 00:00:00 4
2020-01-04 00:00:00 3
2020-01-02 00:00:00 3
-- !result
set dynamic_overwrite=false;
-- result:
-- !result
12 changes: 12 additions & 0 deletions test/sql/test_dynamic_overwrite/T/test_overwrite
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,15 @@ select * from t1;
set dynamic_overwrite=true;
insert overwrite t1 values(0);
select * from t1;

-- name: test_overwrite_range_table
create table fv(k datetime, v int) partition by date_trunc('day', k);
set dynamic_overwrite=false;
insert overwrite fv values('2020-01-01', 4),('2020-01-02', 2);
select * from fv;
insert overwrite fv values('2020-01-03', 4),('2020-01-02', 2);
select * from fv;
set dynamic_overwrite=true;
insert overwrite fv values('2020-01-04', 3),('2020-01-02', 3);
select * from fv;
set dynamic_overwrite=false;

0 comments on commit d96a2fe

Please sign in to comment.