Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition in resource_tracker in multithreaded access to a shared reusable executor #368

Open
ogrisel opened this issue Sep 14, 2022 · 0 comments

Comments

@ogrisel
Copy link
Collaborator

ogrisel commented Sep 14, 2022

Found on the joblib CI:

================================== FAILURES ===================================
_______ test_multithreaded_parallel_termination_resource_tracker_silent _______

    @with_numpy
    @with_multiprocessing
    def test_multithreaded_parallel_termination_resource_tracker_silent():
        # test that concurrent termination attempts of a same executor does not
        # emit any spurious error from the resource_tracker. We test various
        # situations making 0, 1 or both parallel call sending a task that will
        # make the worker (and thus the whole Parallel call) error out.
        cmd = '''if 1:
            import os
            import numpy as np
            from joblib import Parallel, delayed
            from joblib.externals.loky.backend import resource_tracker
            from concurrent.futures import ThreadPoolExecutor, wait
    
            resource_tracker.VERBOSE = 0
    
            array = np.arange(int(1e2))
    
            temp_dirs_thread_1 = set()
            temp_dirs_thread_2 = set()
    
    
            def raise_error(array):
                raise ValueError
    
    
            def parallel_get_filename(array, temp_dirs):
                with Parallel(backend="loky", n_jobs=2, max_nbytes=10) as p:
                    for i in range(10):
                        [filename] = p(
                            delayed(getattr)(array, "filename") for _ in range(1)
                        )
                        temp_dirs.add(os.path.dirname(filename))
    
    
            def parallel_raise(array, temp_dirs):
                with Parallel(backend="loky", n_jobs=2, max_nbytes=10) as p:
                    for i in range(10):
                        [filename] = p(
                            delayed(raise_error)(array) for _ in range(1)
                        )
                        temp_dirs.add(os.path.dirname(filename))
    
    
            executor = ThreadPoolExecutor(max_workers=2)
    
            # both function calls will use the same loky executor, but with a
            # different Parallel object.
            future_1 = executor.submit({f1}, array, temp_dirs_thread_1)
            future_2 = executor.submit({f2}, array, temp_dirs_thread_2)
    
            # Wait for both threads to terminate their backend
            wait([future_1, future_2])
    
            future_1.result()
            future_2.result()
        '''
        functions_and_returncodes = [
            ("parallel_get_filename", "parallel_get_filename", 0),
            ("parallel_get_filename", "parallel_raise", 1),
            ("parallel_raise", "parallel_raise", 1)
        ]
    
        for f1, f2, returncode in functions_and_returncodes:
            p = subprocess.Popen([sys.executable, '-c', cmd.format(f1=f1, f2=f2)],
                                 stderr=subprocess.PIPE, stdout=subprocess.PIPE)
            p.wait()
            out, err = p.communicate()
            assert p.returncode == returncode, out.decode()
>           assert b"resource_tracker" not in err, err.decode()
E           AssertionError: Traceback (most recent call last):

E               File "D:\a\1\s\joblib\externals\loky\backend\resource_tracker.py", line 271, in main

E                 del registry[rtype][name]

E             KeyError: 'C:\\Users\\VSSADM~1\\AppData\\Local\\Temp\\joblib_memmapping_folder_5004_ddaa41808b9d41aab90347433a6b6a61_66fdc615c3aa4ca8b521b721c7605b04\\5004-2017530151456-b29a7e35c0604f13bdf92bca990a7613.pkl'

E             Traceback (most recent call last):

E               File "<string>", line 48, in <module>

E               File "C:\Miniconda\envs\testenv\lib\concurrent\futures\_base.py", line 451, in result

E                 return self.__get_result()

E               File "C:\Miniconda\envs\testenv\lib\concurrent\futures\_base.py", line 403, in __get_result

E                 raise self._exception

E               File "C:\Miniconda\envs\testenv\lib\concurrent\futures\thread.py", line 58, in run

E                 result = self.fn(*self.args, **self.kwargs)

E               File "<string>", line 23, in parallel_get_filename

E               File "D:\a\1\s\joblib\parallel.py", line 1085, in __call__

E                 if self.dispatch_one_batch(iterator):

E               File "D:\a\1\s\joblib\parallel.py", line 901, in dispatch_one_batch

E                 self._dispatch(tasks)

E               File "D:\a\1\s\joblib\parallel.py", line 819, in _dispatch

