Skip to content

Commit

Permalink
Improve the CLI interface, use API for uploading files. (#45)
Browse files Browse the repository at this point in the history
* only save `MODEL` in settings, split the model name and version in Manager.

* remove repeated env var `UPLOAD_PREFIX`

* change order of Manager init for clarity.

* add `setup.py`

* add all env vars from `settings.py`

* test against 2.7 and 3.8 for more robust CLI support

* `model_name` and `model_version` refactored to `model`

* move `_strip` lambda into utils as `strip_bucket_prefix`, use on init for jobs and managers.

* use `defer.returnValue(result)` in `job.restart`

* use `pytest.tmpdir` instead of `tempfile.TemporaryDirectory`

* `JSONDecodeError` is a `ValueError`, but does not exist in python 2.x

* DEBUG env var is unused and undocumented.

* add storage_bucket as an optional manager kwarg

* only include GPUs in output filename if NUM_GPUS is set.

* add upload_results attribute to optionally upload the final file to the bucket.

* define all env vars as argparse options and include in entrypoint.

* move uploading logic into Job. use API for file uploads instead of `gcloud`.

* add calculate_cost to conditionally use the Grafana API.

* log times in each Job HTTP request

* allow `scale` to be an empty string, but otherwise, cast to float.

* promote `job_type` to primary input, and `model` as override option.
  • Loading branch information
willgraf authored May 29, 2020
1 parent 086b48c commit 7161082
Show file tree
Hide file tree
Showing 14 changed files with 623 additions and 262 deletions.
57 changes: 51 additions & 6 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,9 +1,54 @@
# API to add/monitor jobs in Redis
API_HOST=
FILE=
COUNT=
# Debug Mode
DEBUG=
NUM_GPUS=

# Google credentials
STORAGE_BUCKET=

# Batch API Host (IP Address or FQDN)
HOST=

# Grafana resources for cost estimation
GRAFANA_HOST=
GRAFANA_USER=
GRAFANA_PASSWORD=

# TensorFlow Servable
MODEL=
BACKOFF=
POSTPROCESS=

# Job Type
JOB_TYPE=
SCALE=
LABEL=

# Pre- and Post-Processing functions
PREPROCESS=
POSTPROCESS=

# How frequently Jobs update their statuses
UPDATE_INTERVAL=

# Time to wait between starting jobs (for staggering redis entries)
START_DELAY=

# Time interval between Manager status checks
MANAGER_REFRESH_RATE=

# Time in seconds to expire the completed jobs.
EXPIRE_TIME=

# Name of upload folder in storage bucket.
UPLOAD_PREFIX=

# HTTP Settings
CONCURRENT_REQUESTS_PER_HOST=

# Log settings
LOG_ENABLED=
LOG_LEVEL=
LOG_FILE=

# Overwrite directories with environment variables
DOWNLOAD_DIR=
OUTPUT_DIR=
LOG_DIR=
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ git:
language: python

python:
- 2.7
- 3.5
- 3.6
- 3.7
- 3.8

cache: pip

Expand Down
45 changes: 34 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

This repository is part of the [DeepCell Kiosk](https://github.com/vanvalenlab/kiosk-console). More information about the Kiosk project is available through [Read the Docs](https://deepcell-kiosk.readthedocs.io/en/master) and our [FAQ](http://www.deepcell.org/faq) page.

## Getting started
## Installation

First, clone the git repository and install the required dependencies.

Expand All @@ -23,38 +23,61 @@ cd kiosk-benchmarking
pip install -r requirements.txt
```

## Usage Examples
## Usage

Benchmarking can be run in 2 different modes: `benchmark` and `upload`.
The only thing necessary to use the CLI is the image file to process, the type of job, and the IP address or FQDN of the DeepCell Kiosk.

### Benchmark Mode
```bash
# from within the kiosk-benchmarking repository
python benchmarking path/to/image.png \
--job-type segmentation \
--host 123.456.789.012
```

It is also possible to override the default model and post-processing function for a given job type.

```bash
# from within the kiosk-benchmarking repository
python benchmarking benchmark --file image_to_process.png --count 100
python benchmarking path/to/image.png \
--job-type segmentation \
--host 123.456.789.012 \
--model ModelName:0 \
--post deep_watershed
```

`benchmark` mode will create a new job every `START_DELAY` seconds up to `COUNT` jobs. Each job is monitored and when all jobs are finished, the stats are summarized, cost is estimated, and the output file is uploaded to the `STORAGE_BUCKET`. The `FILE` is expected to be inside the bucket in the `UPLOAD_PREFIX` directory (`/uploads` by default) and can be either a single image or a zip file of images. The upload time can be simulated by changing the start delay.
### Benchmark Mode

### Upload Mode
The CLI can also be used to benchmark the cluster with high volume jobs.
It is a prerequisite that the the `FILE` exist in the `STORAGE_BUCKET` inside `UPLOAD_PREFIX` (e.g. `/uploads/image.png`).
There are also a number of other benchmarking options including `--upload-results` and `--calculate_cost`.
A new job is created every `START_DELAY` seconds up to `COUNT` jobs.
The upload time can be simulated by changing the start delay.

```bash
# from within the kiosk-benchmarking repository
python benchmarking upload --file local_file_to_upload.png
python benchmarking path/to/image.png \
--job-type segmentation \
--host 123.456.789.012 \
--model ModelName:0 \
--post deep_watershed \
--start-delay 0.5 \
--count 1000 \
--calculate_cost \
--upload-results
```

`upload` mode is designed for batch processing local files. The `FILE` path is checked for any zip or image files, each is uploaded and monitored. When all jobs are finished, the stats are summarized, cost is estimated, and the output file is uploaded to the `STORAGE_BUCKET`.
_It is easiest to run a benchmarking job from within the DeepCell Kiosk._

## Configuration

Each job can be configured using environmental variables in a `.env` file.
Each job can be configured using environmental variables in a `.env` file. Most of these environment variables can be overridden with command line options. Use `python benchmarking --help` for detailed list of options.

| Name | Description | Default Value |
| :--- | :--- | :--- |
| `API_HOST` | **REQUIRED**: Hostname and port for the *kiosk-frontend* API server. | `""` |
| `STORAGE_BUCKET` | **REQUIRED**: Cloud storage bucket address (e.g. `"gs://bucket-name"`). | `""` |
| `MODEL` | **REQUIRED**: Name and version of the model hosted by TensorFlow Serving (e.g. `"modelname:0"`). | `"modelname:0"` |
| `JOB_TYPE` | **REQUIRED**: Name of job workflow. | `"segmentation"` |
| `MODEL` | Name and version of the model hosted by TensorFlow Serving (e.g. `"modelname:0"`). Overrides default model for the given `JOB_TYPE` | `"modelname:0"` |
| `SCALE` | Rescale data by this float value for model compatibility. | `1` |
| `LABEL` | Integer value of label type. | `""` |
| `PREPROCESS` | Name of the preprocessing function to use (e.g. `"normalize"`). | `""` |
Expand Down
166 changes: 132 additions & 34 deletions benchmarking/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,116 @@
from benchmarking import settings


def get_arg_parser():
parser = argparse.ArgumentParser()
def valid_filepath(parser, arg):
"""File validation for argparsing.
https://stackoverflow.com/a/11541450
"""
if not os.path.exists(arg):
# Argparse uses the ArgumentTypeError to give a rejection message like:
# error: argument input: x does not exist
raise argparse.ArgumentTypeError('{0} does not exist'.format(arg))
return arg

parser.add_argument('mode', choices=['benchmark', 'upload'],
help='Benchmarking mode. `benchmark` for data that '
'already exists in the bucket, `upload` to upload'
'a local file/directory and process them all.')

parser.add_argument('-f', '--file', required=True,
def get_arg_parser():
parser = argparse.ArgumentParser(
prog='kioskcli',
description='The Kicsk-CLI is a Command Line Interface (CLI) for '
'interacting with the DeepCell Kiosk.'
)

# Job definition
parser.add_argument('file', type=str, metavar="FILE",
help='File to process in many duplicated jobs. '
'(Must exist in the cloud storage bucket.)')
'(Must exist in the cloud storage bucket if '
'using benchmark mode.)')

parser.add_argument('--benchmark', action='store_true',
help='Benchmarking mode. Manage COUNT simulated jobs. '
'The FILE must exist in the STORAGE_BUCKET.')

parser.add_argument('-j', '--job-type', type=str, required=True,
help='Type of job (name of Redis work queue).')

parser.add_argument('-t', '--host', type=str, required=True,
help='IP or FQDN of the DeepCell Kiosk API.')

parser.add_argument('-m', '--model', type=str,
default=settings.MODEL,
help='Name and version of model hosted by TensorFlow '
'Serving. Overrides the default model defined by '
'the JOB_TYPE.')

parser.add_argument('-b', '--storage-bucket', type=str,
default=settings.STORAGE_BUCKET,
help='Cloud storage bucket (e.g. gs://storage-bucket).'
'Only required if using `--upload-results`.')

parser.add_argument('-c', '--count', default=1, type=int,
help='Number of times to process the given file. '
'Only used in `benchmark` mode.')

parser.add_argument('--pre', '--preprocess', type=str,
default=settings.PREPROCESS,
help='Preprocessing function to use before model '
'prediction. Overrides default preprocessing '
'function for the JOB_TYPE.')

parser.add_argument('--post', '--postprocess', type=str,
default=settings.POSTPROCESS,
help='Postprocessing function to use after model '
'prediction. Overrides default postprocessing '
'function for the JOB_TYPE.')

parser.add_argument('-s', '--scale', type=str,
default=settings.SCALE,
help='Scale of the data. Data will be scaled up or '
'for the best model compatibility.')

parser.add_argument('-l', '--label', type=str,
default=settings.LABEL, choices=['', '0', '1', '2'],
help='Data type (e.g. nuclear, cytoplasmic, etc.).')

parser.add_argument('-U', '--upload', action='store_true',
help='If provided, uploads the file before creating '
'a new job. '
'(Only applicable in `benchmark` mode.)')

parser.add_argument('--upload-results', action='store_true',
help='Upload the final output file to the bucket.')

parser.add_argument('--calculate-cost', action='store_true',
help='Use the Grafana API to calculate the cost of '
'the job.')

# Timing / interval settings
parser.add_argument('--start-delay', type=float,
default=settings.START_DELAY,
help='Time between each job creation '
'(0.5s is a typical file upload time).')

parser.add_argument('--update-interval', type=float,
default=settings.UPDATE_INTERVAL,
help='Seconds between each job status refresh.')

parser.add_argument('-c', '--count', default=10, type=int,
help='Number of times to process the given file.')
parser.add_argument('--refresh-rate', type=float,
default=settings.MANAGER_REFRESH_RATE,
help='Seconds between each manager status check.')

parser.add_argument('-x', '--expire-time', type=float,
default=settings.EXPIRE_TIME,
help='Finished jobs expire after this many seconds.')

# Logging options
parser.add_argument('-L', '--log-level', default=settings.LOG_LEVEL,
choices=('DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL'),
help='log level (default: DEBUG)')
help='Only log the given level and above.')

parser.add_argument('--upload', action='store_true',
help='If provided, uploads the file before creating '
'a new job.')
# optional arguments
parser.add_argument('--upload-prefix', type=str,
default=settings.UPLOAD_PREFIX,
help='Maximum number of connections to the Kiosk.')

return parser

Expand All @@ -70,21 +158,21 @@ def initialize_logger(log_level):
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

log_level = getattr(logging, log_level)

formatter = logging.Formatter(fmt=settings.LOG_FORMAT)

console = logging.StreamHandler(stream=sys.stdout)
console.setFormatter(formatter)
console.setLevel(log_level)
logger.addHandler(console)

fh = logging.handlers.RotatingFileHandler(
filename=settings.LOG_FILE,
maxBytes=10000000,
backupCount=1)
fh.setFormatter(formatter)

console.setLevel(getattr(logging, log_level))
fh.setLevel(getattr(logging, log_level))

logger.addHandler(console)
fh.setLevel(log_level)
logger.addHandler(fh)


Expand All @@ -94,29 +182,39 @@ def initialize_logger(log_level):
if settings.LOG_ENABLED:
initialize_logger(log_level=args.log_level)

if args.scale: # optional, but if provided should be a float
try:
args.scale = float(args.scale)
except ValueError:
raise argparse.ArgumentTypeError(
'{0} is not a float'.format(args.scale))

mgr_kwargs = {
'host': settings.HOST,
'model_name': settings.MODEL_NAME,
'model_version': settings.MODEL_VERSION,
'job_type': settings.JOB_TYPE,
'data_scale': settings.SCALE,
'data_label': settings.LABEL,
'update_interval': settings.UPDATE_INTERVAL,
'start_delay': settings.START_DELAY,
'refresh_rate': settings.MANAGER_REFRESH_RATE,
'postprocess': settings.POSTPROCESS,
'preprocess': settings.PREPROCESS,
'upload_prefix': settings.UPLOAD_PREFIX,
'host': args.host,
'model': args.model,
'job_type': args.job_type,
'update_interval': args.update_interval,
'start_delay': args.start_delay,
'refresh_rate': args.refresh_rate,
'postprocess': args.post,
'preprocess': args.pre,
'upload_prefix': args.upload_prefix,
'expire_time': args.expire_time,
'data_scale': args.scale,
'data_label': args.label,
'storage_bucket': args.storage_bucket,
'upload_results': args.upload_results,
'calculate_cost': args.calculate_cost,
}

if not os.path.exists(args.file) and (args.mode == 'upload' or args.upload):
if not os.path.exists(args.file) and not args.benchmark and args.upload:
raise FileNotFoundError('%s could not be found.' % args.file)

if args.mode == 'benchmark':
if args.benchmark:
mgr = manager.BenchmarkingJobManager(**mgr_kwargs)
mgr.run(filepath=args.file, count=args.count, upload=args.upload)

elif args.mode == 'upload':
else:
mgr = manager.BatchProcessingJobManager(**mgr_kwargs)
mgr.run(filepath=args.file)

Expand Down
9 changes: 7 additions & 2 deletions benchmarking/cost.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import logging
import time
import urllib.parse
import urllib

import requests

Expand Down Expand Up @@ -143,12 +143,17 @@ def get_query_data(self, query, step=None):

def get_url(self, data):
"""Return a formatted URL for the Grafana API"""
# check python2 vs python3
if hasattr(urllib, 'parse'):
url_encode = urllib.parse.urlencode # pylint: disable=E1101
else:
url_encode = urllib.urlencode # pylint: disable=E1101
return 'http://{user}:{passwd}@{host}{route}?{querystring}'.format(
user=self.grafana_user,
passwd=self.grafana_password,
host=self.grafana_host,
route='/api/datasources/proxy/1/api/v1/query_range',
querystring=urllib.parse.urlencode(data))
querystring=url_encode(data))

def send_grafana_api_request(self, query, step=None):
"""Send a HTTP GET request with the data url encoded"""
Expand Down
Loading

0 comments on commit 7161082

Please sign in to comment.