diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index b3ef6754..6f938471 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -13,10 +13,12 @@ on: jobs: test: - runs-on: ubuntu-latest + runs-on: ${{matrix.os}} strategy: + fail-fast: false matrix: python-version: ["3.7", "3.8", "3.9", "3.10"] + os: [ubuntu-latest, macos-latest] # [ubuntu-latest, macos-latest, windows-latest] steps: - uses: actions/checkout@v3 @@ -28,7 +30,19 @@ jobs: run: | python -m pip install --upgrade pip build pip install flake8 pytest - pip install -e . + - name: Install package (Ubuntu) + if: startsWith(matrix.os, 'ubuntu') + run: | + pip install -e .[tests] --no-binary pyzmq + - name: Install package (Mac) + if: startsWith(matrix.os, 'macos') + run: | + pip install -e .[tests] + - name: Test with pytest run: | - python -m pytest pytest \ No newline at end of file + python -m pytest --cov=improv pytest + + - name: Coveralls + uses: coverallsapp/github-action@v2 + diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml new file mode 100644 index 00000000..e030e568 --- /dev/null +++ b/.github/workflows/deploy.yml @@ -0,0 +1,39 @@ +name: deploy + +on: + # Trigger the workflow on push to main branch + push: + branches: + - main + +# This job installs dependencies, build the book, and pushes it to `gh-pages` +jobs: + build-and-deploy-book: + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ubuntu-latest] + python-version: [3.8] + steps: + - uses: actions/checkout@v2 + + # Install dependencies + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v1 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + pip install -r docs/requirements.txt + + # Build the book + - name: Build the book + run: | + jupyter-book build docs + + # Deploy the book's HTML to gh-pages branch + - name: GitHub Pages action + uses: peaceiris/actions-gh-pages@v3.6.1 + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + publish_dir: docs/_build/html \ No newline at end of file diff --git a/demos/basic/actors/front_end.py b/demos/basic/actors/front_end.py index 38d0ea74..3f186f73 100644 --- a/demos/basic/actors/front_end.py +++ b/demos/basic/actors/front_end.py @@ -48,13 +48,14 @@ def __init__(self, visual, comm, parent=None): self.rawplot_2.getImageItem().mouseClickEvent = self.mouseClick #Select a neuron self.slider.valueChanged.connect(_call(self.sliderMoved)) #Threshold for magnitude selection + self.update() + def update(self): ''' Update visualization while running ''' t = time.time() #start looking for data to display self.visual.getData() - #logger.info('Did I get something:', self.visual.Cx) if self.draw: #plot lines @@ -116,10 +117,10 @@ def customizePlots(self): # Add polar grid lines polar.addLine(x=0, pen=0.2) polar.addLine(y=0, pen=0.2) - for r in range(0, 4, 1): - circle = pyqtgraph.QtGui.QGraphicsEllipseItem(-r, -r, r*2, r*2) - circle.setPen(pyqtgraph.mkPen(0.1)) - polar.addItem(circle) + # for r in range(0, 4, 1): + # circle = pyqtgraph.QtGui.QGraphicsEllipseItem(-r, -r, r*2, r*2) + # circle.setPen(pyqtgraph.mkPen(0.1)) + # polar.addItem(circle) polar.hideAxis('bottom') polar.hideAxis('left') @@ -128,10 +129,10 @@ def customizePlots(self): self.polar1.setData(x, y) self.polar2.setData(x, y) - for r in range(2, 12, 2): - circle = pyqtgraph.QtGui.QGraphicsEllipseItem(-r, -r, r*2, r*2) - circle.setPen(pyqtgraph.mkPen(0.1)) - polars[2].addItem(circle) + # for r in range(2, 12, 2): + # circle = pyqtgraph.QtGui.QGraphicsEllipseItem(-r, -r, r*2, r*2) + # circle.setPen(pyqtgraph.mkPen(0.1)) + # polars[2].addItem(circle) self.polar3 = polars[2].plot() #sliders diff --git a/demos/basic/actors/visual.py b/demos/basic/actors/visual.py index d3916d60..cdf4d612 100644 --- a/demos/basic/actors/visual.py +++ b/demos/basic/actors/visual.py @@ -75,7 +75,8 @@ def setup(self): self.window=500 def run(self): - pass #NOTE: Special case here, tied to GUI + # #NOTE: Special case here, tied to GUI + pass def getData(self): t = time.time() diff --git a/demos/basic/basic_demo.yaml b/demos/basic/basic_demo.yaml index 7dcce782..b007147c 100644 --- a/demos/basic/basic_demo.yaml +++ b/demos/basic/basic_demo.yaml @@ -7,14 +7,14 @@ actors: Acquirer: package: demos.sample_actors.acquire class: FileAcquirer - filename: data/Tolias_mesoscope_2.hdf5 + filename: demos/basic/data/Tolias_mesoscope_2.hdf5 framerate: 15 Processor: package: actors.basic_processor class: BasicProcessor - init_filename: data/Tolias_mesoscope_2.hdf5 - config_file: basic_caiman_params.txt + init_filename: demos/basic/data/Tolias_mesoscope_2.hdf5 + config_file: demos/basic/basic_caiman_params.txt Visual: package: actors.visual diff --git a/demos/fastplotlib/README.md b/demos/fastplotlib/README.md new file mode 100644 index 00000000..3e9f84f6 --- /dev/null +++ b/demos/fastplotlib/README.md @@ -0,0 +1,22 @@ +# fastplotlib demo + +This demo consists of a **generator** `actor` that generates random frames of size 512 * 512 that are sent via a queue to a **processor** `actor` that can be used to process the frames and send them via `zmq`. The `fastplotlib.ipynb` notebook then receives the most recent frame via `zmq` and displays it using [`fastplotlib`](https://github.com/kushalkolar/fastplotlib/). + +Usage: + +```bash +# cd to this dir +cd .../improv/demos/fastplotlib + +# start improv +improv run ./fastplotlib.yaml + +# call `setup` in the improv TUI +setup + +# Run the cells in the jupyter notebook until you receive +# the dark blue square in the plot + +# once the plot is ready call `run` in the improv TUI +run +``` diff --git a/demos/fastplotlib/actors/sample_generator.py b/demos/fastplotlib/actors/sample_generator.py new file mode 100644 index 00000000..b5e37dfd --- /dev/null +++ b/demos/fastplotlib/actors/sample_generator.py @@ -0,0 +1,47 @@ +from improv.actor import Actor, RunManager +from datetime import date #used for saving +import numpy as np +import time +import logging; logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class Generator(Actor): + """ + Generate data and puts it in the queue for the processor to take + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.data = None + self.name = "Generator" + self.frame_index = 0 + + def __str__(self): + return f"Name: {self.name}, Data: {self.data}" + + def setup(self): + logger.info('Completed setup for Generator') + + def stop(self): + print("Generator stopping") + return 0 + + def runStep(self): + """ + Generates 512 x 512 frame and puts it in the queue for the processor + """ + + data = np.random.randint(0, 255, size=(512 * 512), dtype=np.uint16).reshape(512, 512) + + frame_ix = np.array([self.frame_index], dtype=np.uint32) + + # there must be a better way to do this + out = np.concatenate( + [data.ravel(), frame_ix], + dtype=np.uint32 + ) + + self.q_out.put(out) + + self.frame_index += 1 diff --git a/demos/fastplotlib/actors/sample_processor.py b/demos/fastplotlib/actors/sample_processor.py new file mode 100644 index 00000000..775fb0d8 --- /dev/null +++ b/demos/fastplotlib/actors/sample_processor.py @@ -0,0 +1,56 @@ +from improv.actor import Actor, RunManager +import numpy as np +from queue import Empty +import logging; logger = logging.getLogger(__name__) +import zmq +logger.setLevel(logging.INFO) + + +class Processor(Actor): + """ + Process data and send it through zmq to be be visualized + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def setup(self): + """ + Creates and binds the socket for zmq + """ + + self.name = "Processor" + + context = zmq.Context() + self.socket = context.socket(zmq.PUB) + self.socket.bind("tcp://127.0.0.1:5555") + + self.frame_index = 0 + + logger.info('Completed setup for Processor') + + def stop(self): + logger.info("Processor stopping") + return 0 + + def runStep(self): + """ + Gets the frame from the queue, take the mean, sends a memoryview + so the zmq subscriber can get the buffer to update the plot + """ + + frame = None + + try: + frame = self.q_in.get(timeout=0.05) + except Empty: + pass + except: + logger.error("Could not get frame!") + + if frame is not None: + self.frame_index += 1 + # do some processing + frame.mean() + # send the buffer + self.socket.send(frame) diff --git a/demos/fastplotlib/fastplotlib.ipynb b/demos/fastplotlib/fastplotlib.ipynb new file mode 100644 index 00000000..12312aaf --- /dev/null +++ b/demos/fastplotlib/fastplotlib.ipynb @@ -0,0 +1,208 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "275928f7-8ed1-496a-b841-c8625d755874", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import zmq\n", + "import numpy as np\n", + "from fastplotlib import Plot" + ] + }, + { + "cell_type": "markdown", + "id": "da757cf2-5de0-426f-a915-434244bbd970", + "metadata": {}, + "source": [ + "### Setup zmq subscriber client" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6d4b4cad-175c-4de7-93fd-a78c00c74ab1", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "context = zmq.Context()\n", + "sub = context.socket(zmq.SUB)\n", + "sub.setsockopt(zmq.SUBSCRIBE, b\"\")\n", + "\n", + "# keep only the most recent message\n", + "sub.setsockopt(zmq.CONFLATE, 1)\n", + "\n", + "# address must match publisher in Processor actor\n", + "sub.connect(\"tcp://127.0.0.1:5555\")" + ] + }, + { + "cell_type": "markdown", + "id": "535c577e-07e3-4862-951d-3e35ee045df4", + "metadata": {}, + "source": [ + "for testing things, benchmark zmq" + ] + }, + { + "cell_type": "raw", + "id": "df251a03-a0fa-4b84-b0f4-25617d90fcc9", + "metadata": { + "tags": [] + }, + "source": [ + "%%timeit\n", + "try:\n", + " a = sub.recv(zmq.NOBLOCK)\n", + "except zmq.Again:\n", + " pass" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e2d059cb-a7e5-46f0-8586-9fd39d6dd6fc", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "def get_buffer():\n", + " \"\"\"\n", + " Gets the buffer from the publisher\n", + " \"\"\"\n", + " try:\n", + " b = sub.recv(zmq.NOBLOCK)\n", + " except zmq.Again:\n", + " pass\n", + " else:\n", + " return b\n", + " \n", + " return None" + ] + }, + { + "cell_type": "markdown", + "id": "5b3f89a1-1a5d-45f2-8af7-56e55e2a0143", + "metadata": {}, + "source": [ + "### Live plot that updates using the most recent message :D " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6bf49577-06be-44ec-93b1-ad50b3d4d794", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "plot = Plot()\n", + "\n", + "# initialize image graphic with zeros\n", + "plot.add_image(\n", + " np.zeros((512, 512), dtype=np.uint16),\n", + " vmin=0,\n", + " vmax=255,\n", + " name=\"img\"\n", + ")\n", + "\n", + "def update_frame(p):\n", + " # recieve memory with buffer\n", + " buff = get_buffer()\n", + " \n", + " if buff is not None:\n", + " # numpy array from buffer\n", + " a = np.frombuffer(buff, dtype=np.uint32)\n", + " ix = a[-1]\n", + " # set graphic data\n", + " p[\"img\"].data = a[:-1].reshape(512, 512)\n", + " p.set_title(f\"frame: {ix}\")\n", + "\n", + "plot.add_animations(update_frame)\n", + "\n", + "plot.show()" + ] + }, + { + "cell_type": "markdown", + "id": "bbb80338-49c3-4233-a98c-4c7f0e88825a", + "metadata": {}, + "source": [ + "## **fastplotlib is non blocking!**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e25aadce-bd4e-4597-85ca-b46d151009d3", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "plot.canvas.get_stats()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "eae5cd5b-2657-40c9-993b-26052439235e", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "plot[\"img\"].cmap = \"viridis\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b4fb841c-0634-4227-9a3b-aef32a1a3b45", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "plot[\"img\"].vmax=300" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "519f3a42-e684-4ca3-b301-8ef447ffd805", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/demos/fastplotlib/fastplotlib.yaml b/demos/fastplotlib/fastplotlib.yaml new file mode 100644 index 00000000..e7a9eb8d --- /dev/null +++ b/demos/fastplotlib/fastplotlib.yaml @@ -0,0 +1,12 @@ +actors: + Generator: + package: actors.sample_generator + class: Generator + + Processor: + package: actors.sample_processor + class: Processor + +connections: + Generator.q_out: [Processor.q_in] + \ No newline at end of file diff --git a/demos/minimal/actors/sample_generator.py b/demos/minimal/actors/sample_generator.py index a5452343..0c90edaf 100644 --- a/demos/minimal/actors/sample_generator.py +++ b/demos/minimal/actors/sample_generator.py @@ -1,7 +1,6 @@ from improv.actor import Actor, RunManager from datetime import date #used for saving import numpy as np -import time import logging; logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -17,9 +16,11 @@ def __init__(self, *args, **kwargs): self.name = "Generator" self.frame_num = 0 + def __str__(self): return f"Name: {self.name}, Data: {self.data}" + def setup(self): """ Generates an array that serves as an initial source of data. @@ -27,28 +28,20 @@ def setup(self): integers from 1-99, inclusive. """ + logger.info('Beginning setup for Generator') self.data = np.asmatrix(np.random.randint(100, size = (100, 5))) logger.info('Completed setup for Generator') - # def run(self): - # """ Send array into the store. - # """ - # self.fcns = {} - # self.fcns['setup'] = self.setup - # self.fcns['run'] = self.runStep - # self.fcns['stop'] = self.stop - - # with RunManager(self.name, self.fcns, self.links) as rm: - # logger.info(rm) def stop(self): """ Save current randint vector to a file. """ - print("Generator stopping") - np.save(f"sample_generator_data_{date.today()}", self.data) #This is not the best example of a save function, will overwrite previous files with the same name. + logger.info("Generator stopping") + np.save("sample_generator_data.npy", self.data) return 0 + def runStep(self): """ Generates additional data after initial setup data is exhausted. @@ -60,11 +53,12 @@ def runStep(self): if(self.frame_num < np.shape(self.data)[0]): data_id = self.client.put(self.data[self.frame_num], str(f"Gen_raw: {self.frame_num}")) + # logger.info('Put data in store') try: self.q_out.put([[data_id, str(self.frame_num)]]) - time.sleep(0.05) + logger.info('Sent message on') self.frame_num += 1 except Exception as e: - logger.error(f"Generator Exception: {e}") + logger.error(f"--------------------------------Generator Exception: {e}") else: self.data = np.concatenate((self.data, np.asmatrix(np.random.randint(10, size=(1, 5)))), axis=0) diff --git a/demos/minimal/actors/sample_processor.py b/demos/minimal/actors/sample_processor.py index ff48822b..1ddc588c 100644 --- a/demos/minimal/actors/sample_processor.py +++ b/demos/minimal/actors/sample_processor.py @@ -1,6 +1,5 @@ -from improv.actor import Actor, RunManager +from improv.actor import Actor import numpy as np -from queue import Empty import logging; logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -28,23 +27,12 @@ def setup(self): self.frame_num = 1 logger.info('Completed setup for Processor') + def stop(self): """ Trivial stop function for testing purposes. """ logger.info("Processor stopping") - return 0 - - # def run(self): - # """ Send array into the store. - # """ - # self.fcns = {} - # self.fcns['setup'] = self.setup - # self.fcns['run'] = self.runStep - # self.fcns['stop'] = self.stop - - # with RunManager(self.name, self.fcns, self.links) as rm: - # logger.info(rm) def runStep(self): @@ -57,9 +45,8 @@ def runStep(self): frame = None try: - frame = self.q_in.get(timeout=0.05) - except Empty: - pass + frame = self.q_in.get(timeout=0.001) + except: logger.error("Could not get frame!") pass @@ -68,9 +55,10 @@ def runStep(self): self.done = False self.frame = self.client.getID(frame[0][0]) avg = np.mean(self.frame[0]) - print(f"Average: {avg}") + + # print(f"Average: {avg}") self.avg_list.append(avg) - print(f"Overall Average: {np.mean(self.avg_list)}") - print(f"Frame number: {self.frame_num}") + # print(f"Overall Average: {np.mean(self.avg_list)}") + # print(f"Frame number: {self.frame_num}") self.frame_num += 1 \ No newline at end of file diff --git a/demos/sample_actors/acquire.py b/demos/sample_actors/acquire.py index d1c5c4aa..7c67f417 100644 --- a/demos/sample_actors/acquire.py +++ b/demos/sample_actors/acquire.py @@ -68,8 +68,8 @@ def runStep(self): if self.done: pass - elif(self.frame_num < len(self.data)): - frame = self.getFrame(self.frame_num) + elif(self.frame_num < len(self.data)*5): + frame = self.getFrame(self.frame_num % len(self.data)) ## simulate frame-dropping # if self.frame_num > 1500 and self.frame_num < 1800: # frame = None diff --git a/demos/sample_actors/analysis.py b/demos/sample_actors/analysis.py index c3928ae5..01430a44 100644 --- a/demos/sample_actors/analysis.py +++ b/demos/sample_actors/analysis.py @@ -77,7 +77,6 @@ def runStep(self): t = time.time() ids = None try: - print(self.links) sig = self.links['input_stim_queue'].get(timeout=0.0001) self.updateStim_start(sig) except Empty as e: diff --git a/demos/sample_actors/analysis_async.py b/demos/sample_actors/analysis_async.py index b7c47f89..e2349e95 100755 --- a/demos/sample_actors/analysis_async.py +++ b/demos/sample_actors/analysis_async.py @@ -60,11 +60,11 @@ async def get_frame(self): If there's a pile-up in the queue, this spawns new [self.analysis] task. """ - asyncio.ensure_future(self.analysis(), loop=self.loop) + asyncio.create_task(self.analysis(), loop=self.loop) while True: if self.aqueue.qsize() > 0: - asyncio.ensure_future(self.analysis(), loop=self.loop) + asyncio.create_task(self.analysis(), loop=self.loop) obj_id = await self.q_in.get_async() # List if obj_id is not None: diff --git a/demos/sample_actors/process.py b/demos/sample_actors/process.py index 1349214e..64bbee69 100644 --- a/demos/sample_actors/process.py +++ b/demos/sample_actors/process.py @@ -53,7 +53,7 @@ def setup(self): self.opts = CNMFParams(params_dict=self.params) self.onAc = OnACID(params = self.opts) #TODO: Need to rewrite init online as well to receive individual frames. - self.onAc.initialize_online() + self.onAc.initialize_online(T=100000) self.max_shifts_online = self.onAc.params.get('online', 'max_shifts_online') self.fitframe_time = [] diff --git a/docs/_config.yml b/docs/_config.yml new file mode 100644 index 00000000..eab67d8d --- /dev/null +++ b/docs/_config.yml @@ -0,0 +1,37 @@ +####################################################################################### +# A default configuration that will be loaded for all jupyter books +# See the documentation for help and more options: +# https://jupyterbook.org/customize/config.html + +####################################################################################### +# Book settings +title : improv documentation # The title of the book. Will be placed in the left navbar. +author : improv team # The author of the book +copyright : "2022" # Copyright year to be placed in the footer +logo : logo.png # A path to the book logo + +# Force re-execution of notebooks on each build. +# See https://jupyterbook.org/content/execute.html +execute: + execute_notebooks: force + +# Define the name of the latex output file for PDF builds +latex: + latex_documents: + targetname: book.tex + +# Add a bibtex file so that we can create citations +bibtex_bibfiles: + - references.bib + +# Information about where the book exists on the web +repository: + url: https://github.com/project-improv/improv # Online location of your book + path_to_book: docs # Optional path to your book, relative to the repository root + branch: main # Which branch of the repository should be used when creating links (optional) + +# Add GitHub buttons to your book +# See https://jupyterbook.org/customize/config.html#add-a-link-to-your-repository +html: + use_issues_button: true + use_repository_button: true \ No newline at end of file diff --git a/docs/_toc.yml b/docs/_toc.yml new file mode 100644 index 00000000..d4311d41 --- /dev/null +++ b/docs/_toc.yml @@ -0,0 +1,9 @@ +# Table of contents +# Learn more at https://jupyterbook.org/customize/toc.html + +format: jb-book +root: intro +chapters: +- file: markdown +- file: notebooks +- file: markdown-notebooks \ No newline at end of file diff --git a/docs/content.md b/docs/content.md new file mode 100644 index 00000000..0f6aca77 --- /dev/null +++ b/docs/content.md @@ -0,0 +1,5 @@ +Content in Jupyter Book +======================= + +There are many ways to write content in Jupyter Book. This short section +covers a few tips for how to do so. diff --git a/docs/intro.md b/docs/intro.md new file mode 100644 index 00000000..95577c40 --- /dev/null +++ b/docs/intro.md @@ -0,0 +1,11 @@ +# Welcome to your Jupyter Book + +This is a small sample book to give you a feel for how book content is +structured. +It shows off a few of the major file types, as well as some sample content. +It does not go in-depth into any particular topic - check out [the Jupyter Book documentation](https://jupyterbook.org) for more information. + +Check out the content pages bundled with this sample book to see more. + +```{tableofcontents} +``` \ No newline at end of file diff --git a/docs/logo.png b/docs/logo.png new file mode 100644 index 00000000..06d56f40 Binary files /dev/null and b/docs/logo.png differ diff --git a/docs/markdown-notebooks.md b/docs/markdown-notebooks.md new file mode 100644 index 00000000..6d971040 --- /dev/null +++ b/docs/markdown-notebooks.md @@ -0,0 +1,54 @@ +--- +jupytext: + cell_metadata_filter: -all + formats: md:myst + text_representation: + extension: .md + format_name: myst + format_version: 0.13 + jupytext_version: 1.11.5 +kernelspec: + display_name: Python 3 + language: python + name: python3 +--- + +# Notebooks with MyST Markdown + +Jupyter Book also lets you write text-based notebooks using MyST Markdown. +See [the Notebooks with MyST Markdown documentation](https://jupyterbook.org/file-types/myst-notebooks.html) for more detailed instructions. +This page shows off a notebook written in MyST Markdown. + +## An example cell + +With MyST Markdown, you can define code cells with a directive like so: + +```{code-cell} +print(2 + 2) +``` + +When your book is built, the contents of any `{code-cell}` blocks will be +executed with your default Jupyter kernel, and their outputs will be displayed +in-line with the rest of your content. + +```{seealso} +Jupyter Book uses [Jupytext](https://jupytext.readthedocs.io/en/latest/) to convert text-based files to notebooks, and can support [many other text-based notebook files](https://jupyterbook.org/file-types/jupytext.html). +``` + +## Create a notebook with MyST Markdown + +MyST Markdown notebooks are defined by two things: + +1. YAML metadata that is needed to understand if / how it should convert text files to notebooks (including information about the kernel needed). + See the YAML at the top of this page for example. +2. The presence of `{code-cell}` directives, which will be executed with your book. + +That's all that is needed to get started! + +## Quickly add YAML metadata for MyST Notebooks + +If you have a markdown file and you'd like to quickly add YAML metadata to it, so that Jupyter Book will treat it as a MyST Markdown Notebook, run the following command: + +``` +jupyter-book myst init path/to/markdownfile.md +``` \ No newline at end of file diff --git a/docs/markdown.md b/docs/markdown.md new file mode 100644 index 00000000..deaf054e --- /dev/null +++ b/docs/markdown.md @@ -0,0 +1,55 @@ +# Markdown Files + +Whether you write your book's content in Jupyter Notebooks (`.ipynb`) or +in regular markdown files (`.md`), you'll write in the same flavor of markdown +called **MyST Markdown**. +This is a simple file to help you get started and show off some syntax. + +## What is MyST? + +MyST stands for "Markedly Structured Text". It +is a slight variation on a flavor of markdown called "CommonMark" markdown, +with small syntax extensions to allow you to write **roles** and **directives** +in the Sphinx ecosystem. + +For more about MyST, see [the MyST Markdown Overview](https://jupyterbook.org/content/myst.html). + +## Sample Roles and Directives + +Roles and directives are two of the most powerful tools in Jupyter Book. They +are kind of like functions, but written in a markup language. They both +serve a similar purpose, but **roles are written in one line**, whereas +**directives span many lines**. They both accept different kinds of inputs, +and what they do with those inputs depends on the specific role or directive +that is being called. + +Here is a "note" directive: + +```{note} +Here is a note +``` + +It will be rendered in a special box when you build your book. + +Here is an inline directive to refer to a document: {doc}`markdown-notebooks`. + + +## Citations + +You can also cite references that are stored in a `bibtex` file. For example, +the following syntax: `` {cite}`holdgraf_evidence_2014` `` will render like +this: {cite}`holdgraf_evidence_2014`. + +Moreover, you can insert a bibliography into your page with this syntax: +The `{bibliography}` directive must be used for all the `{cite}` roles to +render properly. +For example, if the references for your book are stored in `references.bib`, +then the bibliography is inserted with: + +```{bibliography} +``` + +## Learn more + +This is just a simple starter to get you started. +You can learn a lot more at [jupyterbook.org](https://jupyterbook.org). \ No newline at end of file diff --git a/docs/notebooks.ipynb b/docs/notebooks.ipynb new file mode 100644 index 00000000..fdb7176c --- /dev/null +++ b/docs/notebooks.ipynb @@ -0,0 +1,122 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Content with notebooks\n", + "\n", + "You can also create content with Jupyter Notebooks. This means that you can include\n", + "code blocks and their outputs in your book.\n", + "\n", + "## Markdown + notebooks\n", + "\n", + "As it is markdown, you can embed images, HTML, etc into your posts!\n", + "\n", + "![](https://myst-parser.readthedocs.io/en/latest/_static/logo-wide.svg)\n", + "\n", + "You can also $add_{math}$ and\n", + "\n", + "$$\n", + "math^{blocks}\n", + "$$\n", + "\n", + "or\n", + "\n", + "$$\n", + "\\begin{aligned}\n", + "\\mbox{mean} la_{tex} \\\\ \\\\\n", + "math blocks\n", + "\\end{aligned}\n", + "$$\n", + "\n", + "But make sure you \\$Escape \\$your \\$dollar signs \\$you want to keep!\n", + "\n", + "## MyST markdown\n", + "\n", + "MyST markdown works in Jupyter Notebooks as well. For more information about MyST markdown, check\n", + "out [the MyST guide in Jupyter Book](https://jupyterbook.org/content/myst.html),\n", + "or see [the MyST markdown documentation](https://myst-parser.readthedocs.io/en/latest/).\n", + "\n", + "## Code blocks and outputs\n", + "\n", + "Jupyter Book will also embed your code blocks and output in your book.\n", + "For example, here's some sample Matplotlib code:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from matplotlib import rcParams, cycler\n", + "import matplotlib.pyplot as plt\n", + "import numpy as np\n", + "plt.ion()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Fixing random state for reproducibility\n", + "np.random.seed(19680801)\n", + "\n", + "N = 10\n", + "data = [np.logspace(0, 1, 100) + np.random.randn(100) + ii for ii in range(N)]\n", + "data = np.array(data).T\n", + "cmap = plt.cm.coolwarm\n", + "rcParams['axes.prop_cycle'] = cycler(color=cmap(np.linspace(0, 1, N)))\n", + "\n", + "\n", + "from matplotlib.lines import Line2D\n", + "custom_lines = [Line2D([0], [0], color=cmap(0.), lw=4),\n", + " Line2D([0], [0], color=cmap(.5), lw=4),\n", + " Line2D([0], [0], color=cmap(1.), lw=4)]\n", + "\n", + "fig, ax = plt.subplots(figsize=(10, 5))\n", + "lines = ax.plot(data)\n", + "ax.legend(custom_lines, ['Cold', 'Medium', 'Hot']);" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "There is a lot more that you can do with outputs (such as including interactive outputs)\n", + "with your book. For more information about this, see [the Jupyter Book documentation](https://jupyterbook.org)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.0" + }, + "widgets": { + "application/vnd.jupyter.widget-state+json": { + "state": {}, + "version_major": 2, + "version_minor": 0 + } + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/docs/references.bib b/docs/references.bib new file mode 100644 index 00000000..87e60988 --- /dev/null +++ b/docs/references.bib @@ -0,0 +1,55 @@ +--- +--- + +@inproceedings{holdgraf_evidence_2014, + address = {Brisbane, Australia, Australia}, + title = {Evidence for {Predictive} {Coding} in {Human} {Auditory} {Cortex}}, + booktitle = {International {Conference} on {Cognitive} {Neuroscience}}, + publisher = {Frontiers in Neuroscience}, + author = {Holdgraf, Christopher Ramsay and de Heer, Wendy and Pasley, Brian N. and Knight, Robert T.}, + year = {2014} +} + +@article{holdgraf_rapid_2016, + title = {Rapid tuning shifts in human auditory cortex enhance speech intelligibility}, + volume = {7}, + issn = {2041-1723}, + url = {http://www.nature.com/doifinder/10.1038/ncomms13654}, + doi = {10.1038/ncomms13654}, + number = {May}, + journal = {Nature Communications}, + author = {Holdgraf, Christopher Ramsay and de Heer, Wendy and Pasley, Brian N. and Rieger, Jochem W. and Crone, Nathan and Lin, Jack J. and Knight, Robert T. and Theunissen, Frédéric E.}, + year = {2016}, + pages = {13654}, + file = {Holdgraf et al. - 2016 - Rapid tuning shifts in human auditory cortex enhance speech intelligibility.pdf:C\:\\Users\\chold\\Zotero\\storage\\MDQP3JWE\\Holdgraf et al. - 2016 - Rapid tuning shifts in human auditory cortex enhance speech intelligibility.pdf:application/pdf} +} + +@inproceedings{holdgraf_portable_2017, + title = {Portable learning environments for hands-on computational instruction using container-and cloud-based technology to teach data science}, + volume = {Part F1287}, + isbn = {978-1-4503-5272-7}, + doi = {10.1145/3093338.3093370}, + abstract = {© 2017 ACM. There is an increasing interest in learning outside of the traditional classroom setting. This is especially true for topics covering computational tools and data science, as both are challenging to incorporate in the standard curriculum. These atypical learning environments offer new opportunities for teaching, particularly when it comes to combining conceptual knowledge with hands-on experience/expertise with methods and skills. Advances in cloud computing and containerized environments provide an attractive opportunity to improve the effciency and ease with which students can learn. This manuscript details recent advances towards using commonly-Available cloud computing services and advanced cyberinfrastructure support for improving the learning experience in bootcamp-style events. We cover the benets (and challenges) of using a server hosted remotely instead of relying on student laptops, discuss the technology that was used in order to make this possible, and give suggestions for how others could implement and improve upon this model for pedagogy and reproducibility.}, + author = {Holdgraf, Christopher Ramsay and Culich, A. and Rokem, A. and Deniz, F. and Alegro, M. and Ushizima, D.}, + year = {2017}, + keywords = {Teaching, Bootcamps, Cloud computing, Data science, Docker, Pedagogy} +} + +@article{holdgraf_encoding_2017, + title = {Encoding and decoding models in cognitive electrophysiology}, + volume = {11}, + issn = {16625137}, + doi = {10.3389/fnsys.2017.00061}, + abstract = {© 2017 Holdgraf, Rieger, Micheli, Martin, Knight and Theunissen. Cognitive neuroscience has seen rapid growth in the size and complexity of data recorded from the human brain as well as in the computational tools available to analyze this data. This data explosion has resulted in an increased use of multivariate, model-based methods for asking neuroscience questions, allowing scientists to investigate multiple hypotheses with a single dataset, to use complex, time-varying stimuli, and to study the human brain under more naturalistic conditions. These tools come in the form of “Encoding” models, in which stimulus features are used to model brain activity, and “Decoding” models, in which neural features are used to generated a stimulus output. Here we review the current state of encoding and decoding models in cognitive electrophysiology and provide a practical guide toward conducting experiments and analyses in this emerging field. Our examples focus on using linear models in the study of human language and audition. We show how to calculate auditory receptive fields from natural sounds as well as how to decode neural recordings to predict speech. The paper aims to be a useful tutorial to these approaches, and a practical introduction to using machine learning and applied statistics to build models of neural activity. The data analytic approaches we discuss may also be applied to other sensory modalities, motor systems, and cognitive systems, and we cover some examples in these areas. In addition, a collection of Jupyter notebooks is publicly available as a complement to the material covered in this paper, providing code examples and tutorials for predictive modeling in python. The aimis to provide a practical understanding of predictivemodeling of human brain data and to propose best-practices in conducting these analyses.}, + journal = {Frontiers in Systems Neuroscience}, + author = {Holdgraf, Christopher Ramsay and Rieger, J.W. and Micheli, C. and Martin, S. and Knight, R.T. and Theunissen, F.E.}, + year = {2017}, + keywords = {Decoding models, Encoding models, Electrocorticography (ECoG), Electrophysiology/evoked potentials, Machine learning applied to neuroscience, Natural stimuli, Predictive modeling, Tutorials} +} + +@book{ruby, + title = {The Ruby Programming Language}, + author = {Flanagan, David and Matsumoto, Yukihiro}, + year = {2008}, + publisher = {O'Reilly Media} +} \ No newline at end of file diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 00000000..06c1b40e --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1,3 @@ +jupyter-book +matplotlib +numpy \ No newline at end of file diff --git a/improv/__init__.py b/improv/__init__.py index f7d81f96..e69de29b 100644 --- a/improv/__init__.py +++ b/improv/__init__.py @@ -1,25 +0,0 @@ -import logging -import sys -import os.path -import click -from improv.nexus import Nexus - -@click.command() -@click.option('-a', '--actor-path', type=click.Path(exists=True, resolve_path=True), multiple=True, default=[''], help="search path to add to sys.path when looking for actors; defaults to the directory containing CONFIGFILE") -@click.argument('configfile', type=click.Path(exists=True, dir_okay=False, resolve_path=True)) -def default_invocation(configfile, actor_path): - """ - Function provided as an entry point for command-line usage. Invoke using - improv - """ - logging.getLogger("matplotlib").setLevel(logging.WARNING) - - if not actor_path: - sys.path.append(os.path.dirname(configfile)) - else: - sys.path.extend(actor_path) - click.echo(configfile) - - nexus = Nexus("Sample") - nexus.createNexus(file=configfile) - nexus.startNexus() \ No newline at end of file diff --git a/improv/actor.py b/improv/actor.py index 760511ef..ed10eaed 100644 --- a/improv/actor.py +++ b/improv/actor.py @@ -9,15 +9,15 @@ logger.setLevel(logging.INFO) class AbstractActor(): - ''' Base class for an actor that Nexus + """ Base class for an actor that Nexus controls and interacts with. Needs to have a store and links for communication Also needs to be responsive to sent Signals (e.g. run, setup, etc) - ''' + """ def __init__(self, name, method='fork'): - ''' Require a name for multiple instances of the same actor/class + """ Require a name for multiple instances of the same actor/class Create initial empty dict of Links for easier referencing - ''' + """ self.q_watchout = None self.name = name self.links = {} @@ -37,8 +37,8 @@ def __repr__(self): Returns: [str]: _description_ """ - ''' Return this instance name and links dict - ''' + """ Return this instance name and links dict + """ return self.name+': '+str(self.links.keys()) def setStore(self, client): @@ -47,8 +47,8 @@ def setStore(self, client): Args: client (improv.nexus.Link): _description_ """ - ''' Set client interface to the store - ''' + """ Set client interface to the store + """ self.client = client def _getStoreInterface(self): @@ -58,49 +58,49 @@ def _getStoreInterface(self): self.setStore(store) def setLinks(self, links): - ''' General full dict set for links - ''' + """ General full dict set for links + """ self.links = links def setCommLinks(self, q_comm, q_sig): - ''' Set explicit communication links to/from Nexus (q_comm, q_sig) + """ Set explicit communication links to/from Nexus (q_comm, q_sig) q_comm is for messages from this actor to Nexus q_sig is signals from Nexus and must be checked first - ''' + """ self.q_comm = q_comm self.q_sig = q_sig self.links.update({'q_comm':self.q_comm, 'q_sig':self.q_sig}) def setLinkIn(self, q_in): - ''' Set the dedicated input queue - ''' + """ Set the dedicated input queue + """ self.q_in = q_in self.links.update({'q_in':self.q_in}) def setLinkOut(self, q_out): - ''' Set the dedicated output queue - ''' + """ Set the dedicated output queue + """ self.q_out = q_out self.links.update({'q_out':self.q_out}) def setLinkWatch(self, q_watch): - ''' - ''' + """ + """ self.q_watchout= q_watch self.links.update({'q_watchout':self.q_watchout}) def addLink(self, name, link): - ''' Function provided to add additional data links by name + """ Function provided to add additional data links by name using same form as q_in or q_out Must be done during registration and not during run - ''' + """ self.links.update({name:link}) # User can then use: self.my_queue = self.links['my_queue'] in a setup fcn, # or continue to reference it using self.links['my_queue'] def getLinks(self): - ''' Returns dictionary of links - ''' + """ Returns dictionary of links + """ return self.links def put(self, idnames, q_out= None, save=None): @@ -122,25 +122,25 @@ def put(self, idnames, q_out= None, save=None): self.q_watchout.put(idnames[i]) def run(self): - ''' Must run in continuous mode + """ Must run in continuous mode Also must check q_sig either at top of a run-loop or as async with the primary function - ''' + """ raise NotImplementedError - ''' Suggested implementation for synchronous running: see RunManager class below - ''' + """ Suggested implementation for synchronous running: see RunManager class below + """ - def stop(): + def stop(self): """ Specify method for momentarily stopping the run and saving data. """ pass def changePriority(self): - ''' Try to lower this process' priority + """ Try to lower this process' priority Only changes priority if lower_priority is set TODO: Only works on unix machines. Add Windows functionality - ''' + """ if self.lower_priority is True: import os, psutil p = psutil.Process(os.getpid()) @@ -165,10 +165,10 @@ def run(self): pass def setup(self): - ''' Essenitally the registration process + """ Essenitally the registration process Can also be an initialization for the actor options is a list of options, can be empty - ''' + """ pass def runStep(self): @@ -193,10 +193,10 @@ def run(self): pass async def setup(self): - ''' Essenitally the registration process + """ Essenitally the registration process Can also be an initialization for the actor options is a list of options, can be empty - ''' + """ pass async def runStep(self): @@ -211,8 +211,8 @@ async def stop(self): class RunManager(): - ''' - ''' + """ + """ def __init__(self, name, actions, links, runStore=None, timeout=1e-6): self.run = False self.stop = False @@ -241,12 +241,11 @@ def __enter__(self): logger.error('Actor '+self.actorName+' exception during run: {}'.format(e)) print(traceback.format_exc()) elif self.stop: - #Read stop codes try: - self.actions['run']() - self.q_comm.put([Signal.ready()]) + self.actions['stop']() except Exception as e: - logger.error(f'Actor {self.actorName} exception during stop: {e}') + logger.error('Actor '+self.actorName+' exception during stop: {}'.format(e)) + print(traceback.format_exc()) self.stop = False #Run once elif self.config: try: @@ -258,12 +257,6 @@ def __enter__(self): logger.error('Actor '+self.actorName+' exception during setup: {}'.format(e)) print(traceback.format_exc()) self.config = False - elif self.stop: - try: - self.actions['stop']() - except Exception as e: - logger.error('Actor '+self.actorName+' exception during stop: {}'.format(e)) - print(traceback.format_exc()) # Check for new Signals received from Nexus try: @@ -287,6 +280,8 @@ def __enter__(self): elif signal == Signal.resume(): #currently treat as same as run logger.warning('Received resume signal, resuming') self.run = True + except KeyboardInterrupt: + break except Empty as e: pass # No signal from Nexus @@ -320,13 +315,17 @@ def __init__(self, name, run_method: Callable[[], Awaitable[None]], setup, self.loop = asyncio.get_event_loop() self.start = time.time() + signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) + for s in signals: + self.loop.add_signal_handler(s, lambda s=s: self.loop.stop()) + async def __aenter__(self): while True: signal = await self.q_sig.get_async() if signal == Signal.run() or signal == Signal.resume(): if not self.run: self.run = True - asyncio.ensure_future(self.run_method(), loop=self.loop) + asyncio.create_task(self.run_method(), loop=self.loop) print('Received run signal, begin running') elif signal == Signal.setup(): self.setup() @@ -346,10 +345,10 @@ async def __aexit__(self, type, value, traceback): class Signal(): - ''' Class containing definition of signals Nexus uses + """ Class containing definition of signals Nexus uses to communicate with its actors TODO: doc each of these with expected handling behavior - ''' + """ @staticmethod def run(): return 'run' diff --git a/improv/cli.py b/improv/cli.py new file mode 100644 index 00000000..83351ed2 --- /dev/null +++ b/improv/cli.py @@ -0,0 +1,252 @@ +import logging +import os.path +import re +import argparse +import subprocess +import sys +import psutil +import time +import datetime +import re +from zmq import SocketOption +from zmq.log.handlers import PUBHandler +from improv.tui import TUI +from improv.nexus import Nexus + +MAX_PORT = 2**16 - 1 +DEFAULT_CONTROL_PORT = "0" +DEFAULT_OUTPUT_PORT = "0" +DEFAULT_LOGGING_PORT = "0" + +def file_exists(fname): + if not os.path.isfile(fname): + raise argparse.ArgumentTypeError("{} not found".format(fname)) + return fname + +def path_exists(path): + if not os.path.exists(path): + raise argparse.ArgumentTypeError("{} not found".format(path)) + return path + +def is_valid_port(port): + p = int(port) + if 0 <= p < MAX_PORT: + return p + else: + raise argparse.ArgumentTypeError("Port {} invalid. Ports must be in [0, {}).".format(p, MAX_PORT)) + +def is_valid_ip_addr(addr): + regex = r"[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}" + if ':' in addr: + [address, port] = addr.split(':') + match = re.match(regex, address) + part_list = address.split('.') + if not match or len(part_list) != 4 or not all([0 <= int(part) < 256 for part in part_list]): + raise argparse.ArgumentTypeError("{address!r} is not a valid address.".format(address=address)) + else: + ip = address + + else: # assume it's just a port + ip = "127.0.0.1" # localhost + port = addr + + port = str(is_valid_port(port)) + + return(ip + ":" + port) + +def parse_cli_args(args): + parser = argparse.ArgumentParser(description='Command line tool for improv.') + + subparsers = parser.add_subparsers(title="subcommands", help="for launching individual components", required=False) + + run_parser = subparsers.add_parser('run', description="Start the improv client and server together") + run_parser.add_argument('-c', '--control-port', type=is_valid_port, default=DEFAULT_CONTROL_PORT, help="local port on which control are sent to/from server") + run_parser.add_argument('-o', '--output-port', type=is_valid_port, default=DEFAULT_OUTPUT_PORT, help="local port on which server output messages are broadcast") + run_parser.add_argument('-l', '--logging-port', type=is_valid_port, default=DEFAULT_LOGGING_PORT, help="local port on which logging messages are broadcast") + run_parser.add_argument('-f', '--logfile', default="global.log", help="name of log file") + run_parser.add_argument('-a', '--actor-path', type=path_exists, action='append', default=[], help="search path to add to sys.path when looking for actors; defaults to the directory containing configfile") + run_parser.add_argument('configfile', type=file_exists, help="YAML file specifying improv pipeline") + run_parser.set_defaults(func=run) + + client_parser = subparsers.add_parser('client', description="Start the improv client") + client_parser.add_argument('-c', '--control-port', type=is_valid_ip_addr, default=DEFAULT_CONTROL_PORT, help="address on which control signals are sent to the server") + client_parser.add_argument('-s', '--server-port', type=is_valid_ip_addr, default=DEFAULT_OUTPUT_PORT, help="address on which messages from the server are received") + client_parser.add_argument('-l', '--logging-port', type=is_valid_ip_addr, default=DEFAULT_LOGGING_PORT, help="address on which logging messages are broadcast") + client_parser.set_defaults(func=run_client) + + server_parser = subparsers.add_parser('server', description="Start the improv server") + server_parser.add_argument('-c', '--control-port', type=is_valid_port, default=DEFAULT_CONTROL_PORT, help="local port on which control signals are received") + server_parser.add_argument('-o', '--output-port', type=is_valid_port, default=DEFAULT_OUTPUT_PORT, help="local port on which output messages are broadcast") + server_parser.add_argument('-l', '--logging-port', type=is_valid_port, default=DEFAULT_LOGGING_PORT, help="local port on which logging messages are broadcast") + server_parser.add_argument('-f', '--logfile', default="global.log", help="name of log file") + server_parser.add_argument('-a', '--actor-path', type=path_exists, action='append', default=[], help="search path to add to sys.path when looking for actors; defaults to the directory containing configfile") + server_parser.add_argument('configfile', type=file_exists, help="YAML file specifying improv pipeline") + server_parser.set_defaults(func=run_server) + + list_parser = subparsers.add_parser('list', description="List running improv processes") + list_parser.set_defaults(func=run_list) + + cleanup_parser = subparsers.add_parser('cleanup', description="Kill all processes returned by 'improv list'") + cleanup_parser.set_defaults(func=run_cleanup) + + return parser.parse_args(args) + + +def default_invocation(): + """ + Function provided as an entry point for command-line usage. + """ + args = parse_cli_args(sys.argv[1:]) + args.func(args) + + +def run_client(args): + app = TUI(args.control_port, args.server_port, args.logging_port) + + app.run() + +def run_server(args): + """ + Runs the improv server in headless mode. + """ + zmq_log_handler = PUBHandler('tcp://*:%s' % args.logging_port) + + # in case we bound to a random port (default), get port number + logging_port = int(zmq_log_handler.socket.getsockopt_string(SocketOption.LAST_ENDPOINT).split(':')[-1]) + logging.basicConfig(level=logging.DEBUG, + format='%(name)s %(message)s', + handlers=[logging.FileHandler(args.logfile), + zmq_log_handler]) + + if not args.actor_path: + sys.path.append(os.path.dirname(args.configfile)) + else: + sys.path.extend(args.actor_path) + + server = Nexus() + control_port, output_port = server.createNexus(file=args.configfile, control_port=args.control_port, output_port=args.output_port) + curr_dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + print(f"{curr_dt} Server running on (control, output, log) ports ({control_port}, {output_port}, {logging_port}).\nPress Ctrl-C to quit.") + server.startNexus() + + if args.actor_path: + for p in args.actor_path: + sys.path.remove(p) + else: + sys.path.remove(os.path.dirname(args.configfile)) + +def run_list(args, printit=True): + out_list = [] + pattern = re.compile(r'(improv (run|client|server)|plasma_store)') + mp_pattern = re.compile(r'-c from multiprocessing') + for proc in psutil.process_iter(['pid', 'name', 'cmdline']): + if proc.info['cmdline']: + cmdline = ' '.join(proc.info['cmdline']) + if re.search(pattern, cmdline) or re.search(mp_pattern, cmdline): + out_list.append(proc) + if printit: + print(f"{proc.pid} {proc.name()} {cmdline}") + + return out_list + +def run_cleanup(args, headless=False): + proc_list = run_list(args, printit=False) + if proc_list: + if not headless: + print(f"The following {len(proc_list)} processes will be killed:") + for proc in proc_list: + cmdline = ' '.join(proc.info['cmdline']) + print(f"{proc.pid} {proc.name()} {cmdline}") + res = input("Is that okay [y/N]? ") + else: + res = 'y' + + if res.lower() == 'y': + for proc in proc_list: + if not proc.status == 'terminated': + try: + proc.terminate() + except psutil.NoSuchProcess: + pass + gone, alive = psutil.wait_procs(proc_list, timeout=3) + for p in alive: + p.kill() + else: + if not headless: + print("No running processes found.") + +def run(args): + apath_opts = [] + for p in args.actor_path: + if p: + apath_opts.append('-a') + apath_opts.append(p) + + server_opts = ['improv', 'server', + '-c', str(args.control_port), + '-o', str(args.output_port), + '-l', str(args.logging_port), + '-f', args.logfile, + ] + server_opts.extend(apath_opts) + server_opts.append(args.configfile) + + # save current datetime so we can see when server has started up + curr_dt = datetime.datetime.now().replace(microsecond=0) + + with open(args.logfile, mode='a+') as logfile: + server = subprocess.Popen(server_opts, stdout=logfile, stderr=logfile) + + # wait for server to start up + timeout = 10 + increment = 0.05 + time_now = 0 + while time_now < timeout: + server_start_time = _server_start_logged(args.logfile) + if server_start_time and server_start_time >= curr_dt: + control_port, output_port, logging_port = _get_ports(args.logfile) + break + else: + time.sleep(increment) + time_now += increment + + args.logging_port = logging_port + args.control_port = control_port + args.server_port = output_port + run_client(args) + + try: + server.wait(timeout=2) + except subprocess.TimeoutExpired: + print("Cleaning up the hard way. May have exited dirty.") + server.terminate() + server.wait(10) + run_cleanup(args, headless=True) + +def _server_start_logged(logfile): + # read logfile to make sure new session and ports are logged + with open(logfile, mode='r') as logfile: + contents = logfile.read() + pattern = re.compile(r'\d{4}-\d{2}-\d{2} \d{2}\:\d{2}\:\d{2}(?= Server running on)') + matches = pattern.findall(contents) + if matches: + return datetime.datetime.strptime(matches[-1],'%Y-%m-%d %H:%M:%S') + else: + return None + +def _get_ports(logfile): + # read logfile to get ports + with open(logfile, mode='r') as logfile: + contents = logfile.read() + + pattern = re.compile(r'(?<=\(control, output, log\) ports \()\d*, \d*, \d*') + + # get most recent match (log file may contain old runs) + port_str_list = pattern.findall(contents) + if port_str_list: + port_str = port_str_list[-1] + return (int(p) for p in port_str.split(', ')) + else: + return None + + diff --git a/improv/config.py b/improv/config.py index f76c764f..7ed81f0b 100644 --- a/improv/config.py +++ b/improv/config.py @@ -8,9 +8,9 @@ #TODO: Write a save function for Config objects output as YAML configFile but using ConfigModule objects class Config(): - ''' Handles configuration and logs of configs for + """ Handles configuration and logs of configs for the entire server/processing pipeline. - ''' + """ def __init__(self, configFile): if configFile is None: @@ -25,9 +25,9 @@ def __init__(self, configFile): self.hasGUI = False def createConfig(self): - ''' Read yaml config file and create config for Nexus + """ Read yaml config file and create config for Nexus TODO: check for config file compliance, error handle it - ''' + """ with open(self.configFile, 'r') as ymlfile: cfg = yaml.safe_load(ymlfile) @@ -94,12 +94,12 @@ def createConfig(self): def addParams(self, type, param): - ''' Function to add paramter param of type type - ''' + """ Function to add paramter param of type type + """ def saveActors(self): - ''' Saves the config to a specific file. - ''' + """ Saves the config to a specific file. + """ wflag = True @@ -120,8 +120,8 @@ def __init__(self, name, packagename, classname, options=None): self.options = options def saveConfigModules(self, pathName, wflag): - ''' Loops through each actor to save the modules to the config file. - ''' + """ Loops through each actor to save the modules to the config file. + """ if wflag: writeOption = 'w' diff --git a/improv/link.py b/improv/link.py index 56616b27..9a2355fd 100644 --- a/improv/link.py +++ b/improv/link.py @@ -2,13 +2,10 @@ import logging from multiprocessing import Manager, cpu_count, set_start_method from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor +from concurrent.futures._base import CancelledError logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) -logging.basicConfig(level=logging.DEBUG, - format='%(name)s %(message)s', - handlers=[logging.FileHandler("global.log"), - logging.StreamHandler()]) def Link(name, start, end): """ Function to construct a queue that Nexus uses for @@ -176,8 +173,13 @@ async def put_async(self, item): """ loop = asyncio.get_event_loop() - res = await loop.run_in_executor(self._executor, self.put, item) - return res + try: + res = await loop.run_in_executor(self._executor, self.put, item) + return res + except EOFError: + logger.warn('Link probably killed (EOF)') + except FileNotFoundError: + logger.warn('probably killed (file not found)') async def get_async(self): """ Coroutine for an asynchronous get @@ -200,9 +202,14 @@ async def get_async(self): self.result = await loop.run_in_executor(self._executor, self.get) self.status = 'done' return self.result + except CancelledError: + logger.info('Task {} Canceled'.format(self.name)) + except EOFError: + logger.info('probably killed') + except FileNotFoundError: + logger.info('probably killed') except Exception as e: logger.exception('Error in get_async: {}'.format(e)) - pass def cancel_join_thread(self): """ Function wrapper for cancel_join_thread. diff --git a/improv/nexus.py b/improv/nexus.py index c8b869c7..e94de6a2 100644 --- a/improv/nexus.py +++ b/improv/nexus.py @@ -5,6 +5,8 @@ import time import subprocess import logging +import zmq.asyncio as zmq +from zmq import PUB, REP, SocketOption from multiprocessing import Process, get_context from importlib import import_module @@ -19,26 +21,38 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) -logging.basicConfig(level=logging.DEBUG, - format='%(name)s %(message)s', - handlers=[logging.FileHandler("global.log"), - logging.StreamHandler()]) # TODO: Set up store.notify in async function (?) # TODO: Rename store variables here (not stricly necessary) class Nexus(): - ''' Main server class for handling objects in RASP - ''' - def __init__(self, name): + """ Main server class for handling objects in RASP + """ + def __init__(self, name="Server"): self.name = name def __str__(self): return self.name - def createNexus(self, file=None, use_hdd=False, use_watcher=False, store_size=10000000): + def createNexus(self, file=None, use_hdd=False, use_watcher=False, store_size=10000000, + control_port=0, output_port=0): + + curr_dt = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + logger.info(f"************ new improv server session {curr_dt} ************") + + # set up socket in lieu of printing to stdout + self.zmq_context = zmq.Context() + self.out_socket = self.zmq_context.socket(PUB) + self.out_socket.bind("tcp://*:%s" % output_port) + output_port = int(self.out_socket.getsockopt_string(SocketOption.LAST_ENDPOINT).split(':')[-1]) + + self.in_socket = self.zmq_context.socket(REP) + self.in_socket.bind("tcp://*:%s" % control_port) + control_port = int(self.in_socket.getsockopt_string(SocketOption.LAST_ENDPOINT).split(':')[-1]) + self._startStore(store_size) #default size should be system-dependent; this is 40 GB + self.out_socket.send_string("Store started") #connect to store and subscribe to notifications self.store = Store() @@ -71,8 +85,10 @@ def createNexus(self, file=None, use_hdd=False, use_watcher=False, store_size=10 self.allowStart = False self.stopped = False + return (control_port, output_port) + def loadConfig(self, file): - ''' For each connection: + """ For each connection: create a Link with a name (purpose), start, and end Start links to one actor's name, end to the other. Nexus gives start_actor the Link as a q_in, @@ -82,7 +98,7 @@ def loadConfig(self, file): for communication purposes. OR For each connection, create 2 Links. Nexus acts as intermediary. - ''' + """ #TODO load from file or user input, as in dialogue through FrontEnd? self.config = Config(configFile = file) @@ -146,10 +162,10 @@ def loadConfig(self, file): def startNexus(self): - - ''' Puts all actors in separate processes and begins polling + """ + Puts all actors in separate processes and begins polling to listen to comm queues - ''' + """ for name,m in self.actors.items(): # m accesses the specific actor class instance if 'GUI' not in name: #GUI already started if 'method' in self.config.actors[name].options: @@ -173,25 +189,23 @@ def startNexus(self): # if self.config.hasGUI: loop = asyncio.get_event_loop() - signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) - for s in signals: - loop.add_signal_handler( - s, lambda s=s: self.stop_polling(s, loop)) #TODO try: + self.out_socket.send_string("Awaiting input:") res = loop.run_until_complete(self.pollQueues()) #TODO: in Link executor, complete all tasks except asyncio.CancelledError: - logging.info("Loop is cancelled") + logger.info("Loop is cancelled") try: - logging.info(f"Result of run_until_complete: {res}") + logger.info(f"Result of run_until_complete: {res}") except: - logging.info("Res failed to await") + logger.info("Res failed to await") - logging.info(f"Current loop: {asyncio.get_event_loop()}") + logger.info(f"Current loop: {asyncio.get_event_loop()}") loop.stop() loop.close() logger.info('Shutdown loop') + self.zmq_context.destroy() def start(self): @@ -199,21 +213,23 @@ def start(self): self.t = time.time() for p in self.processes: - logger.info(p) + logger.info(str(p)) p.start() + + logger.info('All processes started') def destroyNexus(self): - ''' Method that calls the internal method + """ Method that calls the internal method to kill the process running the store (plasma server) - ''' + """ logger.warning('Destroying Nexus') self._closeStore() - logger.warning('Killed the central store') async def pollQueues(self): - """ Listens to links and processes their signals. + """ + Listens to links and processes their signals. For every communications queue connected to Nexus, a task is created that gets from the queue. Throughout runtime, when these @@ -224,7 +240,6 @@ async def pollQueues(self): Returns: "Shutting down" (string): Notifies start() that pollQueues has completed. """ - self.actorStates = dict.fromkeys(self.actors.keys()) if not self.config.hasGUI: # Since Visual is not started, it cannot send a ready signal. try: @@ -235,46 +250,67 @@ async def pollQueues(self): pollingNames = list(self.comm_queues.keys()) self.tasks = [] for q in polling: - self.tasks.append(asyncio.ensure_future(q.get_async())) + self.tasks.append(asyncio.create_task(q.get_async())) - self.tasks.append(asyncio.ensure_future(self.ainput('Awaiting input '))) + self.tasks.append(asyncio.create_task(self.remote_input())) + self.early_exit = False + # add signal handlers + loop = asyncio.get_event_loop() + signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) + for s in signals: + loop.add_signal_handler( + s, lambda s=s: self.stop_polling_and_quit(s, polling)) + while not self.flags['quit']: - done, pending = await asyncio.wait(self.tasks, return_when=concurrent.futures.FIRST_COMPLETED) + try: + done, pending = await asyncio.wait(self.tasks, return_when=concurrent.futures.FIRST_COMPLETED) + except asyncio.CancelledError: + pass + + # sort through tasks to see where we got input from (so we can choose a handler) for i,t in enumerate(self.tasks): if i < len(polling): if t in done or polling[i].status == 'done': #catch tasks that complete await wait/gather r = polling[i].result - if 'GUI' in pollingNames[i]: - self.processGuiSignal(r, pollingNames[i]) - else: - self.processActorSignal(r, pollingNames[i]) - self.tasks[i] = (asyncio.ensure_future(polling[i].get_async())) - elif t in done: ##cmd line - res = t.result() - self.processGuiSignal([res.rstrip('\n')], 'commandLine_Nexus') - self.tasks[i] = (asyncio.ensure_future(self.ainput('Awaiting input \n'))) - - self.stop_polling("quit", asyncio.get_running_loop(), polling) - logger.warning('Shutting down polling') + if r: + if 'GUI' in pollingNames[i]: + self.processGuiSignal(r, pollingNames[i]) + else: + self.processActorSignal(r, pollingNames[i]) + self.tasks[i] = (asyncio.create_task(polling[i].get_async())) + elif t in done: + logger.debug("t.result = " + str(t.result())) + self.tasks[i] = asyncio.create_task(self.remote_input()) + + if not self.early_exit: # don't run this again if we already have + self.stop_polling("quit", polling) + logger.warning('Shutting down polling') return "Shutting Down" - - - # https://stackoverflow.com/questions/58454190/python-async-waiting-for-stdin-input-while-doing-other-stuff - async def ainput(self, string: str) -> str: - await asyncio.get_event_loop().run_in_executor( - None, lambda s=string: sys.stdout.write(s)) - return await asyncio.get_event_loop().run_in_executor( - None, sys.stdin.readline) - + + def stop_polling_and_quit(self, signal, queues): + logger.warn('Shutting down via signal handler for {}. Steps may be out of order or dirty.'.format(signal)) + self.stop_polling(signal, queues) + self.flags['quit'] = True + self.early_exit = True + self.quit() + + async def remote_input(self): + msg = await self.in_socket.recv_multipart() + command = msg[0].decode('utf-8') + await self.in_socket.send_string("Awaiting input:") + if command == Signal.quit(): + await self.out_socket.send_string("QUIT") + self.processGuiSignal([command], 'TUI_Nexus') def processGuiSignal(self, flag, name): - '''Receive flags from the Front End as user input + """Receive flags from the Front End as user input TODO: Not all needed - ''' + """ + # import pdb; pdb.set_trace() name = name.split('_')[0] - logger.info('Received signal from user: '+flag[0]) - if flag[0]: + if flag: + logger.info('Received signal from user: ' + flag[0]) if flag[0] == Signal.run(): logger.info('Begin run!') #self.flags['run'] = True @@ -294,7 +330,7 @@ def processGuiSignal(self, flag, name): self.loadConfig(flag[1]) elif flag[0] == Signal.pause(): logger.info('Pausing processes') - # TODO. Alsoresume, reset + # TODO. Also resume, reset # temporary WiP elif flag[0] == Signal.kill(): @@ -337,7 +373,7 @@ def processGuiSignal(self, flag, name): elif flag[0] == Signal.stop(): logger.info('Nexus received stop signal') self.stop() - else: + elif flag: logger.error('Signal received from Nexus but cannot identify {}'.format(flag)) def processActorSignal(self, sig, name): @@ -349,7 +385,7 @@ def processActorSignal(self, sig, name): self.allowStart = True #TODO: replace with q_sig to FE/Visual logger.info('Allowing start') - #TODO: Maybe have flag for auto-start, else require explict command + #TODO: Maybe have flag for auto-start, else require explicit command # if not self.config.hasGUI: # self.run() @@ -363,7 +399,7 @@ def processActorSignal(self, sig, name): def setup(self): for q in self.sig_queues.values(): try: - print('telling to setup:', q) + logger.info('Starting setup: ' + str(q)) q.put_nowait(Signal.setup()) except Full: logger.warning('Signal queue'+q.name+'is full') @@ -376,15 +412,20 @@ def run(self): except Full: logger.warning('Signal queue'+q.name+'is full') #queue full, keep going anyway TODO: add repeat trying as async task + else: + logger.error('-- Not all actors are ready yet, please wait and then try again.') def quit(self): logger.warning('Killing child processes') + self.out_socket.send_string("QUIT") for q in self.sig_queues.values(): try: q.put_nowait(Signal.quit()) except Full as f: logger.warning('Signal queue '+q.name+' full, cannot tell it to quit: {}'.format(f)) + except FileNotFoundError: + logger.warning('Queue {} corrupted.'.format(q.name)) if self.config.hasGUI: self.processes.append(self.p_GUI) @@ -415,7 +456,7 @@ def revive(self): logger.warning('Starting revive') - def stop_polling(self, stop_signal, loop, queues): + def stop_polling(self, stop_signal, queues): """ Cancels outstanding tasks and fills their last request. Puts a string into all active queues, then cancels their @@ -424,39 +465,28 @@ def stop_polling(self, stop_signal, loop, queues): Args: stop_signal (signal.signal): Signal for signal handler. - loop (loop): Event loop for signal handler. queues (AsyncQueue): Comm queues for links. """ - logging.info("Received shutdown order") + logger.info("Received shutdown order") - logging.info(f"Stop signal: {stop_signal}") + logger.info(f"Stop signal: {stop_signal}") shutdown_message = "SHUTDOWN" - [q.put(shutdown_message) for q in queues] - logging.info('Canceling outstanding tasks') - try: - asyncio.gather(*self.tasks) - except asyncio.CancelledError: - logging.info("Gather is cancelled") + for q in queues: + try: + q.put(shutdown_message) + except Exception as e: + logger.info("Unable to send shutdown message to {}.".format(q.name)) - [task.cancel() for task in self.tasks] + logger.info('Canceling outstanding tasks') - cur_task = asyncio.current_task() - cur_task.cancel() - tasks = [task for task in self.tasks if not task.done()] - [t.cancel() for t in tasks] - [t.cancel() for t in tasks] #necessary in order to start cancelling tasks other than the first one + [task.cancel() for task in self.tasks] - try: - cur_task.cancel() - except asyncio.CancelledError: - logging.info("cur_task cancelled") - - logging.info('Polling has stopped.') + logger.info('Polling has stopped.') def createStore(self, name): - ''' Creates Store w/ or w/out LMDB functionality based on {self.use_hdd}. - ''' + """ Creates Store w/ or w/out LMDB functionality based on {self.use_hdd}. + """ if not self.use_hdd: return Store(name) else: @@ -465,19 +495,19 @@ def createStore(self, name): return self.store_dict[name] def _startStore(self, size): - ''' Start a subprocess that runs the plasma store + """ Start a subprocess that runs the plasma store Raises a RuntimeError exception size is undefined Raises an Exception if the plasma store doesn't start #TODO: Generalize this to non-plasma stores - ''' + """ if size is None: raise RuntimeError('Server size needs to be specified') try: self.p_Store = subprocess.Popen(['plasma_store', '-s', '/tmp/store', '-m', str(size), - '-e', 'hashtable://test'], + '-e', 'hashtable://test'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) logger.info('Store started successfully') @@ -485,19 +515,20 @@ def _startStore(self, size): logger.exception('Store cannot be started: {0}'.format(e)) def _closeStore(self): - ''' Internal method to kill the subprocess + """ Internal method to kill the subprocess running the store (plasma sever) - ''' + """ try: self.p_Store.kill() + self.p_Store.wait() logger.info('Store closed successfully') except Exception as e: logger.exception('Cannot close store {0}'.format(e)) def createActor(self, name, actor): - ''' Function to instantiate actor, add signal and comm Links, + """ Function to instantiate actor, add signal and comm Links, and update self.actors dictionary - ''' + """ # Instantiate selected class mod = import_module(actor.packagename) clss = getattr(mod, actor.classname) @@ -532,15 +563,15 @@ def createActor(self, name, actor): self.actors.update({name:instance}) def runActor(self, actor): - '''Run the actor continually; used for separate processes + """Run the actor continually; used for separate processes #TODO: hook into monitoring here? - ''' + """ actor.run() def createConnections(self): - ''' Assemble links (multi or other) + """ Assemble links (multi or other) for later assignment - ''' + """ for source,drain in self.config.connections.items(): name = source.split('.')[0] #current assumption is connection goes from q_out to something(s) else @@ -557,13 +588,13 @@ def createConnections(self): self.data_queues.update({d:link}) def assignLink(self, name, link): - ''' Function to set up Links between actors + """ Function to set up Links between actors for data location passing Actor must already be instantiated #NOTE: Could use this for reassigning links if actors crash? #TODO: Adjust to use default q_out and q_in vs being specified - ''' + """ classname = name.split('.')[0] linktype = name.split('.')[1] if linktype == 'q_out': diff --git a/improv/store.py b/improv/store.py index ca6e4723..e411f8f4 100644 --- a/improv/store.py +++ b/improv/store.py @@ -24,8 +24,8 @@ class StoreInterface(): - '''General interface for a store - ''' + """General interface for a store + """ def get(self): raise NotImplementedError @@ -43,10 +43,10 @@ def subscribe(self): class PlasmaStore(StoreInterface): - ''' Basic interface for our specific data store implemented with apache arrow plasma + """ Basic interface for our specific data store implemented with apache arrow plasma Objects are stored with object_ids References to objects are contained in a dict where key is shortname, value is object_id - ''' + """ def __init__(self, name='default', store_loc='/tmp/store', use_lmdb=False, lmdb_path='../outputs/', lmdb_name=None, hdd_maxstore=1e12, @@ -85,11 +85,11 @@ def __init__(self, name='default', store_loc='/tmp/store', commit_freq=commit_freq) def connect_store(self, store_loc): - ''' Connect to the store at store_loc + """ Connect to the store at store_loc Raises exception if can't connect Returns the plasmaclient if successful Updates the client internal - ''' + """ try: self.client = plasma.connect(store_loc, 20) # Is plasma.PlasmaClient necessary? @@ -183,12 +183,12 @@ def put(self, object, object_name, flush_this_immediately=False): # Before get or getID - check if object is present and sealed (client.contains(obj_id)) def get(self, object_name): - ''' Get a single object from the store + """ Get a single object from the store Checks to see if it knows the object first Otherwise throw CannotGetObject to request dict update TODO: update for lists of objects TODO: replace with getID - ''' + """ #print('trying to get ', object_name) # if self.stored.get(object_name) is None: # logger.error('Never recorded storing this object: '+object_name) @@ -225,19 +225,19 @@ def getID(self, obj_id, hdd_only=False): raise ObjectNotFoundError def getList(self, ids): - ''' Get multiple objects from the store - ''' + """ Get multiple objects from the store + """ # self._get() return self.client.get(ids) def get_all(self): - ''' Get a listing of all objects in the store - ''' + """ Get a listing of all objects in the store + """ return self.client.list() def reset(self): - ''' Reset client connection - ''' + """ Reset client connection + """ self.client = self.connect_store(self.store_loc) logger.debug('Reset local connection to store') @@ -246,9 +246,9 @@ def release(self): # Necessary? How to fix for functionality? Subscribe to notifications about sealed objects? def subscribe(self): - ''' Subscribe to a section? of the ds for singals + """ Subscribe to a section? of the ds for singals Throws unknown errors - ''' + """ try: self.client.subscribe() except Exception as e: @@ -276,27 +276,27 @@ def random_ObjectID(self, number=1): return ids def updateStored(self, object_name, object_id): - ''' Update local dict with info we need locally + """ Update local dict with info we need locally Report to Nexus that we updated the store (did a put or delete/replace) - ''' + """ self.stored.update({object_name:object_id}) def getStored(self): - ''' returns its info about what it has stored - ''' + """ returns its info about what it has stored + """ return self.stored def _put(self, obj, id): - ''' Internal put - ''' + """ Internal put + """ return self.client.put(obj, id) def _get(self, object_name): - ''' Get an object from the store using its name + """ Get an object from the store using its name Assumes we know the id for the object_name Raises ObjectNotFound if object_id returns no object from the store - ''' + """ # Most errors not shown to user. # Maintain separation between external and internal function calls. res = self.getID(self.stored.get(object_name)) @@ -318,17 +318,17 @@ def _get(self, object_name): # Delete below! def saveStore(self, fileName='data/store_dump'): - ''' Save the entire store to disk + """ Save the entire store to disk Uses pickle, should extend to mmap, hd5f, ... - ''' + """ raise NotImplementedError def saveConfig(self, config_ids, fileName='data/config_dump'): - ''' Save current Config object containing parameters + """ Save current Config object containing parameters to run the experiment. Config is pickleable TODO: move this to Nexus' domain? - ''' + """ config = self.client.get(config_ids) #for object ID in list of items in config, get from store #and put into dict (?) @@ -336,9 +336,9 @@ def saveConfig(self, config_ids, fileName='data/config_dump'): pickle.dump(config, output, -1) def saveSubstore(self, keys, fileName='data/substore_dump'): - ''' Save portion of store based on keys + """ Save portion of store based on keys to disk - ''' + """ raise NotImplementedError def saveObj(obj, name): @@ -564,8 +564,8 @@ def __str__(self): return self.message class CannotConnectToStoreError(Exception): - '''Raised when failing to connect to store. - ''' + """Raised when failing to connect to store. + """ def __init__(self, store_loc): super().__init__() diff --git a/improv/tui.css b/improv/tui.css new file mode 100644 index 00000000..8c50d60f --- /dev/null +++ b/improv/tui.css @@ -0,0 +1,55 @@ +#main { + grid-size: 1 5; + grid-rows: 1 1fr 1 1fr 5; +} + +#dialog { + grid-size: 2; + grid-gutter: 1 2; + margin: 1 2; +} + +#help_screen { + align: center middle; +} + +SocketLog { + content-align: left top; +} + +#question { + column-span: 2; + content-align: center bottom; + width: 100%; + height: 100%; +} + +Label { + background: $primary; + width: 100%; +} + +#input { + height: 1; + align-vertical: bottom; + margin: 1 0; +} + +#help_table { + content-align: center middle; + width: 100%; + height: 70%; +} + +Button { + width: 100%; +} + +#ok { + width: 100%; + align-horizontal: center; +} + +#ok_btn { + width: 10; +} \ No newline at end of file diff --git a/improv/tui.py b/improv/tui.py new file mode 100644 index 00000000..acff5a7a --- /dev/null +++ b/improv/tui.py @@ -0,0 +1,316 @@ +import asyncio +import zmq.asyncio as zmq +from zmq import PUB, SUB, SUBSCRIBE, REQ, REP, LINGER +from rich.table import Table +from textual.app import App, ComposeResult +from textual.containers import Grid, Container +from textual.screen import Screen +from textual.widgets import Header, Footer, TextLog, Input, Button, Static, Label, Placeholder +from textual.message import Message +import logging; logger = logging.getLogger(__name__) +from zmq.log.handlers import PUBHandler +logger.setLevel(logging.INFO) + +class SocketLog(TextLog): + def __init__(self, port, context, *args, **kwargs): + if 'formatter' in kwargs: + self.format = kwargs['formatter'] + kwargs.pop('formatter') + else: + self.format = self._simple_formatter + + super().__init__(*args, **kwargs) + self.socket = context.socket(SUB) + self.socket.connect("tcp://%s" % str(port)) + self.socket.setsockopt_string(SUBSCRIBE, "") + self.history = [] + self.print_debug = False + + + class Echo(Message): + def __init__(self, sender, value) -> None: + super().__init__() + self.value = value + + def write(self, content, width=None, expand=False, shrink=True): + TextLog.write(self, content, width, expand, shrink) + self.history.append(content) + + @staticmethod + def _simple_formatter(parts): + """ + format messages from zmq + message is a list of message parts + """ + return ' '.join([p.decode('utf-8').replace('\n', ' ') for p in parts]) + + + + async def poll(self): + try: + ready = await self.socket.poll(10) + if ready: + parts = await self.socket.recv_multipart() + msg_type = parts[0].decode('utf-8') + if msg_type != 'DEBUG' or self.print_debug: + msg = self.format(parts) + self.write(msg) + self.post_message(self.Echo(self, msg)) + except asyncio.CancelledError: + pass + + async def on_mount(self) -> None: + """Event handler called when widget is added to the app.""" + self.poller = self.set_interval(1/60, self.poll) + +class QuitScreen(Screen): + def compose(self) -> ComposeResult: + quit_str = ("Are you sure you want to quit? " + "The server has not been stopped, and the process " + "may continue to run in the background. " + "To exit safely, enter 'quit' into the command console. " + ) + + yield Grid( + Static(quit_str, id="question"), + Button("Quit", variant="error", id="quit"), + Button("Cancel", variant="primary", id="cancel"), + id="dialog", + ) + + async def on_key(self, event) -> None: + # make sure button-related key presses don't bubble up to main window + if event.key == "enter": + event.stop() + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "quit": + self.app.exit() + else: + self.app.pop_screen() + + +class HelpScreen(Screen): + def compose(self): + cmd_table = Table() + cmd_table.add_column('Command', justify='left') + cmd_table.add_column('Function', justify='left') + cmd_table.add_row('setup', 'Prepare all actors to run') + cmd_table.add_row('run', 'Start the experiment') + cmd_table.add_row('pause', '???') + cmd_table.add_row('stop', '???') + cmd_table.add_row('quit', 'Stop everything and terminate this client and the server.') + + + yield Container( + Static(cmd_table, id="help_table"), + Container(Button("OK", id="ok_btn"), id="ok"), + id="help_screen", + ) + + def on_button_pressed(self, event: Button.Pressed) -> None: + self.app.pop_screen() + +class TUI(App, inherit_bindings=False): + """ + View class for the text user interface. Implemented as a Textual app. + """ + def __init__(self, control_port, output_port, logging_port): + super().__init__() + self.title = "improv console" + self.control_port = TUI._sanitize_addr(control_port) + self.output_port = TUI._sanitize_addr(output_port) + self.logging_port = TUI._sanitize_addr(logging_port) + + self.context = zmq.Context() + self.control_socket = self.context.socket(REQ) + self.control_socket.connect("tcp://%s" % self.control_port) + + logger.info('Text interface initialized') + + CSS_PATH = "tui.css" + BINDINGS = [ + ("tab", "focus_next", "Focus Next"), + ("ctrl+c", "request_quit", "Emergency Quit"), + ("ctrl+p", "set_debug", "Toggle Debug Info"), + ("question_mark", "help", "Help") + ] + + def action_set_debug(self): + log_window = self.get_widget_by_id("log") + log_window.print_debug = not log_window.print_debug + + @staticmethod + def _sanitize_addr(input): + if isinstance(input, int): + return "localhost:%s" % str(input) + elif ':' in input: + return input + else: + return "localhost:%s" % input + + @staticmethod + def format_log_messages(parts): + type_list = ['debug', 'info', 'warning', 'error', 'critical', 'exception'] + msg_type = parts[0].decode('utf-8') + msg = SocketLog._simple_formatter(parts[1:]) + if msg_type == 'DEBUG': + msg = '[bold black on white]' + msg + '[/]' + elif msg_type == 'INFO': + msg = '[white]' + msg + '[/]' + elif msg_type == 'WARNING': + msg = ':warning-emoji: [yellow]' + msg + '[/]' + elif msg_type == 'ERROR': + msg = ':heavy_exclamation_mark: [#e40000]' + msg + '[/]' + elif msg_type == 'CRITICAL': + msg = ':collision::scream: [bold red]' + msg + '[/]' + + return msg + + def compose(self) -> ComposeResult: + """Create child widgets for the app.""" + yield Grid( + Header("improv console"), + Label("[white]Log Messages[/]"), + SocketLog(self.logging_port, self.context, + formatter=self.format_log_messages, + markup=True, id="log"), + Label("Command History"), + SocketLog(self.output_port, self.context, id="console"), + Input(id='input'), + Footer(), + id="main" + ) + + async def send_to_controller(self, msg): + """ + Safe version of send/receive with controller. + Based on the Lazy Pirate pattern [here](https://zguide.zeromq.org/docs/chapter4/#Client-Side-Reliability-Lazy-Pirate-Pattern) + """ + REQUEST_TIMEOUT = 2500 + REQUEST_RETRIES = 3 + + retries_left = REQUEST_RETRIES + + try: + logger.info(f"Sending {msg} to controller.") + await self.control_socket.send_string(msg) + reply = None + + while True: + ready = await self.control_socket.poll(REQUEST_TIMEOUT) + + if ready: + reply = await self.control_socket.recv_multipart() + reply = reply[0].decode('utf-8') + logger.info(f"Received {reply} from controller.") + break + else: + retries_left -= 1 + logger.warning("No response from server.") + + # try to close and reconnect + self.control_socket.setsockopt(LINGER, 0) + self.control_socket.close() + if retries_left == 0: + logger.error("Server seems to be offline. Giving up.") + break + + logger.info("Attempting to reconnect to server...") + + self.control_socket = self.context.socket(REQ) + self.control_socket.connect("tcp://%s" % self.control_port) + + logger.info(f"Resending {msg} to controller.") + await self.control_socket.send_string(msg) + + except asyncio.CancelledError: + pass + + return reply + + + async def on_mount(self): + self.set_focus(self.query_one(Input)) + + async def on_input_submitted(self, message): + self.query_one(Input).value = "" + self.query_one("#console").write(message.value) + reply = await self.send_to_controller(message.value) + self.query_one("#console").write(reply) + + async def on_socket_log_echo(self, message): + if message.sender.id == 'console' and message.value == 'QUIT': + logger.info("Got QUIT; will try to exit") + self.exit() + + def action_request_quit(self): + self.push_screen(QuitScreen()) + + def action_help(self): + self.push_screen(HelpScreen()) + + + +if __name__ == '__main__': + CONTROL_PORT = "5555" + OUTPUT_PORT = "5556" + LOGGING_PORT = "5557" + + import random + + zmq_log_handler = PUBHandler('tcp://*:%s' % LOGGING_PORT) + logger.addHandler(zmq_log_handler) + logger.setLevel(logging.DEBUG) + + context = zmq.Context() + socket = context.socket(REP) + socket.bind("tcp://*:%s" % CONTROL_PORT) + pubsocket = context.socket(PUB) + pubsocket.bind("tcp://*:%s" % OUTPUT_PORT) + + async def backend(): + """ + Fake program to be controlled by TUI. + """ + while True: + msg = await socket.recv_multipart() + if msg[0].decode('utf-8') == 'quit': + await socket.send_string("QUIT") + else: + await socket.send_string("Awaiting input:") + + + async def publish(): + """ + Set up a fake server to publish messages for testing. + """ + counter = 0 + while True: + await pubsocket.send_string("test " + str(counter)) + await asyncio.sleep(1) + counter += 1 + + async def log(): + """ + Send fake logging events for testing. + """ + counter = 0 + type_list = ['debug', 'info', 'warning', 'error', 'critical', 'exception'] + while True: + this_type = random.choice(type_list) + getattr(logger, this_type)("log message " + str(counter)) + # logger.info("log message " + str(counter)) + await asyncio.sleep(1.2) + counter += 1 + + async def main_loop(): + app = TUI(CONTROL_PORT, OUTPUT_PORT, LOGGING_PORT) + + # the following construct ensures both the (infinite) fake servers are killed once the tui finishes + finished, unfinished = await asyncio.wait([app.run_async(), publish(), backend(), log()], return_when=asyncio.FIRST_COMPLETED) + + for task in unfinished: + task.cancel() + + asyncio.run(main_loop()) diff --git a/improv/utils/reader.py b/improv/utils/reader.py index ec7aa5a3..a5d8f588 100644 --- a/improv/utils/reader.py +++ b/improv/utils/reader.py @@ -9,31 +9,31 @@ class LMDBReader(): def __init__(self, path): - ''' Constructor for the LMDB reader + """ Constructor for the LMDB reader path: Path to LMDB folder - ''' + """ if not os.path.exists(path): raise FileNotFoundError self.path = path def get_all_data(self): - ''' Load all data from LMDB into a dictionary + """ Load all data from LMDB into a dictionary Make sure that the LMDB is small enough to fit in RAM - ''' + """ with LMDBReader._lmdb_cur(self.path) as cur: return {LMDBReader._decode_key(key): pickle.loads(value) for key, value in cur.iternext()} def get_data_types(self): - ''' Return all data types defined as {object_name}, but without number. - ''' + """ Return all data types defined as {object_name}, but without number. + """ num_idx = get_num_length_from_key() with LMDBReader._lmdb_cur(self.path) as cur: return {key[:-12 - num_idx.send(key)] for key in cur.iternext(values=False)} def get_data_by_number(self, t): - ''' Return data at a specific frame number t - ''' + """ Return data at a specific frame number t + """ num_idx = get_num_length_from_key() def check_if_key_equals_t(key): @@ -47,22 +47,22 @@ def check_if_key_equals_t(key): return {LMDBReader._decode_key(key): pickle.loads(cur.get(key)) for key in keys} def get_data_by_type(self, t): - ''' Return data with key that starts with t - ''' + """ Return data with key that starts with t + """ with LMDBReader._lmdb_cur(self.path) as cur: keys = (key for key in cur.iternext(values=False) if key.startswith(t.encode())) return {LMDBReader._decode_key(key): pickle.loads(cur.get(key)) for key in keys} def get_params(self): - ''' Return parameters in a dictionary - ''' + """ Return parameters in a dictionary + """ with LMDBReader._lmdb_cur(self.path) as cur: keys = [key for key in cur.iternext(values=False) if key.startswith(b'params_dict')] return pickle.loads(cur.get(keys[-1])) @staticmethod def _decode_key(key): - ''' Helper method to convert key from byte to str + """ Helper method to convert key from byte to str Example: >>> LMDBReader._decode_key(b'Call0\x80\x03GA\xd7Ky\x06\x9c\xddi.') @@ -70,15 +70,15 @@ def _decode_key(key): key: Encoded key. The last 12 bytes are pickled time.time(). The remaining are encoded object name. - ''' + """ return f'{key[:-12].decode()}_{pickle.loads(key[-12:])}' @staticmethod @contextmanager def _lmdb_cur(path): - ''' Helper context manager to open and ensure proper closure of LMDB - ''' + """ Helper context manager to open and ensure proper closure of LMDB + """ env = lmdb.open(path) txn = env.begin() diff --git a/improv/utils/utils.py b/improv/utils/utils.py index 9b2efc8b..03fd7280 100644 --- a/improv/utils/utils.py +++ b/improv/utils/utils.py @@ -1,7 +1,7 @@ from functools import wraps # def coroutine(func): #FIXME who uses this and why? -# ''' Decorator that primes 'func' by calling first {yield}. ''' +# """ Decorator that primes 'func' by calling first {yield}. """ # @wraps(func) # def primer(*args, **kwargs): @@ -12,7 +12,7 @@ @coroutine def get_num_length_from_key(): - ''' + """ Coroutine that gets the length of digits in LMDB key. Assumes that object name does not have any digits. @@ -25,7 +25,7 @@ def get_num_length_from_key(): >>> num_idx.send(b'acq_raw1\x80\x03GA\xd7L\x1b\x8f\xb0\x1b\xb0.') 1 - ''' + """ max_num_len = 1 # Keep track of largest digit for performance. def worker(): diff --git a/improv/watcher.py b/improv/watcher.py index 747a7912..3477c591 100644 --- a/improv/watcher.py +++ b/improv/watcher.py @@ -18,10 +18,10 @@ import pickle class BasicWatcher(Actor): - ''' + """ Actor that monitors stored objects from the other actors and saves objects that have been flagged by those actors - ''' + """ def __init__(self, *args, inputs= None): super().__init__(*args) @@ -30,20 +30,20 @@ def __init__(self, *args, inputs= None): def setup(self): - ''' + """ set up tasks and polling based on inputs which will be used for asynchronous polling of input queues - ''' + """ self.numSaved= 0 self.tasks= [] self.polling= self.watchin self.setUp= False def run(self): - ''' + """ continually run the watcher to check all of the input queues for objects to save - ''' + """ with RunManager(self.name, self.watchrun, self.setup, self.q_sig, self.q_comm) as rm: @@ -52,22 +52,22 @@ def run(self): print('watcher saved '+ str(self.numSaved)+ ' objects') def watchrun(self): - ''' + """ set up async loop for polling - ''' + """ loop = asyncio.get_event_loop() loop.run_until_complete(self.watch()) async def watch(self): - ''' + """ function for asynchronous polling of input queues loops through each of the queues in watchin and checks if an object is present and then saves the object if found - ''' + """ if self.setUp== False: for q in self.polling: - self.tasks.append(asyncio.ensure_future(q.get_async())) + self.tasks.append(asyncio.create_task(q.get_async())) self.setUp = True done, pending= await asyncio.wait(self.tasks, return_when= concurrent.futures.FIRST_COMPLETED) @@ -82,13 +82,13 @@ async def watch(self): except ObjectNotFoundError as e: logger.info(e.message) pass - self.tasks[i] = (asyncio.ensure_future(self.polling[i].get_async())) + self.tasks[i] = (asyncio.create_task(self.polling[i].get_async())) class Watcher(): - ''' Monitors the store as separate process + """ Monitors the store as separate process TODO: Facilitate Watcher being used in multiple processes (shared list) - ''' + """ # Related to subscribe - could be private, i.e., _subscribe def __init__(self, name, client): self.name = name diff --git a/pyproject.toml b/pyproject.toml index c3450c79..143ee6db 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,18 +21,26 @@ dependencies = [ "PyQt5", "lmdb", "pyyaml", - "click" + "textual==0.15.0", + "pyzmq", + "psutil", + "h5py", ] classifiers = ['Development Status :: 1 - Planning'] [project.optional-dependencies] -tests = ["async-timeout", "pytest-asyncio"] +tests = ["pytest", "async-timeout", "pytest-asyncio", "pytest-cov", "scikit-image",] docs = ["jupyter-book"] caiman = [] [project.scripts] -improv = "improv:default_invocation" +improv = "improv.cli:default_invocation" [tool.setuptools.packages.find] include = ["improv"] exclude = ["test", "pytest", "env", "demos", "figures"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +filterwarnings = [ ] + diff --git a/pytest/configs/minimal.yaml b/pytest/configs/minimal.yaml new file mode 100644 index 00000000..230282d1 --- /dev/null +++ b/pytest/configs/minimal.yaml @@ -0,0 +1,11 @@ +actors: + Generator: + package: actors.sample_generator + class: Generator + + Processor: + package: actors.sample_processor + class: Processor + +connections: + Generator.q_out: [Processor.q_in] \ No newline at end of file diff --git a/pytest/conftest_with_errors.py b/pytest/conftest_with_errors.py index 59b971e3..398ea9af 100644 --- a/pytest/conftest_with_errors.py +++ b/pytest/conftest_with_errors.py @@ -28,6 +28,7 @@ def tear_down(self): ''' print('Tearing down Plasma store.') self.p.kill() + self.p.wait() class ActorDependentTestCase(): def set_up(self): @@ -42,6 +43,7 @@ def tear_down(self): ''' print('Tearing down Plasma store.') self.p.kill() + self.p.wait() def run_setup(self): print('Set up = True.') diff --git a/pytest/nexus_analog.py b/pytest/nexus_analog.py index d15ef9a5..2127e00d 100644 --- a/pytest/nexus_analog.py +++ b/pytest/nexus_analog.py @@ -37,7 +37,7 @@ def setup_store(): async def pollQueues(links): tasks = [] for link in links: - tasks.append(asyncio.ensure_future(link.get_async())) + tasks.append(asyncio.create_task(link.get_async())) links_cpy = links t_0 = time.perf_counter() @@ -53,7 +53,7 @@ async def pollQueues(links): for i, t in enumerate(tasks): if t in done: pass - tasks[i] = asyncio.ensure_future(links_cpy[i].get_async()) + tasks[i] = asyncio.create_task(links_cpy[i].get_async()) t_1 = time.perf_counter() @@ -65,7 +65,7 @@ async def pollQueues(links): clean_list_print([task for task in tasks]) loop = asyncio.get_running_loop() - return stop_polling(tasks, loop, links) + return stop_polling(tasks, links) @@ -83,7 +83,7 @@ def start(): loop.close() print(f"Loop: {loop}") -def stop_polling(tasks, loop, links): +def stop_polling(tasks, links): #asyncio.gather(*tasks) print("Cancelling") diff --git a/pytest/test_actor.py b/pytest/test_actor.py index 67a21bae..4209dce7 100644 --- a/pytest/test_actor.py +++ b/pytest/test_actor.py @@ -21,6 +21,7 @@ def setup_store(scope="module"): stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) yield p p.kill() + p.wait() @pytest.fixture diff --git a/pytest/test_cli.py b/pytest/test_cli.py new file mode 100644 index 00000000..1d112f24 --- /dev/null +++ b/pytest/test_cli.py @@ -0,0 +1,185 @@ +import pytest +import os +import sys +import subprocess +import asyncio +import signal +import improv.cli as cli + +from test_nexus import ports + +SERVER_WARMUP = 16 +SERVER_TIMEOUT = 16 + +@pytest.fixture +def setdir(): + prev = os.getcwd() + os.chdir(os.path.dirname(__file__)) + yield None + os.chdir(prev) + +@pytest.fixture +async def server(setdir, ports): + """ + Sets up a server using minimal.yaml in the configs folder. + Requires the actor path command line argument and so implicitly + tests that as well. + """ + os.chdir('configs') + + control_port, output_port, logging_port = ports + + #start server + server_opts = ['improv', 'server', + '-c', str(control_port), + '-o', str(output_port), + '-l', str(logging_port), + '-a', '..', + '-f', 'testlog', 'minimal.yaml', + ] + + server = subprocess.Popen(server_opts, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + await asyncio.sleep(SERVER_WARMUP) + yield server + server.wait(SERVER_TIMEOUT) + try: + os.remove('testlog') + except FileNotFoundError: + pass + +def test_configfile_required(setdir): + with pytest.raises(SystemExit): + cli.parse_cli_args(['run']) + + with pytest.raises(SystemExit): + cli.parse_cli_args(['server']) + + with pytest.raises(SystemExit): + cli.parse_cli_args(['server', 'does_not_exist.yaml']) + + +def test_multiple_actor_path(setdir): + args = cli.parse_cli_args(['run', '-a', 'actors', '-a', 'configs', 'configs/blank_file.yaml']) + assert len(args.actor_path) == 2 + + args = cli.parse_cli_args(['server', '-a', 'actors', '-a', 'configs', 'configs/blank_file.yaml']) + assert len(args.actor_path) == 2 + + +@pytest.mark.parametrize("mode,flag,expected", [('run', '-c', "6000"), + ('run', '-o', "6000"), + ('run', '-l', "6000"), + ('server', '-c', "6000"), + ('server', '-o', "6000"), + ('server', '-l', "6000"), + ('client', '-c', "6000"), + ('client', '-s', "6000"), + ('client', '-l', "6000"), + ]) +def test_can_override_ports(mode, flag, expected, setdir): + file = 'configs/blank_file.yaml' + localhost = "127.0.0.1:" + params = {'-c': 'control_port', + '-o': 'output_port', + '-s': 'server_port', + '-l': 'logging_port' + } + + if mode in ['run', 'server']: + args = cli.parse_cli_args([mode, flag, expected, file]) + assert vars(args)[params[flag]] == int(expected) + else: + args = cli.parse_cli_args([mode, flag, expected]) + assert vars(args)[params[flag]] == localhost + expected + +@pytest.mark.parametrize("mode,flag,expected", [('run', '-c', "127.0.0.1:6000"), + ('run', '-o', "-6000"), + ('run', '-l', str(cli.MAX_PORT + 1)), + ('server', '-c', "127.0.0.1:6000"), + ('server', '-o', "-6000"), + ('server', '-l', str(cli.MAX_PORT + 1)), + ]) +def test_non_port_is_error(mode, flag, expected): + file = 'configs/blank_file.yaml' + with pytest.raises(SystemExit): + cli.parse_cli_args([mode, flag, expected, file]) + +@pytest.mark.parametrize("mode,flag,expected", [('client', '-c', "111.127.0.0.1:6000"), + ('client', '-s', "127.0.1:6000"), + ('client', '-s', "-6000"), + ('client', '-l', str(cli.MAX_PORT + 1)), + ('client', '-l', "127.0.0.1"), + ('client', '-l', "127.0.0.1:"), + ('client', '-l', "127.0.0.1:" + str(cli.MAX_PORT)), + ]) +def test_non_ip_is_error(mode, flag, expected): + with pytest.raises(SystemExit): + cli.parse_cli_args([mode, flag, expected]) + +@pytest.mark.parametrize("mode,flag,expected", [('client', '-c', "127.0.0.1:6000"), + ('client', '-s', "155.4.4.3:4000"), + ]) +def test_can_override_ip(mode, flag, expected): + params = {'-c': 'control_port', + '-o': 'output_port', + '-s': 'server_port', + '-l': 'logging_port' + } + args = cli.parse_cli_args([mode, flag, expected]) + assert vars(args)[params[flag]] == expected + +async def test_sigint_kills_server(server): + server.send_signal(signal.SIGINT) + +async def test_improv_list_nonempty(server): + proc_list = cli.run_list('', printit=False) + assert len(proc_list) > 0 + server.send_signal(signal.SIGINT) + +async def test_improv_kill_empties_list(server): + proc_list = cli.run_list('', printit=False) + assert len(proc_list) > 0 + cli.run_cleanup('', headless=True) + proc_list = cli.run_list('', printit=False) + assert len(proc_list) == 0 + +async def test_improv_run_writes_stderr_to_log(setdir, ports): + os.chdir('configs') + control_port, output_port, logging_port = ports + + #start server + server_opts = ['improv', 'run', + '-c', str(control_port), + '-o', str(output_port), + '-l', str(logging_port), + '-a', '..', + '-f', 'testlog', 'blank_file.yaml', + ] + server = subprocess.Popen(server_opts, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + await asyncio.sleep(SERVER_WARMUP) + server.kill() + server.wait(SERVER_TIMEOUT) + with open('testlog') as log: + contents = log.read() + assert 'Traceback' in contents + os.remove('testlog') + cli.run_cleanup('', headless=True) + +async def test_get_ports_from_logfile(setdir): + test_control_port = 53349 + test_output_port = 53350 + test_logging_port = 53351 + + logfile = 'tmp.log' + + with open(logfile, 'w') as log: + log.write("Server running on (control, output, log) ports (53345, 53344, 53343).") + log.write(f"Server running on (control, output, log) ports ({test_control_port}, {test_output_port}, {test_logging_port}).") + + control_port, output_port, logging_port = cli._get_ports(logfile) + + os.remove(logfile) + + assert control_port == test_control_port + assert output_port == test_output_port + assert logging_port == test_logging_port diff --git a/pytest/test_demos.py b/pytest/test_demos.py new file mode 100644 index 00000000..41d8f0b3 --- /dev/null +++ b/pytest/test_demos.py @@ -0,0 +1,95 @@ +import pytest +import os +import asyncio +import subprocess +import improv.tui as tui + +from test_nexus import ports +SERVER_WARMUP = 8 + +@pytest.fixture() +def setdir(): + prev = os.getcwd() + os.chdir(os.path.dirname(__file__)) + os.chdir('../demos') + yield None + os.chdir(prev) + +@pytest.mark.parametrize("dir,configfile,logfile", [('minimal','minimal.yaml', 'testlog')]) +async def test_simple_boot_and_quit(dir, configfile, logfile, setdir, ports): + os.chdir(dir) + + control_port, output_port, logging_port = ports + + #start server + server_opts = ['improv', 'server', + '-c', str(control_port), + '-o', str(output_port), + '-l', str(logging_port), + '-f', logfile, configfile, + ] + + with open(logfile, mode='a+') as log: + server = subprocess.Popen(server_opts, stdout=log, stderr=log) + await asyncio.sleep(SERVER_WARMUP) + + # initialize client + app = tui.TUI(control_port, output_port, logging_port) + + # run client + async with app.run_test() as pilot: + print("running pilot") + await pilot.press(*'setup', 'enter') + await pilot.pause(.5) + await pilot.press(*'quit', 'enter') + await pilot.pause(2) + assert not pilot.app._running + + # wait on server to fully shut down + server.wait(10) + os.remove(logfile) # later, might want to read this file and check for messages + +@pytest.mark.parametrize("dir,configfile,logfile,datafile", [('minimal', 'minimal.yaml', 'testlog', 'sample_generator_data.npy')]) +async def test_stop_output(dir, configfile, logfile, datafile, setdir, ports): + os.chdir(dir) + + control_port, output_port, logging_port = ports + + #start server + server_opts = ['improv', 'server', + '-c', str(control_port), + '-o', str(output_port), + '-l', str(logging_port), + '-f', logfile, configfile, + ] + + with open(logfile, mode='a+') as log: + server = subprocess.Popen(server_opts, stdout=log, stderr=log) + await asyncio.sleep(SERVER_WARMUP) + + + # initialize client + app = tui.TUI(control_port, output_port, logging_port) + + # run client + async with app.run_test() as pilot: + print("running pilot") + await pilot.press(*'setup', 'enter') + await pilot.pause(.5) + await pilot.press(*'run', 'enter') + await pilot.pause(1) + await pilot.press(*'stop', 'enter') + await pilot.pause(2) + await pilot.press(*'quit', 'enter') + await pilot.pause(2) + assert not pilot.app._running + + # wait on server to fully shut down + server.wait(10) + + # check that the file written by Generator's stop function got written + os.path.isfile(datafile) + + # then remove that file and logile + os.remove(datafile) + os.remove(logfile) # later, might want to read this file and check for messages diff --git a/pytest/test_link.py b/pytest/test_link.py index c297d518..fd6860a9 100644 --- a/pytest/test_link.py +++ b/pytest/test_link.py @@ -36,6 +36,7 @@ def setup_store(): store = Store(store_loc = "/tmp/store") yield store p.kill() + p.wait() def init_actors(n = 1): @@ -309,6 +310,7 @@ def test_put_overflow(setup_store, caplog): lnk.put(message) p.kill() + p.wait() setup_store #restore the 10 mb store if caplog.records: diff --git a/pytest/test_nexus.py b/pytest/test_nexus.py index 6e08d83f..691d53f8 100644 --- a/pytest/test_nexus.py +++ b/pytest/test_nexus.py @@ -2,11 +2,24 @@ import os import pytest import logging +import subprocess +import signal from improv.nexus import Nexus from improv.actor import Actor from improv.store import Store +SERVER_COUNTER = 0 + +@pytest.fixture +def ports(): + global SERVER_COUNTER + CONTROL_PORT = 5555 + OUTPUT_PORT = 5556 + LOGGING_PORT = 5557 + yield (CONTROL_PORT + SERVER_COUNTER, OUTPUT_PORT + SERVER_COUNTER, LOGGING_PORT + SERVER_COUNTER) + SERVER_COUNTER += 3 + @pytest.fixture def setdir(): prev = os.getcwd() @@ -15,10 +28,9 @@ def setdir(): os.chdir(prev) @pytest.fixture -def sample_nex(setdir): - setdir +def sample_nex(setdir, ports): nex = Nexus("test") - nex.createNexus(file='good_config.yaml', store_size=4000) + nex.createNexus(file='good_config.yaml', store_size=4000, control_port=ports[0], output_port=ports[1]) yield nex nex.destroyNexus() @@ -45,16 +57,14 @@ def sample_nex(setdir): # p.kill() def test_init(setdir): - setdir # store = setup_store nex = Nexus("test") assert str(nex) == "test" + nex.destroyNexus() - -def test_createNexus(setdir): - setdir +def test_createNexus(setdir, ports): nex = Nexus("test") - nex.createNexus(file = "good_config.yaml") + nex.createNexus(file = "good_config.yaml", control_port=ports[0], output_port=ports[1]) assert list(nex.comm_queues.keys()) == ["GUI_comm", "Acquirer_comm", "Analysis_comm"] assert list(nex.sig_queues.keys()) == ["Acquirer_sig", "Analysis_sig"] assert list(nex.data_queues.keys()) == ["Acquirer.q_out", "Analysis.q_in"] @@ -75,6 +85,7 @@ def test_startNexus(sample_nex): nex = sample_nex nex.startNexus() assert [p.name for p in nex.processes] == ["Acquirer", "Analysis"] + nex.destroyNexus() # @pytest.mark.skip(reason="This test is unfinished") @pytest.mark.parametrize("cfg_name, actor_list, link_list", [ @@ -82,7 +93,7 @@ def test_startNexus(sample_nex): ("simple_graph.yaml", ["Acquirer", "Analysis"], ["Acquirer_sig", "Analysis_sig"]), ("complex_graph.yaml", ["Acquirer", "Analysis", "InputStim"], ["Acquirer_sig", "Analysis_sig", "InputStim_sig"]) ]) -def test_config_construction(cfg_name, actor_list, link_list, setdir): +def test_config_construction(cfg_name, actor_list, link_list, setdir, ports): """ Tests if constructing a nexus based on the provided config has the right structure. After construction based on the config, this @@ -90,10 +101,8 @@ def test_config_construction(cfg_name, actor_list, link_list, setdir): links between them are constructed correctly. """ - setdir - nex = Nexus("test") - nex.createNexus(file = cfg_name) + nex.createNexus(file = cfg_name, control_port=ports[0], output_port=ports[1]) logging.info(cfg_name) # Check for actors @@ -109,26 +118,23 @@ def test_config_construction(cfg_name, actor_list, link_list, setdir): lnk_lst = [] assert True -def test_single_actor(setdir): - setdir +def test_single_actor(setdir, ports): nex = Nexus("test") with pytest.raises(AttributeError): - nex.createNexus(file="single_actor.yaml") + nex.createNexus(file="single_actor.yaml", control_port=ports[0], output_port=ports[1]) nex.destroyNexus() -def test_cyclic_graph(setdir): - setdir +def test_cyclic_graph(setdir, ports): nex = Nexus("test") - nex.createNexus(file="cyclic_config.yaml") + nex.createNexus(file="cyclic_config.yaml", control_port=ports[0], output_port=ports[1]) assert True nex.destroyNexus() -def test_blank_cfg(setdir, caplog): - setdir +def test_blank_cfg(setdir, caplog, ports): nex = Nexus("test") with pytest.raises(TypeError): - nex.createNexus(file="blank_file.yaml") + nex.createNexus(file="blank_file.yaml", control_port=ports[0], output_port=ports[1]) assert any(["The config file is empty" in record.msg for record in list(caplog.records)]) nex.destroyNexus() @@ -146,7 +152,6 @@ def test_blank_cfg(setdir, caplog): @pytest.mark.skip(reason="unfinished") def test_queue_message(setdir, sample_nex): - setdir nex = sample_nex nex.startNexus() time.sleep(20) @@ -205,6 +210,7 @@ def test_startstore(caplog): assert any(["Store started successfully" in record.msg for record in caplog.records]) nex._closeStore() + nex.destroyNexus() assert True def test_closestore(caplog): @@ -220,19 +226,17 @@ def test_closestore(caplog): with pytest.raises(AttributeError): nex.p_Store.put("Message in", "Message in Label") + nex.destroyNexus() assert True @pytest.mark.skip(reason="unfinished") -def test_actor_sub(setdir, capsys, monkeypatch): - - - setdir +def test_actor_sub(setdir, capsys, monkeypatch, ports): monkeypatch.setattr("improv.nexus.input", lambda: "setup\n") cfg_file = "sample_config.yaml" nex = Nexus("test") - nex.createNexus(file = cfg_file, store_size=4000) + nex.createNexus(file = cfg_file, store_size=4000, control_port=ports[0], output_port=ports[1]) print("Nexus Created") nex.startNexus() @@ -247,4 +251,18 @@ def test_actor_sub(setdir, capsys, monkeypatch): nex.destroyNexus() + assert True + +def test_sigint_exits_cleanly(ports, tmp_path): + server_opts = ['improv', 'server', + '-c', str(ports[0]), + '-o', str(ports[1]), + '-f', tmp_path / "global.log", + ] + + server = subprocess.Popen(server_opts, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + server.send_signal(signal.SIGINT) + + server.wait(10) assert True \ No newline at end of file diff --git a/pytest/test_store_with_errors.py b/pytest/test_store_with_errors.py index 06b5d7bc..b298221d 100644 --- a/pytest/test_store_with_errors.py +++ b/pytest/test_store_with_errors.py @@ -1,4 +1,5 @@ import pytest +import time from improv.store import Store from multiprocessing import Process from pyarrow._plasma import PlasmaObjectExists @@ -12,6 +13,8 @@ import pickle import subprocess +WAIT_TIMEOUT = 10 + # TODO: add docstrings!!! # TODO: clean up syntax - consistent capitalization, function names, etc. # TODO: decide to keep classes @@ -45,6 +48,7 @@ def setup_store(store_loc='/tmp/store'): # ''' # print('Tearing down Plasma store.') p.kill() + p.wait(WAIT_TIMEOUT) def test_connect(setup_store): diff --git a/pytest/test_tui.py b/pytest/test_tui.py new file mode 100644 index 00000000..81f39695 --- /dev/null +++ b/pytest/test_tui.py @@ -0,0 +1,82 @@ +import pytest +import time +import improv.tui as tui +import logging +import zmq.asyncio as zmq +from zmq import PUB, REP +from zmq.log.handlers import PUBHandler + +from test_nexus import ports + +@pytest.fixture +def logger(ports): + logger = logging.getLogger(__name__) + logger.setLevel(logging.DEBUG) + zmq_log_handler = PUBHandler('tcp://*:%s' % ports[2]) + logger.addHandler(zmq_log_handler) + yield logger + logger.removeHandler(zmq_log_handler) + +@pytest.fixture +async def sockets(ports): + with zmq.Context() as context: + ctrl_socket = context.socket(REP) + ctrl_socket.bind("tcp://*:%s" % ports[0]) + out_socket = context.socket(PUB) + out_socket.bind("tcp://*:%s" % ports[1]) + yield (ctrl_socket, out_socket) + +@pytest.fixture +async def app(ports): + mock = tui.TUI(*ports) + yield mock + time.sleep(0.5) + +async def test_console_panel_receives_broadcast(app, sockets, logger): + async with app.run_test() as pilot: + await sockets[1].send_string("received") + await pilot.pause(0.1) + console = pilot.app.get_widget_by_id("console") + console.history[0] == 'received' + +async def test_quit_from_socket(app, sockets): + async with app.run_test() as pilot: + await sockets[1].send_string("QUIT") + await pilot.pause(0.1) + assert not pilot.app._running + +async def test_log_panel_receives_logging(app, logger): + async with app.run_test() as pilot: + logger.info('test') + await pilot.pause(0.1) + log_window = pilot.app.get_widget_by_id("log") + assert 'test' in log_window.history[0] + +async def test_input_box_echoed_to_console(app): + async with app.run_test() as pilot: + await pilot.press(*'foo', 'enter') + console = pilot.app.get_widget_by_id("console") + assert console.history[0] == 'foo' + +async def test_quit_screen(app): + async with app.run_test() as pilot: + await pilot.press('ctrl+c', 'tab', 'tab', 'enter') + assert pilot.app._running + + await pilot.press('ctrl+c', 'tab', 'enter') + await pilot.pause(0.5) + assert not pilot.app._running + +async def test_turn_on_print_debug_msg(app, logger): + async with app.run_test() as pilot: + logger.debug('test debug message') + await pilot.pause(0.1) + log_window = pilot.app.get_widget_by_id("log") + assert len(log_window.history) == 0 + + await pilot.press('ctrl+p') + await pilot.pause(0.1) + logger.debug('test debug message') + await pilot.pause(0.1) + log_window = pilot.app.get_widget_by_id("log") + assert 'debug' in log_window.history[0] diff --git a/test/csc_timings.py b/test/csc_timings.py index 2baf5d36..597822ac 100644 --- a/test/csc_timings.py +++ b/test/csc_timings.py @@ -59,6 +59,7 @@ def time_putget(n, type = "normal"): # Kill the process p.kill() + p.wait() # Return timings diff --git a/test/test_utils.py b/test/test_utils.py index b0135d5a..d208c648 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -23,6 +23,7 @@ def tearDown(self): ''' self.p.kill() + self.p.wait() class ActorDependentTestCase(TestCase): @@ -40,6 +41,7 @@ def tearDown(self): ''' self.p.kill() + self.p.wait() def run_setup(self): self.isSetUp= True