Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support generator for LocalJob & ThreadJob & DaskJob #6

Merged
merged 4 commits into from
Sep 27, 2024

Conversation

liunux4odoo
Copy link
Contributor

@liunux4odoo liunux4odoo commented Sep 24, 2024

known problem:
DaskJob will logs StopIteration error even runs success.

example code:

import typing as t
import asyncio

from executor.engine import Engine, ProcessJob, LocalJob, ThreadJob
from executor.engine.job.dask import DaskJob


def gen(n: int) -> t.Generator[int, None, None]:
    import time

    for i in range(n):
        time.sleep(0.5)
        print(f"sync yield from executor: {i}")
        yield i


async def gen_async(n: int) -> t.AsyncGenerator[int, None]:
    import asyncio

    for i in range(n):
        await asyncio.sleep(0.5)
        print(f"async yield from executor: {i}")
        yield i


async def main():
    with Engine() as engine:
        for job_cls in [LocalJob, ThreadJob, ProcessJob, DaskJob]:
            job1 = job_cls(gen, (5,))
            engine.submit(job1)
            for x in job1.result():
                print(f"main received: {x}")

            job2 = job_cls(gen_async, (5,))
            engine.submit(job2)
            async for x in job2.result():
                print(f"main received: {x}")
            print(job2)


if __name__ == "__main__":
    asyncio.run(main())

@liunux4odoo liunux4odoo marked this pull request as ready for review September 24, 2024 07:26
@Nanguage
Copy link
Owner

Thanks again for your contribution. I see you are trying Dask. If you are ready, please remind me to merge.

@liunux4odoo
Copy link
Contributor Author

Thanks again for your contribution. I see you are trying Dask. If you are ready, please remind me to merge.

I have make DaskJob support generator. But the previous commit was override by mistake, I will fix it tonight.

@liunux4odoo liunux4odoo changed the title support generator for LocalJob & ThreadJob support generator for LocalJob & ThreadJob & DaskJob Sep 25, 2024
@liunux4odoo
Copy link
Contributor Author

Thanks again for your contribution. I see you are trying Dask. If you are ready, please remind me to merge.

@Nanguage All four Job types support generator now, plz review it.

@Nanguage
Copy link
Owner

Would you be interested in adding some commits to ensure that the CI tests pass? If you'd prefer not to write that code, that's perfectly fine—I can go ahead and merge as is.

@liunux4odoo
Copy link
Contributor Author

Would you be interested in adding some commits to ensure that the CI tests pass? If you'd prefer not to write that code, that's perfectly fine—I can go ahead and merge as is.

These are lint errors:

executor/engine/job/utils.py:56: error: Name "_thread_locals" is not defined  [name-defined]
executor/engine/job/utils.py:5[8](https://github.com/Nanguage/executor-engine/actions/runs/11024907333/job/30618876230?pr=6#step:6:9): error: No overload variant of "next" matches argument type "Future[Any]"  [call-overload]
executor/engine/job/utils.py:58: note: Possible overload variants:
executor/engine/job/utils.py:58: note:     def [_T] next(SupportsNext[_T], /) -> _T
executor/engine/job/utils.py:58: note:     def [_T, _VT] next(SupportsNext[_T], _VT, /) -> Union[_T, _VT]
executor/engine/job/utils.py:64: error: Name "_thread_locals" is not defined  [name-defined]
executor/engine/job/utils.py:66: error: "Future[Any]" has no attribute "__anext__"  [attr-defined]
Found 4 errors in 1 file (checked 22 source files)
  1. the _thread_locals variable is dynamically defined because I don't want to mix it in the main thread.
  2. the Future.__anext__ method exists only for future returned by DaskJob, I'm not sure howto fix this lint error.

@Nanguage Nanguage merged commit 264bfb8 into Nanguage:master Sep 27, 2024
0 of 3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants