diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/conf/SchedulerConfigurationTest.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/conf/SchedulerConfigurationTest.scala new file mode 100644 index 0000000000..cf36543de8 --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/conf/SchedulerConfigurationTest.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.linkis.scheduler.conf + +import org.apache.linkis.common.conf.{CommonVars, TimeType} +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +class SchedulerConfigurationTest { + + @Test + def testSchedulerConfiguration: Unit = { + + val fifoConsumerAutoClearEnabled = CommonVars("wds.linkis.fifo.consumer.auto.clear.enabled", true) + assertEquals(SchedulerConfiguration.FIFO_CONSUMER_AUTO_CLEAR_ENABLED, fifoConsumerAutoClearEnabled) + val fifoConsumerMaxIdleTime = CommonVars("wds.linkis.fifo.consumer.max.idle.time", new TimeType("1h")).getValue.toLong + assertEquals(SchedulerConfiguration.FIFO_CONSUMER_MAX_IDLE_TIME, fifoConsumerMaxIdleTime) + assertEquals(SchedulerConfiguration.FIFO_CONSUMER_IDLE_SCAN_INTERVAL.getValue.toLong, 7200000) + val fifoConsumerIdleScanInitTime = CommonVars("wds.linkis.fifo.consumer.idle.scan.init.time", new TimeType("1s")).getValue.toLong + assertEquals(SchedulerConfiguration.FIFO_CONSUMER_IDLE_SCAN_INIT_TIME.getValue.toLong, fifoConsumerIdleScanInitTime) + } +} diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/event/LogEventTest.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/event/LogEventTest.scala new file mode 100644 index 0000000000..58be521ac6 --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/event/LogEventTest.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.linkis.scheduler.event + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +class LogEventTest { + @Test + def testLogEvent: Unit = { + val logEvent = LogEvent + + assertEquals(2, logEvent.write) + assertEquals(1, logEvent.read) + } +} diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/exception/LinkisJobRetryExceptionTest.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/exception/LinkisJobRetryExceptionTest.scala new file mode 100644 index 0000000000..f8ce994e9e --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/exception/LinkisJobRetryExceptionTest.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.linkis.scheduler.exception + + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +class LinkisJobRetryExceptionTest { + + @Test + def testLinkisJobRetryException: Unit = { + val des = "testLinkisJobRetryExceptionTest" + val localIp = "127.0.0.1"; + val linkisJobRetryException = new LinkisJobRetryException(des); + linkisJobRetryException.setIp(localIp) + assertEquals(des, linkisJobRetryException.getDesc) + assertEquals(25000, linkisJobRetryException.getErrCode) + assertEquals(localIp, linkisJobRetryException.getIp) + } + +} diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/exception/SchedulerErrorExceptionTest.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/exception/SchedulerErrorExceptionTest.scala new file mode 100644 index 0000000000..f84113df26 --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/exception/SchedulerErrorExceptionTest.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.linkis.scheduler.exception + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +class SchedulerErrorExceptionTest { + + @Test + def testSchedulerErrorException: Unit = { + val des = "testSchedulerErrorExceptionTest" + val localIp = "127.0.0.1"; + val errorCode = 25000 + val schedulerErrorException = new SchedulerErrorException(errorCode, des); + schedulerErrorException.setIp(localIp) + assertEquals(des, schedulerErrorException.getDesc) + assertEquals(25000, schedulerErrorException.getErrCode) + assertEquals(localIp, schedulerErrorException.getIp) + } + +} diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/exception/WaitForNextAskExecutorExceptionTest.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/exception/WaitForNextAskExecutorExceptionTest.scala new file mode 100644 index 0000000000..e546a9effd --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/exception/WaitForNextAskExecutorExceptionTest.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.linkis.scheduler.exception + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +class WaitForNextAskExecutorExceptionTest { + + @Test + def testWaitForNextAskExecutorException: Unit = { + val des = "testWaitForNextAskExecutorExceptionTest" + val localIp = "127.0.0.1"; + val errorCode = 12111 + val waitForNextAskExecutorException = new WaitForNextAskExecutorException(des); + waitForNextAskExecutorException.setIp(localIp) + assertEquals(des, waitForNextAskExecutorException.getDesc) + assertEquals(errorCode, waitForNextAskExecutorException.getErrCode) + assertEquals(localIp, waitForNextAskExecutorException.getIp) + } + +} diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/future/BDPFutureTaskTest.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/future/BDPFutureTaskTest.scala new file mode 100644 index 0000000000..cedfb2053d --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/future/BDPFutureTaskTest.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.linkis.scheduler.future + +import org.apache.linkis.common.utils.Utils +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +import java.util.concurrent.{ExecutorService, Future} + +class BDPFutureTaskTest { + private var future: Future[_] = _ + private var bdpFutureTask: BDPFuture = _ + @Test + def testBDPFutureTask: Unit = { + val executeService: ExecutorService = Utils.newCachedThreadPool(1, "test" + "-ThreadPool-", true) + + val runnable = new Thread(new Runnable { + override def run(): Unit = Thread.sleep(1000) + }) + + future = executeService.submit(runnable) + bdpFutureTask = new BDPFutureTask(future) + if (runnable.isAlive) { + assertEquals(false, runnable.isInterrupted) + bdpFutureTask.cancel() + assertEquals(true, runnable.isInterrupted) + } + + } + +} diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOConsumerManagerTest.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOConsumerManagerTest.scala new file mode 100644 index 0000000000..ffe012ae2f --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOConsumerManagerTest.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.linkis.scheduler.queue.fifoqueue + +import org.junit.jupiter.api.Assertions.{assertFalse, assertNotNull, assertNull, assertTrue} +import org.junit.jupiter.api.Test + +class FIFOConsumerManagerTest { + @Test + def testSetSchedulerContext: Unit = { + val schedulerContext = new FIFOSchedulerContextImpl(100) + val consumerManager = new FIFOConsumerManager + consumerManager.setSchedulerContext(schedulerContext) + assertNotNull(consumerManager.listConsumers()) + } + @Test + def testListConsumers: Unit = { + val schedulerContext = new FIFOSchedulerContextImpl(100) + val consumerManager = new FIFOConsumerManager + assertNull(consumerManager.listConsumers()(0)) + consumerManager.setSchedulerContext(schedulerContext) + assertNotNull(consumerManager.listConsumers()(0)) + } + @Test + def testShutdown: Unit = { + val schedulerContext = new FIFOSchedulerContextImpl(100) + val consumerManager = new FIFOConsumerManager + consumerManager.setSchedulerContext(schedulerContext) + assertFalse(consumerManager.listConsumers()(0).terminate) + try { + try + consumerManager.shutdown() + } catch { + case e => e.printStackTrace + } + assertTrue(consumerManager.listConsumers()(0).terminate) + } +} \ No newline at end of file diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroupFactoryTest.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroupFactoryTest.scala new file mode 100644 index 0000000000..ade90a030f --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroupFactoryTest.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.linkis.scheduler.queue.fifoqueue + +import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull} +import org.junit.jupiter.api.Test + +class FIFOGroupFactoryTest { + @Test + def testGroupFactory: Unit = { + val schedulerContext = new FIFOSchedulerContextImpl(100) + val groupFactory = new FIFOGroupFactory() + groupFactory.setDefaultMaxRunningJobs(10) + schedulerContext.setGroupFactory(groupFactory) + groupFactory.setDefaultMaxAskExecutorTimes(100L) + assertEquals(10, groupFactory.getDefaultMaxRunningJobs) + assertEquals(100L, groupFactory.getDefaultMaxAskExecutorTimes) + } +} diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroupTest.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroupTest.scala new file mode 100644 index 0000000000..7c285ad080 --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOGroupTest.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.linkis.scheduler.queue.fifoqueue + +import org.apache.linkis.scheduler.queue.GroupStatus +import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull} +import org.junit.jupiter.api.Test + +class FIFOGroupTest { + @Test + def testFIFOGroup: Unit = { + val schedulerContext = new FIFOSchedulerContextImpl(100) + val group = schedulerContext.getOrCreateGroupFactory + .getOrCreateGroup(null).asInstanceOf[FIFOGroup] + group.setMaxAskInterval(10) + group.setMinAskInterval(8) + group.setStatus(GroupStatus.USING) + group.setMaxRunningJobs(10) + group.setMaxAskExecutorTimes(100L) + assertEquals(10, group.getMaxAskInterval) + assertEquals(8, group.getMinAskInterval) + assertEquals(GroupStatus.USING, group.getStatus) + assertEquals(10, group.getMaxRunningJobs) + assertEquals(100L, group.getMaxAskExecutorTimes) + } +} diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOSchedulerContextImplTest.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOSchedulerContextImplTest.scala new file mode 100644 index 0000000000..f2f24ea110 --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOSchedulerContextImplTest.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.linkis.scheduler.queue.fifoqueue + +import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull} +import org.junit.jupiter.api.Test + +class FIFOSchedulerContextImplTest { + @Test + def testGetOrCreateGroupFactory: Unit = { + val schedulerContext = new FIFOSchedulerContextImpl(100) + val group = schedulerContext.getOrCreateGroupFactory.getOrCreateGroup(null) + assertNotNull(group) + } + @Test + def testSetGroupFactory: Unit = { + val schedulerContext = new FIFOSchedulerContextImpl(100) + schedulerContext.setGroupFactory(new FIFOGroupFactory) + val groupFactory = schedulerContext.getOrCreateGroupFactory + assertNotNull(groupFactory) + } +} diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOSchedulerTest.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOSchedulerTest.scala new file mode 100644 index 0000000000..9f0e03069e --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOSchedulerTest.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.linkis.scheduler.queue.fifoqueue + +import org.apache.linkis.scheduler.SchedulerContext +import org.apache.linkis.scheduler.queue.UserJob +import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull} +import org.junit.jupiter.api.{BeforeEach, Test} + + +class FIFOSchedulerTest { + var schedulerContext: SchedulerContext = _ + var fIFOScheduler: FIFOScheduler = _ + @BeforeEach + def prepareScheduler: Unit = { + schedulerContext = new FIFOSchedulerContextImpl(100) + fIFOScheduler = new FIFOScheduler(schedulerContext) + } + + @Test + def testFIFOSchedulerInit: Unit = { + fIFOScheduler.init() + assertEquals("FIFOScheduler", fIFOScheduler.getName) + } + + @Test + def testFIFOSchedulerSubmit: Unit = { + val userJob = new UserJob() + fIFOScheduler.submit(userJob) + assertNotNull(fIFOScheduler.getSchedulerContext.getOrCreateConsumerManager.getOrCreateConsumer("FIFO-Group")) + } + +} \ No newline at end of file diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumerTest.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumerTest.scala new file mode 100644 index 0000000000..f8a4204cd0 --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumerTest.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.linkis.scheduler.queue.fifoqueue + +import org.junit.jupiter.api.Assertions.{assertFalse, assertNotNull, assertTrue} +import org.junit.jupiter.api.Test + +class FIFOUserConsumerTest { + @Test + def testStart: Unit = { + val schedulerContext = new FIFOSchedulerContextImpl(100) + val consumerManager = new FIFOConsumerManager + consumerManager.setSchedulerContext(schedulerContext) + val consumer = consumerManager.listConsumers()(0) + consumer.start() + assertFalse(consumer.terminate) + assertNotNull(consumer.getGroup) + assertNotNull(consumer.getConsumeQueue) + consumer.shutdown() + assertTrue(consumer.terminate) + } +} diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManagerTest.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManagerTest.scala new file mode 100644 index 0000000000..30bfd11cc0 --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelConsumerManagerTest.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.linkis.scheduler.queue.parallelqueue +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Test + +class ParallelConsumerManagerTest { + @Test + def testSetSchedulerContext: Unit = { + val schedulerContext = new ParallelSchedulerContextImpl(100) + val consumerManager = new ParallelConsumerManager(10) + consumerManager.setSchedulerContext(schedulerContext) + assertNotNull(consumerManager.listConsumers()) + } +} diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelGroupFactoryTest.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelGroupFactoryTest.scala new file mode 100644 index 0000000000..d172e3e81e --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelGroupFactoryTest.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.linkis.scheduler.queue.parallelqueue + +import org.apache.linkis.scheduler.queue.fifoqueue.{FIFOGroupFactory, FIFOSchedulerContextImpl} +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +class ParallelGroupFactoryTest { + @Test + def testGroupFactory: Unit = { + val schedulerContext = new ParallelSchedulerContextImpl(100) + val groupFactory = schedulerContext.getOrCreateGroupFactory.asInstanceOf[ParallelGroupFactory] + groupFactory.setDefaultMaxRunningJobs(10) + schedulerContext.setGroupFactory(groupFactory) + groupFactory.setDefaultMaxAskExecutorTimes(100L) + assertEquals(10, groupFactory.getDefaultMaxRunningJobs) + assertEquals(100L, groupFactory.getDefaultMaxAskExecutorTimes) + } +} diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelGroupTest.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelGroupTest.scala new file mode 100644 index 0000000000..5fca352f95 --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelGroupTest.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.linkis.scheduler.queue.parallelqueue + +import org.apache.linkis.scheduler.queue.GroupStatus +import org.apache.linkis.scheduler.queue.fifoqueue.{FIFOGroup, FIFOSchedulerContextImpl} +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +class ParallelGroupTest { + @Test + def testFIFOGroup: Unit = { + val schedulerContext = new ParallelSchedulerContextImpl(100) + val groupFactory = schedulerContext.getOrCreateGroupFactory.asInstanceOf[ParallelGroupFactory] + groupFactory.setParallelism(10) + groupFactory.setDefaultMaxAskExecutorTimes(3) + groupFactory.setDefaultMaxRunningJobs(2) + assertEquals(10, groupFactory.getParallelism) + assertEquals(3, groupFactory.getDefaultMaxAskExecutorTimes) + assertEquals(2, groupFactory.getDefaultMaxRunningJobs) + } +} diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelSchedulerContextImplTest.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelSchedulerContextImplTest.scala new file mode 100644 index 0000000000..c6e3161b58 --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelSchedulerContextImplTest.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.linkis.scheduler.queue.parallelqueue +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Test + +class ParallelSchedulerContextImplTest { + @Test + def testGetOrCreateGroupFactory: Unit = { + val schedulerContext = new ParallelSchedulerContextImpl(100) + val groupFactory = schedulerContext.getOrCreateGroupFactory.asInstanceOf[ParallelGroupFactory] + assertNotNull(groupFactory) + } +} diff --git a/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelSchedulerTest.scala b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelSchedulerTest.scala new file mode 100644 index 0000000000..3fe7bbe514 --- /dev/null +++ b/linkis-commons/linkis-scheduler/src/test/scala/org/apache/linkis/scheduler/queue/parallelqueue/ParallelSchedulerTest.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.linkis.scheduler.queue.parallelqueue + +import org.apache.linkis.scheduler.SchedulerContext +import org.apache.linkis.scheduler.queue.UserJob +import org.apache.linkis.scheduler.queue.fifoqueue.{FIFOGroupFactory, FIFOScheduler, FIFOSchedulerContextImpl} +import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull} +import org.junit.jupiter.api.{BeforeEach, Test} + +class ParallelSchedulerTest { + var schedulerContext: SchedulerContext = _ + var parallelScheduler: ParallelScheduler = _ + @BeforeEach + def prepareScheduler: Unit = { + val schedulerContext = new ParallelSchedulerContextImpl(100) + parallelScheduler = new ParallelScheduler(schedulerContext) + } + + @Test + def testParallelSchedulerInit: Unit = { + parallelScheduler.init() + assertEquals("ParallelScheduler", parallelScheduler.getName) + } +}