From 5b0d345e145a5d430b7d163c43c36b88d76dbd71 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 19 Aug 2024 20:51:49 +0000 Subject: [PATCH] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- notebooks/examples.ipynb | 563 +++++++++++++++++++++------------------ 1 file changed, 301 insertions(+), 262 deletions(-) diff --git a/notebooks/examples.ipynb b/notebooks/examples.ipynb index e77fb9f1..b818301e 100644 --- a/notebooks/examples.ipynb +++ b/notebooks/examples.ipynb @@ -1,67 +1,53 @@ { - "metadata": { - "kernelspec": { - "name": "flux", - "display_name": "Flux", - "language": "python" - }, - "language_info": { - "name": "python", - "version": "3.12.3", - "mimetype": "text/x-python", - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "pygments_lexer": "ipython3", - "nbconvert_exporter": "python", - "file_extension": ".py" - } - }, - "nbformat_minor": 5, - "nbformat": 4, "cells": [ { - "id": "c31c95fe-9af4-42fd-be2c-713afa380e09", "cell_type": "markdown", + "id": "c31c95fe-9af4-42fd-be2c-713afa380e09", + "metadata": {}, "source": [ "# Examples\n", "The `executorlib.Executor` extends the interface of the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\n", "to simplify the up-scaling of individual functions in a given workflow." - ], - "metadata": {} + ] }, { - "id": "a1c6370e-7c8a-4da2-ac7d-42a36e12b27c", "cell_type": "markdown", - "source": "## Compatibility\nStarting with the basic example of `1+1=2`. With the `ThreadPoolExecutor` from the [`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nstandard library this can be written as: ", - "metadata": {} + "id": "a1c6370e-7c8a-4da2-ac7d-42a36e12b27c", + "metadata": {}, + "source": "## Compatibility\nStarting with the basic example of `1+1=2`. With the `ThreadPoolExecutor` from the [`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nstandard library this can be written as: " }, { - "id": "8b663009-60af-4d71-8ef3-2e9c6cd79cce", "cell_type": "code", - "source": "from concurrent.futures import ThreadPoolExecutor\n\nwith ThreadPoolExecutor(max_workers=1) as exe:\n future = exe.submit(sum, [1, 1])\n print(future.result())", + "execution_count": 1, + "id": "8b663009-60af-4d71-8ef3-2e9c6cd79cce", "metadata": { "trusted": true }, "outputs": [ { "name": "stdout", - "text": "2\n", - "output_type": "stream" + "output_type": "stream", + "text": "2\n" } ], - "execution_count": 1 + "source": [ + "from concurrent.futures import ThreadPoolExecutor\n", + "\n", + "with ThreadPoolExecutor(max_workers=1) as exe:\n", + " future = exe.submit(sum, [1, 1])\n", + " print(future.result())" + ] }, { - "id": "56192fa7-bbd6-43fe-8598-ff764addfbac", "cell_type": "markdown", - "source": "In this case `max_workers=1` limits the number of threads used by the `ThreadPoolExecutor` to one. Then the `sum()`\nfunction is submitted to the executor with a list with two ones `[1, 1]` as input. A [`concurrent.futures.Future`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nobject is returned. The `Future` object allows to check the status of the execution with the `done()` method which \nreturns `True` or `False` depending on the state of the execution. Or the main process can wait until the execution is \ncompleted by calling `result()`. \n\nThe result of the calculation is `1+1=2`. ", - "metadata": {} + "id": "56192fa7-bbd6-43fe-8598-ff764addfbac", + "metadata": {}, + "source": "In this case `max_workers=1` limits the number of threads used by the `ThreadPoolExecutor` to one. Then the `sum()`\nfunction is submitted to the executor with a list with two ones `[1, 1]` as input. A [`concurrent.futures.Future`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\nobject is returned. The `Future` object allows to check the status of the execution with the `done()` method which \nreturns `True` or `False` depending on the state of the execution. Or the main process can wait until the execution is \ncompleted by calling `result()`. \n\nThe result of the calculation is `1+1=2`. " }, { - "id": "99aba5f3-5667-450c-b31f-2b53918b1896", "cell_type": "markdown", + "id": "99aba5f3-5667-450c-b31f-2b53918b1896", + "metadata": {}, "source": [ "The `executorlib.Executor` class extends the interface of the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\n", "class by providing more parameters to specify the level of parallelism. In addition, to specifying the maximum number \n", @@ -72,83 +58,87 @@ "replacement for the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures).\n", "\n", "The previous example is rewritten for the `executorlib.Executor` in:" - ], - "metadata": {} + ] }, { - "id": "559f59cf-f074-4399-846d-a5706797ff64", "cell_type": "code", - "source": [ - "import flux.job\n", - "from executorlib import Executor\n", - "\n", - "with flux.job.FluxExecutor() as flux_exe:\n", - " with Executor(max_cores=1, executor=flux_exe) as exe:\n", - " future = exe.submit(sum, [1,1])\n", - " print(future.result())" - ], + "execution_count": 2, + "id": "559f59cf-f074-4399-846d-a5706797ff64", "metadata": { "trusted": true }, "outputs": [ { "name": "stdout", - "text": "2\n", - "output_type": "stream" + "output_type": "stream", + "text": "2\n" } ], - "execution_count": 2 + "source": [ + "import flux.job\n", + "from executorlib import Executor\n", + "\n", + "with flux.job.FluxExecutor() as flux_exe:\n", + " with Executor(max_cores=1, executor=flux_exe) as exe:\n", + " future = exe.submit(sum, [1, 1])\n", + " print(future.result())" + ] }, { - "id": "cbe445ae-9f52-4449-a936-a4ca1acc4500", "cell_type": "markdown", - "source": "The result of the calculation is again `1+1=2`.", - "metadata": {} + "id": "cbe445ae-9f52-4449-a936-a4ca1acc4500", + "metadata": {}, + "source": "The result of the calculation is again `1+1=2`." }, { - "id": "eb838571-24c6-4516-ab13-66f5943325b9", "cell_type": "markdown", - "source": "Beyond pre-defined functions like the `sum()` function, the same functionality can be used to submit user-defined \nfunctions. In the next example a custom summation function is defined:", - "metadata": {} + "id": "eb838571-24c6-4516-ab13-66f5943325b9", + "metadata": {}, + "source": "Beyond pre-defined functions like the `sum()` function, the same functionality can be used to submit user-defined \nfunctions. In the next example a custom summation function is defined:" }, { - "id": "e80ca2d6-4308-4e39-bec7-b55cfb024e79", "cell_type": "code", + "execution_count": 3, + "id": "e80ca2d6-4308-4e39-bec7-b55cfb024e79", + "metadata": { + "trusted": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": "[3, 4, 5, 6]\n" + } + ], "source": [ "import flux.job\n", "from executorlib import Executor\n", "\n", + "\n", "def calc(*args):\n", " return sum(*args)\n", "\n", + "\n", "with flux.job.FluxExecutor() as flux_exe:\n", " with Executor(max_cores=2, executor=flux_exe) as exe:\n", " fs_1 = exe.submit(calc, [2, 1])\n", " fs_2 = exe.submit(calc, [2, 2])\n", " fs_3 = exe.submit(calc, [2, 3])\n", " fs_4 = exe.submit(calc, [2, 4])\n", - " print([\n", - " fs_1.result(), \n", - " fs_2.result(), \n", - " fs_3.result(), \n", - " fs_4.result(),\n", - " ])\n" - ], - "metadata": { - "trusted": true - }, - "outputs": [ - { - "name": "stdout", - "text": "[3, 4, 5, 6]\n", - "output_type": "stream" - } - ], - "execution_count": 3 + " print(\n", + " [\n", + " fs_1.result(),\n", + " fs_2.result(),\n", + " fs_3.result(),\n", + " fs_4.result(),\n", + " ]\n", + " )" + ] }, { - "id": "4d97551b-f7c0-416b-bcc3-55392e938ee8", "cell_type": "markdown", + "id": "4d97551b-f7c0-416b-bcc3-55392e938ee8", + "metadata": {}, "source": [ "In contrast to the previous example where just a single function was submitted to a single worker, in this case a total\n", "of four functions is submitted to a group of two workers `max_cores=2`. Consequently, the functions are executed as a\n", @@ -159,55 +149,57 @@ "the same commands in interactive environments like [Jupyter notebooks](https://jupyter.org). This is achieved by using \n", "[cloudpickle](https://github.com/cloudpipe/cloudpickle) to serialize the python function and its parameters rather than\n", "the regular pickle package." - ], - "metadata": {} + ] }, { - "id": "a97edc41-1396-48a0-8fb5-98d691a69e90", "cell_type": "markdown", + "id": "a97edc41-1396-48a0-8fb5-98d691a69e90", + "metadata": {}, "source": [ "For backwards compatibility with the [`multiprocessing.Pool`](https://docs.python.org/3/library/multiprocessing.html) \n", "class the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\n", "also implements the `map()` function to map a series of inputs to a function. The same `map()` function is also \n", "available in the `executorlib.Executor`:" - ], - "metadata": {} + ] }, { - "id": "3362afef-265f-4432-88ad-e051e6318c77", "cell_type": "code", + "execution_count": 4, + "id": "3362afef-265f-4432-88ad-e051e6318c77", + "metadata": { + "trusted": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": "[3, 4, 5, 6]\n" + } + ], "source": [ "import flux.job\n", "from executorlib import Executor\n", "\n", + "\n", "def calc(*args):\n", " return sum(*args)\n", "\n", + "\n", "with flux.job.FluxExecutor() as flux_exe:\n", " with Executor(max_cores=2, executor=flux_exe) as exe:\n", " print(list(exe.map(calc, [[2, 1], [2, 2], [2, 3], [2, 4]])))" - ], - "metadata": { - "trusted": true - }, - "outputs": [ - { - "name": "stdout", - "text": "[3, 4, 5, 6]\n", - "output_type": "stream" - } - ], - "execution_count": 4 + ] }, { - "id": "27af5cc1-8514-4735-8bba-b4b32444901f", "cell_type": "markdown", - "source": "The results remain the same. ", - "metadata": {} + "id": "27af5cc1-8514-4735-8bba-b4b32444901f", + "metadata": {}, + "source": "The results remain the same. " }, { - "id": "59747b38-64f8-4342-82ad-a771aaf7c4eb", "cell_type": "markdown", + "id": "59747b38-64f8-4342-82ad-a771aaf7c4eb", + "metadata": {}, "source": [ "## Resource Assignment\n", "By default, every submission of a python function results in a flux job (or SLURM job step) depending on the backend. \n", @@ -219,12 +211,22 @@ "resources have to be configured on the **Executor level**, otherwise it can either be defined on the **Executor level**\n", "or on the **Submission level**. The resource defined on the **Submission level** overwrite the resources defined on the \n", "**Executor level**." - ], - "metadata": {} + ] }, { - "id": "d29280d4-c085-47b1-b7fa-602732d60832", "cell_type": "code", + "execution_count": 5, + "id": "d29280d4-c085-47b1-b7fa-602732d60832", + "metadata": { + "trusted": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": "3\n" + } + ], "source": [ "import flux.job\n", "from executorlib import Executor\n", @@ -235,10 +237,10 @@ "\n", "\n", "with flux.job.FluxExecutor() as flux_exe:\n", - " with Executor( \n", + " with Executor(\n", " # Resource definition on the executor level\n", " max_cores=2, # total number of cores available to the Executor\n", - " # Optional resource definition \n", + " # Optional resource definition\n", " cores_per_worker=1,\n", " threads_per_core=1,\n", " gpus_per_worker=0,\n", @@ -247,14 +249,14 @@ " executor=flux_exe,\n", " hostname_localhost=False, # only required on MacOS\n", " backend=\"flux\", # optional in case the backend is not recognized\n", - " block_allocation=False, \n", + " block_allocation=False,\n", " init_function=None, # only available with block_allocation=True\n", " command_line_argument_lst=[], # additional command line arguments for SLURM\n", " ) as exe:\n", " future_obj = exe.submit(\n", - " calc_function, \n", - " 1, # parameter_a\n", - " parameter_b=2, \n", + " calc_function,\n", + " 1, # parameter_a\n", + " parameter_b=2,\n", " # Resource definition on the submission level\n", " resource_dict={\n", " \"cores\": 1,\n", @@ -268,28 +270,28 @@ " },\n", " )\n", " print(future_obj.result())" - ], + ] + }, + { + "cell_type": "markdown", + "id": "5c7055ad-d84d-4afc-9023-b53643c4138a", + "metadata": {}, + "source": "The `max_cores` which defines the total number of cores of the allocation, is the only mandatory parameter. All other\nresource parameters are optional. If none of the submitted Python function uses [mpi4py](https://mpi4py.readthedocs.io)\nor any GPU, then the resources can be defined on the **Executor level** as: `cores_per_worker=1`, `threads_per_core=1` \nand `gpus_per_worker=0`. These are defaults, so they do even have to be specified. In this case it also makes sense to \nenable `block_allocation=True` to continuously use a fixed number of python processes rather than creating a new python\nprocess for each submission. In this case the above example can be reduced to: " + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "cd8f883f-5faf-43bc-b971-354aa9dcbecb", "metadata": { "trusted": true }, "outputs": [ { "name": "stdout", - "text": "3\n", - "output_type": "stream" + "output_type": "stream", + "text": "3\n" } ], - "execution_count": 5 - }, - { - "id": "5c7055ad-d84d-4afc-9023-b53643c4138a", - "cell_type": "markdown", - "source": "The `max_cores` which defines the total number of cores of the allocation, is the only mandatory parameter. All other\nresource parameters are optional. If none of the submitted Python function uses [mpi4py](https://mpi4py.readthedocs.io)\nor any GPU, then the resources can be defined on the **Executor level** as: `cores_per_worker=1`, `threads_per_core=1` \nand `gpus_per_worker=0`. These are defaults, so they do even have to be specified. In this case it also makes sense to \nenable `block_allocation=True` to continuously use a fixed number of python processes rather than creating a new python\nprocess for each submission. In this case the above example can be reduced to: ", - "metadata": {} - }, - { - "id": "cd8f883f-5faf-43bc-b971-354aa9dcbecb", - "cell_type": "code", "source": [ "import flux.job\n", "from executorlib import Executor\n", @@ -300,40 +302,30 @@ "\n", "\n", "with flux.job.FluxExecutor() as flux_exe:\n", - " with Executor( \n", + " with Executor(\n", " # Resource definition on the executor level\n", " max_cores=2, # total number of cores available to the Executor\n", " block_allocation=True, # reuse python processes\n", " executor=flux_exe,\n", " ) as exe:\n", " future_obj = exe.submit(\n", - " calc_function, \n", - " 1, # parameter_a\n", - " parameter_b=2, \n", + " calc_function,\n", + " 1, # parameter_a\n", + " parameter_b=2,\n", " )\n", " print(future_obj.result())" - ], - "metadata": { - "trusted": true - }, - "outputs": [ - { - "name": "stdout", - "text": "3\n", - "output_type": "stream" - } - ], - "execution_count": 6 + ] }, { - "id": "ea6a2ef1-c5bc-49c2-adb1-60f9f6cc71f3", "cell_type": "markdown", - "source": "The working directory parameter `cwd` can be helpful for tasks which interact with the file system to define which task\nis executed in which folder, but for most python functions it is not required.", - "metadata": {} + "id": "ea6a2ef1-c5bc-49c2-adb1-60f9f6cc71f3", + "metadata": {}, + "source": "The working directory parameter `cwd` can be helpful for tasks which interact with the file system to define which task\nis executed in which folder, but for most python functions it is not required." }, { - "id": "d6be1cc6-f47b-4b85-a0bc-00f9ccd8e2fd", "cell_type": "markdown", + "id": "d6be1cc6-f47b-4b85-a0bc-00f9ccd8e2fd", + "metadata": {}, "source": [ "## Data Handling\n", "A limitation of many parallel approaches is the overhead in communication when working with large datasets. Instead of\n", @@ -343,42 +335,50 @@ "keys of the dictionary can then be used as additional input parameters in each function submitted to the `executorlib.Executor`. When block allocation is disabled this functionality is not available, as each function is executed in a separate process, so no data can be preloaded.\n", "\n", "This functionality is illustrated below: " - ], - "metadata": {} + ] }, { - "id": "050c2781-0c8c-436b-949c-580cabf5c63c", "cell_type": "code", + "execution_count": 7, + "id": "050c2781-0c8c-436b-949c-580cabf5c63c", + "metadata": { + "trusted": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": "10\n" + } + ], "source": [ "import flux.job\n", "from executorlib import Executor\n", "\n", + "\n", "def calc(i, j, k):\n", " return i + j + k\n", "\n", + "\n", "def init_function():\n", " return {\"j\": 4, \"k\": 3, \"l\": 2}\n", "\n", + "\n", "with flux.job.FluxExecutor() as flux_exe:\n", - " with Executor(max_cores=1, init_function=init_function, executor=flux_exe, block_allocation=True) as exe:\n", + " with Executor(\n", + " max_cores=1,\n", + " init_function=init_function,\n", + " executor=flux_exe,\n", + " block_allocation=True,\n", + " ) as exe:\n", " fs = exe.submit(calc, 2, j=5)\n", " print(fs.result())" - ], - "metadata": { - "trusted": true - }, - "outputs": [ - { - "name": "stdout", - "text": "10\n", - "output_type": "stream" - } - ], - "execution_count": 7 + ] }, { - "id": "8386b4e6-290f-4733-8c50-4312f9ba07e4", "cell_type": "markdown", + "id": "8386b4e6-290f-4733-8c50-4312f9ba07e4", + "metadata": {}, "source": [ "The function `calc()` requires three inputs `i`, `j` and `k`. But when the function is submitted to the executor only \n", "two inputs are provided `fs = exe.submit(calc, 2, j=5)`. In this case the first input parameter is mapped to `i=2`, the\n", @@ -390,18 +390,18 @@ "\n", "The result is `2+5+3=10` as `i=2` and `j=5` are provided during the submission and `k=3` is defined in the `init_function()`\n", "function." - ], - "metadata": {} + ] }, { - "id": "0d623365-1b84-4c69-97ee-f6718be8ab39", "cell_type": "markdown", - "source": "## Up-Scaling \n[flux](https://flux-framework.org) provides fine-grained resource assigment via `libhwloc` and `pmi`.", - "metadata": {} + "id": "0d623365-1b84-4c69-97ee-f6718be8ab39", + "metadata": {}, + "source": "## Up-Scaling \n[flux](https://flux-framework.org) provides fine-grained resource assigment via `libhwloc` and `pmi`." }, { - "id": "33f9eee3-e327-43e4-8f15-3cf709f3975c", "cell_type": "markdown", + "id": "33f9eee3-e327-43e4-8f15-3cf709f3975c", + "metadata": {}, "source": [ "### Thread-based Parallelism\n", "The number of threads per core can be controlled with the `threads_per_core` parameter during the initialization of the \n", @@ -416,28 +416,40 @@ "\n", "At the current stage `executorlib.Executor` does not set these parameters itself, so you have to add them in the function\n", "you submit before importing the corresponding library: \n" - ], - "metadata": {} + ] }, { - "id": "a9799f38-b9b8-411e-945d-dae951151d26", "cell_type": "code", - "source": "def calc(i):\n import os\n os.environ[\"OMP_NUM_THREADS\"] = \"2\"\n os.environ[\"OPENBLAS_NUM_THREADS\"] = \"2\"\n os.environ[\"MKL_NUM_THREADS\"] = \"2\"\n os.environ[\"VECLIB_MAXIMUM_THREADS\"] = \"2\"\n os.environ[\"NUMEXPR_NUM_THREADS\"] = \"2\"\n import numpy as np\n return i", + "execution_count": 8, + "id": "a9799f38-b9b8-411e-945d-dae951151d26", "metadata": { "trusted": true }, "outputs": [], - "execution_count": 8 + "source": [ + "def calc(i):\n", + " import os\n", + "\n", + " os.environ[\"OMP_NUM_THREADS\"] = \"2\"\n", + " os.environ[\"OPENBLAS_NUM_THREADS\"] = \"2\"\n", + " os.environ[\"MKL_NUM_THREADS\"] = \"2\"\n", + " os.environ[\"VECLIB_MAXIMUM_THREADS\"] = \"2\"\n", + " os.environ[\"NUMEXPR_NUM_THREADS\"] = \"2\"\n", + " import numpy as np\n", + "\n", + " return i" + ] }, { - "id": "4d2af8e0-8b49-40cc-a9ed-298d6c68870c", "cell_type": "markdown", - "source": "Most modern CPUs use hyper-threading to present the operating system with double the number of virtual cores compared to\nthe number of physical cores available. So unless this functionality is disabled `threads_per_core=2` is a reasonable \ndefault. Just be careful if the number of threads is not specified it is possible that all workers try to access all \ncores at the same time which can lead to poor performance. So it is typically a good idea to monitor the CPU utilization\nwith increasing number of workers. \n\nSpecific manycore CPU models like the Intel Xeon Phi processors provide a much higher hyper-threading ration and require\na higher number of threads per core for optimal performance. \n", - "metadata": {} + "id": "4d2af8e0-8b49-40cc-a9ed-298d6c68870c", + "metadata": {}, + "source": "Most modern CPUs use hyper-threading to present the operating system with double the number of virtual cores compared to\nthe number of physical cores available. So unless this functionality is disabled `threads_per_core=2` is a reasonable \ndefault. Just be careful if the number of threads is not specified it is possible that all workers try to access all \ncores at the same time which can lead to poor performance. So it is typically a good idea to monitor the CPU utilization\nwith increasing number of workers. \n\nSpecific manycore CPU models like the Intel Xeon Phi processors provide a much higher hyper-threading ration and require\na higher number of threads per core for optimal performance. \n" }, { - "id": "2faf6399-0230-4cdd-b4d2-2508dee66d47", "cell_type": "markdown", + "id": "2faf6399-0230-4cdd-b4d2-2508dee66d47", + "metadata": {}, "source": [ "### MPI Parallel Python Functions\n", "Beyond thread based parallelism, the message passing interface (MPI) is the de facto standard parallel execution in \n", @@ -453,42 +465,47 @@ "advantage of this approach is that the users can parallelize their workflows one function at the time. \n", "\n", "The example in `test_mpi.py` illustrates the submission of a simple MPI parallel python function: " - ], - "metadata": {} + ] }, { - "id": "44e510fc-8897-46a8-bef7-f1a5c47e4fbf", "cell_type": "code", + "execution_count": 9, + "id": "44e510fc-8897-46a8-bef7-f1a5c47e4fbf", + "metadata": { + "trusted": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": "[(3, 2, 0), (3, 2, 1)]\n" + } + ], "source": [ "import flux.job\n", "from executorlib import Executor\n", "\n", + "\n", "def calc(i):\n", " from mpi4py import MPI\n", + "\n", " size = MPI.COMM_WORLD.Get_size()\n", " rank = MPI.COMM_WORLD.Get_rank()\n", " return i, size, rank\n", "\n", + "\n", "with flux.job.FluxExecutor() as flux_exe:\n", - " with Executor(max_cores=2, cores_per_worker=2, executor=flux_exe, pmi=\"pmix\") as exe:\n", + " with Executor(\n", + " max_cores=2, cores_per_worker=2, executor=flux_exe, pmi=\"pmix\"\n", + " ) as exe:\n", " fs = exe.submit(calc, 3)\n", " print(fs.result())" - ], - "metadata": { - "trusted": true - }, - "outputs": [ - { - "name": "stdout", - "text": "[(3, 2, 0), (3, 2, 1)]\n", - "output_type": "stream" - } - ], - "execution_count": 9 + ] }, { - "id": "4fa03544-1dfc-465a-b352-0458b710cbcd", "cell_type": "markdown", + "id": "4fa03544-1dfc-465a-b352-0458b710cbcd", + "metadata": {}, "source": [ "In the example environment OpenMPI version 5 is used, so the `pmi` parameter has to be set to `pmix` rather than `pmi1` or `pmi2` which is the default. For `mpich` it is not necessary to specify the `pmi` interface manually.\n", "The `calc()` function initializes the [`mpi4py`](https://mpi4py.readthedocs.io) library and gathers the size of the \n", @@ -502,18 +519,18 @@ "The response consists of a list of two tuples, one for each MPI parallel process, with the first entry of the tuple \n", "being the parameter `i=3`, followed by the number of MPI parallel processes assigned to the function call `cores_per_worker=2`\n", "and finally the index of the specific process `0` or `1`. " - ], - "metadata": {} + ] }, { - "id": "581e948b-8c66-42fb-b4b2-279cc9e1c1f3", "cell_type": "markdown", - "source": "### GPU Assignment\nWith the rise of machine learning applications, the use of GPUs for scientific application becomes more and more popular.\nConsequently, it is essential to have full control over the assignment of GPUs to specific python functions. In the \n`test_gpu.py` example the `tensorflow` library is used to identify the GPUs and return their configuration: ", - "metadata": {} + "id": "581e948b-8c66-42fb-b4b2-279cc9e1c1f3", + "metadata": {}, + "source": "### GPU Assignment\nWith the rise of machine learning applications, the use of GPUs for scientific application becomes more and more popular.\nConsequently, it is essential to have full control over the assignment of GPUs to specific python functions. In the \n`test_gpu.py` example the `tensorflow` library is used to identify the GPUs and return their configuration: " }, { - "id": "7d1bca64-14fb-4d40-997d-b58a011508bf", "cell_type": "markdown", + "id": "7d1bca64-14fb-4d40-997d-b58a011508bf", + "metadata": {}, "source": [ "```\n", "import socket\n", @@ -538,12 +555,12 @@ " fs_2 = exe.submit(get_available_gpus)\n", " print(fs_1.result(), fs_2.result())\n", "```" - ], - "metadata": {} + ] }, { - "id": "23794ff4-916f-4b03-a18a-c232bab68dfa", "cell_type": "markdown", + "id": "23794ff4-916f-4b03-a18a-c232bab68dfa", + "metadata": {}, "source": [ "The additional parameter `gpus_per_worker=1` specifies that one GPU is assigned to each worker. This functionality \n", "requires `executorlib` to be connected to a resource manager like the [SLURM workload manager](https://www.schedmd.com)\n", @@ -552,18 +569,18 @@ "\n", "To clarify the execution of such an example on a high performance computing (HPC) cluster using the [SLURM workload manager](https://www.schedmd.com)\n", "the submission script is given below: " - ], - "metadata": {} + ] }, { - "id": "6dea0b84-65fd-4785-b78d-0ad3ff5aaa95", "cell_type": "markdown", - "source": "```\n#!/bin/bash\n#SBATCH --nodes=2\n#SBATCH --gpus-per-node=1\n#SBATCH --get-user-env=L\n\npython test_gpu.py\n```", - "metadata": {} + "id": "6dea0b84-65fd-4785-b78d-0ad3ff5aaa95", + "metadata": {}, + "source": "```\n#!/bin/bash\n#SBATCH --nodes=2\n#SBATCH --gpus-per-node=1\n#SBATCH --get-user-env=L\n\npython test_gpu.py\n```" }, { - "id": "5f77c45c-7077-4edf-ace7-1922faecd380", "cell_type": "markdown", + "id": "5f77c45c-7077-4edf-ace7-1922faecd380", + "metadata": {}, "source": [ "The important part is that for using the `executorlib.slurm.PySlurmExecutor` backend the script `test_gpu.py` does not\n", "need to be executed with `srun` but rather it is sufficient to just execute it with the python interpreter. `executorlib`\n", @@ -573,24 +590,24 @@ "within the [SLURM workload manager](https://www.schedmd.com) it is essential that the resources are passed from the \n", "[SLURM workload manager](https://www.schedmd.com) to the [flux framework](https://flux-framework.org). This is achieved\n", "by calling `srun flux start` in the submission script: " - ], - "metadata": {} + ] }, { - "id": "e2cd51d8-8991-42bc-943a-050b6d7c74c3", "cell_type": "markdown", - "source": "```\n#!/bin/bash\n#SBATCH --nodes=2\n#SBATCH --gpus-per-node=1\n#SBATCH --get-user-env=L\n\nsrun flux start python test_gpu.py\n````", - "metadata": {} + "id": "e2cd51d8-8991-42bc-943a-050b6d7c74c3", + "metadata": {}, + "source": "```\n#!/bin/bash\n#SBATCH --nodes=2\n#SBATCH --gpus-per-node=1\n#SBATCH --get-user-env=L\n\nsrun flux start python test_gpu.py\n````" }, { - "id": "87529bb6-1fbb-4416-8edb-0b3124f4dec2", "cell_type": "markdown", - "source": "As a result the GPUs available on the two compute nodes are reported: \n```\n>>> [('/device:GPU:0', 'device: 0, name: Tesla V100S-PCIE-32GB, pci bus id: 0000:84:00.0, compute capability: 7.0', 'cn138'),\n>>> ('/device:GPU:0', 'device: 0, name: Tesla V100S-PCIE-32GB, pci bus id: 0000:84:00.0, compute capability: 7.0', 'cn139')]\n```\nIn this case each compute node `cn138` and `cn139` is equipped with one `Tesla V100S-PCIE-32GB`.\n", - "metadata": {} + "id": "87529bb6-1fbb-4416-8edb-0b3124f4dec2", + "metadata": {}, + "source": "As a result the GPUs available on the two compute nodes are reported: \n```\n>>> [('/device:GPU:0', 'device: 0, name: Tesla V100S-PCIE-32GB, pci bus id: 0000:84:00.0, compute capability: 7.0', 'cn138'),\n>>> ('/device:GPU:0', 'device: 0, name: Tesla V100S-PCIE-32GB, pci bus id: 0000:84:00.0, compute capability: 7.0', 'cn139')]\n```\nIn this case each compute node `cn138` and `cn139` is equipped with one `Tesla V100S-PCIE-32GB`.\n" }, { - "id": "0e4a6e73-38b1-4a6c-b567-d6b079a58886", "cell_type": "markdown", + "id": "0e4a6e73-38b1-4a6c-b567-d6b079a58886", + "metadata": {}, "source": [ "## Coupled Functions \n", "For submitting two functions with rather different computing resource requirements it is essential to represent this \n", @@ -599,56 +616,58 @@ "input for the second function during the submission. Consequently, this functionality can be used for directed acyclic \n", "graphs, still it does not enable cyclic graphs. As a simple example we can add one to the result of the addition of one\n", "and two:" - ], - "metadata": {} + ] }, { - "id": "c84442ee-68e4-4065-97e7-bfad7582acfc", "cell_type": "code", + "execution_count": 10, + "id": "c84442ee-68e4-4065-97e7-bfad7582acfc", + "metadata": { + "trusted": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": "4\n" + } + ], "source": [ "import flux.job\n", "from executorlib import Executor\n", "\n", + "\n", "def calc_function(parameter_a, parameter_b):\n", " return parameter_a + parameter_b\n", "\n", + "\n", "with flux.job.FluxExecutor() as flux_exe:\n", " with Executor(max_cores=2, executor=flux_exe) as exe:\n", " future_1 = exe.submit(\n", - " calc_function, \n", + " calc_function,\n", " 1,\n", " parameter_b=2,\n", " resource_dict={\"cores\": 1},\n", " )\n", " future_2 = exe.submit(\n", - " calc_function, \n", + " calc_function,\n", " 1,\n", " parameter_b=future_1,\n", " resource_dict={\"cores\": 1},\n", " )\n", " print(future_2.result())" - ], - "metadata": { - "trusted": true - }, - "outputs": [ - { - "name": "stdout", - "text": "4\n", - "output_type": "stream" - } - ], - "execution_count": 10 + ] }, { - "id": "bd3e6eea-3a77-49ec-8fec-d88274aeeda5", "cell_type": "markdown", - "source": "Here the first addition `1+2` is computed and the output `3` is returned as the result of `future_1.result()`. Still \nbefore the computation of this addition is completed already the next addition is submitted which uses the future object\nas an input `future_1` and adds `1`. The result of both additions is `4` as `1+2+1=4`. \n\nTo disable this functionality the parameter `disable_dependencies=True` can be set on the executor level. Still at the\ncurrent stage the performance improvement of disabling this functionality seem to be minimal. Furthermore, this \nfunctionality introduces the `refresh_rate=0.01` parameter, it defines the refresh rate in seconds how frequently the \nqueue of submitted functions is queried. Typically, there is no need to change these default parameters. ", - "metadata": {} + "id": "bd3e6eea-3a77-49ec-8fec-d88274aeeda5", + "metadata": {}, + "source": "Here the first addition `1+2` is computed and the output `3` is returned as the result of `future_1.result()`. Still \nbefore the computation of this addition is completed already the next addition is submitted which uses the future object\nas an input `future_1` and adds `1`. The result of both additions is `4` as `1+2+1=4`. \n\nTo disable this functionality the parameter `disable_dependencies=True` can be set on the executor level. Still at the\ncurrent stage the performance improvement of disabling this functionality seem to be minimal. Furthermore, this \nfunctionality introduces the `refresh_rate=0.01` parameter, it defines the refresh rate in seconds how frequently the \nqueue of submitted functions is queried. Typically, there is no need to change these default parameters. " }, { - "id": "d1086337-5291-4e06-96d1-a6e162d28c58", "cell_type": "markdown", + "id": "d1086337-5291-4e06-96d1-a6e162d28c58", + "metadata": {}, "source": [ "## SLURM Job Scheduler\n", "Using `executorlib` without the [flux framework](https://flux-framework.org) results in one `srun` call per worker in\n", @@ -658,12 +677,12 @@ "is recommended as secondary job scheduler even within the context of the SLURM job manager. \n", "\n", "Still the general usage of `executorlib` remains similar even with SLURM as backend:" - ], - "metadata": {} + ] }, { - "id": "27569937-7d99-4697-b3ee-f68c43b95a10", "cell_type": "markdown", + "id": "27569937-7d99-4697-b3ee-f68c43b95a10", + "metadata": {}, "source": [ "```\n", "from executorlib import Executor\n", @@ -672,58 +691,58 @@ " future = exe.submit(sum, [1,1])\n", " print(future.result())\n", "```" - ], - "metadata": {} + ] }, { - "id": "ae8dd860-f90f-47b4-b3e5-664f5c949350", "cell_type": "markdown", + "id": "ae8dd860-f90f-47b4-b3e5-664f5c949350", + "metadata": {}, "source": [ "The `backend=\"slurm\"` parameter is optional as `executorlib` automatically recognizes if [flux framework](https://flux-framework.org)\n", "or SLURM are available. \n", "\n", "In addition, the SLURM backend introduces the `command_line_argument_lst=[]` parameter, which allows the user to provide\n", "a list of command line arguments for the `srun` command. " - ], - "metadata": {} + ] }, { - "id": "449d2c7a-67ba-449e-8e0b-98a228707e1c", "cell_type": "markdown", + "id": "449d2c7a-67ba-449e-8e0b-98a228707e1c", + "metadata": {}, "source": [ "## Workstation Support\n", "While the high performance computing (HPC) setup is limited to the Linux operating system, `executorlib` can also be used\n", "in combination with MacOS and Windows. These setups are limited to a single compute node. \n", "\n", "Still the general usage of `executorlib` remains similar:" - ], - "metadata": {} + ] }, { - "id": "fa147b3b-61df-4884-b90c-544362bc95d9", "cell_type": "code", - "source": [ - "from executorlib import Executor\n", - "\n", - "with Executor(max_cores=1, backend=\"local\") as exe:\n", - " future = exe.submit(sum, [1,1], resource_dict={\"cores\": 1})\n", - " print(future.result())" - ], + "execution_count": 11, + "id": "fa147b3b-61df-4884-b90c-544362bc95d9", "metadata": { "trusted": true }, "outputs": [ { "name": "stdout", - "text": "2\n", - "output_type": "stream" + "output_type": "stream", + "text": "2\n" } ], - "execution_count": 11 + "source": [ + "from executorlib import Executor\n", + "\n", + "with Executor(max_cores=1, backend=\"local\") as exe:\n", + " future = exe.submit(sum, [1, 1], resource_dict={\"cores\": 1})\n", + " print(future.result())" + ] }, { - "id": "0370b42d-237b-4169-862a-b0bac4bb858b", "cell_type": "markdown", + "id": "0370b42d-237b-4169-862a-b0bac4bb858b", + "metadata": {}, "source": [ "The `backend=\"local\"` parameter is optional as `executorlib` automatically recognizes if [flux framework](https://flux-framework.org)\n", "or SLURM are available. \n", @@ -732,8 +751,28 @@ "look up of hostnames and communicating with itself via their own hostname. To directly connect to `localhost` rather\n", "than using the hostname which is the default for distributed systems, the `hostname_localhost=True` parameter is \n", "introduced. " - ], - "metadata": {} + ] } - ] + ], + "metadata": { + "kernelspec": { + "display_name": "Flux", + "language": "python", + "name": "flux" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 }