From f97ac67c6a1e4035b1d9e8783d854a55c4d3b833 Mon Sep 17 00:00:00 2001 From: Brenden Matthews Date: Sun, 22 May 2016 08:39:19 -0700 Subject: [PATCH] Rename bluegreen_deploy -> zdd. --- README.md | 15 +- bluegreen_deploy.py | 673 +---------------- tests/test_marathon_lb.py | 8 +- .../{test_bluegreen_deploy.py => test_zdd.py} | 61 +- ...egreen_app_blue.json => zdd_app_blue.json} | 0 tests/{bluegreen_apps.json => zdd_apps.json} | 0 zdd.py | 674 ++++++++++++++++++ 7 files changed, 722 insertions(+), 709 deletions(-) mode change 100755 => 120000 bluegreen_deploy.py rename tests/{test_bluegreen_deploy.py => test_zdd.py} (77%) rename tests/{bluegreen_app_blue.json => zdd_app_blue.json} (100%) rename tests/{bluegreen_apps.json => zdd_apps.json} (100%) create mode 100755 zdd.py diff --git a/README.md b/README.md index 928f8523..ec55e3b4 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ fast, efficient, battle-tested, highly available load balancer with many advance * **Real-time LB updates**, via [Marathon's event bus](https://mesosphere.github.io/marathon/docs/event-bus.html) * Support for Marathon's **health checks** * **Multi-cert TLS/SSL** support + * [Zero-downtime deployments](#zero-downtime-deployments) * Per-service **HAProxy templates** * DC/OS integration * Automated Docker image builds ([mesosphere/marathon-lb](https://hub.docker.com/r/mesosphere/marathon-lb)) @@ -156,10 +157,10 @@ Marathon-lb exposes a few endpoints on port 9090 (by default). They are: | Endpoint | Description | |-------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `:9090/haproxy?stats` | HAProxy stats endpoint. This produces an HTML page which can be viewed in your browser, providing various statistics about the current HAProxy instance. | -| `:9090/haproxy?stats;csv` | This is a CSV version of the stats above, which can be consumed by other tools. For example, it's used in the [`bluegreen_deploy.py`](bluegreen_deploy.py) script. | +| `:9090/haproxy?stats;csv` | This is a CSV version of the stats above, which can be consumed by other tools. For example, it's used in the [`zdd.py`](zdd.py) script. | | `:9090/_haproxy_health_check` | HAProxy health check endpoint. Returns `200 OK` if HAProxy is healthy. | | `:9090/_haproxy_getconfig` | Returns the HAProxy config file as it was when HAProxy was started. Implemented in [`getconfig.lua`](getconfig.lua). | -| `:9090/_haproxy_getpids` | Returns the PIDs for all HAProxy instances within the current process namespace. This literally returns `$(pidof haproxy)`. Implemented in [`getpids.lua`](getpids.lua). This is also used by the [`bluegreen_deploy.py`](bluegreen_deploy.py) script to determine if connections have finished draining during a deploy. | +| `:9090/_haproxy_getpids` | Returns the PIDs for all HAProxy instances within the current process namespace. This literally returns `$(pidof haproxy)`. Implemented in [`getpids.lua`](getpids.lua). This is also used by the [`zdd.py`](zdd.py) script to determine if connections have finished draining during a deploy. | ## HAProxy configuration @@ -254,29 +255,31 @@ are [documented here](Longhelp.md#templates). < HTTP/1.1 200 OK ``` -## Zero downtime deployments +## Zero-downtime deployments Marathon-lb is able to perform canary style blue/green deployment with zero downtime. To execute such deployments, you must follow certain patterns when using Marathon. -The deployment method is described [in this Marathon document](https://mesosphere.github.io/marathon/docs/blue-green-deploy.html). Marathon-lb provides an implementation of the aforementioned deployment method with the script [`bluegreen_deploy.py`](bluegreen_deploy.py). To perform a zero downtime deploy using `bluegreen_deploy.py`, you must: +The deployment method is described [in this Marathon document](https://mesosphere.github.io/marathon/docs/blue-green-deploy.html). Marathon-lb provides an implementation of the aforementioned deployment method with the script [`zdd.py`](zdd.py). To perform a zero downtime deploy using `zdd.py`, you must: - Specify the `HAPROXY_DEPLOYMENT_GROUP` and `HAPROXY_DEPLOYMENT_ALT_PORT` labels in your app template - `HAPROXY_DEPLOYMENT_GROUP`: This label uniquely identifies a pair of apps belonging to a blue/green deployment, and will be used as the app name in the HAProxy configuration - `HAPROXY_DEPLOYMENT_ALT_PORT`: An alternate service port is required because Marathon requires service ports to be unique across all apps - Only use 1 service port: multiple ports are not yet implemented -- Use the provided `bluegreen_deploy.py` script to orchestrate the deploy: the script will make API calls to Marathon, and use the HAProxy stats endpoint to gracefully terminate instances +- Use the provided `zdd.py` script to orchestrate the deploy: the script will make API calls to Marathon, and use the HAProxy stats endpoint to gracefully terminate instances - The marathon-lb container must be run in privileged mode (to execute `iptables` commands) due to the issues outlined in the excellent blog post by the [Yelp engineering team found here](http://engineeringblog.yelp.com/2015/04/true-zero-downtime-haproxy-reloads.html) - If you have long-lived TCP connections using the same HAProxy instances, it may cause the deploy to take longer than necessary. The script will wait up to 5 minutes (by default) for connections to drain from HAProxy between steps, but any long-lived TCP connections will cause old instances of HAProxy to stick around. An example minimal configuration for a [test instance of nginx is included here](tests/1-nginx.json). You might execute a deployment from a CI tool like Jenkins with: ``` -./bluegreen_deploy.py -j 1-nginx.json -m http://master.mesos:8080 -f -l http://marathon-lb.marathon.mesos:9090 --syslog-socket /dev/null +./zdd.py -j 1-nginx.json -m http://master.mesos:8080 -f -l http://marathon-lb.marathon.mesos:9090 --syslog-socket /dev/null ``` Zero downtime deployments are accomplished through the use of a Lua module, which reports the number of HAProxy processes which are currently running by hitting the stats endpoint at the `/_haproxy_getpids`. After a restart, there will be multiple HAProxy PIDs until all remaining connections have gracefully terminated. By waiting for all connections to complete, you may safely and deterministically drain tasks. A caveat of this, however, is that if you have any long-lived connections on the same LB, HAProxy will continue to run and serve those connections until they complete, thereby breaking this technique. +The ZDD script includes the ability to specify a pre-kill hook, which is executed before draining tasks are terminated. This allows you to run your own automated checks against the old and new app before the deploy continues. + ## Mesos with IP-per-task support Marathon-lb supports load balancing for applications that use the Mesos IP-per-task diff --git a/bluegreen_deploy.py b/bluegreen_deploy.py deleted file mode 100755 index 0dea57b6..00000000 --- a/bluegreen_deploy.py +++ /dev/null @@ -1,672 +0,0 @@ -#!/usr/bin/env python3 - -from common import * -from datetime import datetime -from collections import namedtuple, defaultdict -from itertools import groupby - -import argparse -import json -import requests -import csv -import time -import re -import math -import six.moves.urllib as urllib -import socket -import sys -import subprocess - - -logger = logging.getLogger('bluegreen_deploy') - - -def query_yes_no(question, default="yes"): - # Thanks stackoverflow: - # https://stackoverflow.com/questions/3041986/python-command-line-yes-no-input - """Ask a yes/no question via input() and return their answer. - - "question" is a string that is presented to the user. - "default" is the presumed answer if the user just hits . - It must be "yes" (the default), "no" or None (meaning - an answer is required of the user). - - The "answer" return value is True for "yes" or False for "no". - """ - valid = {"yes": True, "y": True, "ye": True, - "no": False, "n": False} - if default is None: - prompt = " [y/n] " - elif default == "yes": - prompt = " [Y/n] " - elif default == "no": - prompt = " [y/N] " - else: - raise ValueError("invalid default answer: '%s'" % default) - - while True: - sys.stdout.write(question + prompt) - choice = input().lower() - if default is not None and choice == '': - return valid[default] - elif choice in valid: - return valid[choice] - else: - sys.stdout.write("Please respond with 'yes' or 'no' " - "(or 'y' or 'n').\n") - - -def marathon_get_request(args, path): - url = args.marathon + path - response = requests.get(url, auth=get_marathon_auth_params(args)) - response.raise_for_status() - return response - - -def list_marathon_apps(args): - response = marathon_get_request(args, "/v2/apps") - return response.json()['apps'] - - -def fetch_marathon_app(args, app_id): - response = marathon_get_request(args, "/v2/apps" + app_id) - return response.json()['app'] - - -def _get_alias_records(hostname): - """Return all IPv4 A records for a given hostname - """ - return socket.gethostbyname_ex(hostname)[2] - - -def _unparse_url_alias(url, addr): - """Reassemble a url object into a string but with a new address - """ - return urllib.parse.urlunparse((url[0], - addr + ":" + str(url.port), - url[2], - url[3], - url[4], - url[5])) - - -def get_marathon_lb_urls(args): - """Return a list of urls for all Aliases of the - marathon_lb url passed in as an argument - """ - url = urllib.parse.urlparse(args.marathon_lb) - addrs = _get_alias_records(url.hostname) - return [_unparse_url_alias(url, addr) for addr in addrs] - - -def fetch_haproxy_pids(haproxy_url): - try: - response = requests.get(haproxy_url + "/_haproxy_getpids") - response.raise_for_status() - except requests.exceptions.RequestException: - logger.exception("Caught exception when retrieving HAProxy" - " pids from " + haproxy_url) - raise - - return response.text.split() - - -def check_haproxy_reloading(haproxy_url): - """Return False if haproxy has only one pid, it is not reloading. - Return True if we catch an exception while making a request to - haproxy or if more than one pid is returned - """ - try: - pids = fetch_haproxy_pids(haproxy_url) - except requests.exceptions.RequestException: - # Assume reloading on any error, this should be caught with a timeout - return True - - if len(pids) > 1: - logger.info("Waiting for {} pids on {}".format(len(pids), haproxy_url)) - return True - - return False - - -def any_marathon_lb_reloading(marathon_lb_urls): - return any([check_haproxy_reloading(url) for url in marathon_lb_urls]) - - -def fetch_haproxy_stats(haproxy_url): - try: - response = requests.get(haproxy_url + "/haproxy?stats;csv") - response.raise_for_status() - except requests.exceptions.RequestException: - logger.exception("Caught exception when retrieving HAProxy" - " stats from " + haproxy_url) - raise - - return response.text - - -def fetch_combined_haproxy_stats(marathon_lb_urls): - raw = ''.join([fetch_haproxy_stats(url) for url in marathon_lb_urls]) - return parse_haproxy_stats(raw) - - -def parse_haproxy_stats(csv_data): - rows = csv_data.splitlines() - headings = rows.pop(0).lstrip('# ').rstrip(',\n').split(',') - csv_reader = csv.reader(rows, delimiter=',', quotechar="'") - - Row = namedtuple('Row', headings) - - return [Row(*row[0:-1]) for row in csv_reader if row[0][0] != '#'] - - -def get_deployment_label(app): - return get_deployment_group(app) + "_" + app['labels']['HAPROXY_0_PORT'] - - -def _if_app_listener(app, listener): - return (listener.pxname == get_deployment_label(app) and - listener.svname not in ['BACKEND', 'FRONTEND']) - - -def fetch_app_listeners(app, marathon_lb_urls): - haproxy_stats = fetch_combined_haproxy_stats(marathon_lb_urls) - return [l for l in haproxy_stats if _if_app_listener(app, l)] - - -def waiting_for_listeners(new_app, old_app, listeners, haproxy_count): - listener_count = (len(listeners) / haproxy_count) - return listener_count != new_app['instances'] + old_app['instances'] - - -def get_deployment_target(app): - return int(app['labels']['HAPROXY_DEPLOYMENT_TARGET_INSTANCES']) - - -def waiting_for_up_listeners(app, listeners, haproxy_count): - up_listeners = [l for l in listeners if l.status == 'UP'] - up_listener_count = (len(up_listeners) / haproxy_count) - - return up_listener_count < get_deployment_target(app) - - -def select_draining_listeners(listeners): - return [l for l in listeners if l.status == 'MAINT'] - - -def select_drained_listeners(listeners): - draining_listeners = select_draining_listeners(listeners) - return [l for l in draining_listeners if not _has_pending_requests(l)] - - -def get_svnames_from_task(task): - prefix = task['host'].replace('.', '_') - for port in task['ports']: - yield(prefix + '_{}'.format(port)) - - -def get_svnames_from_tasks(tasks): - svnames = [] - for task in tasks: - svnames += get_svnames_from_task(task) - return svnames - - -def _has_pending_requests(listener): - return int(listener.qcur or 0) > 0 or int(listener.scur or 0) > 0 - - -def find_drained_task_ids(app, listeners, haproxy_count): - """Return app tasks which have all haproxy listeners down and draining - of any pending sessions or connections - """ - tasks = zip(get_svnames_from_tasks(app['tasks']), app['tasks']) - drained_listeners = select_drained_listeners(listeners) - - drained_task_ids = [] - for svname, task in tasks: - task_listeners = [l for l in drained_listeners if l.svname == svname] - if len(task_listeners) == haproxy_count: - drained_task_ids.append(task['id']) - - return drained_task_ids - - -def find_draining_task_ids(app, listeners, haproxy_count): - """Return app tasks which have all haproxy listeners draining - """ - tasks = zip(get_svnames_from_tasks(app['tasks']), app['tasks']) - draining_listeners = select_draining_listeners(listeners) - - draining_task_ids = [] - for svname, task in tasks: - task_listeners = [l for l in draining_listeners if l.svname == svname] - if len(task_listeners) == haproxy_count: - draining_task_ids.append(task['id']) - - return draining_task_ids - - -def max_wait_not_exceeded(max_wait, timestamp): - return (time.time() - timestamp < max_wait) - - -def find_tasks_to_kill(args, new_app, old_app, timestamp): - marathon_lb_urls = get_marathon_lb_urls(args) - haproxy_count = len(marathon_lb_urls) - listeners = [] - - while max_wait_not_exceeded(args.max_wait, timestamp): - time.sleep(args.step_delay) - - logger.info("Existing app running {} instances, " - "new app running {} instances" - .format(old_app['instances'], new_app['instances'])) - - if any_marathon_lb_reloading(marathon_lb_urls): - continue - - try: - listeners = fetch_app_listeners(new_app, marathon_lb_urls) - except requests.exceptions.RequestException: - # Restart loop if we hit an exception while loading listeners, - # this may be normal behaviour - continue - - logger.info("Found {} app listeners across {} HAProxy instances" - .format(len(listeners), haproxy_count)) - - if waiting_for_listeners(new_app, old_app, listeners, haproxy_count): - continue - - if waiting_for_up_listeners(new_app, listeners, haproxy_count): - continue - - if waiting_for_drained_listeners(listeners): - continue - - return find_drained_task_ids(old_app, listeners, haproxy_count) - - logger.info('Timed out waiting for tasks to fully drain, find any draining' - ' tasks and continue with deployment...') - - return find_draining_task_ids(old_app, listeners, haproxy_count) - - -def deployment_in_progress(app): - return len(app['deployments']) > 0 - - -def execute_pre_kill_hook(args, old_app, tasks_to_kill): - if args.pre_kill_hook is not None: - logger.info("Calling pre-kill hook '{}'".format(args.pre_kill_hook)) - - subprocess.check_call([args.pre_kill_hook, - json.dumps(old_app), - json.dumps(tasks_to_kill)]) - - -def swap_bluegreen_apps(args, new_app, old_app): - old_app = fetch_marathon_app(args, old_app['id']) - new_app = fetch_marathon_app(args, new_app['id']) - - if deployment_in_progress(new_app): - time.sleep(args.step_delay) - return swap_bluegreen_apps(args, new_app, old_app) - - tasks_to_kill = find_tasks_to_kill(args, new_app, old_app, time.time()) - - if ready_to_delete_old_app(new_app, old_app, tasks_to_kill): - return safe_delete_app(args, old_app) - - if len(tasks_to_kill) > 0: - execute_pre_kill_hook(args, old_app, tasks_to_kill) - - logger.info("There are {} draining listeners, " - "about to kill the following tasks:\n - {}" - .format(len(tasks_to_kill), - "\n - ".join(tasks_to_kill))) - - if args.force or query_yes_no("Continue?"): - logger.info("Scaling down old app by {} instances" - .format(len(tasks_to_kill))) - kill_marathon_tasks(args, tasks_to_kill) - else: - return False - - if new_app['instances'] < get_deployment_target(new_app): - scale_new_app_instances(args, new_app, old_app) - - return swap_bluegreen_apps(args, new_app, old_app) - - -def ready_to_delete_old_app(new_app, old_app, draining_task_ids): - return (int(new_app['instances']) == get_deployment_target(new_app) and - len(draining_task_ids) == int(old_app['instances'])) - - -def waiting_for_drained_listeners(listeners): - return len(select_drained_listeners(listeners)) < 1 - - -def scale_new_app_instances(args, new_app, old_app): - """Scale the app by 50% of its existing instances until we - meet or surpass instances deployed for old_app. - At which point go right to the new_app deployment target - """ - instances = (math.floor(new_app['instances'] + - (new_app['instances'] + 1) / 2)) - if instances >= old_app['instances']: - instances = get_deployment_target(new_app) - - logger.info("Scaling new app up to {} instances".format(instances)) - return scale_marathon_app_instances(args, new_app, instances) - - -def safe_delete_app(args, app): - logger.info("About to delete old app {}".format(app['id'])) - if args.force or query_yes_no("Continue?"): - delete_marathon_app(args, app) - return True - else: - return False - - -def delete_marathon_app(args, app): - url = args.marathon + '/v2/apps' + app['id'] - response = requests.delete(url, - auth=get_marathon_auth_params(args)) - response.raise_for_status() - return response - - -def kill_marathon_tasks(args, ids): - data = json.dumps({'ids': ids}) - url = args.marathon + "/v2/tasks/delete?scale=true" - headers = {'Content-Type': 'application/json'} - response = requests.post(url, headers=headers, data=data, - auth=get_marathon_auth_params(args)) - response.raise_for_status() - return response - - -def scale_marathon_app_instances(args, app, instances): - url = args.marathon + "/v2/apps" + app['id'] - data = json.dumps({'instances': instances}) - headers = {'Content-Type': 'application/json'} - - response = requests.put(url, headers=headers, data=data, - auth=get_marathon_auth_params(args)) - - response.raise_for_status() - return response - - -def deploy_marathon_app(args, app): - url = args.marathon + "/v2/apps" - data = json.dumps(app) - headers = {'Content-Type': 'application/json'} - response = requests.post(url, headers=headers, data=data, - auth=get_marathon_auth_params(args)) - response.raise_for_status() - return response - - -def get_service_port(app): - try: - return \ - int(app['container']['docker']['portMappings'][0]['servicePort']) - except KeyError: - return int(app['ports'][0]) - - -def set_service_port(app, servicePort): - try: - app['container']['docker']['portMappings'][0]['servicePort'] = \ - int(servicePort) - except KeyError: - app['ports'][0] = int(servicePort) - - return app - - -def validate_app(app): - if app['id'] is None: - raise Exception("App doesn't contain a valid App ID") - if 'labels' not in app: - raise Exception("No labels found. Please define the" - "HAPROXY_DEPLOYMENT_GROUP label") - if 'HAPROXY_DEPLOYMENT_GROUP' not in app['labels']: - raise Exception("Please define the " - "HAPROXY_DEPLOYMENT_GROUP label") - if 'HAPROXY_DEPLOYMENT_ALT_PORT' not in app['labels']: - raise Exception("Please define the " - "HAPROXY_DEPLOYMENT_ALT_PORT label") - - -def set_app_ids(app, colour): - app['labels']['HAPROXY_APP_ID'] = app['id'] - app['id'] = app['id'] + '-' + colour - - if app['id'][0] != '/': - app['id'] = '/' + app['id'] - - return app - - -def set_service_ports(app, servicePort): - app['labels']['HAPROXY_0_PORT'] = str(get_service_port(app)) - try: - app['container']['docker']['portMappings'][0]['servicePort'] = \ - int(servicePort) - return app - except KeyError: - app['ports'][0] = int(servicePort) - return app - - -def select_next_port(app): - alt_port = int(app['labels']['HAPROXY_DEPLOYMENT_ALT_PORT']) - if int(app['ports'][0]) == alt_port: - return int(app['labels']['HAPROXY_0_PORT']) - else: - return alt_port - - -def select_next_colour(app): - if app['labels'].get('HAPROXY_DEPLOYMENT_COLOUR') == 'blue': - return 'green' - else: - return 'blue' - - -def sort_deploys(apps): - return sorted(apps, key=lambda a: a.get('labels', {}) - .get('HAPROXY_DEPLOYMENT_STARTED_AT', '0')) - - -def select_last_deploy(apps): - return sort_deploys(apps).pop() - - -def select_last_two_deploys(apps): - return sort_deploys(apps)[:-3:-1] - - -def get_deployment_group(app): - return app.get('labels', {}).get('HAPROXY_DEPLOYMENT_GROUP') - - -def fetch_previous_deploys(args, app): - apps = list_marathon_apps(args) - app_deployment_group = get_deployment_group(app) - return [a for a in apps if get_deployment_group(a) == app_deployment_group] - - -def prepare_deploy(args, previous_deploys, app): - """ Return a blue or a green version of `app` based on prexisting deploys - """ - if len(previous_deploys) > 0: - last_deploy = select_last_deploy(previous_deploys) - - app['instances'] = args.initial_instances - next_colour = select_next_colour(last_deploy) - next_port = select_next_port(last_deploy) - deployment_target_instances = last_deploy['instances'] - else: - next_colour = 'blue' - next_port = get_service_port(app) - deployment_target_instances = app['instances'] - - app = set_app_ids(app, next_colour) - app = set_service_ports(app, next_port) - app['labels']['HAPROXY_DEPLOYMENT_TARGET_INSTANCES'] = \ - str(deployment_target_instances) - app['labels']['HAPROXY_DEPLOYMENT_COLOUR'] = next_colour - app['labels']['HAPROXY_DEPLOYMENT_STARTED_AT'] = datetime.now().isoformat() - - return app - - -def load_app_json(args): - with open(args.json) as content_file: - return json.load(content_file) - - -def safe_resume_deploy(args, previous_deploys): - if args.resume: - logger.info("Found previous deployment, resuming") - new_app, old_app = select_last_two_deploys(previous_deploys) - return swap_bluegreen_apps(args, new_app, old_app) - else: - raise Exception("There appears to be an" - " existing deployment in progress") - - -def do_bluegreen_deploy(args, out=sys.stdout): - app = load_app_json(args) - validate_app(app) - - previous_deploys = fetch_previous_deploys(args, app) - - if len(previous_deploys) > 1: - # There is a stuck deploy - return safe_resume_deploy(args, previous_deploys) - - new_app = prepare_deploy(args, previous_deploys, app) - - logger.info('Final app definition:') - out.write(json.dumps(new_app, sort_keys=True, indent=2)) - out.write("\n") - - if args.dry_run: - return True - - if args.force or query_yes_no("Continue with deployment?"): - deploy_marathon_app(args, new_app) - - if len(previous_deploys) == 0: - # This was the first deploy, nothing to swap - return True - else: - # This is a standard blue/green deploy, swap new app with old - old_app = select_last_deploy(previous_deploys) - return swap_bluegreen_apps(args, new_app, old_app) - - -def get_arg_parser(): - parser = argparse.ArgumentParser( - description="Marathon HAProxy Load Balancer", - formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument("--longhelp", - help="Print out configuration details", - action="store_true" - ) - parser.add_argument("--marathon", "-m", - help="[required] Marathon endpoint, eg. -m " + - "http://marathon1:8080" - ) - parser.add_argument("--marathon-lb", "-l", - help="[required] Marathon-lb stats endpoint, eg. -l " + - "http://marathon-lb.marathon.mesos:9090" - ) - - parser.add_argument("--json", "-j", - help="[required] App JSON" - ) - parser.add_argument("--dry-run", "-d", - help="Perform a dry run", - action="store_true" - ) - parser.add_argument("--force", "-f", - help="Perform deployment un-prompted", - action="store_true" - ) - parser.add_argument("--step-delay", "-s", - help="Delay (in seconds) between each successive" - " deployment step", - type=int, default=5 - ) - parser.add_argument("--initial-instances", "-i", - help="Initial number of app instances to launch", - type=int, default=1 - ) - parser.add_argument("--resume", "-r", - help="Resume from a previous deployment", - action="store_true" - ) - parser.add_argument("--max-wait", "-w", - help="Maximum amount of time (in seconds) to wait" - " for HAProxy to drain connections", - type=int, default=300 - ) - parser.add_argument("--pre-kill-hook", - help="A path to an executable (such as a script) " - "which will be called before killing any tasks marked " - "for draining at each step. The script will be called " - "with 2 arguments: the old app definition (in JSON), " - "and the list of tasks which will be killed. An exit " - "code of 0 indicates the deploy may continue. " - "If the hook returns a non-zero exit code, the deploy " - "will stop, and an operator must intervene." - ) - parser = set_logging_args(parser) - parser = set_marathon_auth_args(parser) - return parser - - -def set_request_retries(): - s = requests.Session() - a = requests.adapters.HTTPAdapter(max_retries=3) - s.mount('http://', a) - - -def process_arguments(): - # Process arguments - arg_parser = get_arg_parser() - args = arg_parser.parse_args() - - if args.longhelp: - print(__doc__) - sys.exit() - # otherwise make sure that a Marathon URL was specified - else: - if args.marathon is None: - arg_parser.error('argument --marathon/-m is required') - if args.marathon_lb is None: - arg_parser.error('argument --marathon-lb/-l is required') - if args.json is None: - arg_parser.error('argument --json/-j is required') - - return args - - -if __name__ == '__main__': - args = process_arguments() - set_request_retries() - setup_logging(logger, args.syslog_socket, args.log_format) - - if do_bluegreen_deploy(args): - sys.exit(0) - else: - sys.exit(1) diff --git a/bluegreen_deploy.py b/bluegreen_deploy.py new file mode 120000 index 00000000..7d890a96 --- /dev/null +++ b/bluegreen_deploy.py @@ -0,0 +1 @@ +zdd.py \ No newline at end of file diff --git a/tests/test_marathon_lb.py b/tests/test_marathon_lb.py index d12de412..cb1f85cc 100644 --- a/tests/test_marathon_lb.py +++ b/tests/test_marathon_lb.py @@ -1178,9 +1178,9 @@ def test_config_simple_app_balance(self): ''' self.assertMultiLineEqual(config, expected) - def test_bluegreen_app(self): - with open('tests/bluegreen_apps.json') as data_file: - bluegreen_apps = json.load(data_file) + def test_zdd_app(self): + with open('tests/zdd_apps.json') as data_file: + zdd_apps = json.load(data_file) class Marathon: def __init__(self, data): @@ -1196,7 +1196,7 @@ def health_check(self): bind_http_https = True ssl_certs = "" templater = marathon_lb.ConfigTemplater() - apps = marathon_lb.get_apps(Marathon(bluegreen_apps['apps'])) + apps = marathon_lb.get_apps(Marathon(zdd_apps['apps'])) config = marathon_lb.config(apps, groups, bind_http_https, ssl_certs, templater) expected = self.base_config + ''' diff --git a/tests/test_bluegreen_deploy.py b/tests/test_zdd.py similarity index 77% rename from tests/test_bluegreen_deploy.py rename to tests/test_zdd.py index ffa4cc24..a1153cd8 100644 --- a/tests/test_bluegreen_deploy.py +++ b/tests/test_zdd.py @@ -1,5 +1,5 @@ import unittest -import bluegreen_deploy +import zdd import mock import json import time @@ -31,12 +31,12 @@ def json(self): def _load_listeners(): with open('tests/haproxy_stats.csv') as f: - return bluegreen_deploy.parse_haproxy_stats(f.read()) + return zdd.parse_haproxy_stats(f.read()) class TestBluegreenDeploy(unittest.TestCase): - @mock.patch('bluegreen_deploy.scale_marathon_app_instances') + @mock.patch('zdd.scale_marathon_app_instances') def test_scale_new_app_instances_up_50_percent(self, mock): """When scaling new_app instances, increase instances by 50% of existing instances if we have not yet met or surpassed the amount @@ -51,11 +51,11 @@ def test_scale_new_app_instances_up_50_percent(self, mock): old_app = {'instances': 30} args = Arguments() args.initial_instances = 5 - bluegreen_deploy.scale_new_app_instances(args, new_app, old_app) + zdd.scale_new_app_instances(args, new_app, old_app) mock.assert_called_with( args, new_app, 15) - @mock.patch('bluegreen_deploy.scale_marathon_app_instances') + @mock.patch('zdd.scale_marathon_app_instances') def test_scale_new_app_instances_to_target(self, mock): """When scaling new instances up, if we have met or surpassed the amount of instances deployed for old_app, go right to our @@ -70,20 +70,20 @@ def test_scale_new_app_instances_to_target(self, mock): old_app = {'instances': 8} args = Arguments() args.initial_instances = 5 - bluegreen_deploy.scale_new_app_instances(args, new_app, old_app) + zdd.scale_new_app_instances(args, new_app, old_app) mock.assert_called_with( args, new_app, 30) def test_find_drained_task_ids(self): listeners = _load_listeners() haproxy_instance_count = 2 - apps = json.loads(open('tests/bluegreen_app_blue.json').read()) + apps = json.loads(open('tests/zdd_app_blue.json').read()) app = apps['apps'][0] results = \ - bluegreen_deploy.find_drained_task_ids(app, - listeners, - haproxy_instance_count) + zdd.find_drained_task_ids(app, + listeners, + haproxy_instance_count) assert app['tasks'][0]['id'] in results # 2 l's down, no sessions assert app['tasks'][1]['id'] not in results # 1 l up, 1 down @@ -92,30 +92,30 @@ def test_find_drained_task_ids(self): def test_find_draining_task_ids(self): listeners = _load_listeners() haproxy_instance_count = 2 - apps = json.loads(open('tests/bluegreen_app_blue.json').read()) + apps = json.loads(open('tests/zdd_app_blue.json').read()) app = apps['apps'][0] results = \ - bluegreen_deploy.find_draining_task_ids(app, - listeners, - haproxy_instance_count) + zdd.find_draining_task_ids(app, + listeners, + haproxy_instance_count) assert app['tasks'][0]['id'] in results # 2 l's down, no sessions assert app['tasks'][1]['id'] not in results # 1 l up, 1 down assert app['tasks'][2]['id'] in results # 2 l's down, 1 w/ scur/qcur def test_get_svnames_from_tasks(self): - apps = json.loads(open('tests/bluegreen_app_blue.json').read()) + apps = json.loads(open('tests/zdd_app_blue.json').read()) tasks = apps['apps'][0]['tasks'] - task_svnames = bluegreen_deploy.get_svnames_from_tasks(tasks) + task_svnames = zdd.get_svnames_from_tasks(tasks) assert '10_0_6_25_16916' in task_svnames assert '10_0_6_25_31184' in task_svnames def test_parse_haproxy_stats(self): with open('tests/haproxy_stats.csv') as f: - results = bluegreen_deploy.parse_haproxy_stats(f.read()) + results = zdd.parse_haproxy_stats(f.read()) assert results[1].pxname == 'http-in' assert results[1].svname == 'IPv4-direct' @@ -128,18 +128,25 @@ def test_pre_kill_hook(self, mock): # TODO(BM): This test is naive. An end-to-end test would be nice. args = Arguments() args.pre_kill_hook = 'myhook' - app = { - 'id': 'myApp' + old_app = { + 'id': 'oldApp' + } + new_app = { + 'id': 'newApp' } tasks_to_kill = ['task1', 'task2'] - bluegreen_deploy.execute_pre_kill_hook(args, app, tasks_to_kill) + zdd.execute_pre_kill_hook(args, + old_app, + tasks_to_kill, + new_app) mock.assert_called_with([args.pre_kill_hook, - '{"id": "myApp"}', - '["task1", "task2"]']) + '{"id": "oldApp"}', + '["task1", "task2"]', + '{"id": "newApp"}']) - @mock.patch('bluegreen_deploy.fetch_combined_haproxy_stats', + @mock.patch('zdd.fetch_combined_haproxy_stats', mock.Mock(side_effect=lambda _: _load_listeners())) def test_fetch_app_listeners(self): app = { @@ -149,7 +156,7 @@ def test_fetch_app_listeners(self): } } - app_listeners = bluegreen_deploy.fetch_app_listeners(app, []) + app_listeners = zdd.fetch_app_listeners(app, []) assert app_listeners[0].pxname == 'foobar_8080' assert len(app_listeners) == 1 @@ -158,7 +165,7 @@ def test_fetch_app_listeners(self): mock.Mock(side_effect=lambda hostname: (hostname, [], ['127.0.0.1', '127.0.0.2']))) def test_get_marathon_lb_urls(self): - marathon_lb_urls = bluegreen_deploy.get_marathon_lb_urls(Arguments()) + marathon_lb_urls = zdd.get_marathon_lb_urls(Arguments()) assert 'http://127.0.0.1:9090' in marathon_lb_urls assert 'http://127.0.0.2:9090' in marathon_lb_urls @@ -166,14 +173,14 @@ def test_get_marathon_lb_urls(self): @mock.patch('requests.get', mock.Mock(side_effect=lambda k, auth: - MyResponse('tests/bluegreen_app_blue.json'))) + MyResponse('tests/zdd_app_blue.json'))) def test_simple(self): # This test just checks the output of the program against # some expected output from six import StringIO out = StringIO() - bluegreen_deploy.do_bluegreen_deploy(Arguments(), out) + zdd.do_zdd(Arguments(), out) output = json.loads(out.getvalue()) output['labels']['HAPROXY_DEPLOYMENT_STARTED_AT'] = "" diff --git a/tests/bluegreen_app_blue.json b/tests/zdd_app_blue.json similarity index 100% rename from tests/bluegreen_app_blue.json rename to tests/zdd_app_blue.json diff --git a/tests/bluegreen_apps.json b/tests/zdd_apps.json similarity index 100% rename from tests/bluegreen_apps.json rename to tests/zdd_apps.json diff --git a/zdd.py b/zdd.py new file mode 100755 index 00000000..2f1213f6 --- /dev/null +++ b/zdd.py @@ -0,0 +1,674 @@ +#!/usr/bin/env python3 + +from common import * +from datetime import datetime +from collections import namedtuple, defaultdict +from itertools import groupby + +import argparse +import json +import requests +import csv +import time +import re +import math +import six.moves.urllib as urllib +import socket +import sys +import subprocess + + +logger = logging.getLogger('zdd') + + +def query_yes_no(question, default="yes"): + # Thanks stackoverflow: + # https://stackoverflow.com/questions/3041986/python-command-line-yes-no-input + """Ask a yes/no question via input() and return their answer. + + "question" is a string that is presented to the user. + "default" is the presumed answer if the user just hits . + It must be "yes" (the default), "no" or None (meaning + an answer is required of the user). + + The "answer" return value is True for "yes" or False for "no". + """ + valid = {"yes": True, "y": True, "ye": True, + "no": False, "n": False} + if default is None: + prompt = " [y/n] " + elif default == "yes": + prompt = " [Y/n] " + elif default == "no": + prompt = " [y/N] " + else: + raise ValueError("invalid default answer: '%s'" % default) + + while True: + sys.stdout.write(question + prompt) + choice = input().lower() + if default is not None and choice == '': + return valid[default] + elif choice in valid: + return valid[choice] + else: + sys.stdout.write("Please respond with 'yes' or 'no' " + "(or 'y' or 'n').\n") + + +def marathon_get_request(args, path): + url = args.marathon + path + response = requests.get(url, auth=get_marathon_auth_params(args)) + response.raise_for_status() + return response + + +def list_marathon_apps(args): + response = marathon_get_request(args, "/v2/apps") + return response.json()['apps'] + + +def fetch_marathon_app(args, app_id): + response = marathon_get_request(args, "/v2/apps" + app_id) + return response.json()['app'] + + +def _get_alias_records(hostname): + """Return all IPv4 A records for a given hostname + """ + return socket.gethostbyname_ex(hostname)[2] + + +def _unparse_url_alias(url, addr): + """Reassemble a url object into a string but with a new address + """ + return urllib.parse.urlunparse((url[0], + addr + ":" + str(url.port), + url[2], + url[3], + url[4], + url[5])) + + +def get_marathon_lb_urls(args): + """Return a list of urls for all Aliases of the + marathon_lb url passed in as an argument + """ + url = urllib.parse.urlparse(args.marathon_lb) + addrs = _get_alias_records(url.hostname) + return [_unparse_url_alias(url, addr) for addr in addrs] + + +def fetch_haproxy_pids(haproxy_url): + try: + response = requests.get(haproxy_url + "/_haproxy_getpids") + response.raise_for_status() + except requests.exceptions.RequestException: + logger.exception("Caught exception when retrieving HAProxy" + " pids from " + haproxy_url) + raise + + return response.text.split() + + +def check_haproxy_reloading(haproxy_url): + """Return False if haproxy has only one pid, it is not reloading. + Return True if we catch an exception while making a request to + haproxy or if more than one pid is returned + """ + try: + pids = fetch_haproxy_pids(haproxy_url) + except requests.exceptions.RequestException: + # Assume reloading on any error, this should be caught with a timeout + return True + + if len(pids) > 1: + logger.info("Waiting for {} pids on {}".format(len(pids), haproxy_url)) + return True + + return False + + +def any_marathon_lb_reloading(marathon_lb_urls): + return any([check_haproxy_reloading(url) for url in marathon_lb_urls]) + + +def fetch_haproxy_stats(haproxy_url): + try: + response = requests.get(haproxy_url + "/haproxy?stats;csv") + response.raise_for_status() + except requests.exceptions.RequestException: + logger.exception("Caught exception when retrieving HAProxy" + " stats from " + haproxy_url) + raise + + return response.text + + +def fetch_combined_haproxy_stats(marathon_lb_urls): + raw = ''.join([fetch_haproxy_stats(url) for url in marathon_lb_urls]) + return parse_haproxy_stats(raw) + + +def parse_haproxy_stats(csv_data): + rows = csv_data.splitlines() + headings = rows.pop(0).lstrip('# ').rstrip(',\n').split(',') + csv_reader = csv.reader(rows, delimiter=',', quotechar="'") + + Row = namedtuple('Row', headings) + + return [Row(*row[0:-1]) for row in csv_reader if row[0][0] != '#'] + + +def get_deployment_label(app): + return get_deployment_group(app) + "_" + app['labels']['HAPROXY_0_PORT'] + + +def _if_app_listener(app, listener): + return (listener.pxname == get_deployment_label(app) and + listener.svname not in ['BACKEND', 'FRONTEND']) + + +def fetch_app_listeners(app, marathon_lb_urls): + haproxy_stats = fetch_combined_haproxy_stats(marathon_lb_urls) + return [l for l in haproxy_stats if _if_app_listener(app, l)] + + +def waiting_for_listeners(new_app, old_app, listeners, haproxy_count): + listener_count = (len(listeners) / haproxy_count) + return listener_count != new_app['instances'] + old_app['instances'] + + +def get_deployment_target(app): + return int(app['labels']['HAPROXY_DEPLOYMENT_TARGET_INSTANCES']) + + +def waiting_for_up_listeners(app, listeners, haproxy_count): + up_listeners = [l for l in listeners if l.status == 'UP'] + up_listener_count = (len(up_listeners) / haproxy_count) + + return up_listener_count < get_deployment_target(app) + + +def select_draining_listeners(listeners): + return [l for l in listeners if l.status == 'MAINT'] + + +def select_drained_listeners(listeners): + draining_listeners = select_draining_listeners(listeners) + return [l for l in draining_listeners if not _has_pending_requests(l)] + + +def get_svnames_from_task(task): + prefix = task['host'].replace('.', '_') + for port in task['ports']: + yield(prefix + '_{}'.format(port)) + + +def get_svnames_from_tasks(tasks): + svnames = [] + for task in tasks: + svnames += get_svnames_from_task(task) + return svnames + + +def _has_pending_requests(listener): + return int(listener.qcur or 0) > 0 or int(listener.scur or 0) > 0 + + +def find_drained_task_ids(app, listeners, haproxy_count): + """Return app tasks which have all haproxy listeners down and draining + of any pending sessions or connections + """ + tasks = zip(get_svnames_from_tasks(app['tasks']), app['tasks']) + drained_listeners = select_drained_listeners(listeners) + + drained_task_ids = [] + for svname, task in tasks: + task_listeners = [l for l in drained_listeners if l.svname == svname] + if len(task_listeners) == haproxy_count: + drained_task_ids.append(task['id']) + + return drained_task_ids + + +def find_draining_task_ids(app, listeners, haproxy_count): + """Return app tasks which have all haproxy listeners draining + """ + tasks = zip(get_svnames_from_tasks(app['tasks']), app['tasks']) + draining_listeners = select_draining_listeners(listeners) + + draining_task_ids = [] + for svname, task in tasks: + task_listeners = [l for l in draining_listeners if l.svname == svname] + if len(task_listeners) == haproxy_count: + draining_task_ids.append(task['id']) + + return draining_task_ids + + +def max_wait_not_exceeded(max_wait, timestamp): + return (time.time() - timestamp < max_wait) + + +def find_tasks_to_kill(args, new_app, old_app, timestamp): + marathon_lb_urls = get_marathon_lb_urls(args) + haproxy_count = len(marathon_lb_urls) + listeners = [] + + while max_wait_not_exceeded(args.max_wait, timestamp): + time.sleep(args.step_delay) + + logger.info("Existing app running {} instances, " + "new app running {} instances" + .format(old_app['instances'], new_app['instances'])) + + if any_marathon_lb_reloading(marathon_lb_urls): + continue + + try: + listeners = fetch_app_listeners(new_app, marathon_lb_urls) + except requests.exceptions.RequestException: + # Restart loop if we hit an exception while loading listeners, + # this may be normal behaviour + continue + + logger.info("Found {} app listeners across {} HAProxy instances" + .format(len(listeners), haproxy_count)) + + if waiting_for_listeners(new_app, old_app, listeners, haproxy_count): + continue + + if waiting_for_up_listeners(new_app, listeners, haproxy_count): + continue + + if waiting_for_drained_listeners(listeners): + continue + + return find_drained_task_ids(old_app, listeners, haproxy_count) + + logger.info('Timed out waiting for tasks to fully drain, find any draining' + ' tasks and continue with deployment...') + + return find_draining_task_ids(old_app, listeners, haproxy_count) + + +def deployment_in_progress(app): + return len(app['deployments']) > 0 + + +def execute_pre_kill_hook(args, old_app, tasks_to_kill, new_app): + if args.pre_kill_hook is not None: + logger.info("Calling pre-kill hook '{}'".format(args.pre_kill_hook)) + + subprocess.check_call([args.pre_kill_hook, + json.dumps(old_app), + json.dumps(tasks_to_kill), + json.dumps(new_app)]) + + +def swap_zdd_apps(args, new_app, old_app): + old_app = fetch_marathon_app(args, old_app['id']) + new_app = fetch_marathon_app(args, new_app['id']) + + if deployment_in_progress(new_app): + time.sleep(args.step_delay) + return swap_zdd_apps(args, new_app, old_app) + + tasks_to_kill = find_tasks_to_kill(args, new_app, old_app, time.time()) + + if ready_to_delete_old_app(new_app, old_app, tasks_to_kill): + return safe_delete_app(args, old_app) + + if len(tasks_to_kill) > 0: + execute_pre_kill_hook(args, old_app, tasks_to_kill, new_app) + + logger.info("There are {} draining listeners, " + "about to kill the following tasks:\n - {}" + .format(len(tasks_to_kill), + "\n - ".join(tasks_to_kill))) + + if args.force or query_yes_no("Continue?"): + logger.info("Scaling down old app by {} instances" + .format(len(tasks_to_kill))) + kill_marathon_tasks(args, tasks_to_kill) + else: + return False + + if new_app['instances'] < get_deployment_target(new_app): + scale_new_app_instances(args, new_app, old_app) + + return swap_zdd_apps(args, new_app, old_app) + + +def ready_to_delete_old_app(new_app, old_app, draining_task_ids): + return (int(new_app['instances']) == get_deployment_target(new_app) and + len(draining_task_ids) == int(old_app['instances'])) + + +def waiting_for_drained_listeners(listeners): + return len(select_drained_listeners(listeners)) < 1 + + +def scale_new_app_instances(args, new_app, old_app): + """Scale the app by 50% of its existing instances until we + meet or surpass instances deployed for old_app. + At which point go right to the new_app deployment target + """ + instances = (math.floor(new_app['instances'] + + (new_app['instances'] + 1) / 2)) + if instances >= old_app['instances']: + instances = get_deployment_target(new_app) + + logger.info("Scaling new app up to {} instances".format(instances)) + return scale_marathon_app_instances(args, new_app, instances) + + +def safe_delete_app(args, app): + logger.info("About to delete old app {}".format(app['id'])) + if args.force or query_yes_no("Continue?"): + delete_marathon_app(args, app) + return True + else: + return False + + +def delete_marathon_app(args, app): + url = args.marathon + '/v2/apps' + app['id'] + response = requests.delete(url, + auth=get_marathon_auth_params(args)) + response.raise_for_status() + return response + + +def kill_marathon_tasks(args, ids): + data = json.dumps({'ids': ids}) + url = args.marathon + "/v2/tasks/delete?scale=true" + headers = {'Content-Type': 'application/json'} + response = requests.post(url, headers=headers, data=data, + auth=get_marathon_auth_params(args)) + response.raise_for_status() + return response + + +def scale_marathon_app_instances(args, app, instances): + url = args.marathon + "/v2/apps" + app['id'] + data = json.dumps({'instances': instances}) + headers = {'Content-Type': 'application/json'} + + response = requests.put(url, headers=headers, data=data, + auth=get_marathon_auth_params(args)) + + response.raise_for_status() + return response + + +def deploy_marathon_app(args, app): + url = args.marathon + "/v2/apps" + data = json.dumps(app) + headers = {'Content-Type': 'application/json'} + response = requests.post(url, headers=headers, data=data, + auth=get_marathon_auth_params(args)) + response.raise_for_status() + return response + + +def get_service_port(app): + try: + return \ + int(app['container']['docker']['portMappings'][0]['servicePort']) + except KeyError: + return int(app['ports'][0]) + + +def set_service_port(app, servicePort): + try: + app['container']['docker']['portMappings'][0]['servicePort'] = \ + int(servicePort) + except KeyError: + app['ports'][0] = int(servicePort) + + return app + + +def validate_app(app): + if app['id'] is None: + raise Exception("App doesn't contain a valid App ID") + if 'labels' not in app: + raise Exception("No labels found. Please define the" + "HAPROXY_DEPLOYMENT_GROUP label") + if 'HAPROXY_DEPLOYMENT_GROUP' not in app['labels']: + raise Exception("Please define the " + "HAPROXY_DEPLOYMENT_GROUP label") + if 'HAPROXY_DEPLOYMENT_ALT_PORT' not in app['labels']: + raise Exception("Please define the " + "HAPROXY_DEPLOYMENT_ALT_PORT label") + + +def set_app_ids(app, colour): + app['labels']['HAPROXY_APP_ID'] = app['id'] + app['id'] = app['id'] + '-' + colour + + if app['id'][0] != '/': + app['id'] = '/' + app['id'] + + return app + + +def set_service_ports(app, servicePort): + app['labels']['HAPROXY_0_PORT'] = str(get_service_port(app)) + try: + app['container']['docker']['portMappings'][0]['servicePort'] = \ + int(servicePort) + return app + except KeyError: + app['ports'][0] = int(servicePort) + return app + + +def select_next_port(app): + alt_port = int(app['labels']['HAPROXY_DEPLOYMENT_ALT_PORT']) + if int(app['ports'][0]) == alt_port: + return int(app['labels']['HAPROXY_0_PORT']) + else: + return alt_port + + +def select_next_colour(app): + if app['labels'].get('HAPROXY_DEPLOYMENT_COLOUR') == 'blue': + return 'green' + else: + return 'blue' + + +def sort_deploys(apps): + return sorted(apps, key=lambda a: a.get('labels', {}) + .get('HAPROXY_DEPLOYMENT_STARTED_AT', '0')) + + +def select_last_deploy(apps): + return sort_deploys(apps).pop() + + +def select_last_two_deploys(apps): + return sort_deploys(apps)[:-3:-1] + + +def get_deployment_group(app): + return app.get('labels', {}).get('HAPROXY_DEPLOYMENT_GROUP') + + +def fetch_previous_deploys(args, app): + apps = list_marathon_apps(args) + app_deployment_group = get_deployment_group(app) + return [a for a in apps if get_deployment_group(a) == app_deployment_group] + + +def prepare_deploy(args, previous_deploys, app): + """ Return a blue or a green version of `app` based on prexisting deploys + """ + if len(previous_deploys) > 0: + last_deploy = select_last_deploy(previous_deploys) + + app['instances'] = args.initial_instances + next_colour = select_next_colour(last_deploy) + next_port = select_next_port(last_deploy) + deployment_target_instances = last_deploy['instances'] + else: + next_colour = 'blue' + next_port = get_service_port(app) + deployment_target_instances = app['instances'] + + app = set_app_ids(app, next_colour) + app = set_service_ports(app, next_port) + app['labels']['HAPROXY_DEPLOYMENT_TARGET_INSTANCES'] = \ + str(deployment_target_instances) + app['labels']['HAPROXY_DEPLOYMENT_COLOUR'] = next_colour + app['labels']['HAPROXY_DEPLOYMENT_STARTED_AT'] = datetime.now().isoformat() + + return app + + +def load_app_json(args): + with open(args.json) as content_file: + return json.load(content_file) + + +def safe_resume_deploy(args, previous_deploys): + if args.resume: + logger.info("Found previous deployment, resuming") + new_app, old_app = select_last_two_deploys(previous_deploys) + return swap_zdd_apps(args, new_app, old_app) + else: + raise Exception("There appears to be an" + " existing deployment in progress") + + +def do_zdd(args, out=sys.stdout): + app = load_app_json(args) + validate_app(app) + + previous_deploys = fetch_previous_deploys(args, app) + + if len(previous_deploys) > 1: + # There is a stuck deploy + return safe_resume_deploy(args, previous_deploys) + + new_app = prepare_deploy(args, previous_deploys, app) + + logger.info('Final app definition:') + out.write(json.dumps(new_app, sort_keys=True, indent=2)) + out.write("\n") + + if args.dry_run: + return True + + if args.force or query_yes_no("Continue with deployment?"): + deploy_marathon_app(args, new_app) + + if len(previous_deploys) == 0: + # This was the first deploy, nothing to swap + return True + else: + # This is a standard blue/green deploy, swap new app with old + old_app = select_last_deploy(previous_deploys) + return swap_zdd_apps(args, new_app, old_app) + + +def get_arg_parser(): + parser = argparse.ArgumentParser( + description="Marathon HAProxy Load Balancer", + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument("--longhelp", + help="Print out configuration details", + action="store_true" + ) + parser.add_argument("--marathon", "-m", + help="[required] Marathon endpoint, eg. -m " + + "http://marathon1:8080" + ) + parser.add_argument("--marathon-lb", "-l", + help="[required] Marathon-lb stats endpoint, eg. -l " + + "http://marathon-lb.marathon.mesos:9090" + ) + + parser.add_argument("--json", "-j", + help="[required] App JSON" + ) + parser.add_argument("--dry-run", "-d", + help="Perform a dry run", + action="store_true" + ) + parser.add_argument("--force", "-f", + help="Perform deployment un-prompted", + action="store_true" + ) + parser.add_argument("--step-delay", "-s", + help="Delay (in seconds) between each successive" + " deployment step", + type=int, default=5 + ) + parser.add_argument("--initial-instances", "-i", + help="Initial number of app instances to launch", + type=int, default=1 + ) + parser.add_argument("--resume", "-r", + help="Resume from a previous deployment", + action="store_true" + ) + parser.add_argument("--max-wait", "-w", + help="Maximum amount of time (in seconds) to wait" + " for HAProxy to drain connections", + type=int, default=300 + ) + parser.add_argument("--pre-kill-hook", + help="A path to an executable (such as a script) " + "which will be called before killing any tasks marked " + "for draining at each step. The script will be called " + "with 3 arguments (in JSON): the old app definition, " + "the list of tasks which will be killed, " + "and the new app definition. An exit " + "code of 0 indicates the deploy may continue. " + "If the hook returns a non-zero exit code, the deploy " + "will stop, and an operator must intervene." + ) + parser = set_logging_args(parser) + parser = set_marathon_auth_args(parser) + return parser + + +def set_request_retries(): + s = requests.Session() + a = requests.adapters.HTTPAdapter(max_retries=3) + s.mount('http://', a) + + +def process_arguments(): + # Process arguments + arg_parser = get_arg_parser() + args = arg_parser.parse_args() + + if args.longhelp: + print(__doc__) + sys.exit() + # otherwise make sure that a Marathon URL was specified + else: + if args.marathon is None: + arg_parser.error('argument --marathon/-m is required') + if args.marathon_lb is None: + arg_parser.error('argument --marathon-lb/-l is required') + if args.json is None: + arg_parser.error('argument --json/-j is required') + + return args + + +if __name__ == '__main__': + args = process_arguments() + set_request_retries() + setup_logging(logger, args.syslog_socket, args.log_format) + + if do_zdd(args): + sys.exit(0) + else: + sys.exit(1)