Skip to content

Commit

Permalink
branch-2.1: [fix](Export) Fix the problem of exporting stuck apache#4…
Browse files Browse the repository at this point in the history
…4944 (apache#45094)

Cherry-picked from apache#44944

Co-authored-by: Tiewei Fang <[email protected]>
  • Loading branch information
github-actions[bot] and BePPPower authored Dec 6, 2024
1 parent 3018e9a commit bea9564
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 23 deletions.
30 changes: 14 additions & 16 deletions fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,26 +102,24 @@ public void addExportJobAndRegisterTask(ExportJob job) throws Exception {
throw new LabelAlreadyUsedException(job.getLabel());
}
unprotectAddJob(job);
// delete existing files
if (Config.enable_delete_existing_files && Boolean.parseBoolean(job.getDeleteExistingFiles())) {
if (job.getBrokerDesc() == null) {
throw new AnalysisException("Local file system does not support delete existing files");
}
String fullPath = job.getExportPath();
BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, fullPath.lastIndexOf('/') + 1),
job.getBrokerDesc());
}
Env.getCurrentEnv().getEditLog().logExportCreate(job);
// ATTN: Must add task after edit log, otherwise the job may finish before adding job.
job.getCopiedTaskExecutors().forEach(executor -> {
Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor);
});
LOG.info("add export job. {}", job);

} finally {
writeUnlock();
}

// delete existing files
if (Config.enable_delete_existing_files && Boolean.parseBoolean(job.getDeleteExistingFiles())) {
if (job.getBrokerDesc() == null) {
throw new AnalysisException("Local file system does not support delete existing files");
}
String fullPath = job.getExportPath();
BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, fullPath.lastIndexOf('/') + 1),
job.getBrokerDesc());
}
// ATTN: Must add task after edit log, otherwise the job may finish before adding job.
for (int i = 0; i < job.getCopiedTaskExecutors().size(); i++) {
Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(job.getCopiedTaskExecutors().get(i));
}
LOG.info("add export job. {}", job);
}

public void cancelExportJob(CancelExportStmt stmt) throws DdlException, AnalysisException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.scheduler.constants.TaskType;
import org.apache.doris.scheduler.exception.JobException;

import com.lmax.disruptor.EventTranslatorThreeArg;
import com.lmax.disruptor.LiteTimeoutBlockingWaitStrategy;
Expand Down Expand Up @@ -119,15 +120,17 @@ public void tryPublish(Long jobId, Long taskId, TaskType taskType) {
*
* @param taskId task id
*/
public void tryPublishTask(Long taskId) {
public void tryPublishTask(Long taskId) throws JobException {
if (isClosed) {
log.info("tryPublish failed, disruptor is closed, taskId: {}", taskId);
return;
}
try {
// We reserve two slots in the ring buffer
// to prevent it from becoming stuck due to competition between producers and consumers.
if (disruptor.getRingBuffer().hasAvailableCapacity(2)) {
disruptor.publishEvent(TRANSLATOR, taskId, 0L, TaskType.TRANSIENT_TASK);
} catch (Exception e) {
log.warn("tryPublish failed, taskId: {}", taskId, e);
} else {
throw new JobException("There is not enough available capacity in the RingBuffer.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.TransientTaskExecutor;

import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -41,7 +40,6 @@ public class TransientTaskManager {
* disruptor is used to handle task
* disruptor will start a thread pool to handle task
*/
@Setter
private TaskDisruptor disruptor;

public TransientTaskManager() {
Expand All @@ -56,7 +54,7 @@ public TransientTaskExecutor getMemoryTaskExecutor(Long taskId) {
return taskExecutorMap.get(taskId);
}

public Long addMemoryTask(TransientTaskExecutor executor) {
public Long addMemoryTask(TransientTaskExecutor executor) throws JobException {
Long taskId = executor.getId();
taskExecutorMap.put(taskId, executor);
disruptor.tryPublishTask(taskId);
Expand Down

0 comments on commit bea9564

Please sign in to comment.