From 7ea40418e8e9badfcdfb4462202ca958af62bfde Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 10 Jan 2023 20:40:05 +0000 Subject: [PATCH 1/2] low latency executor has been broken since at least commit 674291f5818a960703ae832d62a41b3309597fb4 Author: Ben Clifford Date: Wed Aug 25 12:26:38 2021 +0000 Abstract more block handling from HighThroughputExecutor and share with WorkQueue (#2071) when _get_launch_command was introduced removes 5-10 errors --- docs/reference.rst | 1 - parsl/executors/__init__.py | 2 - parsl/executors/low_latency/__init__.py | 0 parsl/executors/low_latency/executor.py | 272 ------------------ parsl/executors/low_latency/interchange.py | 132 --------- .../low_latency/lowlatency_worker.py | 138 --------- parsl/executors/low_latency/zmq_pipes.py | 80 ------ parsl/tests/low_latency/__init__.py | 0 parsl/tests/low_latency/constants.py | 2 - parsl/tests/low_latency/executor.py | 189 ------------ parsl/tests/low_latency/interchange.py | 69 ----- parsl/tests/low_latency/utils.py | 16 -- parsl/tests/low_latency/worker.py | 109 ------- parsl/tests/manual_tests/llex_local.py | 24 -- setup.py | 1 - 15 files changed, 1035 deletions(-) delete mode 100644 parsl/executors/low_latency/__init__.py delete mode 100644 parsl/executors/low_latency/executor.py delete mode 100644 parsl/executors/low_latency/interchange.py delete mode 100644 parsl/executors/low_latency/lowlatency_worker.py delete mode 100644 parsl/executors/low_latency/zmq_pipes.py delete mode 100644 parsl/tests/low_latency/__init__.py delete mode 100644 parsl/tests/low_latency/constants.py delete mode 100644 parsl/tests/low_latency/executor.py delete mode 100644 parsl/tests/low_latency/interchange.py delete mode 100644 parsl/tests/low_latency/utils.py delete mode 100644 parsl/tests/low_latency/worker.py delete mode 100644 parsl/tests/manual_tests/llex_local.py diff --git a/docs/reference.rst b/docs/reference.rst index 9485f7b6ff..a0e46442fb 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -77,7 +77,6 @@ Executors parsl.executors.HighThroughputExecutor parsl.executors.WorkQueueExecutor parsl.executors.ExtremeScaleExecutor - parsl.executors.LowLatencyExecutor parsl.executors.FluxExecutor parsl.executors.swift_t.TurbineExecutor diff --git a/parsl/executors/__init__.py b/parsl/executors/__init__.py index 9c61e346f7..e39a8b051e 100644 --- a/parsl/executors/__init__.py +++ b/parsl/executors/__init__.py @@ -2,12 +2,10 @@ from parsl.executors.workqueue.executor import WorkQueueExecutor from parsl.executors.high_throughput.executor import HighThroughputExecutor from parsl.executors.extreme_scale.executor import ExtremeScaleExecutor -from parsl.executors.low_latency.executor import LowLatencyExecutor from parsl.executors.flux.executor import FluxExecutor __all__ = ['ThreadPoolExecutor', 'HighThroughputExecutor', 'ExtremeScaleExecutor', - 'LowLatencyExecutor', 'WorkQueueExecutor', 'FluxExecutor'] diff --git a/parsl/executors/low_latency/__init__.py b/parsl/executors/low_latency/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/parsl/executors/low_latency/executor.py b/parsl/executors/low_latency/executor.py deleted file mode 100644 index fd11cc8e75..0000000000 --- a/parsl/executors/low_latency/executor.py +++ /dev/null @@ -1,272 +0,0 @@ -"""LowLatencyExecutor for low latency task/lambda-function execution -""" - -from concurrent.futures import Future -import logging -import threading -import queue -from multiprocessing import Process, Queue - -from parsl.serialize import pack_apply_message, deserialize -from parsl.executors.low_latency import zmq_pipes -from parsl.executors.low_latency import interchange -from parsl.executors.errors import ScalingFailed, DeserializationError, BadMessage, UnsupportedFeatureError -from parsl.executors.status_handling import BlockProviderExecutor -from parsl.utils import RepresentationMixin -from parsl.providers import LocalProvider - -logger = logging.getLogger(__name__) - - -class LowLatencyExecutor(BlockProviderExecutor, RepresentationMixin): - """ - TODO: docstring for LowLatencyExecutor - """ - - def __init__(self, - label='LowLatencyExecutor', - provider=LocalProvider(), - launch_cmd=None, - address="127.0.0.1", - worker_port=None, - worker_port_range=(54000, 55000), - interchange_port_range=(55000, 56000), - # storage_access=None, - working_dir=None, - worker_debug=False, - workers_per_node=1, - # cores_per_worker=1.0, - ): - logger.debug("Initializing LowLatencyExecutor") - - BlockProviderExecutor.__init__(self, provider=provider) - self.label = label - self.launch_cmd = launch_cmd - self.provider = provider - self.worker_debug = worker_debug - # self.storage_access = storage_access if storage_access is not None else [] - # if len(self.storage_access) > 1: - # raise ConfigurationError('Multiple storage access schemes are not supported') - self.working_dir = working_dir - self.blocks = [] - self.workers_per_node = workers_per_node - - self._task_counter = 0 - self.address = address - self.worker_port = worker_port - self.worker_port_range = worker_port_range - self.interchange_port_range = interchange_port_range - self.run_dir = '.' - - # TODO: add debugging, logdir, other functionality to workers - if not launch_cmd: - self.launch_cmd = """lowlatency_worker.py -n {workers_per_node} --task_url={task_url} --logdir={logdir}""" - - def start(self): - """Create the Interchange process and connect to it. - """ - self.outgoing_q = zmq_pipes.TasksOutgoing( - "127.0.0.1", self.interchange_port_range) - self.incoming_q = zmq_pipes.ResultsIncoming( - "127.0.0.1", self.interchange_port_range) - - self.is_alive = True - - self._queue_management_thread = None - self._start_queue_management_thread() - self._start_local_queue_process() - - logger.debug("Created management thread: {}" - .format(self._queue_management_thread)) - - if self.provider: - # debug_opts = "--debug" if self.worker_debug else "" - l_cmd = self.launch_cmd.format( # debug=debug_opts, - task_url=self.worker_task_url, - workers_per_node=self.workers_per_node, - logdir="{}/{}".format(self.run_dir, self.label)) - self.launch_cmd = l_cmd - logger.debug("Launch command: {}".format(self.launch_cmd)) - - logger.debug( - "Starting LowLatencyExecutor with provider:\n%s", self.provider) - if hasattr(self.provider, 'init_blocks'): - try: - for i in range(self.provider.init_blocks): - block = self.provider.submit( - self.launch_cmd, self.workers_per_node) - logger.debug("Launched block {}:{}".format(i, block)) - if not block: - raise(ScalingFailed(self, - "Attempts to provision nodes via provider has failed")) - self.blocks.extend([block]) - - except Exception as e: - logger.error("Scaling out failed: {}".format(e)) - raise e - else: - logger.debug("Starting LowLatencyExecutor with no provider") - - def _start_local_queue_process(self): - """ TODO: docstring """ - - comm_q = Queue(maxsize=10) - self.queue_proc = Process(target=interchange.starter, - args=(comm_q,), - kwargs={"client_ports": (self.outgoing_q.port, - self.incoming_q.port), - "worker_port": self.worker_port, - "worker_port_range": self.worker_port_range - # TODO: logdir and logging level - }) - self.queue_proc.start() - - try: - worker_port = comm_q.get(block=True, timeout=120) - logger.debug( - "Got worker port {} from interchange".format(worker_port)) - except queue.Empty: - logger.error( - "Interchange has not completed initialization in 120s. Aborting") - raise Exception("Interchange failed to start") - - self.worker_task_url = "tcp://{}:{}".format( - self.address, worker_port) - - def _start_queue_management_thread(self): - """ TODO: docstring """ - if self._queue_management_thread is None: - logger.debug("Starting queue management thread") - self._queue_management_thread = threading.Thread( - target=self._queue_management_worker) - self._queue_management_thread.daemon = True - self._queue_management_thread.start() - logger.debug("Started queue management thread") - - else: - logger.debug("Management thread already exists, returning") - - def _queue_management_worker(self): - """ TODO: docstring """ - logger.debug("[MTHREAD] queue management worker starting") - - while not self.bad_state_is_set: - task_id, buf = self.incoming_q.get() # TODO: why does this hang? - msg = deserialize(buf)[0] - # TODO: handle exceptions - task_fut = self.tasks[task_id] - logger.debug("Got response for task id {}".format(task_id)) - - if "result" in msg: - task_fut.set_result(msg["result"]) - - elif "exception" in msg: - # TODO: handle exception - pass - elif 'exception' in msg: - logger.warning("Task: {} has returned with an exception") - try: - s = deserialize(msg['exception']) - exception = ValueError("Remote exception description: {}".format(s)) - task_fut.set_exception(exception) - except Exception as e: - # TODO could be a proper wrapped exception? - task_fut.set_exception( - DeserializationError("Received exception, but handling also threw an exception: {}".format(e))) - - else: - raise BadMessage( - "Message received is neither result nor exception") - - if not self.is_alive: - break - - logger.info("[MTHREAD] queue management worker finished") - - def submit(self, func, resource_specification, *args, **kwargs): - """ TODO: docstring """ - if resource_specification: - logger.error("Ignoring the resource specification. " - "Parsl resource specification is not supported in LowLatency Executor. " - "Please check WorkQueueExecutor if resource specification is needed.") - raise UnsupportedFeatureError('resource specification', 'LowLatency Executor', 'WorkQueue Executor') - - if self.bad_state_is_set: - raise self.executor_exception - - self._task_counter += 1 - task_id = self._task_counter - - logger.debug( - "Pushing function {} to queue with args {}".format(func, args)) - - self.tasks[task_id] = Future() - - fn_buf = pack_apply_message(func, args, kwargs, - buffer_threshold=1024 * 1024) - - # Post task to the the outgoing queue - self.outgoing_q.put(task_id, fn_buf) - - # Return the future - return self.tasks[task_id] - - def scale_out(self, blocks=1): - """Scales out the number of active workers by the number of blocks specified. - - Parameters - ---------- - - blocks : int - # of blocks to scale out. Default=1 - """ - r = [] - for i in range(blocks): - if self.provider: - try: - block = self.provider.submit( - self.launch_cmd, self.workers_per_node) - logger.debug("Launched block {}:{}".format(i, block)) - # TODO: use exceptions for this - if not block: - self._fail_job_async(None, "Failed to launch block") - self.blocks.extend([block]) - except Exception as ex: - self._fail_job_async(None, "Failed to launch block: {}".format(ex)) - else: - logger.error("No execution provider available") - r = None - return r - - def scale_in(self, blocks): - """Scale in the number of active blocks by specified amount. - - The scale in method here is very rude. It doesn't give the workers - the opportunity to finish current tasks or cleanup. This is tracked - in issue #530 - """ - to_kill = self.blocks[:blocks] - if self.provider: - r = self.provider.cancel(to_kill) - return self._filter_scale_in_ids(to_kill, r) - - def _get_job_ids(self): - return self.blocks - - def shutdown(self, hub=True, targets='all', block=False): - """Shutdown the executor, including all workers and controllers. - - This is not implemented. - - Kwargs: - - hub (Bool): Whether the hub should be shutdown, Default:True, - - targets (list of ints| 'all'): List of block id's to kill, Default:'all' - - block (Bool): To block for confirmations or not - """ - - logger.warning("Attempting LowLatencyExecutor shutdown") - # self.outgoing_q.close() - # self.incoming_q.close() - self.queue_proc.terminate() - logger.warning("Finished LowLatencyExecutor shutdown attempt") - return True diff --git a/parsl/executors/low_latency/interchange.py b/parsl/executors/low_latency/interchange.py deleted file mode 100644 index 9b7118a66c..0000000000 --- a/parsl/executors/low_latency/interchange.py +++ /dev/null @@ -1,132 +0,0 @@ -#!/usr/bin/env python3 - -import logging -import zmq -# import time - - -class Interchange(object): - """ TODO: docstring """ - - def __init__(self, - client_address="127.0.0.1", - client_ports=(50055, 50056), - worker_port=None, - worker_port_range=(54000, 55000) - ): - global logger - start_file_logger("interchange.log") - logger.info("Init Interchange") - - self.context = zmq.Context() - self.task_incoming = self.context.socket(zmq.ROUTER) - self.result_outgoing = self.context.socket(zmq.DEALER) - self.worker_messages = self.context.socket(zmq.DEALER) - - self.result_outgoing.set_hwm(0) - - task_address = "tcp://{}:{}".format(client_address, client_ports[0]) - result_address = "tcp://{}:{}".format(client_address, client_ports[1]) - self.task_incoming.connect(task_address) - self.result_outgoing.connect(result_address) - - logger.debug("Client task address: {}".format(task_address)) - logger.debug("Client result address: {}".format(result_address)) - - self.worker_port = worker_port - self.worker_port_range = worker_port_range - - if self.worker_port: - worker_task_address = "tcp://*:{}".format(self.worker_port) - self.worker_messages.bind(worker_task_address) - logger.debug("Worker task address: {}".format(worker_task_address)) - - else: - self.worker_port = self.worker_messages.bind_to_random_port( - 'tcp://*', - min_port=worker_port_range[0], - max_port=worker_port_range[1], max_tries=100) - - logger.debug( - "Worker task address: tcp://*:{}".format(self.worker_port)) - - self.poller = zmq.Poller() - self.poller.register(self.task_incoming, zmq.POLLIN) - self.poller.register(self.worker_messages, zmq.POLLIN) - logger.debug("Init complete") - - def start(self): - """ TODO: docstring """ - logger.info("Starting interchange") - # last = time.time() - - while True: - # active_flag = False - socks = dict(self.poller.poll(1)) - - if socks.get(self.task_incoming) == zmq.POLLIN: - message = self.task_incoming.recv_multipart() - logger.debug("Got new task from client") - self.worker_messages.send_multipart(message) - logger.debug("Sent task to worker") - # active_flag = True - # last = time.time() - - if socks.get(self.worker_messages) == zmq.POLLIN: - message = self.worker_messages.recv_multipart() - logger.debug("Got new result from worker") - # self.result_outgoing.send_multipart(message) - self.result_outgoing.send_multipart(message[1:]) - - logger.debug("Sent result to client") - # active_flag = True - # last = time.time() - - # if not active_flag and last + 1 < time.time(): - # logger.debug("Nothing in the past 1s round") - # last = time.time() - - -def start_file_logger(filename, name='interchange', level=logging.DEBUG, format_string=None): - """Add a stream log handler. - - Parameters - --------- - - filename: string - Name of the file to write logs to. Required. - name: string - Logger name. Default="parsl.executors.interchange" - level: logging.LEVEL - Set the logging level. Default=logging.DEBUG - - format_string (string): Set the format string - format_string: string - Format string to use. - - Returns - ------- - None. - """ - if format_string is None: - format_string = "%(asctime)s %(name)s:%(lineno)d [%(levelname)s] %(message)s" - - global logger - logger = logging.getLogger(name) - logger.setLevel(level) - handler = logging.FileHandler(filename) - handler.setLevel(level) - formatter = logging.Formatter(format_string, datefmt='%Y-%m-%d %H:%M:%S') - handler.setFormatter(formatter) - logger.addHandler(handler) - - -def starter(comm_q, *args, **kwargs): - """Start the interchange process - - The executor is expected to call this function. The args, kwargs match that of the Interchange.__init__ - """ - # logger = multiprocessing.get_logger() - ic = Interchange(*args, **kwargs) - comm_q.put(ic.worker_port) - ic.start() - logger.debug("Port information sent back to client") diff --git a/parsl/executors/low_latency/lowlatency_worker.py b/parsl/executors/low_latency/lowlatency_worker.py deleted file mode 100644 index b084a71b14..0000000000 --- a/parsl/executors/low_latency/lowlatency_worker.py +++ /dev/null @@ -1,138 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import logging -import os -import uuid -# import zmq -from multiprocessing import Process - -from parsl.serialize import unpack_apply_message, serialize -from parsl.executors.low_latency import zmq_pipes - -logger = logging.getLogger(__name__) - - -def execute_task(f, args, kwargs, user_ns): - """ - Deserialize the buffer and execute the task. - - # Returns the result or exception. - """ - fname = getattr(f, '__name__', 'f') - prefix = "parsl_" - fname = prefix + "f" - argname = prefix + "args" - kwargname = prefix + "kwargs" - resultname = prefix + "result" - - user_ns.update({fname: f, - argname: args, - kwargname: kwargs, - resultname: resultname}) - - code = "{0} = {1}(*{2}, **{3})".format(resultname, fname, - argname, kwargname) - try: - exec(code, user_ns, user_ns) - - except Exception as e: - logger.warning("Caught exception; will raise it: {}".format(e)) - raise e - - else: - return user_ns.get(resultname) - - -def start_file_logger(filename, rank, name='parsl', level=logging.DEBUG, format_string=None): - """Add a stream log handler. - - Args: - - filename (string): Name of the file to write logs to - - name (string): Logger name - - level (logging.LEVEL): Set the logging level. - - format_string (string): Set the format string - - Returns: - - None - """ - - try: - os.makedirs(os.path.dirname(filename), 511, True) - except Exception as e: - print("Caught exception with trying to make log dirs: {}".format(e)) - - if format_string is None: - format_string = "%(asctime)s %(name)s:%(lineno)d Rank:{0} [%(levelname)s] %(message)s".format( - rank) - global logger - logger = logging.getLogger(name) - logger.setLevel(logging.DEBUG) - handler = logging.FileHandler(filename) - handler.setLevel(level) - formatter = logging.Formatter(format_string, datefmt='%Y-%m-%d %H:%M:%S') - handler.setFormatter(formatter) - logger.addHandler(handler) - - -def worker(worker_id, task_url, debug=True, logdir="workers", uid="1"): - """ TODO: docstring - - TODO : Cleanup debug, logdir and uid to function correctly - """ - - start_file_logger('{}/{}/worker_{}.log'.format(logdir, uid, worker_id), - 0, - level=logging.DEBUG if debug is True else logging.INFO) - - logger.info("Starting worker {}".format(worker_id)) - - task_ids_received = [] - - message_q = zmq_pipes.WorkerMessages(task_url) - - while True: - print("Worker loop iteration starting") - task_id, buf = message_q.get() - task_ids_received.append(task_id) - - user_ns = locals() - user_ns.update({'__builtins__': __builtins__}) - f, args, kwargs = unpack_apply_message(buf, user_ns, copy=False) - - logger.debug("Worker {} received task {}".format(worker_id, task_id)) - result = execute_task(f, args, kwargs, user_ns) - logger.debug("Worker {} completed task {}".format(worker_id, task_id)) - - reply = {"result": result, "worker_id": worker_id} - message_q.put(task_id, serialize(reply)) - logger.debug("Result sent") - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("-n", "--workers_per_node", default=1, type=int, - help="Number of workers to kick off. Default=1") - parser.add_argument("-l", "--logdir", default="lowlatency_worker_logs", - help="LowLatency worker log directory") - parser.add_argument("-t", "--task_url", required=True, - help="REQUIRED: ZMQ url for receiving tasks") - parser.add_argument("-u", "--uid", default=str(uuid.uuid4()).split('-')[-1], - help="Unique identifier string for Manager") - - args = parser.parse_args() - - workers = [] - for i in range(args.workers_per_node): - worker = Process(target=worker, - kwargs={"worker_id": i, - "task_url": args.task_url, - "logdir": args.logdir, - "uid": args.uid - }) - worker.daemon = True - worker.start() - workers.append(worker) - - for worker in workers: - worker.join() diff --git a/parsl/executors/low_latency/zmq_pipes.py b/parsl/executors/low_latency/zmq_pipes.py deleted file mode 100644 index e432f359d8..0000000000 --- a/parsl/executors/low_latency/zmq_pipes.py +++ /dev/null @@ -1,80 +0,0 @@ -#!/usr/bin/env python3 - -import zmq -import logging - -logger = logging.getLogger(__name__) - - -class TasksOutgoing(object): - """ TODO: docstring """ - - def __init__(self, ip_address, port_range): - """ TODO: docstring """ - self.context = zmq.Context() - self.zmq_socket = self.context.socket(zmq.DEALER) - self.zmq_socket.set_hwm(0) - self.port = self.zmq_socket.bind_to_random_port("tcp://{}".format(ip_address), - min_port=port_range[0], - max_port=port_range[1]) - self.poller = zmq.Poller() - self.poller.register(self.zmq_socket, zmq.POLLOUT) - - def put(self, task_id, buffer): - """ TODO: docstring """ - task_id_bytes = task_id.to_bytes(4, "little") - message = [b"", task_id_bytes] + buffer - - self.zmq_socket.send_multipart(message) - logger.debug("Sent task {}".format(task_id)) - - def close(self): - self.zmq_socket.close() - self.context.term() - - -class ResultsIncoming(object): - """ TODO: docstring """ - - def __init__(self, ip_address, port_range): - """ TODO: docstring """ - self.context = zmq.Context() - self.zmq_socket = self.context.socket(zmq.DEALER) - self.zmq_socket.set_hwm(0) - self.port = self.zmq_socket.bind_to_random_port( - "tcp://{}".format(ip_address), - min_port=port_range[0], - max_port=port_range[1]) - - def get(self): - - result = self.zmq_socket.recv_multipart() - task_id = int.from_bytes(result[1], "little") - buffer = result[2:] - return task_id, buffer - - def close(self): - self.zmq_socket.close() - self.context.term() - - -class WorkerMessages(object): - """ TODO: docstring """ - - def __init__(self, tasks_url): - self.context = zmq.Context() - self.zmq_socket = self.context.socket(zmq.REP) - self.zmq_socket.connect(tasks_url) - - def get(self): - bufs = self.zmq_socket.recv_multipart() - task_id = int.from_bytes(bufs[0], "little") - return task_id, bufs[1:] - - def put(self, task_id, buffer): - task_id_bytes = task_id.to_bytes(4, "little") - self.zmq_socket.send_multipart([task_id_bytes] + buffer) - - def close(self): - self.zmq_socket.close() - self.context.term() diff --git a/parsl/tests/low_latency/__init__.py b/parsl/tests/low_latency/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/parsl/tests/low_latency/constants.py b/parsl/tests/low_latency/constants.py deleted file mode 100644 index 7984d90f2e..0000000000 --- a/parsl/tests/low_latency/constants.py +++ /dev/null @@ -1,2 +0,0 @@ -CLIENT_IP_FILE = "/scratch/midway2/rohankumar/parsl/client_ip.txt" -INTERCHANGE_IP_FILE = "/scratch/midway2/rohankumar/parsl/interchange_ip.txt" diff --git a/parsl/tests/low_latency/executor.py b/parsl/tests/low_latency/executor.py deleted file mode 100644 index 0c6016ae3a..0000000000 --- a/parsl/tests/low_latency/executor.py +++ /dev/null @@ -1,189 +0,0 @@ -import argparse -import time -import logging -from statistics import mean, stdev - -import zmq -from multiprocessing import Process, Manager - -from parsl.serialize import ParslSerializer -parsl_serializer = ParslSerializer() -pack_apply_message = parsl_serializer.pack_apply_message -unpack_apply_message = parsl_serializer.unpack_apply_message -deserialize_object = parsl_serializer.deserialize - -from constants import CLIENT_IP_FILE -from parsl.addresses import address_by_interface -from worker import execute_task - -logger = logging.getLogger(__name__) - - -def simple_executor(f_all, args_all, kwargs_all, num_tasks): - serialization_times = [] - exec_times = [] - results = [] - - for i in range(num_tasks): - task_id = i - start_time = time.time() - buf = pack_apply_message(f=next(f_all), args=next(args_all), - kwargs=next(kwargs_all), - buffer_threshold=1024 * 1024) - serialization_times.append(time.time() - start_time) - - start_time = time.time() - user_ns = locals() - user_ns.update({'__builtins__': __builtins__}) - f, args, kwargs = unpack_apply_message(buf, user_ns, copy=False) - result = execute_task(f, args, kwargs, user_ns) - exec_times.append(time.time() - start_time) - - results.append(result) - - avg_serialization_time = sum( - serialization_times) / len(serialization_times) * 10 ** 6 - avg_execution_time = sum(exec_times) / len(exec_times) * 10 ** 6 - - return { - "avg_serialization_time": avg_serialization_time, - "avg_execution_time": avg_execution_time, - "results": results - } - - -def dealer_executor(f_all, args_all, kwargs_all, num_tasks, return_dict, - port=5559, interchange=True, warmup=10): - label = "DEALER-INTERCHANGE-REP" if interchange else "DEALER-REP" - logger.info("Starting executor:{}".format(label)) - - serialization_times = [] - deserialization_times = [] - send_times = {} - exec_times = {} - results = [] - - context = zmq.Context() - dealer = context.socket(zmq.DEALER) - dealer.bind("tcp://*:{}".format(port)) - - poller = zmq.Poller() - poller.register(dealer, zmq.POLLIN) - - num_send = 0 - num_recv = 0 - - while True: - socks = dict(poller.poll(1)) - if num_send < num_tasks: - task_id = num_send - task_id_bytes = task_id.to_bytes(4, "little") - start_time = time.time() - buf = pack_apply_message(f=next(f_all), args=next(args_all), - kwargs=next(kwargs_all), - buffer_threshold=1024 * 1024) - serialization_times.append(time.time() - start_time) - - logger.debug("Manager sending task {}".format(task_id)) - send_times[task_id] = time.time() - dealer.send_multipart([b"", task_id_bytes] + buf) - num_send += 1 - - if dealer in socks and socks[dealer] == zmq.POLLIN: - buf = dealer.recv_multipart() - recv_time = time.time() - - start_time = time.time() - msg = deserialize_object(buf[2:])[0] - deserialization_times.append(time.time() - start_time) - - logger.debug("Got message {}".format(msg)) - task_id = int.from_bytes(buf[1], "little") - results.append(msg["result"]) - - if num_recv >= warmup: - # Ignore the first `warmup` tasks - exec_times[task_id] = recv_time - send_times[task_id] - - num_recv += 1 - logger.debug("Dealer received result {}".format(task_id)) - if num_recv == num_tasks: - break - - avg_serialization_time = sum( - serialization_times) / len(serialization_times) * 10 ** 6 - avg_execution_time = sum(exec_times.values()) / len(exec_times) * 10 ** 6 - - return_dict["avg_serialization_time"] = avg_serialization_time - return_dict["avg_execution_time"] = avg_execution_time - return_dict["results"] = results - - -def double(x): - return 2 * x - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--num-tasks", default=10000, type=int, - help="Number of tasks to send for benchmark") - parser.add_argument("--num-trials", default=10, type=int, - help="Number of trials to run for benchmarking") - parser.add_argument("--warmup", default=100, type=int, - help="Number of warmup runs before benchmarking") - parser.add_argument("--interchange", action="store_true", default=False, - help="Whether an interchange is being used") - parser.add_argument("--client-port", default=5560, type=int, - help="Port for client to communicate on") - parser.add_argument("--localhost", default=False, action="store_true", - help="True if communication is on localhost") - args = parser.parse_args() - - # Write IP address to file so that workers can access it - if not args.localhost: - ip = address_by_interface("eth0") - with open(CLIENT_IP_FILE, "w") as fh: - fh.write(ip) - print("Wrote IP address {} to file {}".format(ip, CLIENT_IP_FILE)) - - # Parameters for worker requests - def f_all(): - return (double for _ in range(args.num_tasks)) - - def args_all(): - return ([i] for i in range(args.num_tasks)) - - def kwargs_all(): - return ({} for _ in range(args.num_tasks)) - - serialization_times = [] - execution_times = [] - - # Every trial sends the same jobs again and benchmarks them - for _ in range(args.num_trials): - m = Manager() - return_dict = m.dict() - manager = Process(target=dealer_executor, - kwargs={"f_all": f_all(), "args_all": args_all(), - "kwargs_all": kwargs_all(), - "num_tasks": args.num_tasks, - "port": args.client_port, - "interchange": args.interchange, - "warmup": args.warmup, - "return_dict": return_dict}) - manager.start() - manager.join() - - serialization_times.append(return_dict["avg_serialization_time"]) - execution_times.append(return_dict["avg_execution_time"]) - - # Print stats - label = "[DEALER-INTERCHANGE-REP]" if args.interchange else "[DEALER-REP]" - s = stdev(serialization_times) if len(serialization_times) > 1 else 0 - print("{} Avg Serialization Time\n" - "Mean = {:=10.4f} us, Stdev = {:=10.4f} us" - .format(label, mean(serialization_times), s)) - s = stdev(execution_times) if len(execution_times) > 1 else 0 - print("{} Avg Execution Time\n" - "Mean = {:=10.4f} us, Stdev = {:=10.4f} us" - .format(label, mean(execution_times), s)) diff --git a/parsl/tests/low_latency/interchange.py b/parsl/tests/low_latency/interchange.py deleted file mode 100644 index 0f83bf7461..0000000000 --- a/parsl/tests/low_latency/interchange.py +++ /dev/null @@ -1,69 +0,0 @@ -import logging -import argparse - -from multiprocessing import Process -import zmq - -from constants import CLIENT_IP_FILE, INTERCHANGE_IP_FILE -from parsl.addresses import address_by_interface - -logger = logging.getLogger(__name__) - - -def dealer_interchange(manager_ip="localhost", manager_port=5559, - worker_port=5560): - context = zmq.Context() - incoming = context.socket(zmq.ROUTER) - outgoing = context.socket(zmq.DEALER) - - incoming.connect("tcp://{}:{}".format(manager_ip, manager_port)) - outgoing.bind("tcp://*:{}".format(worker_port)) - - poller = zmq.Poller() - poller.register(incoming, zmq.POLLIN) - poller.register(outgoing, zmq.POLLIN) - - while True: - socks = dict(poller.poll(1)) - - if socks.get(incoming) == zmq.POLLIN: - message = incoming.recv_multipart() - logger.debug("[interchange] New task {}".format(message)) - outgoing.send_multipart(message) - - if socks.get(outgoing) == zmq.POLLIN: - message = outgoing.recv_multipart() - logger.debug("[interchange] New result {}".format(message)) - incoming.send_multipart(message) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--worker-port", default=5559, type=int, - help="Port for workers to communicate on") - parser.add_argument("--client-port", default=5560, type=int, - help="Port for client to communicate on") - parser.add_argument("--localhost", default=False, action="store_true", - help="True if communication is on localhost") - args = parser.parse_args() - - if not args.localhost: - with open(CLIENT_IP_FILE, "r") as fh: - client_ip = fh.read().strip() - print("Read IP {} from file {}".format(client_ip, CLIENT_IP_FILE)) - - interchange_ip = address_by_interface("eth0") - with open(INTERCHANGE_IP_FILE, "w") as fh: - fh.write(interchange_ip) - print("Wrote IP address {} to file {}" - .format(interchange_ip, INTERCHANGE_IP_FILE)) - else: - client_ip = "localhost" - - interchange = Process(target=dealer_interchange, - kwargs={"manager_ip": client_ip, - "manager_port": args.client_port, - "worker_port": args.worker_port}) - interchange.daemon = True - interchange.start() - interchange.join() diff --git a/parsl/tests/low_latency/utils.py b/parsl/tests/low_latency/utils.py deleted file mode 100644 index 1c499fc2f8..0000000000 --- a/parsl/tests/low_latency/utils.py +++ /dev/null @@ -1,16 +0,0 @@ -import subprocess - - -def ping_time(ip, n=5): - """ - Returns the average ping time in microseconds. - - Note: This function is inherently platform specific. - It currently works on Midway. - """ - cmd = "ping {} -c {}".format(ip, n) - p = subprocess.Popen(cmd.split(" "), stdout=subprocess.PIPE) - output = str(p.communicate()[0]) - stats = output.split("\n")[-1].split(" = ")[-1].split("/") - avg_ping_time = float(stats[1]) # In ms - return avg_ping_time * 1000 diff --git a/parsl/tests/low_latency/worker.py b/parsl/tests/low_latency/worker.py deleted file mode 100644 index 5dea3682b8..0000000000 --- a/parsl/tests/low_latency/worker.py +++ /dev/null @@ -1,109 +0,0 @@ -import argparse -import logging - -import zmq -from multiprocessing import Process - -from parsl.serialize import ParslSerializer -parsl_serializer = ParslSerializer() -unpack_apply_message = parsl_serializer.unpack_apply_message -serialize_object = parsl_serializer.serialize - -logger = logging.getLogger(__name__) -logger.setLevel("DEBUG") - -from constants import CLIENT_IP_FILE, INTERCHANGE_IP_FILE -from utils import ping_time - - -def execute_task(f, args, kwargs, user_ns): - """ - Deserialize the buffer and execute the task. - - Returns the result or exception. - """ - fname = getattr(f, '__name__', 'f') - prefix = "parsl_" - fname = prefix + "f" - argname = prefix + "args" - kwargname = prefix + "kwargs" - resultname = prefix + "result" - - user_ns.update({fname: f, - argname: args, - kwargname: kwargs, - resultname: resultname}) - - code = "{0} = {1}(*{2}, **{3})".format(resultname, fname, - argname, kwargname) - try: - exec(code, user_ns, user_ns) - - except Exception as e: - logger.warning("Caught exception; will raise it: {}".format(e)) - raise e - - else: - return user_ns.get(resultname) - - -def dealer_worker(worker_id, ip="localhost", port=5560): - context = zmq.Context() - socket = context.socket(zmq.REP) - socket.connect("tcp://{}:{}".format(ip, port)) - print("Starting worker {}".format(worker_id)) - - task_ids_received = [] - - while True: - bufs = socket.recv_multipart() - task_id = int.from_bytes(bufs[0], "little") - task_ids_received.append(task_id) - - user_ns = locals() - user_ns.update({'__builtins__': __builtins__}) - f, args, kwargs = unpack_apply_message(bufs[1:], user_ns, copy=False) - - logger.debug("Worker {} received task {}".format(worker_id, task_id)) - result = execute_task(f, args, kwargs, user_ns) - logger.debug("Worker result: {}".format(result)) - reply = {"result": result, "worker_id": worker_id} - socket.send_multipart([bufs[0]] + serialize_object(reply)) - - print("Worker {} received {} tasks" - .format(worker_id, len(task_ids_received))) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--num-workers", default=1, type=int, - help="Number of workers to use for Dealer-Rep") - parser.add_argument("--localhost", default=False, action="store_true", - help="True if communication is on localhost") - parser.add_argument("--worker-port", default=5559, type=int, - help="Port for workers to communicate on") - parser.add_argument("--interchange", action="store_true", default=False, - help="Whether an interchange is being used") - args = parser.parse_args() - - # Read in IP address to communicate to - if not args.localhost: - ip_file = INTERCHANGE_IP_FILE if args.interchange else CLIENT_IP_FILE - with open(ip_file, "r") as fh: - ip = fh.read().strip() - print("Read IP {} from file {}".format(ip, ip_file)) - print("Ping time to IP {}: {} us".format(ip, ping_time(ip))) - else: - ip = "localhost" - - workers = [] - for i in range(args.num_workers): - worker = Process(target=dealer_worker, - kwargs={"worker_id": i, "ip": ip, - "port": args.worker_port}) - worker.daemon = True - worker.start() - workers.append(worker) - - for worker in workers: - worker.join() diff --git a/parsl/tests/manual_tests/llex_local.py b/parsl/tests/manual_tests/llex_local.py deleted file mode 100644 index c305b1a905..0000000000 --- a/parsl/tests/manual_tests/llex_local.py +++ /dev/null @@ -1,24 +0,0 @@ -from parsl.providers import LocalProvider -from parsl.channels import LocalChannel -# from parsl.launchers import SimpleLauncher -from parsl.launchers import SingleNodeLauncher - -from parsl.config import Config -from parsl.executors import LowLatencyExecutor - -config = Config( - executors=[ - LowLatencyExecutor( - label="llex_local", - # worker_debug=True, - workers_per_node=1, - provider=LocalProvider( - channel=LocalChannel(), - init_blocks=1, - max_blocks=1, - launcher=SingleNodeLauncher(), - ), - ) - ], - strategy='none', -) diff --git a/setup.py b/setup.py index 91a9902e7c..f859bc4c6d 100755 --- a/setup.py +++ b/setup.py @@ -49,7 +49,6 @@ install_requires=install_requires, scripts = ['parsl/executors/high_throughput/process_worker_pool.py', 'parsl/executors/extreme_scale/mpi_worker_pool.py', - 'parsl/executors/low_latency/lowlatency_worker.py', 'parsl/executors/workqueue/exec_parsl_function.py', ], From 7ccce734d16a661227406d4efac4c90baf66191a Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 25 Jan 2023 12:42:51 +0000 Subject: [PATCH 2/2] Move changelog hyperlink to plain text as its destination no longer exists --- docs/devguide/changelog.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/devguide/changelog.rst b/docs/devguide/changelog.rst index 741796f1dd..2478a5c29a 100644 --- a/docs/devguide/changelog.rst +++ b/docs/devguide/changelog.rst @@ -560,7 +560,7 @@ Released on Jan 18th, 2019 New Functionality ^^^^^^^^^^^^^^^^^ -* `parsl.executors.LowLatencyExecutor`: a new executor designed to address use-cases with tight latency requirements +* parsl.executors.LowLatencyExecutor: a new executor designed to address use-cases with tight latency requirements such as model serving (Machine Learning), function serving and interactive analyses is now available. * New options in `parsl.executors.HighThroughputExecutor`: * ``suppress_failure``: Enable suppression of worker rejoin errors.