diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 5aff53bda7..2a3332aaaa 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -1256,7 +1256,39 @@ def cleanup(self) -> None: executor.shutdown() logger.info(f"Shut down executor {executor.label}") + if hasattr(executor, 'provider'): + logger.info(f"Closing channel(s) for {executor.label}") + + # The reasoning about which of .channel or .channels should + # be present is not well described (or describable) in the + # type system, so there are a lot of asserts here that + # attempt to describe the non-checked assumptions. + + # This logic is based on the logic in add_executors. + + # These two asserts could be an XOR but the 'and' and 'or' + # components of the XOR are separated here to give different + # error text. + assert hasattr(executor.provider, 'channel') or hasattr(executor.provider, 'channels'), \ + "The provider model assumes a provider has channel(s)" + assert not (hasattr(executor.provider, 'channel') and hasattr(executor.provider, 'channels')), \ + "The provider model assumes a provider does not have .channel and .channels" + + if hasattr(executor.provider, 'channels'): + for channel in executor.provider.channels: + channel.close() + else: + assert hasattr(executor.provider, 'channel'), "If provider has no .channels, it must have .channel" + executor.provider.channel.close() + + logger.info(f"Closed executor channel(s) for {executor.label}") + logger.info("Terminated executors") + + logger.info("Closing channels") + + logger.info("Closed channels") + self.time_completed = datetime.datetime.now() if self.monitoring: