Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

error in parallel documentation #2915

Open
jkitchin opened this issue Oct 15, 2023 · 13 comments
Open

error in parallel documentation #2915

jkitchin opened this issue Oct 15, 2023 · 13 comments

Comments

@jkitchin
Copy link

This example does not work as written:

outputs = pool.map(generate, range(1, 5))

I think it should be either:

with ParslPoolExecutor(config) as exec:
    outputs = exec.map(generate, range(1, 5))

or

with ParslPoolExecutor(config) as pool:
    outputs = pool.map(generate, range(1, 5))

The example also does not work for me on a Mac unless I change the import to from parsl.configs.local_threads import config. For the given import from parsl.configs.htex_local import config I get this error:

      3 from parsl.configs.htex_local import config
----> 5 with ParslPoolExecutor(config) as exec:
      6     outputs = exec.map(generate, range(1, 5))
      8 for result in outputs:

File ~/anaconda3/lib/python3.10/site-packages/parsl/concurrent/__init__.py:31, in ParslPoolExecutor.__init__(self, config)
     25 """Create the executor
     26 
     27 Args:
     28     config: Configuration for the Parsl Data Flow Kernel (DFK)
     29 """
     30 self._config = config
---> 31 self.dfk = DataFlowKernel(self._config)
     32 self._app_cache: Dict[Callable, PythonApp] = {}

File ~/anaconda3/lib/python3.10/site-packages/typeguard/__init__.py:1033, in typechecked.<locals>.wrapper(*args, **kwargs)
   1031 memo = _CallMemo(python_func, _localns, args=args, kwargs=kwargs)
   1032 check_argument_types(memo)
-> 1033 retval = func(*args, **kwargs)
   1034 try:
   1035     check_return_type(retval, memo)

File ~/anaconda3/lib/python3.10/site-packages/parsl/dataflow/dflow.py:185, in DataFlowKernel.__init__(self, config)
    183 self.data_manager = DataManager(self)
    184 parsl_internal_executor = ThreadPoolExecutor(max_threads=config.internal_tasks_max_threads, label='_parsl_internal')
--> 185 self.add_executors(config.executors)
    186 self.add_executors([parsl_internal_executor])
    188 if self.checkpoint_mode == "periodic":

File ~/anaconda3/lib/python3.10/site-packages/parsl/dataflow/dflow.py:1122, in DataFlowKernel.add_executors(self, executors)
   1119             self._create_remote_dirs_over_channel(executor.provider, executor.provider.channel)
   1121 self.executors[executor.label] = executor
-> 1122 block_ids = executor.start()
   1123 if self.monitoring and block_ids:
   1124     new_status = {}

File ~/anaconda3/lib/python3.10/site-packages/parsl/executors/high_throughput/executor.py:365, in HighThroughputExecutor.start(self)
    361 self._start_local_interchange_process()
    363 logger.debug("Created management thread: {}".format(self._queue_management_thread))
--> 365 block_ids = self.initialize_scaling()
    366 return block_ids

File ~/anaconda3/lib/python3.10/site-packages/parsl/executors/high_throughput/executor.py:320, in HighThroughputExecutor.initialize_scaling(self)
    317 if self.worker_logdir_root is not None:
    318     worker_logdir = "{}/{}".format(self.worker_logdir_root, self.label)
--> 320 l_cmd = self.launch_cmd.format(debug=debug_opts,
    321                                prefetch_capacity=self.prefetch_capacity,
    322                                address_probe_timeout_string=address_probe_timeout_string,
    323                                addresses=self.all_addresses,
    324                                task_port=self.worker_task_port,
    325                                result_port=self.worker_result_port,
    326                                cores_per_worker=self.cores_per_worker,
    327                                mem_per_worker=self.mem_per_worker,
    328                                max_workers=max_workers,
    329                                nodes_per_block=self.provider.nodes_per_block,
    330                                heartbeat_period=self.heartbeat_period,
    331                                heartbeat_threshold=self.heartbeat_threshold,
    332                                poll_period=self.poll_period,
    333                                logdir=worker_logdir,
    334                                cpu_affinity=self.cpu_affinity,
    335                                accelerators=" ".join(self.available_accelerators),
    336                                start_method=self.start_method)
    337 self.launch_cmd = l_cmd
    338 logger.debug("Launch command: {}".format(self.launch_cmd))

KeyError: 'block_id'
@benclifford
Copy link
Collaborator

For the first part, that's probably something for @WardLT

For the block_id key error, that's something surprising - can you tell me the exact version of parsl that you have installer?

