diff --git a/.github/workflows/cluster-test/mysql_with_mysql_registry/dolphinscheduler_env.sh b/.github/workflows/cluster-test/mysql_with_mysql_registry/dolphinscheduler_env.sh index 58937e740c14..8536eb0905a9 100755 --- a/.github/workflows/cluster-test/mysql_with_mysql_registry/dolphinscheduler_env.sh +++ b/.github/workflows/cluster-test/mysql_with_mysql_registry/dolphinscheduler_env.sh @@ -28,7 +28,6 @@ export SPRING_DATASOURCE_PASSWORD=123456 # DolphinScheduler server related configuration export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none} export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC} -export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10} # Registry center configuration, determines the type and link of the registry center export REGISTRY_TYPE=${REGISTRY_TYPE:-jdbc} diff --git a/.github/workflows/cluster-test/mysql_with_zookeeper_registry/dolphinscheduler_env.sh b/.github/workflows/cluster-test/mysql_with_zookeeper_registry/dolphinscheduler_env.sh index 671c70a5bba5..f64e59b768c5 100755 --- a/.github/workflows/cluster-test/mysql_with_zookeeper_registry/dolphinscheduler_env.sh +++ b/.github/workflows/cluster-test/mysql_with_zookeeper_registry/dolphinscheduler_env.sh @@ -28,7 +28,6 @@ export SPRING_DATASOURCE_PASSWORD=123456 # DolphinScheduler server related configuration export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none} export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC} -export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10} # Registry center configuration, determines the type and link of the registry center export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper} diff --git a/.github/workflows/cluster-test/postgresql_with_postgresql_registry/dolphinscheduler_env.sh b/.github/workflows/cluster-test/postgresql_with_postgresql_registry/dolphinscheduler_env.sh index e7fd1b7204a5..29f8570319b1 100644 --- a/.github/workflows/cluster-test/postgresql_with_postgresql_registry/dolphinscheduler_env.sh +++ b/.github/workflows/cluster-test/postgresql_with_postgresql_registry/dolphinscheduler_env.sh @@ -28,7 +28,6 @@ export SPRING_DATASOURCE_PASSWORD=postgres # DolphinScheduler server related configuration export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none} export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC} -export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10} # Registry center configuration, determines the type and link of the registry center export REGISTRY_TYPE=jdbc diff --git a/.github/workflows/cluster-test/postgresql_with_zookeeper_registry/dolphinscheduler_env.sh b/.github/workflows/cluster-test/postgresql_with_zookeeper_registry/dolphinscheduler_env.sh index 1dbd63254eee..685171605850 100644 --- a/.github/workflows/cluster-test/postgresql_with_zookeeper_registry/dolphinscheduler_env.sh +++ b/.github/workflows/cluster-test/postgresql_with_zookeeper_registry/dolphinscheduler_env.sh @@ -28,7 +28,6 @@ export SPRING_DATASOURCE_PASSWORD=postgres # DolphinScheduler server related configuration export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none} export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC} -export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10} # Registry center configuration, determines the type and link of the registry center export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper} diff --git a/docs/docs/en/architecture/configuration.md b/docs/docs/en/architecture/configuration.md index 13d89329439b..fe0b7851bae1 100644 --- a/docs/docs/en/architecture/configuration.md +++ b/docs/docs/en/architecture/configuration.md @@ -286,7 +286,6 @@ Location: `master-server/conf/application.yaml` | Parameters | Default value | Description | |-----------------------------------------------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | master.listen-port | 5678 | master listen port | -| master.fetch-command-num | 10 | the number of commands fetched by master | | master.pre-exec-threads | 10 | master prepare execute thread number to limit handle commands in parallel | | master.exec-threads | 100 | master execute thread number to limit process instances in parallel | | master.dispatch-task-number | 3 | master dispatch task number per batch | @@ -305,6 +304,9 @@ Location: `master-server/conf/application.yaml` | master.registry-disconnect-strategy.strategy | stop | Used when the master disconnect from registry, default value: stop. Optional values include stop, waiting | | master.registry-disconnect-strategy.max-waiting-time | 100s | Used when the master disconnect from registry, and the disconnect strategy is waiting, this config means the master will waiting to reconnect to registry in given times, and after the waiting times, if the master still cannot connect to registry, will stop itself, if the value is 0s, the Master will wait infinitely | | master.worker-group-refresh-interval | 10s | The interval to refresh worker group from db to memory | +| master.command-fetch-strategy.type | ID_SLOT_BASED | The command fetch strategy, only support `ID_SLOT_BASED` | +| master.command-fetch-strategy.config.id-step | 1 | The id auto incremental step of t_ds_command in db | +| master.command-fetch-strategy.config.fetch-size | 10 | The number of commands fetched by master | ### Worker Server related configuration diff --git a/docs/docs/en/guide/installation/pseudo-cluster.md b/docs/docs/en/guide/installation/pseudo-cluster.md index e63436f203bb..7a3b43b00ef2 100644 --- a/docs/docs/en/guide/installation/pseudo-cluster.md +++ b/docs/docs/en/guide/installation/pseudo-cluster.md @@ -123,7 +123,6 @@ export SPRING_DATASOURCE_PASSWORD={password} # DolphinScheduler server related configuration export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none} export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC} -export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10} # Registry center configuration, determines the type and link of the registry center export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper} diff --git a/docs/docs/zh/architecture/configuration.md b/docs/docs/zh/architecture/configuration.md index 08fded19e069..d8d1d42d1eb8 100644 --- a/docs/docs/zh/architecture/configuration.md +++ b/docs/docs/zh/architecture/configuration.md @@ -281,29 +281,30 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId 位置:`master-server/conf/application.yaml` -| 参数 | 默认值 | 描述 | -|-----------------------------------------------------------------------------|--------------|-----------------------------------------------------------------------------------------| -| master.listen-port | 5678 | master监听端口 | -| master.fetch-command-num | 10 | master拉取command数量 | -| master.pre-exec-threads | 10 | master准备执行任务的数量,用于限制并行的command | -| master.exec-threads | 100 | master工作线程数量,用于限制并行的流程实例数量 | -| master.dispatch-task-number | 3 | master每个批次的派发任务数量 | -| master.host-selector | lower_weight | master host选择器,用于选择合适的worker执行任务,可选值: random, round_robin, lower_weight | -| master.max-heartbeat-interval | 10s | master最大心跳间隔 | -| master.task-commit-retry-times | 5 | 任务重试次数 | -| master.task-commit-interval | 1000 | 任务提交间隔,单位为毫秒 | -| master.state-wheel-interval | 5 | 轮询检查状态时间 | -| master.server-load-protection.enabled | true | 是否开启系统保护策略 | -| master.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.7 | master最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统CPU | -| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.7 | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU | -| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | master最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统内存 | -| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 | -| master.failover-interval | 10 | failover间隔,单位为分钟 | -| master.kill-application-when-task-failover | true | 当任务实例failover时,是否kill掉yarn或k8s application | -| master.registry-disconnect-strategy.strategy | stop | 当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting | -| master.registry-disconnect-strategy.max-waiting-time | 100s | 当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, | -| 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 | -| master.master.worker-group-refresh-interval | 10s | 定期将workerGroup从数据库中同步到内存的时间间隔 | +| 参数 | 默认值 | 描述 | +|-----------------------------------------------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------| +| master.listen-port | 5678 | master监听端口 | +| master.pre-exec-threads | 10 | master准备执行任务的数量,用于限制并行的command | +| master.exec-threads | 100 | master工作线程数量,用于限制并行的流程实例数量 | +| master.dispatch-task-number | 3 | master每个批次的派发任务数量 | +| master.host-selector | lower_weight | master host选择器,用于选择合适的worker执行任务,可选值: random, round_robin, lower_weight | +| master.max-heartbeat-interval | 10s | master最大心跳间隔 | +| master.task-commit-retry-times | 5 | 任务重试次数 | +| master.task-commit-interval | 1000 | 任务提交间隔,单位为毫秒 | +| master.state-wheel-interval | 5 | 轮询检查状态时间 | +| master.server-load-protection.enabled | true | 是否开启系统保护策略 | +| master.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.7 | master最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统CPU | +| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.7 | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU | +| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | master最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统内存 | +| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 | +| master.failover-interval | 10 | failover间隔,单位为分钟 | +| master.kill-application-when-task-failover | true | 当任务实例failover时,是否kill掉yarn或k8s application | +| master.registry-disconnect-strategy.strategy | stop | 当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting | +| master.registry-disconnect-strategy.max-waiting-time | 100s | 当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 | +| master.master.worker-group-refresh-interval | 10s | 定期将workerGroup从数据库中同步到内存的时间间隔 | +| master.command-fetch-strategy.type | ID_SLOT_BASED | Command拉取策略, 目前仅支持 `ID_SLOT_BASED` | +| master.command-fetch-strategy.config.id-step | 1 | 数据库中t_ds_command的id自增步长 | +| master.command-fetch-strategy.config.fetch-size | 10 | master拉取command数量 | ## Worker Server相关配置 diff --git a/docs/docs/zh/guide/installation/pseudo-cluster.md b/docs/docs/zh/guide/installation/pseudo-cluster.md index 13479e0d9e05..a199167e0445 100644 --- a/docs/docs/zh/guide/installation/pseudo-cluster.md +++ b/docs/docs/zh/guide/installation/pseudo-cluster.md @@ -118,7 +118,6 @@ export SPRING_DATASOURCE_PASSWORD={password} # DolphinScheduler server related configuration export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none} export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC} -export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10} # Registry center configuration, determines the type and link of the registry center export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java index 9fb664322764..8c8314e7ccaa 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java @@ -52,14 +52,10 @@ List countCommandState( */ List queryCommandPage(@Param("limit") int limit, @Param("offset") int offset); - /** - * query command page by slot - * - * @return command list - */ - List queryCommandPageBySlot(@Param("limit") int limit, - @Param("masterCount") int masterCount, - @Param("thisMasterSlot") int thisMasterSlot); + List queryCommandByIdSlot(@Param("currentSlotIndex") int currentSlotIndex, + @Param("totalSlot") int totalSlot, + @Param("idStep") int idStep, + @Param("fetchNumber") int fetchNum); void deleteByWorkflowInstanceIds(@Param("workflowInstanceIds") List workflowInstanceIds); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java index 2937957dbdc4..664b56ee472c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java @@ -56,6 +56,11 @@ public List queryByIds(Collection ids) { return mybatisMapper.selectBatchIds(ids); } + @Override + public List queryAll() { + return mybatisMapper.selectList(null); + } + @Override public List queryByCondition(ENTITY queryCondition) { if (queryCondition == null) { diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java new file mode 100644 index 000000000000..daa52b83181d --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository; + +import org.apache.dolphinscheduler.dao.entity.Command; + +import java.util.List; + +public interface CommandDao extends IDao { + + /** + * Query command by command id and server slot, return the command which match (commandId / step) %s totalSlot = currentSlotIndex + * + * @param currentSlotIndex current slot index + * @param totalSlot total slot number + * @param idStep id step in db + * @param fetchNum fetch number + * @return command list + */ + List queryCommandByIdSlot(int currentSlotIndex, + int totalSlot, + int idStep, + int fetchNum); +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java index c566d9b90402..ab774196003f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java @@ -41,6 +41,11 @@ public interface IDao { */ List queryByIds(Collection ids); + /** + * Query all entities. + */ + List queryAll(); + /** * Query the entity by condition. */ diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java new file mode 100644 index 000000000000..0b510d15b51d --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.mapper.CommandMapper; +import org.apache.dolphinscheduler.dao.repository.BaseDao; +import org.apache.dolphinscheduler.dao.repository.CommandDao; + +import java.util.List; + +import org.springframework.stereotype.Repository; + +@Repository +public class CommandDaoImpl extends BaseDao implements CommandDao { + + public CommandDaoImpl(CommandMapper commandMapper) { + super(commandMapper); + } + + @Override + public List queryCommandByIdSlot(int currentSlotIndex, int totalSlot, int idStep, int fetchNum) { + return mybatisMapper.queryCommandByIdSlot(currentSlotIndex, totalSlot, idStep, fetchNum); + } + +} diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml index 56db890ef07a..16f7c05f2534 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml @@ -40,12 +40,12 @@ limit #{limit} offset #{offset} - select * from t_ds_command - where id % #{masterCount} = #{thisMasterSlot} + where (id / #{idStep}) % #{totalSlot} = #{currentSlotIndex} order by process_instance_priority, id asc - limit #{limit} + limit #{fetchNumber} delete from t_ds_command diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java index 2d367e46e4c4..560b68754a58 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java @@ -187,7 +187,7 @@ private boolean toTestQueryCommandPageBySlot(int masterCount, int thisMasterSlot Command command = createCommand(); Integer id = command.getId(); boolean hit = id % masterCount == thisMasterSlot; - List commandList = commandMapper.queryCommandPageBySlot(1, masterCount, thisMasterSlot); + List commandList = commandMapper.queryCommandByIdSlot(thisMasterSlot, masterCount, 1, 1); if (hit) { Assertions.assertEquals(id, commandList.get(0).getId()); } else { @@ -201,8 +201,9 @@ private boolean toTestQueryCommandPageBySlot(int masterCount, int thisMasterSlot /** * create command map - * @param count map count - * @param commandType comman type + * + * @param count map count + * @param commandType comman type * @param processDefinitionCode process definition code * @return command map */ @@ -223,7 +224,8 @@ private CommandCount createCommandMap( } /** - * create process definition + * create process definition + * * @return process definition */ private ProcessDefinition createProcessDefinition() { @@ -243,6 +245,7 @@ private ProcessDefinition createProcessDefinition() { /** * create command map + * * @param count map count * @return command map */ @@ -258,6 +261,7 @@ private Map createCommandMap(Integer count) { /** * create command + * * @return */ private Command createCommand() { @@ -266,6 +270,7 @@ private Command createCommand() { /** * create command + * * @return Command */ private Command createCommand(CommandType commandType, long processDefinitionCode) { diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java new file mode 100644 index 000000000000..85867ef3b591 --- /dev/null +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import static com.google.common.truth.Truth.assertThat; + +import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.dao.BaseDaoTest; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.repository.CommandDao; + +import org.apache.commons.lang3.RandomUtils; + +import java.util.List; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +class CommandDaoImplTest extends BaseDaoTest { + + @Autowired + private CommandDao commandDao; + + @Test + void fetchCommandByIdSlot() { + int commandSize = RandomUtils.nextInt(1, 1000); + for (int i = 0; i < commandSize; i++) { + createCommand(CommandType.START_PROCESS, 0); + } + int totalSlot = RandomUtils.nextInt(1, 10); + int currentSlotIndex = RandomUtils.nextInt(0, totalSlot); + int fetchSize = RandomUtils.nextInt(10, 100); + for (int i = 1; i < 5; i++) { + int idStep = i; + List commands = commandDao.queryCommandByIdSlot(currentSlotIndex, totalSlot, idStep, fetchSize); + assertThat(commands.size()).isGreaterThan(0); + assertThat(commands.size()) + .isEqualTo(commandDao.queryAll() + .stream() + .filter(command -> (command.getId() / idStep) % totalSlot == currentSlotIndex) + .limit(fetchSize) + .count()); + + } + + } + + private void createCommand(CommandType commandType, int processDefinitionCode) { + Command command = new Command(); + command.setCommandType(commandType); + command.setProcessDefinitionCode(processDefinitionCode); + command.setExecutorId(4); + command.setCommandParam("test command param"); + command.setTaskDependType(TaskDependType.TASK_ONLY); + command.setFailureStrategy(FailureStrategy.CONTINUE); + command.setWarningType(WarningType.ALL); + command.setWarningGroupId(1); + command.setScheduleTime(DateUtils.stringToDate("2019-12-29 12:10:00")); + command.setProcessInstancePriority(Priority.MEDIUM); + command.setStartTime(DateUtils.stringToDate("2019-12-29 10:10:00")); + command.setUpdateTime(DateUtils.stringToDate("2019-12-29 10:10:00")); + command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP); + command.setProcessInstanceId(0); + command.setProcessDefinitionVersion(0); + commandDao.insert(command); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/CommandFetcherConfiguration.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/CommandFetcherConfiguration.java new file mode 100644 index 000000000000..4a4d3c1efc65 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/CommandFetcherConfiguration.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.command; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.dolphinscheduler.dao.repository.CommandDao; +import org.apache.dolphinscheduler.server.master.config.CommandFetchStrategy; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class CommandFetcherConfiguration { + + @Bean + public ICommandFetcher commandFetcher(MasterConfig masterConfig, + MasterSlotManager masterSlotManager, + CommandDao commandDao) { + CommandFetchStrategy commandFetchStrategy = + checkNotNull(masterConfig.getCommandFetchStrategy(), "command fetch strategy is null"); + switch (commandFetchStrategy.getType()) { + case ID_SLOT_BASED: + CommandFetchStrategy.IdSlotBasedFetchConfig idSlotBasedFetchConfig = + (CommandFetchStrategy.IdSlotBasedFetchConfig) commandFetchStrategy.getConfig(); + return new IdSlotBasedCommandFetcher(idSlotBasedFetchConfig, masterSlotManager, commandDao); + default: + throw new IllegalArgumentException( + "unsupported command fetch strategy type: " + commandFetchStrategy.getType()); + } + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/ICommandFetcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/ICommandFetcher.java new file mode 100644 index 000000000000..c315a9b29497 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/ICommandFetcher.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.command; + +import org.apache.dolphinscheduler.dao.entity.Command; + +import java.util.List; + +/** + * The command fetcher used to fetch commands + */ +public interface ICommandFetcher { + + /** + * Fetch commands + * + * @return command list which need to be handled + */ + List fetchCommands(); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/IdSlotBasedCommandFetcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/IdSlotBasedCommandFetcher.java new file mode 100644 index 000000000000..a4178200938e --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/IdSlotBasedCommandFetcher.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.command; + +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.repository.CommandDao; +import org.apache.dolphinscheduler.server.master.config.CommandFetchStrategy; +import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; +import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager; + +import java.util.Collections; +import java.util.List; + +import lombok.extern.slf4j.Slf4j; + +/** + * The command fetcher which is fetch commands by command id and slot. + */ +@Slf4j +public class IdSlotBasedCommandFetcher implements ICommandFetcher { + + private final CommandFetchStrategy.IdSlotBasedFetchConfig idSlotBasedFetchConfig; + + private final CommandDao commandDao; + + private final MasterSlotManager masterSlotManager; + + public IdSlotBasedCommandFetcher(CommandFetchStrategy.IdSlotBasedFetchConfig idSlotBasedFetchConfig, + MasterSlotManager masterSlotManager, + CommandDao commandDao) { + this.idSlotBasedFetchConfig = idSlotBasedFetchConfig; + this.masterSlotManager = masterSlotManager; + this.commandDao = commandDao; + } + + @Override + public List fetchCommands() { + long scheduleStartTime = System.currentTimeMillis(); + int currentSlotIndex = masterSlotManager.getSlot(); + int totalSlot = masterSlotManager.getMasterSize(); + if (totalSlot <= 0 || currentSlotIndex < 0) { + log.warn("Slot is validated, current master slots: {}, the current slot index is {}", totalSlot, + currentSlotIndex); + return Collections.emptyList(); + } + List commands = commandDao.queryCommandByIdSlot( + currentSlotIndex, + totalSlot, + idSlotBasedFetchConfig.getIdStep(), + idSlotBasedFetchConfig.getFetchSize()); + long cost = System.currentTimeMillis() - scheduleStartTime; + log.info("Fetch commands: {} success, cost: {}ms, totalSlot: {}, currentSlotIndex: {}", commands.size(), cost, + totalSlot, currentSlotIndex); + ProcessInstanceMetrics.recordCommandQueryTime(cost); + return commands; + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/CommandFetchStrategy.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/CommandFetchStrategy.java new file mode 100644 index 000000000000..e61941677c65 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/CommandFetchStrategy.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.config; + +import lombok.Data; + +import org.springframework.validation.Errors; + +@Data +public class CommandFetchStrategy { + + private CommandFetchStrategyType type = CommandFetchStrategyType.ID_SLOT_BASED; + + private CommandFetchConfig config = new IdSlotBasedFetchConfig(); + + public void validate(Errors errors) { + config.validate(errors); + } + + public enum CommandFetchStrategyType { + ID_SLOT_BASED, + ; + } + + public interface CommandFetchConfig { + + void validate(Errors errors); + + } + + @Data + public static class IdSlotBasedFetchConfig implements CommandFetchConfig { + + private int idStep = 1; + private int fetchSize = 10; + + @Override + public void validate(Errors errors) { + if (idStep <= 0) { + errors.rejectValue("step", null, "step must be greater than 0"); + } + if (fetchSize <= 0) { + errors.rejectValue("fetchSize", null, "fetchSize must be greater than 0"); + } + } + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index 02c0dcb819d6..20d3cccef3e2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -48,10 +48,6 @@ public class MasterConfig implements Validator { * The master RPC server listen port. */ private int listenPort = 5678; - /** - * The max batch size used to fetch command from database. - */ - private int fetchCommandNum = 10; /** * The thread number used to prepare processInstance. This number shouldn't bigger than fetchCommandNum. */ @@ -98,6 +94,8 @@ public class MasterConfig implements Validator { private Duration workerGroupRefreshInterval = Duration.ofSeconds(10L); + private CommandFetchStrategy commandFetchStrategy = new CommandFetchStrategy(); + // ip:listenPort private String masterAddress; @@ -115,9 +113,6 @@ public void validate(Object target, Errors errors) { if (masterConfig.getListenPort() <= 0) { errors.rejectValue("listen-port", null, "is invalidated"); } - if (masterConfig.getFetchCommandNum() <= 0) { - errors.rejectValue("fetch-command-num", null, "should be a positive value"); - } if (masterConfig.getPreExecThreads() <= 0) { errors.rejectValue("per-exec-threads", null, "should be a positive value"); } @@ -149,6 +144,7 @@ public void validate(Object target, Errors errors) { if (StringUtils.isEmpty(masterConfig.getMasterAddress())) { masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort())); } + commandFetchStrategy.validate(errors); masterConfig.setMasterRegistryPath( RegistryNodeType.MASTER.getRegistryPath() + "/" + masterConfig.getMasterAddress()); @@ -159,7 +155,6 @@ private void printConfig() { String config = "\n****************************Master Configuration**************************************" + "\n listen-port -> " + listenPort + - "\n fetch-command-num -> " + fetchCommandNum + "\n pre-exec-threads -> " + preExecThreads + "\n exec-threads -> " + execThreads + "\n dispatch-task-number -> " + dispatchTaskNumber + @@ -175,6 +170,7 @@ private void printConfig() { "\n master-address -> " + masterAddress + "\n master-registry-path: " + masterRegistryPath + "\n worker-group-refresh-interval: " + workerGroupRefreshInterval + + "\n command-fetch-strategy: " + commandFetchStrategy + "\n****************************Master Configuration**************************************"; log.info(config); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java index 2fddd9438474..c1b5d0ffabfd 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java @@ -26,21 +26,18 @@ import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.command.ICommandFetcher; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection; import org.apache.dolphinscheduler.server.master.event.WorkflowEvent; import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue; import org.apache.dolphinscheduler.server.master.event.WorkflowEventType; -import org.apache.dolphinscheduler.server.master.exception.MasterException; import org.apache.dolphinscheduler.server.master.exception.WorkflowCreateException; import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics; -import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; -import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager; import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.commons.collections4.CollectionUtils; -import java.util.Collections; import java.util.List; import java.util.Optional; @@ -56,6 +53,9 @@ @Slf4j public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCloseable { + @Autowired + private ICommandFetcher commandFetcher; + @Autowired private CommandService commandService; @@ -74,9 +74,6 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl @Autowired private WorkflowEventLooper workflowEventLooper; - @Autowired - private MasterSlotManager masterSlotManager; - @Autowired private MasterTaskExecutorBootstrap masterTaskExecutorBootstrap; @@ -125,7 +122,7 @@ public void run() { Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } - List commands = findCommands(); + List commands = commandFetcher.fetchCommands(); if (CollectionUtils.isEmpty(commands)) { // indicate that no command ,sleep for 1s Thread.sleep(Constants.SLEEP_TIME_MILLIS); @@ -170,29 +167,4 @@ public void run() { } } - private List findCommands() throws MasterException { - try { - long scheduleStartTime = System.currentTimeMillis(); - int thisMasterSlot = masterSlotManager.getSlot(); - int masterCount = masterSlotManager.getMasterSize(); - if (masterCount <= 0) { - log.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot); - return Collections.emptyList(); - } - int pageSize = masterConfig.getFetchCommandNum(); - final List result = - commandService.findCommandPageBySlot(pageSize, masterCount, thisMasterSlot); - if (CollectionUtils.isNotEmpty(result)) { - long cost = System.currentTimeMillis() - scheduleStartTime; - log.info( - "Master schedule bootstrap loop command success, fetch command size: {}, cost: {}ms, current slot: {}, total slot size: {}", - result.size(), cost, thisMasterSlot, masterCount); - ProcessInstanceMetrics.recordCommandQueryTime(cost); - } - return result; - } catch (Exception ex) { - throw new MasterException("Master loop command from database error", ex); - } - } - } diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml index f18c6ef61db3..17b1e41a7109 100644 --- a/dolphinscheduler-master/src/main/resources/application.yaml +++ b/dolphinscheduler-master/src/main/resources/application.yaml @@ -83,8 +83,6 @@ registry: master: listen-port: 5678 - # master fetch command num - fetch-command-num: 10 # master prepare execute thread number to limit handle commands in parallel pre-exec-threads: 10 # master execute thread number to limit process instances in parallel @@ -121,6 +119,13 @@ master: # The max waiting time to reconnect to registry if you set the strategy to waiting max-waiting-time: 100s worker-group-refresh-interval: 10s + command-fetch-strategy: + type: ID_SLOT_BASED + config: + # The incremental id step + id-step: 1 + # master fetch command num + fetch-size: 10 server: port: 5679 diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java index faab44cf854c..9d26aa81f4bb 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.config; +import static com.google.common.truth.Truth.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -47,6 +48,17 @@ public void getServerLoadProtection() { assertEquals(0.77, serverLoadProtection.getMaxJvmCpuUsagePercentageThresholds()); assertEquals(0.77, serverLoadProtection.getMaxSystemMemoryUsagePercentageThresholds()); assertEquals(0.77, serverLoadProtection.getMaxDiskUsagePercentageThresholds()); + } + + @Test + public void getCommandFetchStrategy() { + CommandFetchStrategy commandFetchStrategy = masterConfig.getCommandFetchStrategy(); + assertThat(commandFetchStrategy.getType()) + .isEqualTo(CommandFetchStrategy.CommandFetchStrategyType.ID_SLOT_BASED); + CommandFetchStrategy.IdSlotBasedFetchConfig idSlotBasedFetchConfig = + (CommandFetchStrategy.IdSlotBasedFetchConfig) commandFetchStrategy.getConfig(); + assertThat(idSlotBasedFetchConfig.getIdStep()).isEqualTo(3); + assertThat(idSlotBasedFetchConfig.getFetchSize()).isEqualTo(11); } } diff --git a/dolphinscheduler-master/src/test/resources/application.yaml b/dolphinscheduler-master/src/test/resources/application.yaml index f4827d4b3c56..15f91996090a 100644 --- a/dolphinscheduler-master/src/test/resources/application.yaml +++ b/dolphinscheduler-master/src/test/resources/application.yaml @@ -89,8 +89,6 @@ registry: master: listen-port: 5678 - # master fetch command num - fetch-command-num: 10 # master prepare execute thread number to limit handle commands in parallel pre-exec-threads: 10 # master execute thread number to limit process instances in parallel @@ -127,6 +125,13 @@ master: # The max waiting time to reconnect to registry if you set the strategy to waiting max-waiting-time: 100s worker-group-refresh-interval: 10s + command-fetch-strategy: + type: ID_SLOT_BASED + config: + # The incremental id step + id-step: 3 + # master fetch command num + fetch-size: 11 server: port: 5679 diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java index cff73c503f2c..43b81c4e5c45 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java @@ -22,8 +22,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import java.util.List; - /** * Command Service */ @@ -44,15 +42,6 @@ public interface CommandService { */ int createCommand(Command command); - /** - * Get command page - * @param pageSize page size - * @param masterCount master count - * @param thisMasterSlot master slot - * @return command page - */ - List findCommandPageBySlot(int pageSize, int masterCount, int thisMasterSlot); - /** * check the input command exists in queue list * diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java index 483899446b2a..ee833a80b045 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java @@ -57,7 +57,6 @@ import org.springframework.stereotype.Component; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; import io.micrometer.core.annotation.Counted; /** @@ -107,14 +106,6 @@ public int createCommand(Command command) { return result; } - @Override - public List findCommandPageBySlot(int pageSize, int masterCount, int thisMasterSlot) { - if (masterCount <= 0) { - return Lists.newArrayList(); - } - return commandMapper.queryCommandPageBySlot(pageSize, masterCount, thisMasterSlot); - } - @Override public boolean verifyIsNeedCreateCommand(Command command) { boolean isNeedCreate = true; diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java index 0cde76bdfe88..f60320fc63e6 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java @@ -214,14 +214,4 @@ public void testCreateCommand() { Mockito.verify(commandMapper, Mockito.times(1)).insert(command); } - @Test - public void testFindCommandPageBySlot() { - int pageSize = 1; - int masterCount = 0; - int thisMasterSlot = 2; - List commandList = - commandService.findCommandPageBySlot(pageSize, masterCount, thisMasterSlot); - Assertions.assertEquals(0, commandList.size()); - } - } diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml index 5122eea2a17c..6757718929de 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml +++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml @@ -160,8 +160,6 @@ casdoor: master: listen-port: 5678 - # master fetch command num - fetch-command-num: 10 # master prepare execute thread number to limit handle commands in parallel pre-exec-threads: 10 # master execute thread number to limit process instances in parallel @@ -192,6 +190,13 @@ master: # kill yarn/k8s application when failover taskInstance, default true kill-application-when-task-failover: true worker-group-refresh-interval: 10s + command-fetch-strategy: + type: ID_SLOT_BASED + config: + # The incremental id step + id-step: 1 + # master fetch command num + fetch-size: 10 worker: # worker listener port