-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix some routine load bugs #2093
Conversation
@@ -757,7 +758,10 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String | |||
.add("error_msg", "change job state to paused when task has been aborted with error " + e.getMessage()) | |||
.build(), e); | |||
} finally { | |||
writeUnlock(); | |||
if (lock.isWriteLockedByCurrentThread()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function of afterAborted
could not be call individually. It already explain in comments at the top of this function.
So the case A is illegal. The timeout of txn should not call this function individually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It is weird. I will make this function more readable
be/src/common/config.h
Outdated
@@ -434,6 +434,10 @@ namespace config { | |||
// max consumer num in one data consumer group, for routine load | |||
CONF_Int32(max_consumer_num_per_group, "3"); | |||
|
|||
// the size of thread pool for routine load task. | |||
// this should be slightly larger than FE config 'max_concurrent_task_num_per_be' (default 10) | |||
CONF_Int32(routine_load_thread_pool_size, "12"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concurrency of task per BE has been limited by FE. This config is not useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just self-protection of BE, in case that FE has some bugs...
But I will simplify the configurations of FE, to make things simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the restriction of the task num between FE and BE increase the degree of difficulty
@@ -86,18 +86,6 @@ private void process() throws UserException { | |||
.build()); | |||
continue; | |||
} | |||
int currentTotalTaskNum = routineLoadManager.getSizeOfIdToRoutineLoadTask(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are no limit about the Routine Load Job. The queue of task will continually increase till the memory of FE is going to use out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have job num limit. the config is desired_max_waiting_jobs
of FE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure about that ?
private boolean submitTask(long beId, TRoutineLoadTask tTask) { | ||
Backend backend = Catalog.getCurrentSystemInfo().getBackend(beId); | ||
if (backend == null) { | ||
LOG.warn("failed to send tasks to backend {} because not exist", beId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use the new LogBuilder
ClientPool.backendPool.invalidateObject(address, client); | ||
} | ||
} catch (Exception e) { | ||
LOG.warn("task send error. backend[{}]", beId, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use the new Log Builder
// just print a log, it does not matter. | ||
LOG.warn("after abort timeout txn failed. txn id: {}", abortedTxn.getTransactionId(), e); | ||
// abort may be failed. it is acceptable. just print a log | ||
LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage()); | |
LOG.warn("abort timeout txn {} failed. msg: {}", txnId, e.getMessage(), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
print stack trance is unnecessary here, and will make fe.log ugly
@@ -254,6 +258,13 @@ public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) th | |||
ConnectContext.get().getRemoteIP(), | |||
tableName); | |||
} | |||
|
|||
if (getRoutineLoadJobByState(Sets.newHashSet(RoutineLoadJob.JobState.NEED_SCHEDULE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The paused job need to be include.
@@ -254,6 +258,13 @@ public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) th | |||
ConnectContext.get().getRemoteIP(), | |||
tableName); | |||
} | |||
|
|||
if (getRoutineLoadJobByState(Sets.newHashSet(RoutineLoadJob.JobState.NEED_SCHEDULE, | |||
RoutineLoadJob.JobState.RUNNING)).size() > Config.desired_max_waiting_jobs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RoutineLoadJob.JobState.RUNNING)).size() > Config.desired_max_waiting_jobs) { | |
RoutineLoadJob.JobState.RUNNING)).size() > Config.max_routine_load_jobs) { |
@@ -282,7 +280,7 @@ public void testGetMinTaskBeId() throws LoadException { | |||
beIdToConcurrentTaskMap.put(1L, 1); | |||
|
|||
new Expectations(routineLoadManager) {{ | |||
invoke(routineLoadManager, "getBeIdConcurrentTaskMaps"); | |||
invoke(routineLoadManager, "getBeCurrentTasksNumMap"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
invoke(routineLoadManager, "getBeCurrentTasksNumMap"); | |
invoke(routineLoadManager, "getBeCurrentTasksNumMap"); |
@@ -364,10 +363,11 @@ public void testGetTotalIdleTaskNum() { | |||
Map<Long, Integer> beIdToConcurrentTaskMap = Maps.newHashMap(); | |||
beIdToConcurrentTaskMap.put(1L, 1); | |||
new Expectations(routineLoadManager) {{ | |||
invoke(routineLoadManager, "getBeIdConcurrentTaskMaps"); | |||
invoke(routineLoadManager, "getBeCurrentTasksNumMap"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
invoke(routineLoadManager, "getBeCurrentTasksNumMap"); | |
invoke(routineLoadManager, "getBeCurrentTasksNumMap"); |
@@ -496,12 +510,13 @@ public boolean checkTaskInJob(UUID taskId) { | |||
return false; | |||
} | |||
|
|||
public List<RoutineLoadJob> getRoutineLoadJobByState(RoutineLoadJob.JobState jobState) { | |||
public List<RoutineLoadJob> getRoutineLoadJobByState(Set<RoutineLoadJob.JobState> desiredStates) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public List<RoutineLoadJob> getRoutineLoadJobByState(Set<RoutineLoadJob.JobState> desiredStates) { | |
public List<RoutineLoadJob> getRoutineLoadJobByState(RoutineLoadJob.JobState ...states) { |
RoutineLoadJob.JobState.RUNNING)).size() > Config.desired_max_waiting_jobs) { | ||
throw new DdlException("There are more then " + Config.desired_max_waiting_jobs | ||
RoutineLoadJob.JobState.RUNNING, RoutineLoadJob.JobState.PAUSED)).size() > Config.max_routine_load_job_num) { | ||
throw new DdlException("There are more then " + Config.max_routine_load_job_num |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This restriction does not need in here.
LGTM |
Mainly fix the following issues: 1. A null pointer exception is raised when a database or table is dropped. The expected behavior is that the routine load job is stopped. 2. Memory leaks. Batch routine load task submissions are no longer performed, and modifications are submitted separately for each task. 3. Unreasonable task timeout. Routine load tasks should not be queued in the BE thread pool for execution. The task sent to the BE should be executed immediately, otherwise the task in the FE will be timeout first. Eventually leads to constant timeout for all subsequent tasks. 4. All routine load job should be scheduled once it being submitted. Not waiting the available BE slot. Otherwise, all later submitted jobs may not be scheduled forever.
Mainly fix the following issues:
A null pointer exception is raised when a database or table is dropped. The expected behavior is that the routine load job is stopped.
Memory leaks. Batch routine load task submissions are no longer performed, and modifications are submitted separately for each task.
Unreasonable task timeout.
Routine load tasks should not be queued in the BE thread pool for execution. The task sent to the BE should be executed immediately, otherwise the task in the FE will be timeout first. Eventually leads to constant timeout for all subsequent tasks.
All routine load job should be scheduled once it being submitted. Not waiting the available BE slot. Otherwise, all later submitted jobs may not be scheduled forever.
ISSUE #2065