@jkitchin
Copy link
Author

pip show parsl
Name: parsl
Version: 2023.7.10
Summary: Simple data dependent workflows in Python
Home-page: https://github.com/Parsl/parsl
Author: The Parsl Team
Author-email: [email protected]
License: Apache 2.0
Location: /Users/jkitchin/anaconda3/lib/python3.10/site-packages
Requires: dill, globus-sdk, paramiko, psutil, pyzmq, requests, setproctitle, six, tblib, typeguard, types-paramiko, types-requests, types-six, typing-extensions
Required-by: 

@benclifford
Copy link
Collaborator

For the block id key error:

That parameter should not be substituted at that point of execution, because the block_id in the default template uses double curly brackets:

"--block_id={{block_id}} "

What that error says is that for some reason the substitution code is treating that as something to substitute.

Two questions:

  1. Are you setting your own launch command in the htex constructor?
  2. Are you reusing a HighThroughputExecutor in two parsl.load calls? I have a strong suspicion that isn't detected, even though it is behaviour that isn't allowed.

@jkitchin
Copy link
Author

I see what you mean. Yes, it does appear a double load is the issue there. In the script I had

parsl.load(config) and with ParslPoolExecutor(config) as pool:, which seems to cause that issue. It seems like something else is awry though.

This code:

import parsl, time
from parsl import python_app
from parsl.concurrent import ParslPoolExecutor

from parsl.configs.local_threads import config

parsl.load(config)

@python_app
def generate(limit):
    from random import randint
    """Generate a random integer and return it"""
    time.sleep(5)
    return randint(1, limit)


with ParslPoolExecutor(config) as pool:
    outputs = pool.map(generate, range(1, 11))

print([x.result() for x in outputs])

does not work if I don't load the config first, and raises RuntimeError: Must first load config if I don't.

This code:

import parsl, time
from parsl import python_app
from parsl.concurrent import ParslPoolExecutor
from parsl.configs.htex_local import config


parsl.load(config)

@python_app
def generate(limit):
    from random import randint
    """Generate a random integer and return it"""
    time.sleep(5)
    return randint(1, limit)


with ParslPoolExecutor(config) as pool:
    outputs = pool.map(generate, range(1, 11))

print([x.result() for x in outputs])

raises KeyError: 'block_id' if I have parsl.load(config), but fails differently with TypeError: Cannot subclass special typing classes if I comment that out.

It seems like there are two issues here, the original documentation issue, and this one. Should I split this into a second issue?

@benclifford
Copy link
Collaborator

That 2nd new error is a different error I've encountered before and should be fixed elsewhere - let me go find it. Something to do with package versions...

@jkitchin
Copy link
Author

Here is the whole traceback:

Traceback (most recent call last):
  File "<stdin>", line 19, in <module>
  File "<stdin>", line 19, in <listcomp>
  File "/Users/jkitchin/anaconda3/lib/python3.10/site-packages/parsl/concurrent/__init__.py", line 74, in result_iterator
    yield fs.pop().result()
  File "/Users/jkitchin/anaconda3/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/Users/jkitchin/anaconda3/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/Users/jkitchin/anaconda3/lib/python3.10/site-packages/parsl/dataflow/dflow.py", line 300, in handle_exec_update
    res = self._unwrap_remote_exception_wrapper(future)
  File "/Users/jkitchin/anaconda3/lib/python3.10/site-packages/parsl/dataflow/dflow.py", line 570, in _unwrap_remote_exception_wrapper
    result = future.result()
  File "/Users/jkitchin/anaconda3/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/Users/jkitchin/anaconda3/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/Users/jkitchin/anaconda3/lib/python3.10/site-packages/parsl/executors/high_throughput/executor.py", line 443, in _queue_management_worker
    s.reraise()
  File "/Users/jkitchin/anaconda3/lib/python3.10/site-packages/parsl/app/errors.py", line 123, in reraise
    reraise(t, v, v.__traceback__)
  File "/Users/jkitchin/anaconda3/lib/python3.10/site-packages/six.py", line 719, in reraise
    raise value
  File "/Users/jkitchin/anaconda3/bin/process_worker_pool.py", line 596, in worker
    result = execute_task(req['buffer'])
  File "/Users/jkitchin/anaconda3/bin/process_worker_pool.py", line 489, in execute_task
    f, args, kwargs = unpack_apply_message(bufs, user_ns, copy=False)
  File "/Users/jkitchin/anaconda3/lib/python3.10/site-packages/parsl/serialize/facade.py", line 52, in unpack_apply_message
    return [deserialize(buf) for buf in unpack_buffers(packed_buffer)]
  File "/Users/jkitchin/anaconda3/lib/python3.10/site-packages/parsl/serialize/facade.py", line 52, in <listcomp>
    return [deserialize(buf) for buf in unpack_buffers(packed_buffer)]
  File "/Users/jkitchin/anaconda3/lib/python3.10/site-packages/parsl/serialize/facade.py", line 95, in deserialize
    result = methods_for_code[header].deserialize(body)
  File "/Users/jkitchin/anaconda3/lib/python3.10/site-packages/parsl/serialize/concretes.py", line 89, in deserialize
    return dill.loads(body)
  File "/Users/jkitchin/anaconda3/lib/python3.10/site-packages/dill/_dill.py", line 286, in loads
    return load(file, ignore, **kwds)
  File "/Users/jkitchin/anaconda3/lib/python3.10/site-packages/dill/_dill.py", line 272, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "/Users/jkitchin/anaconda3/lib/python3.10/site-packages/dill/_dill.py", line 419, in load
    obj = StockUnpickler.load(self)
  File "/Users/jkitchin/anaconda3/lib/python3.10/site-packages/dill/_dill.py", line 568, in _create_type
    return typeobj(*args)
  File "/Users/jkitchin/anaconda3/lib/python3.10/typing.py", line 348, in __init_subclass__
    raise TypeError("Cannot subclass special typing classes")