E                 job = self._backend.apply_async(batch, callback=cb)

E               File "D:\a\1\s\joblib\_parallel_backends.py", line 556, in apply_async

E                 future = self._workers.submit(SafeFunction(func))

E               File "D:\a\1\s\joblib\externals\loky\reusable_executor.py", line 176, in submit

E                 return super().submit(fn, *args, **kwargs)

E               File "D:\a\1\s\joblib\externals\loky\process_executor.py", line 1116, in submit

E                 raise ShutdownExecutorError(

E             joblib.externals.loky.process_executor.ShutdownExecutorError: cannot schedule new futures after shutdown

E             
E           assert b'resource_tracker' not in b'Traceback (most recent call last):\r\n  File "D:\\a\\1\\s\\joblib\\externals\\loky\\backend\\resource_tracker.py", line 271, in main\r\n    del registry[rtype][name]\r\nKeyError: \'C:\\\\Users\\\\VSSADM~1\\\\AppData\\\\Local\\\\Temp\\\\joblib_memmapping_folder_5004_ddaa41808b9d41aab90347433a6b6a61_66fdc615c3aa4ca8b521b721c7605b04\\\\5004-2017530151456-b29a7e35c0604f13bdf92bca990a7613.pkl\'\r\nTraceback (most recent call last):\r\n  File "<string>", line 48, in <module>\r\n  File "C:\\Miniconda\\envs\\testenv\\lib\\concurrent\\futures\\_base.py", line 451, in result\r\n    return self.__get_result()\r\n  File "C:\\Miniconda\\envs\\testenv\\lib\\concurrent\\futures\\_base.py", line 403, in __get_result\r\n    raise self._exception\r\n  File "C:\\Miniconda\\envs\\testenv\\lib\\concurrent\\futures\\thread.py", line 58, in run\r\n    result = self.fn(*self.args, **self.kwargs)\r\n  File "<string>", line 23, in parallel_get_filename\r\n  File "D:\\a\\1\\s\\joblib\\parallel.py", line 1085, in __call__\r\n    if self.dispatch_one_batch(iterator):\r\n  File "D:\\a\\1\\s\\joblib\\parallel.py", line 901, in dispatch_one_batch\r\n    self._dispatch(tasks)\r\n  File "D:\\a\\1\\s\\joblib\\parallel.py", line 819, in _dispatch\r\n    job = self._backend.apply_async(batch, callback=cb)\r\n  File "D:\\a\\1\\s\\joblib\\_parallel_backends.py", line 556, in apply_async\r\n    future = self._workers.submit(SafeFunction(func))\r\n  File "D:\\a\\1\\s\\joblib\\externals\\loky\\reusable_executor.py", line 176, in submit\r\n    return super().submit(fn, *args, **kwargs)\r\n  File "D:\\a\\1\\s\\joblib\\externals\\loky\\process_executor.py", line 1116, in submit\r\n    raise ShutdownExecutorError(\r\njoblib.externals.loky.process_executor.ShutdownExecutorError: cannot schedule new futures after shutdown\r\n'

cmd        = 'if 1:\n        import os\n        import numpy as np\n        from joblib import Parallel, delayed\n        from jobl...minate their backend\n        wait([future_1, future_2])\n\n        future_1.result()\n        future_2.result()\n    '
err        = b'Traceback (most recent call last):\r\n  File "D:\\a\\1\\s\\joblib\\externals\\loky\\backend\\resource_tracker.py", l...Error(\r\njoblib.externals.loky.process_executor.ShutdownExecutorError: cannot schedule new futures after shutdown\r\n'
f1         = 'parallel_get_filename'
f2         = 'parallel_raise'
functions_and_returncodes = [('parallel_get_filename', 'parallel_get_filename', 0), ('parallel_get_filename', 'parallel_raise', 1), ('parallel_raise', 'parallel_raise', 1)]
out        = b''
p          = <Popen: returncode: 1 args: ['C:\\Miniconda\\envs\\testenv\\python.exe', '-c...>
returncode = 1

joblib\test\test_memmapping.py:586: AssertionError

This happened on a windows host but it it's probably not strictly related to a specific OS.

https://dev.azure.com/joblib/joblib/_build/results?buildId=2240&view=logs&j=8008bb8c-6b4f-56d5-66f4-b1d037aba1bf&t=7af8808d-3494-56e0-c1dc-6fbf5bb2bc40

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant