Skip to content

Commit

Permalink
modify review
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiqiang-hhhh committed Sep 6, 2023
1 parent 9635b21 commit 68b74d4
Showing 1 changed file with 45 additions and 67 deletions.
112 changes: 45 additions & 67 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1262,81 +1262,59 @@ public boolean shouldCancel(List<Backend> currentBackends) {
for (Backend be : currentBackends) {
curBeMap.put(be.getId(), be);
}
lock();

for (Long id : idToBackend.keySet()) {
Backend be = curBeMap.get(id);

if (be == null) {
// Be not exists in latest be snapshot
unlock();
LOG.warn("Backend {} not exists, query {} should be cancelled",
id, DebugUtil.printId(queryId));
return true;
} else if (!be.isAlive()) {
unlock();
LOG.warn("Backend {} dead, coordinator {} should be cancelled.",
be.getId(), DebugUtil.printId(queryId));
return true;
}
}
try {
lock();

if (queryOptions.isEnablePipelineEngine()) {
for (PipelineExecContext pipelineExecContext : pipelineExecContexts.values()) {
Backend be = curBeMap.get(pipelineExecContext.backend.getId());
if (be == null || !be.isAlive()) {
LOG.warn("Backend {} not exists or dead, query {} should be cancelled",
pipelineExecContext.backend.toString(), DebugUtil.printId(queryId));
return true;
}

// Usually, if any be dead or restarted, this function should have returned true
// in above for loop.
// But there is a situation where a backend has finished restart, and hb has already
// updated before this function is involved, so we need to compare the backend process epoch
// still.
if (queryOptions.isEnablePipelineEngine()) {
for (PipelineExecContext pipelineExecContext : pipelineExecContexts.values()) {
// Here we have an implicit consumption that pipelineExecContexts and idToBackend
// are logical correct, so be could not be null.
Backend be = curBeMap.get(pipelineExecContext.backend.getId());
if (be == null || !be.isAlive()) {
// Should not happen.
unlock();
LOG.error("Logical error in Coordinator, backend: {}", pipelineExecContext.backend.toString());
return true;
// Backend process epoch changed, indicates that this be restarts, query should be cancelled.
// Check zero since during upgrading, older version oplog will not persistent be start time
// so newer version follower will get zero epoch when replaying oplog or snapshot
if (pipelineExecContext.beProcessEpoch != be.getProcessEpoch() && be.getProcessEpoch() != 0) {
LOG.warn("Backend process epoch changed, previous {} now {}, "
+ "means this be has already restarted, should cancel this coordinator,"
+ " query id {}",
pipelineExecContext.beProcessEpoch, be.getProcessEpoch(),
DebugUtil.printId(queryId));
return true;
} else if (be.getProcessEpoch() == 0) {
LOG.warn("Backend {} has zero process epoch, maybe we are upgrading cluster?",
be.toString());
}
}
} else {
// beToExecStates will be updated only in non-pipeline query.
for (BackendExecStates beExecState : beToExecStates.values()) {
Backend be = curBeMap.get(beExecState.beId);
if (be == null || !be.isAlive()) {
LOG.warn("Backend {} not exists or dead, query {} should be cancelled.",
beExecState.beId, DebugUtil.printId(queryId));
return true;
}

// Backend process epoch changed, indicates that this be restarts, query should be cancelled.
// Check zero since during upgrading, older version oplog will not persistent be start time
// so newer version follower will get zero epoch when replaying oplog or snapshot
if (pipelineExecContext.beProcessEpoch != be.getProcessEpoch() && be.getProcessEpoch() != 0) {
unlock();
LOG.warn("Backend process epoch changed, previous {} now {}, means this be has already restarted, "
+ "should cancel this coordinator, query id {}",
pipelineExecContext.beProcessEpoch, be.getProcessEpoch(), DebugUtil.printId(queryId));
return true;
} else if (be.getProcessEpoch() == 0) {
LOG.warn("Backend {} has zero process epoch, maybe we are upgrading cluster?", be.toString());
if (beExecState.beProcessEpoch != be.getProcessEpoch() && be.getProcessEpoch() != 0) {
LOG.warn("Process epoch changed, previous {} now {}, means this be has already restarted, "
+ "should cancel this coordinator, query id {}",
beExecState.beProcessEpoch, be.getProcessEpoch(), DebugUtil.printId(queryId));
return true;
} else if (be.getProcessEpoch() == 0) {
LOG.warn("Backend {} has zero process epoch, maybe we are upgrading cluster?", be.toString());
}
}
}
} else {
// beToExecStates will be updated only in non-pipeline query.
for (BackendExecStates beExecState : beToExecStates.values()) {
Backend be = curBeMap.get(beExecState.beId);
if (be == null || !be.isAlive()) {
// Should not happen.
unlock();
LOG.error("Logical error in Coordinator, backendId: {}, beProcessEpoch: {}",
beExecState.beId, beExecState.beProcessEpoch);
return true;
}

if (beExecState.beProcessEpoch != be.getProcessEpoch() && be.getProcessEpoch() != 0) {
unlock();
LOG.warn("Process epoch changed, previous {} now {}, means this be has already restarted, "
+ "should cancel this coordinator, query id {}",
beExecState.beProcessEpoch, be.getProcessEpoch(), DebugUtil.printId(queryId));
return true;
} else if (be.getProcessEpoch() == 0) {
LOG.warn("Backend {} has zero process epoch, maybe we are upgrading cluster?", be.toString());
}
}
return false;
} finally {
unlock();
}

unlock();
return false;
}

// Cancel execution of query. This includes the execution of the local plan
Expand Down

0 comments on commit 68b74d4

Please sign in to comment.