TypeError: Cannot subclass special typing classes

@benclifford
Copy link
Collaborator

#2678

PRs around the time of the version of parsl you are using deal with it so you might have a version that is right in the window of being broken. So if possible try a more recent parsl which should specify tighter version constraints for the troublesome packages.

@benclifford
Copy link
Collaborator

Part of this issue highlights the need for parsl to be more aggressive about detecting multiple configuration uses - I'll open a feature request for that

@jkitchin
Copy link
Author

jkitchin commented Oct 15, 2023

I updated to parsl-2023.10.9. Now, I do have to load that config to avoid parsl.errors.NoDataFlowKernelError: Must first load config, but when I do load it, I am back to KeyError: 'block_id'. (with from parsl.configs.htex_local import config).

from parsl.configs.local_threads import config still works fine.

@benclifford
Copy link
Collaborator

Here's the automated test case that runs for the ParslExecutor feature:

34fe75f#diff-534bab1124fd7d79ea9fbe4cdcfe7321d646f99e644336e050f8189ac038e049

Are you decorating the function you are trying to pass into ParslExecutor with the python_app decorator? If so, don't do that.

@jkitchin
Copy link
Author

jkitchin commented Oct 15, 2023

I see. This works.

from parsl.concurrent import ParslPoolExecutor
from parsl.configs.htex_local import config

def generate(limit):
    from random import randint
    """Generate a random integer and return it"""
    return randint(1, limit)


with ParslPoolExecutor(config) as pool:
    outputs = pool.map(generate, range(1, 11))

print(list(outputs))

It is still confusing in the documentation. Here

def generate(limit):
the function is decorated.

technically, here:

@python_app

@WardLT
Copy link
Contributor

WardLT commented Oct 16, 2023

Ah ha, that is a problem in the docs. Sorry I missed that earlier.

Yes, generate must be decorated when using it as an app which produces its own futures, as in

for i in range(1,5):

but not when using it as part of the "ParslPoolExecutor"

I'll make a quick PR that clears up when to decorate and not in that part of the documentation.

@WardLT
Copy link
Contributor

WardLT commented Oct 16, 2023

I updated to parsl-2023.10.9. Now, I do have to load that config to avoid parsl.errors.NoDataFlowKernelError: Must first load config, but when I do load it, I am back to KeyError: 'block_id'. (with from parsl.configs.htex_local import config).

I bet this is a result of re-using a configuration between multiple executors. Check out #2871 . I'm going to open an Issue with some ideas around how to make the error more informative

benclifford pushed a commit that referenced this issue Oct 24, 2023
Clarifies problems users found in the documentation

 error in parallel documentation #2915: Decorators are not needed for ParslPoolExecutor.map
 Reloading default HighThroughputExecutor after parsl.clear() results in KeyError: 'block_id' exception #2871 : Configs may not be re-used
 Fixes Docs: Further clarification on how serialization impacts where imports need to be #2918 : Explain when functions need imports
 Fixes Allowing for inputs and outputs to take immutable types #2925 : Explain mutable types are undesirable

Thanks, @Andrew-S-Rosen
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants