From edd4594b1a6f84c75e628450946b4e120ba8fc5d Mon Sep 17 00:00:00 2001 From: yanang007 Date: Tue, 29 Nov 2022 15:46:07 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20dask=E5=8D=95=E7=BA=BF=E7=A8=8Bwor?= =?UTF-8?q?ker=E5=90=AF=E5=8A=A8=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- misc/startup.py | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 misc/startup.py diff --git a/misc/startup.py b/misc/startup.py new file mode 100644 index 0000000..6a3ef7a --- /dev/null +++ b/misc/startup.py @@ -0,0 +1,46 @@ +""" +用于启动一个单一主线程的dask worker,防止一些存在线程安全问题的库出现问题 +""" + +import click +from distributed import Worker +from distributed.cli.dask_worker import main +from concurrent.futures import Future, Executor + + +class DummyWorker(Worker): + def __init__(self, *args, **kwargs): + print('Dummy啦') + super().__init__(*args, executor=DummyLinearExecutor(), **kwargs) + + +class DummyLinearExecutor(Executor): + def __init__(self): + pass + + def submit(self, fn, /, *args, **kwargs): + fut = Future() + result = fn(*args, **kwargs) + fut.set_result(result) + + return fut + + +if __name__ == "__main__": + use_dummy_executor = True # 是否使用上文实现的单线程执行器,否则使用dask的offload执行器 + if use_dummy_executor: + click.option( + "--worker-class", + type=str, + default='startup.DummyWorker', + show_default=True, + help="Default executor.", + )(main)() + else: + click.option( + "--executor", + type=str, + default="offload", + show_default=True, + help="Default executor.", + )(main)()