Skip to content

Commit

Permalink
update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Nanguage committed May 11, 2023
1 parent 7be8a18 commit ae3e5ca
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 13 deletions.
38 changes: 31 additions & 7 deletions tests/test_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@

from executor.engine.core import Engine
from executor.engine.job import LocalJob, ThreadJob, ProcessJob
from executor.engine.job.condition import AfterAnother, AfterOthers, AfterTimepoint, AnySatisfied, AllSatisfied
from executor.engine.job.condition import (
AfterAnother, AfterOthers, AfterTimepoint,
AnySatisfied, AllSatisfied
)


def test_after_another_cond():
engine = Engine()

lis = []

def append(x):
lis.append(x)

Expand All @@ -18,14 +22,20 @@ def id_func(x):

async def submit_job():
job1 = LocalJob(id_func, (1,), callback=append)
job2 = ThreadJob(id_func, (2,), callback=append, condition=AfterAnother(job_id=job1.id))
job3 = ProcessJob(id_func, (3,), callback=append, condition=AfterAnother(job_id=job2.id))
job2 = ThreadJob(
id_func, (2,), callback=append,
condition=AfterAnother(job_id=job1.id))
job3 = ProcessJob(
id_func, (3,), callback=append,
condition=AfterAnother(job_id=job2.id))
await engine.submit_async(job3)
await engine.submit_async(job2)
await engine.submit_async(job1)
await engine.join()
assert lis == [1, 2, 3]
job4 = ThreadJob(id_func, (4,), callback=append, condition=AfterAnother(job_id="not_exist"))
job4 = ThreadJob(
id_func, (4,), callback=append,
condition=AfterAnother(job_id="not_exist"))
await engine.submit_async(job4)
await job4.join(timeout=0.1)
assert len(lis) == 3
Expand All @@ -42,15 +52,22 @@ def test_after_others_all_mode():
async def submit_job():
job1 = ThreadJob(lambda: s.add(1))
job2 = ThreadJob(lambda: s.add(2))

def has_1_2():
assert 1 in s
assert 2 in s
job3 = ThreadJob(has_1_2, condition=AfterOthers(job_ids=[job1.id, job2.id], mode='all'))
job3 = ThreadJob(
has_1_2,
condition=AfterOthers(
job_ids=[job1.id, job2.id],
mode='all'))
await engine.submit_async(job3)
await engine.submit_async(job2)
await engine.submit_async(job1)
await engine.join()
job4 = ThreadJob(lambda: s.add(3), condition=AfterOthers(job_ids=["not_exist"]))
job4 = ThreadJob(
lambda: s.add(3),
condition=AfterOthers(job_ids=["not_exist"]))
await engine.submit_async(job4)
await engine.join(timeout=0.1)
assert len(s) == 2
Expand All @@ -67,9 +84,13 @@ def test_after_others_any_mode():
async def submit_job():
job1 = ThreadJob(lambda: s.add(1))
job2 = ThreadJob(lambda: s.add(2))

def has_1_or_2():
assert 1 in s or 2 in s
job3 = ThreadJob(has_1_or_2, condition=AfterOthers(job_ids=[job1.id, job2.id], mode="any"))
job3 = ThreadJob(
has_1_or_2,
condition=AfterOthers(
job_ids=[job1.id, job2.id], mode="any"))
await engine.submit_async(job3)
await engine.submit_async(job2)
await engine.submit_async(job1)
Expand All @@ -85,6 +106,7 @@ async def submit_job():
t1 = datetime.now()
d1 = timedelta(seconds=0.5)
t2 = t1 + d1

def assert_after():
assert datetime.now() > t2
job = ThreadJob(assert_after, condition=AfterTimepoint(timepoint=t2))
Expand All @@ -106,6 +128,7 @@ def run_forever():
async def submit_job():
job1 = ThreadJob(lambda: s.add(1))
job2 = ProcessJob(run_forever)

def has_one_element():
assert len(s) == 1
job3 = ThreadJob(has_one_element, condition=AnySatisfied(conditions=[
Expand All @@ -129,6 +152,7 @@ def test_all_satisfy():
async def submit_job():
job1 = ThreadJob(lambda: s.add(1))
job2 = ThreadJob(lambda: s.add(2))

def has_two_elements():
assert len(s) == 2
job3 = ThreadJob(has_two_elements, condition=AllSatisfied(conditions=[
Expand Down
17 changes: 11 additions & 6 deletions tests/test_engine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import time
import typing as T
import shutil
import asyncio

import pytest

Expand Down Expand Up @@ -257,12 +258,15 @@ async def test_async_api():
await engine.join()
assert job1.status == "done"
assert job2.result() == 4
job3 = ThreadJob(lambda x: x**2, (2,))
job4 = ThreadJob(lambda x: x**2, (2,))
await engine.submit_async(job3, job4)
await engine.wait_async()
assert job3.status == "done"
assert job4.result() == 4

def sleep_5s():
time.sleep(5)

job5 = ProcessJob(sleep_5s)
await engine.submit_async(job5)
await engine.wait_async(timeout=1.0)
assert job5.status == "running"
await engine.cancel_all_async()


def test_engine_start_stop():
Expand Down Expand Up @@ -296,3 +300,4 @@ def test_corner_case():
engine = Engine()
with pytest.raises(RuntimeError):
engine.submit(job)
engine.loop = asyncio.new_event_loop()

0 comments on commit ae3e5ca

Please sign in to comment.