diff --git a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/AbstractJobFacade.java b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/AbstractJobFacade.java index cee06ed49..44a1c169c 100644 --- a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/AbstractJobFacade.java +++ b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/AbstractJobFacade.java @@ -48,22 +48,22 @@ */ @Slf4j abstract class AbstractJobFacade implements JobFacade { - - protected final ConfigurationService configService; - - protected final ShardingService shardingService; - - protected final ExecutionContextService executionContextService; - - protected final ExecutionService executionService; - - protected final FailoverService failoverService; - - protected final Collection elasticJobListeners; - - protected final JobTracingEventBus jobTracingEventBus; - - public AbstractJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final Collection elasticJobListeners, final TracingConfiguration tracingConfig) { + + private final ConfigurationService configService; + + private final ShardingService shardingService; + + private final ExecutionContextService executionContextService; + + private final ExecutionService executionService; + + private final FailoverService failoverService; + + private final Collection elasticJobListeners; + + private final JobTracingEventBus jobTracingEventBus; + + AbstractJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final Collection elasticJobListeners, final TracingConfiguration tracingConfig) { configService = new ConfigurationService(regCenter, jobName); shardingService = new ShardingService(regCenter, jobName); executionContextService = new ExecutionContextService(regCenter, jobName); diff --git a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacade.java b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacade.java index 2dbb77402..72b638b5d 100644 --- a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacade.java +++ b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacade.java @@ -37,7 +37,7 @@ public interface JobFacade { * @param fromCache load from cache or not * @return job configuration */ - JobConfiguration loadJobConfiguration(final boolean fromCache); + JobConfiguration loadJobConfiguration(boolean fromCache); /** * Check job execution environment. @@ -121,7 +121,7 @@ public interface JobFacade { * * @param jobExecutionEvent job execution event */ - void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent); + void postJobExecutionEvent(JobExecutionEvent jobExecutionEvent); /** * Post job status trace event. @@ -131,7 +131,7 @@ public interface JobFacade { * @param message job message */ void postJobStatusTraceEvent(String taskId, JobStatusTraceEvent.State state, String message); - + /** * Get job runtime service. * diff --git a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/ShardingJobFacade.java b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/ShardingJobFacade.java index 040fd3e44..3f4f3cc90 100644 --- a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/ShardingJobFacade.java +++ b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/ShardingJobFacade.java @@ -18,13 +18,22 @@ package org.apache.shardingsphere.elasticjob.kernel.executor.facade; import lombok.extern.slf4j.Slf4j; + +import org.apache.shardingsphere.elasticjob.kernel.internal.config.ConfigurationService; +import org.apache.shardingsphere.elasticjob.kernel.internal.failover.FailoverService; +import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionContextService; +import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionService; +import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ShardingService; +import org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobTracingEventBus; import org.apache.shardingsphere.elasticjob.spi.listener.ElasticJobListener; import org.apache.shardingsphere.elasticjob.spi.listener.param.ShardingContexts; import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; import org.apache.shardingsphere.elasticjob.kernel.tracing.config.TracingConfiguration; import java.util.Collection; +import java.util.Comparator; import java.util.List; +import java.util.stream.Collectors; /** * Sharding Job facade. @@ -32,10 +41,32 @@ @Slf4j public final class ShardingJobFacade extends AbstractJobFacade { + private final ConfigurationService configService; + + private final ShardingService shardingService; + + private final ExecutionContextService executionContextService; + + private final ExecutionService executionService; + + private final FailoverService failoverService; + + private final Collection elasticJobListeners; + + private final JobTracingEventBus jobTracingEventBus; + public ShardingJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final Collection elasticJobListeners, final TracingConfiguration tracingConfig) { super(regCenter, jobName, elasticJobListeners, tracingConfig); + + configService = new ConfigurationService(regCenter, jobName); + shardingService = new ShardingService(regCenter, jobName); + executionContextService = new ExecutionContextService(regCenter, jobName); + executionService = new ExecutionService(regCenter, jobName); + failoverService = new FailoverService(regCenter, jobName); + this.elasticJobListeners = elasticJobListeners.stream().sorted(Comparator.comparingInt(ElasticJobListener::order)).collect(Collectors.toList()); + this.jobTracingEventBus = null == tracingConfig ? new JobTracingEventBus() : new JobTracingEventBus(tracingConfig); } - + /** * Get sharding contexts. * @@ -58,5 +89,5 @@ public ShardingContexts getShardingContexts() { shardingItems.removeAll(executionService.getDisabledItems(shardingItems)); return executionContextService.getJobShardingContext(shardingItems); } - + } diff --git a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/SingleShardingJobFacade.java b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/SingleShardingJobFacade.java index 44e5c0144..1ce6954ba 100644 --- a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/SingleShardingJobFacade.java +++ b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/SingleShardingJobFacade.java @@ -20,15 +20,23 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.List; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.shardingsphere.elasticjob.api.JobConfiguration; +import org.apache.shardingsphere.elasticjob.kernel.internal.config.ConfigurationService; +import org.apache.shardingsphere.elasticjob.kernel.internal.failover.FailoverService; import org.apache.shardingsphere.elasticjob.kernel.internal.instance.InstanceService; import org.apache.shardingsphere.elasticjob.kernel.internal.schedule.JobRegistry; +import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionContextService; +import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionService; import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.JobInstance; +import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ShardingService; import org.apache.shardingsphere.elasticjob.kernel.internal.storage.JobNodeStorage; import org.apache.shardingsphere.elasticjob.kernel.tracing.config.TracingConfiguration; +import org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobTracingEventBus; import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; import org.apache.shardingsphere.elasticjob.spi.listener.ElasticJobListener; import org.apache.shardingsphere.elasticjob.spi.listener.param.ShardingContexts; @@ -40,21 +48,44 @@ */ @Slf4j public final class SingleShardingJobFacade extends AbstractJobFacade { - + + private final ConfigurationService configService; + + private final ShardingService shardingService; + + private final ExecutionContextService executionContextService; + + private final ExecutionService executionService; + + private final FailoverService failoverService; + + private final Collection elasticJobListeners; + + private final JobTracingEventBus jobTracingEventBus; + private final JobNodeStorage jobNodeStorage; + private final InstanceService instanceService; - - public SingleShardingJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final Collection elasticJobListeners, final TracingConfiguration tracingConfig) { + + public SingleShardingJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final Collection elasticJobListeners, + final TracingConfiguration tracingConfig) { super(regCenter, jobName, elasticJobListeners, tracingConfig); - + + configService = new ConfigurationService(regCenter, jobName); + shardingService = new ShardingService(regCenter, jobName); + executionContextService = new ExecutionContextService(regCenter, jobName); + executionService = new ExecutionService(regCenter, jobName); + failoverService = new FailoverService(regCenter, jobName); + this.elasticJobListeners = elasticJobListeners.stream().sorted(Comparator.comparingInt(ElasticJobListener::order)).collect(Collectors.toList()); + this.jobTracingEventBus = null == tracingConfig ? new JobTracingEventBus() : new JobTracingEventBus(tracingConfig); jobNodeStorage = new JobNodeStorage(regCenter, jobName); instanceService = new InstanceService(regCenter, jobName); } - + @Override public void registerJobCompleted(final ShardingContexts shardingContexts) { super.registerJobCompleted(shardingContexts); - + JobConfiguration jobConfig = configService.load(true); JobInstance jobInst = JobRegistry.getInstance().getJobInstance(jobConfig.getJobName()); if (null == jobInst) { @@ -66,21 +97,21 @@ public void registerJobCompleted(final ShardingContexts shardingContexts) { for (int i = 0; i < availJobInst.size(); i++) { JobInstance temp = availJobInst.get(i); if (temp.getServerIp().equals(jobInst.getServerIp())) { - nextIndex = i + 1; // find the current running job instance, and set next one to current index + 1 + nextIndex = i + 1; break; } } - if (nextIndex != null) { // the normal case that can find the next index, exclude the bounded scenarios - nextIndex = nextIndex >= availJobInst.size() ? 0 : nextIndex; // Round Robin Loop + if (nextIndex != null) { + nextIndex = nextIndex >= availJobInst.size() ? 0 : nextIndex; jobNodeStorage.fillEphemeralJobNode("next-job-instance-ip", availJobInst.get(nextIndex).getServerIp()); } - + if (log.isDebugEnabled()) { log.debug("job name: {}, next index: {}, sharding total count: {}", - jobConfig.getJobName(), nextIndex, jobConfig.getShardingTotalCount()); + jobConfig.getJobName(), nextIndex, jobConfig.getShardingTotalCount()); } } - + /** * Get sharding contexts. * @@ -96,32 +127,32 @@ public ShardingContexts getShardingContexts() { return executionContextService.getJobShardingContext(failoverShardingItems); } } - + List shardingItems; String nextJobInstIP = null; - if (isNeedSharding()) { // the first initialization or reconcile case + if (isNeedSharding()) { shardingService.shardingIfNecessary(); shardingItems = shardingService.getLocalShardingItems(); } else { nextJobInstIP = jobNodeStorage.getJobNodeDataDirectly("next-job-instance-ip"); - if (StringUtils.isBlank(nextJobInstIP)) { // if there is no next job instance ip + if (StringUtils.isBlank(nextJobInstIP)) { shardingService.shardingIfNecessary(); shardingItems = shardingService.getLocalShardingItems(); - } else { // when next job instance is specified under normal case + } else { JobInstance jobInst = JobRegistry.getInstance().getJobInstance(jobConfig.getJobName()); shardingItems = nextJobInstIP.equals(jobInst.getServerIp()) ? Collections.singletonList(0) : new ArrayList<>(); } } if (log.isDebugEnabled()) { log.debug("job name: {}, sharding items: {}, nextJobInstIP: {}, sharding total count: {}, isFailover: {}", - jobConfig.getJobName(), shardingItems, nextJobInstIP, jobConfig.getShardingTotalCount(), isFailover); + jobConfig.getJobName(), shardingItems, nextJobInstIP, jobConfig.getShardingTotalCount(), isFailover); } - + if (isFailover) { shardingItems.removeAll(failoverService.getLocalTakeOffItems()); } shardingItems.removeAll(executionService.getDisabledItems(shardingItems)); return executionContextService.getJobShardingContext(shardingItems); } - + } diff --git a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java index 29bf9ea9e..636b68225 100644 --- a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java +++ b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java @@ -82,14 +82,15 @@ public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob Collection jobListeners = getElasticJobListeners(this.jobConfig); setUpFacade = new SetUpFacade(regCenter, this.jobConfig.getJobName(), jobListeners); schedulerFacade = new SchedulerFacade(regCenter, this.jobConfig.getJobName()); - - if (1 == this.jobConfig.getShardingTotalCount() // the single sharding scenario - && "SINGLE_SHARDING_BALANCE".equals(this.jobConfig.getJobShardingStrategyType())) { // the specified SINGLE_SHARDING_BALANCE strategy + + // the single sharding scenario and specified SINGLE_SHARDING_BALANCE strategy + if (1 == this.jobConfig.getShardingTotalCount() + && "SINGLE_SHARDING_BALANCE".equals(this.jobConfig.getJobShardingStrategyType())) { jobFacade = new SingleShardingJobFacade(regCenter, this.jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null)); } else { jobFacade = new ShardingJobFacade(regCenter, this.jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null)); } - + validateJobProperties(); jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, jobFacade); setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners); @@ -103,14 +104,15 @@ public JobScheduler(final CoordinatorRegistryCenter regCenter, final String elas Collection jobListeners = getElasticJobListeners(this.jobConfig); setUpFacade = new SetUpFacade(regCenter, this.jobConfig.getJobName(), jobListeners); schedulerFacade = new SchedulerFacade(regCenter, this.jobConfig.getJobName()); - - if (1 == this.jobConfig.getShardingTotalCount() // the single sharding scenario - && "SINGLE_SHARDING_BALANCE".equals(this.jobConfig.getJobShardingStrategyType())) { // the specified SINGLE_SHARDING_BALANCE strategy + + // the single sharding scenario and specified SINGLE_SHARDING_BALANCE strategy + if (1 == this.jobConfig.getShardingTotalCount() + && "SINGLE_SHARDING_BALANCE".equals(this.jobConfig.getJobShardingStrategyType())) { jobFacade = new SingleShardingJobFacade(regCenter, this.jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null)); } else { jobFacade = new ShardingJobFacade(regCenter, this.jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null)); } - + validateJobProperties(); jobExecutor = new ElasticJobExecutor(elasticJobType, this.jobConfig, jobFacade); setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners); diff --git a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/strategy/type/SingleShardingBalanceJobShardingStrategy.java b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/strategy/type/SingleShardingBalanceJobShardingStrategy.java index 3be60fa93..d4368bdac 100644 --- a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/strategy/type/SingleShardingBalanceJobShardingStrategy.java +++ b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/strategy/type/SingleShardingBalanceJobShardingStrategy.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.type; import java.util.ArrayList; @@ -22,26 +39,26 @@ * @since 2024-12-03 19:19 */ public class SingleShardingBalanceJobShardingStrategy implements JobShardingStrategy { - + private final AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy(); - + @Override public Map> sharding(final List jobInstances, final String jobName, final int shardingTotalCount) { int shardingUnitsSize = jobInstances.size(); - int offset = Math.abs(jobName.hashCode() + ((Long)System.currentTimeMillis()).intValue()) % shardingUnitsSize; - + int offset = Math.abs(jobName.hashCode() + ((Long) System.currentTimeMillis()).intValue()) % shardingUnitsSize; + List result = new ArrayList<>(shardingUnitsSize); for (int i = 0; i < shardingUnitsSize; i++) { int index = (i + offset) % shardingUnitsSize; result.add(jobInstances.get(index)); } - + return averageAllocationJobShardingStrategy.sharding(result, jobName, shardingTotalCount); } - + @Override public String getType() { return "SINGLE_SHARDING_BALANCE"; } - + } diff --git a/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/ShardingJobFacadeTest.java b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/ShardingJobFacadeTest.java index 61bde8f03..31afd41ee 100644 --- a/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/ShardingJobFacadeTest.java +++ b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/ShardingJobFacadeTest.java @@ -84,6 +84,12 @@ void setUp() { ReflectionUtils.setSuperclassFieldValue(shardingJobFacade, "executionService", executionService); ReflectionUtils.setSuperclassFieldValue(shardingJobFacade, "failoverService", failoverService); ReflectionUtils.setSuperclassFieldValue(shardingJobFacade, "jobTracingEventBus", jobTracingEventBus); + ReflectionUtils.setFieldValue(shardingJobFacade, "configService", configService); + ReflectionUtils.setFieldValue(shardingJobFacade, "shardingService", shardingService); + ReflectionUtils.setFieldValue(shardingJobFacade, "executionContextService", executionContextService); + ReflectionUtils.setFieldValue(shardingJobFacade, "executionService", executionService); + ReflectionUtils.setFieldValue(shardingJobFacade, "failoverService", failoverService); + ReflectionUtils.setFieldValue(shardingJobFacade, "jobTracingEventBus", jobTracingEventBus); } @Test diff --git a/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/SingleShardingJobFacadeTest.java b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/SingleShardingJobFacadeTest.java index dc919c360..2f5027a94 100644 --- a/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/SingleShardingJobFacadeTest.java +++ b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/SingleShardingJobFacadeTest.java @@ -75,10 +75,10 @@ class SingleShardingJobFacadeTest { @Mock private ElasticJobListenerCaller caller; - + @Mock private JobNodeStorage jobNodeStorage; - + @Mock private InstanceService instanceService; @@ -97,6 +97,12 @@ void setUp() { ReflectionUtils.setSuperclassFieldValue(singleShardingJobFacade, "executionService", executionService); ReflectionUtils.setSuperclassFieldValue(singleShardingJobFacade, "failoverService", failoverService); ReflectionUtils.setSuperclassFieldValue(singleShardingJobFacade, "jobTracingEventBus", jobTracingEventBus); + ReflectionUtils.setFieldValue(singleShardingJobFacade, "configService", configService); + ReflectionUtils.setFieldValue(singleShardingJobFacade, "shardingService", shardingService); + ReflectionUtils.setFieldValue(singleShardingJobFacade, "executionContextService", executionContextService); + ReflectionUtils.setFieldValue(singleShardingJobFacade, "executionService", executionService); + ReflectionUtils.setFieldValue(singleShardingJobFacade, "failoverService", failoverService); + ReflectionUtils.setFieldValue(singleShardingJobFacade, "jobTracingEventBus", jobTracingEventBus); ReflectionUtils.setFieldValue(singleShardingJobFacade, "jobNodeStorage", jobNodeStorage); ReflectionUtils.setFieldValue(singleShardingJobFacade, "instanceService", instanceService); } @@ -152,10 +158,10 @@ void assertRegisterJobCompletedWhenFailoverEnabled() { verify(executionService).registerJobCompleted(shardingContexts); verify(failoverService).updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet()); } - + @Test void assertRegisterJobCompletedWhenRunningOnCurrentHost() { - ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); + final ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 1).cron("0/1 * * * * ?").failover(true).monitorExecution(true).build()); JobInstance jobInstance = new JobInstance(); jobInstance.setServerIp("192.168.1.2"); @@ -167,17 +173,17 @@ void assertRegisterJobCompletedWhenRunningOnCurrentHost() { jobInstance2.setServerIp("192.168.1.3"); availJobInst.add(jobInstance2); when(instanceService.getAvailableJobInstances()).thenReturn(availJobInst); - + singleShardingJobFacade.registerJobCompleted(shardingContexts); - + verify(executionService).registerJobCompleted(shardingContexts); verify(failoverService).updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet()); verify(jobNodeStorage).fillEphemeralJobNode("next-job-instance-ip", availJobInst.get(1).getServerIp()); } - + @Test void assertRegisterJobCompletedWhenRunningOnOtherHost() { - ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); + final ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 1).cron("0/1 * * * * ?").failover(true).monitorExecution(true).build()); JobInstance jobInstance = new JobInstance(); jobInstance.setServerIp("192.168.1.2"); @@ -188,14 +194,14 @@ void assertRegisterJobCompletedWhenRunningOnOtherHost() { jobInstance2.setServerIp("192.168.1.3"); availJobInst.add(jobInstance2); when(instanceService.getAvailableJobInstances()).thenReturn(availJobInst); - + singleShardingJobFacade.registerJobCompleted(shardingContexts); - + verify(executionService).registerJobCompleted(shardingContexts); verify(failoverService).updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet()); verify(jobNodeStorage, times(0)).fillEphemeralJobNode("next-job-instance-ip", availJobInst.get(0).getServerIp()); } - + @Test void assertGetShardingContextWhenIsFailoverEnableAndFailover() { ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); @@ -238,7 +244,7 @@ void assertGetShardingContextWhenHasDisabledItems() { assertThat(singleShardingJobFacade.getShardingContexts(), is(shardingContexts)); verify(shardingService).shardingIfNecessary(); } - + @Test void assertGetShardingContextWhenIsFailoverDisableAndNoNeedShardingWithoutNextIP() { ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); @@ -247,15 +253,15 @@ void assertGetShardingContextWhenIsFailoverDisableAndNoNeedShardingWithoutNextIP when(jobNodeStorage.getJobNodeDataDirectly("next-job-instance-ip")).thenReturn(null); when(shardingService.getLocalShardingItems()).thenReturn(Collections.singletonList(0)); when(executionContextService.getJobShardingContext(Collections.singletonList(0))).thenReturn(shardingContexts); - + assertThat(singleShardingJobFacade.getShardingContexts(), is(shardingContexts)); - + verify(shardingService, times(1)).shardingIfNecessary(); } - + @Test void assertGetShardingContextWhenIsFailoverDisableAndNoNeedShardingWithNextIP() { - ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); + final ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 1).cron("0/1 * * * * ?").failover(false).build()); when(shardingService.isNeedSharding()).thenReturn(false); when(jobNodeStorage.getJobNodeDataDirectly("next-job-instance-ip")).thenReturn("192.168.1.2"); @@ -264,12 +270,12 @@ void assertGetShardingContextWhenIsFailoverDisableAndNoNeedShardingWithNextIP() JobRegistry jobRegistry = JobRegistry.getInstance(); jobRegistry.addJobInstance("test_job", jobInstance); when(executionContextService.getJobShardingContext(Collections.singletonList(0))).thenReturn(shardingContexts); - + assertThat(singleShardingJobFacade.getShardingContexts(), is(shardingContexts)); - + verify(shardingService, times(0)).shardingIfNecessary(); } - + @Test void assertGetShardingContextWhenIsFailoverDisableAndNeedSharding() { ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); @@ -277,12 +283,12 @@ void assertGetShardingContextWhenIsFailoverDisableAndNeedSharding() { when(shardingService.getLocalShardingItems()).thenReturn(Collections.singletonList(0)); when(shardingService.isNeedSharding()).thenReturn(true); when(executionContextService.getJobShardingContext(Collections.singletonList(0))).thenReturn(shardingContexts); - + assertThat(singleShardingJobFacade.getShardingContexts(), is(shardingContexts)); - + verify(shardingService).shardingIfNecessary(); } - + @Test void assertMisfireIfRunning() { when(executionService.misfireIfHasRunningItems(Arrays.asList(0, 1))).thenReturn(true); diff --git a/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/strategy/type/SingleShardingBalanceJobShardingStrategyTest.java b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/strategy/type/SingleShardingBalanceJobShardingStrategyTest.java index d04d43820..6f016ce72 100644 --- a/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/strategy/type/SingleShardingBalanceJobShardingStrategyTest.java +++ b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/strategy/type/SingleShardingBalanceJobShardingStrategyTest.java @@ -34,8 +34,8 @@ class SingleShardingBalanceJobShardingStrategyTest { @Test void assertSharding() { Map> sharding = singleShardingBalanceJobShardingStrategy.sharding( - Arrays.asList(new JobInstance("host0@-@0"), new JobInstance("host1@-@0"), new JobInstance("host2@-@0")), - "JobName", 1); + Arrays.asList(new JobInstance("host0@-@0"), new JobInstance("host1@-@0"), new JobInstance("host2@-@0")), + "JobName", 1); int sum = sharding.values().stream().mapToInt(List::size).sum(); assertThat(sum, is(1)); }