From 5db61050d258212dc5f8efe5faa8ecf390dcebf2 Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Wed, 14 Sep 2016 16:00:10 +0200 Subject: [PATCH 01/17] First attempt at a reload endpoint * Add --api-listen parameter --- marathon_lb.py | 87 ++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 74 insertions(+), 13 deletions(-) diff --git a/marathon_lb.py b/marathon_lb.py index df0425b3..2327ce6d 100755 --- a/marathon_lb.py +++ b/marathon_lb.py @@ -1542,6 +1542,11 @@ def get_arg_parser(): help="The HTTP address that Marathon can call this " + "script back at (http://lb1:8080)" ) + parser.add_argument("--api-listen", + help="The address marathon-lb listens on to respond to" + "API requests. Only available in SSE mode. (e.g., " + "0.0.0.0:8080)" + ) parser.add_argument("--haproxy-config", help="Location of haproxy configuration", default="/etc/haproxy/haproxy.cfg" @@ -1636,13 +1641,7 @@ def clear_callbacks(marathon, callback_url): marathon.remove_subscriber(callback_url) -def process_sse_events(marathon, config_file, groups, - bind_http_https, ssl_certs, haproxy_map): - processor = MarathonEventProcessor(marathon, - config_file, - groups, - bind_http_https, - ssl_certs, haproxy_map) +def process_sse_events(marathon, processor): try: events = marathon.get_event_stream() for event in events: @@ -1669,6 +1668,60 @@ def process_sse_events(marathon, config_file, groups, processor.stop() +class MarathonLbApi(object): + """ + This isn't a real API yet, it just contains an endpoint to trigger a reload + of the config: POST /reload + """ + + def __init__(self, listen_addr, processor): + self.__listen_addr = listen_addr + self.__processor = processor + + self.__thread = threading.Thread(target=self.listen) + + def run(self): + self.__thread.run() + + def listen(self): + try: + listen_uri = parse.urlparse(self.__listen_addr) + httpd = make_server(listen_uri.hostname, listen_uri.port, + self.wsgi_app) + httpd.serve_forever() + finally: + self.__processor.stop() + + # TODO(JayH5): Switch to a sane http server + # TODO(JayH5): Good exception catching, etc + def wsgi_app(self, env, start_response): + path = env.get("PATH_INFO", "").lstrip("/") + if re.match(r"reload/?$", path): + if env.get("REQUEST_METHOD") == "POST": + return self.reload(env, start_response) + + return self.method_not_allowed(env, start_response) + + return self.not_found(env, start_response) + + def reload(self, env, start_response): + self.__processor.reset_from_tasks() + start_response("200 OK", [("Content-Type", "text/plain")]) + + return ["Reloading...\n".encode("utf-8")] + + def method_not_allowed(self, env, start_response): + """Called if no method matches.""" + start_response("405 METHOD NOT ALLOWED", + [("Content-Type", "text/plain")]) + return ["Method Not Allowed".encode("utf-8")] + + def not_found(self, env, start_response): + """Called if no path matches.""" + start_response("404 NOT FOUND", [("Content-Type", "text/plain")]) + return ["Not Found".encode("utf-8")] + + if __name__ == '__main__': # Process arguments arg_parser = get_arg_parser() @@ -1689,6 +1742,8 @@ def process_sse_events(marathon, config_file, groups, if args.sse and args.listening: arg_parser.error( 'cannot use --listening and --sse at the same time') + if args.api_listen and not args.sse: + arg_parser.error('--api-listen is only supported with --sse') if bool(args.min_serv_port_ip_per_task) != \ bool(args.max_serv_port_ip_per_task): arg_parser.error( @@ -1741,16 +1796,22 @@ def process_sse_events(marathon, config_file, groups, finally: clear_callbacks(marathon, callback_url) elif args.sse: + processor = MarathonEventProcessor(marathon, + args.haproxy_config, + args.group, + not args.dont_bind_http_https, + args.ssl_certs, + args.haproxy_map) + + if args.api_listen: + api = MarathonLbApi(args.api_listen, processor) + api.run() + backoff = 3 while True: stream_started = time.time() try: - process_sse_events(marathon, - args.haproxy_config, - args.group, - not args.dont_bind_http_https, - args.ssl_certs, - args.haproxy_map) + process_sse_events(marathon, processor) except: logger.exception("Caught exception") backoff = backoff * 1.5 From 3ffc962c0b426b44c230d0bcf4d37d4d87b38df9 Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Wed, 14 Sep 2016 16:31:40 +0200 Subject: [PATCH 02/17] Parse API listen URI before launching thread * Catch errors early * Apparently the http:// is important... --- marathon_lb.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/marathon_lb.py b/marathon_lb.py index 2327ce6d..615acfe9 100755 --- a/marathon_lb.py +++ b/marathon_lb.py @@ -1545,7 +1545,7 @@ def get_arg_parser(): parser.add_argument("--api-listen", help="The address marathon-lb listens on to respond to" "API requests. Only available in SSE mode. (e.g., " - "0.0.0.0:8080)" + "http://0.0.0.0:8080)" ) parser.add_argument("--haproxy-config", help="Location of haproxy configuration", @@ -1674,8 +1674,8 @@ class MarathonLbApi(object): of the config: POST /reload """ - def __init__(self, listen_addr, processor): - self.__listen_addr = listen_addr + def __init__(self, listen_uri, processor): + self.__listen_uri = listen_uri self.__processor = processor self.__thread = threading.Thread(target=self.listen) @@ -1685,9 +1685,8 @@ def run(self): def listen(self): try: - listen_uri = parse.urlparse(self.__listen_addr) - httpd = make_server(listen_uri.hostname, listen_uri.port, - self.wsgi_app) + httpd = make_server(self.__listen_uri.hostname, + self.__listen_uri.port, self.wsgi_app) httpd.serve_forever() finally: self.__processor.stop() @@ -1804,7 +1803,8 @@ def not_found(self, env, start_response): args.haproxy_map) if args.api_listen: - api = MarathonLbApi(args.api_listen, processor) + listen_uri = parse.urlparse(args.api_listen) + api = MarathonLbApi(listen_uri, processor) api.run() backoff = 3 From 2e3f69f68eed7bc8dcf3fc97f2876d17c3bfe832 Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Wed, 14 Sep 2016 16:33:39 +0200 Subject: [PATCH 03/17] Refactor config validation into separate method * Simplify logic around skipping validation * In preparation for testing existing config... --- marathon_lb.py | 61 +++++++++++++++++++++++++++----------------------- 1 file changed, 33 insertions(+), 28 deletions(-) diff --git a/marathon_lb.py b/marathon_lb.py index 615acfe9..344d90df 100755 --- a/marathon_lb.py +++ b/marathon_lb.py @@ -1056,28 +1056,14 @@ def writeConfigAndValidate( # Change the file paths in the config to (temporarily) point to the # temporary map files so those can also be checked when the config is # validated - if not args.skip_validation: - temp_config = config.replace( - domain_map_file, domain_temp_map_file - ).replace(app_map_file, app_temp_map_file) + temp_config = config.replace( + domain_map_file, domain_temp_map_file + ).replace(app_map_file, app_temp_map_file) # Write the new config to a temporary file haproxyTempConfigFile = writeReplacementTempFile(temp_config, config_file) - # If skip validation flag is provided, don't check. - if args.skip_validation: - logger.debug("skipping validation.") - if haproxy_map: - moveTempFile(domain_temp_map_file, domain_map_file) - moveTempFile(app_temp_map_file, app_map_file) - moveTempFile(haproxyTempConfigFile, config_file) - return True - - # Check that config is valid - cmd = ['haproxy', '-f', haproxyTempConfigFile, '-c'] - logger.debug("checking config with command: " + str(cmd)) - returncode = subprocess.call(args=cmd) - if returncode == 0: + if validateConfig(haproxyTempConfigFile): # Move into place if haproxy_map: moveTempFile(domain_temp_map_file, domain_map_file) @@ -1086,11 +1072,13 @@ def writeConfigAndValidate( # Edit the config file again to point to the actual map paths with open(haproxyTempConfigFile, 'w') as tempConfig: tempConfig.write(config) + else: + truncateMapFileIfExists(domain_map_file) + truncateMapFileIfExists(app_map_file) moveTempFile(haproxyTempConfigFile, config_file) return True else: - logger.error("haproxy returned non-zero when checking config") return False @@ -1114,12 +1102,38 @@ def writeReplacementTempFile(content, file_to_replace): return tempFile +def validateConfig(haproxy_config_file): + # If skip validation flag is provided, don't check. + if args.skip_validation: + logger.debug("skipping validation.") + return True + + # Check that config is valid + cmd = ['haproxy', '-f', haproxy_config_file, '-c'] + logger.debug("checking config with command: " + str(cmd)) + returncode = subprocess.call(args=cmd) + if returncode == 0: + return True + else: + logger.error("haproxy returned non-zero when checking config") + return False + + def moveTempFile(temp_file, dest_file): # Replace the old file with the new from its temporary location logger.debug("moving temp file %s to %s", temp_file, dest_file) move(temp_file, dest_file) +def truncateMapFileIfExists(map_file): + if os.path.isfile(map_file): + logger.debug("Truncating map file as haproxy-map flag " + "is disabled %s", map_file) + fd = os.open(map_file, os.O_RDWR) + os.ftruncate(fd, 0) + os.close(fd) + + def compareWriteAndReloadConfig(config, config_file, domain_map_array, app_map_array, haproxy_map): # See if the last config on disk matches this, and if so don't reload @@ -1198,15 +1212,6 @@ def compareMapFile(map_file, map_string): return runningmap != map_string -def truncateMapFileIfExists(map_file): - if os.path.isfile(map_file): - logger.debug("Truncating map file as haproxy-map flag " - "is disabled %s", map_file) - fd = os.open(map_file, os.O_RDWR) - os.ftruncate(fd, 0) - os.close(fd) - - def get_health_check(app, portIndex): for check in app['healthChecks']: if check.get('port'): From c290531d42c96d1b82cd2046d50301f7f4ff8264 Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Wed, 14 Sep 2016 17:07:21 +0200 Subject: [PATCH 04/17] Add the ability to reload existing HAProxy config * /reload?existing --- marathon_lb.py | 95 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 70 insertions(+), 25 deletions(-) diff --git a/marathon_lb.py b/marathon_lb.py index 344d90df..7472a767 100755 --- a/marathon_lb.py +++ b/marathon_lb.py @@ -34,6 +34,7 @@ import threading import time import traceback +from cgi import parse_qs from itertools import cycle from operator import attrgetter from shutil import move @@ -1463,8 +1464,9 @@ def __init__(self, marathon, config_file, groups, self.__ssl_certs = ssl_certs self.__condition = threading.Condition() - self.__thread = threading.Thread(target=self.do_reset) + self.__thread = threading.Thread(target=self.try_reset) self.__pending_reset = False + self.__pending_reload = False self.__stop = False self.__haproxy_map = haproxy_map self.__thread.start() @@ -1472,39 +1474,69 @@ def __init__(self, marathon, config_file, groups, # Fetch the base data self.reset_from_tasks() - def do_reset(self): + def try_reset(self): with self.__condition: logger.info('starting event processor thread') while True: + pending_reset = False + pending_reload = False + self.__condition.acquire() + if self.__stop: logger.info('stopping event processor thread') return - if not self.__pending_reset: + + pending_reset = self.__pending_reset + pending_reload = self.__pending_reload + if not pending_reset and not pending_reload: if not self.__condition.wait(300): logger.info('condition wait expired') + self.__pending_reset = False + self.__pending_reload = False + self.__condition.release() - try: - start_time = time.time() - - self.__apps = get_apps(self.__marathon) - regenerate_config(self.__apps, - self.__config_file, - self.__groups, - self.__bind_http_https, - self.__ssl_certs, - self.__templater, - self.__haproxy_map) - - logger.debug("updating tasks finished, took %s seconds", - time.time() - start_time) - except requests.exceptions.ConnectionError as e: - logger.error("Connection error({0}): {1}".format( - e.errno, e.strerror)) - except: - logger.exception("Unexpected error!") + # Reset takes precedence over reload + if pending_reset: + self.do_reset() + elif pending_reload: + self.do_reload() + else: + # Timed out waiting on the condition variable, just do a + # full reset for good measure (as was done before). + self.do_reset() + + def do_reset(self): + try: + start_time = time.time() + + self.__apps = get_apps(self.__marathon) + regenerate_config(self.__apps, + self.__config_file, + self.__groups, + self.__bind_http_https, + self.__ssl_certs, + self.__templater, + self.__haproxy_map) + + logger.debug("updating tasks finished, took %s seconds", + time.time() - start_time) + except requests.exceptions.ConnectionError as e: + logger.error("Connection error({0}): {1}".format( + e.errno, e.strerror)) + except: + logger.exception("Unexpected error!") + + def do_reload(self): + try: + # Validate the existing config before reloading + logger.debug("attempting to reload existing config...") + if validateConfig(self.__config_file): + reloadConfig() + except: + logger.exception("Unexpected error!") def stop(self): self.__condition.acquire() @@ -1518,6 +1550,12 @@ def reset_from_tasks(self): self.__condition.notify() self.__condition.release() + def reload_existing_config(self): + self.__condition.acquire() + self.__pending_reload = True + self.__condition.notify() + self.__condition.release() + def handle_event(self, event): if event['eventType'] == 'status_update_event' or \ event['eventType'] == 'health_status_changed_event' or \ @@ -1708,11 +1746,18 @@ def wsgi_app(self, env, start_response): return self.not_found(env, start_response) - def reload(self, env, start_response): - self.__processor.reset_from_tasks() + def reload(self, env, start_response, existing=False): + query_params = parse_qs(env["QUERY_STRING"]) + if "existing" in query_params: + self.__processor.reload_existing_config() + msg = "Reloading existing config...\n" + else: + self.__processor.reset_from_tasks() + msg = "Reloading config...\n" + start_response("200 OK", [("Content-Type", "text/plain")]) - return ["Reloading...\n".encode("utf-8")] + return [msg.encode("utf-8")] def method_not_allowed(self, env, start_response): """Called if no method matches.""" From 55226a76c82effad80dbb3eb98fcb4c418fa2098 Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Thu, 15 Sep 2016 15:37:12 +0200 Subject: [PATCH 05/17] Fix notifying bools in event processor --- marathon_lb.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/marathon_lb.py b/marathon_lb.py index 7472a767..e52c975e 100755 --- a/marathon_lb.py +++ b/marathon_lb.py @@ -1478,21 +1478,18 @@ def try_reset(self): with self.__condition: logger.info('starting event processor thread') while True: - pending_reset = False - pending_reload = False - self.__condition.acquire() if self.__stop: logger.info('stopping event processor thread') return - pending_reset = self.__pending_reset - pending_reload = self.__pending_reload - if not pending_reset and not pending_reload: + if not self.__pending_reset and not self.__pending_reload: if not self.__condition.wait(300): logger.info('condition wait expired') + pending_reset = self.__pending_reset + pending_reload = self.__pending_reload self.__pending_reset = False self.__pending_reload = False From 33dcef53b38f2b9d663f025155cdf68aa89ac00a Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Thu, 15 Sep 2016 15:41:34 +0200 Subject: [PATCH 06/17] Improve debug logging around config comparisons --- marathon_lb.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/marathon_lb.py b/marathon_lb.py index e52c975e..ace9d65b 100755 --- a/marathon_lb.py +++ b/marathon_lb.py @@ -1169,8 +1169,9 @@ def compareWriteAndReloadConfig(config, config_file, domain_map_array, app_map_string, app_map_file, haproxy_map): reloadConfig() else: - logger.warning("skipping reload: config not valid") - + logger.warning("skipping reload: config/map not valid") + else: + logger.debug("skipping reload: config/map unchanged") else: truncateMapFileIfExists(domain_map_file) truncateMapFileIfExists(app_map_file) @@ -1184,6 +1185,8 @@ def compareWriteAndReloadConfig(config, config_file, domain_map_array, reloadConfig() else: logger.warning("skipping reload: config not valid") + else: + logger.debug("skipping reload: config unchanged") def generateMapString(map_array): From c25789858f8e2141b9959aae68a3a7b003e88736 Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Thu, 15 Sep 2016 15:46:00 +0200 Subject: [PATCH 07/17] Tweak response messages for reload API --- marathon_lb.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/marathon_lb.py b/marathon_lb.py index ace9d65b..af80a8cb 100755 --- a/marathon_lb.py +++ b/marathon_lb.py @@ -1750,10 +1750,10 @@ def reload(self, env, start_response, existing=False): query_params = parse_qs(env["QUERY_STRING"]) if "existing" in query_params: self.__processor.reload_existing_config() - msg = "Reloading existing config...\n" + msg = "Triggered reload of existing config...\n" else: self.__processor.reset_from_tasks() - msg = "Reloading config...\n" + msg = "Triggered reload of config...\n" start_response("200 OK", [("Content-Type", "text/plain")]) From 9df0b4f44bedefbb8ddf4f13316b736999facef0 Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Thu, 15 Sep 2016 15:57:43 +0200 Subject: [PATCH 08/17] Do a slightly better check on query params --- marathon_lb.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/marathon_lb.py b/marathon_lb.py index af80a8cb..f03cbb04 100755 --- a/marathon_lb.py +++ b/marathon_lb.py @@ -1748,7 +1748,8 @@ def wsgi_app(self, env, start_response): def reload(self, env, start_response, existing=False): query_params = parse_qs(env["QUERY_STRING"]) - if "existing" in query_params: + existing = query_params.get("existing", []) + if existing and existing[0].lower() in ["true", "1"]: self.__processor.reload_existing_config() msg = "Triggered reload of existing config...\n" else: From c5a5c501581a55534e6a66ba04fac22a8182409f Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Thu, 15 Sep 2016 16:28:46 +0200 Subject: [PATCH 09/17] Add a README note about the endpoint --- README.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e340e178..65110808 100644 --- a/README.md +++ b/README.md @@ -162,7 +162,7 @@ Currently it creates a lookup dictionary only for host header (both HTTP and HTT ### API Endpoints -Marathon-lb exposes a few endpoints on port 9090 (by default). They are: +Marathon-lb exposes a few endpoints served by HAProxy on port 9090 (by default). They are: | Endpoint | Description | |-------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| @@ -175,6 +175,11 @@ Marathon-lb exposes a few endpoints on port 9090 (by default). They are: | `:9090/_haproxy_getpids` | Returns the PIDs for all HAProxy instances within the current process namespace. This literally returns `$(pidof haproxy)`. Implemented in [`getpids.lua`](getpids.lua). This is also used by the [`zdd.py`](zdd.py) script to determine if connections have finished draining during a deploy. | +In addition to these, marathon-lb can (optionally) expose an endpoint for performing some useful actions. By setting the `--api-endpoint` parameter to a listening address (e.g. `http://0.0.0.0:8080`), a *very* basic API is made available for managing marathon-lb. Currently the only endpoint is for triggering a configuration reload. This is done via the `/reload` endpoint. You can also specify a query parameter to reload existing config, rather than regenerating the HAProxy config from Marathon: `/reload?existing=true`. This is **only** available in SSE mode. + +This endpoint is useful as a debugging tool: a general way to poke marathon-lb to get it to reload. It may also be useful in other cases, such as when files related to the HAProxy config change (e.g. SSL certificates) but the actual contents of the HAProxy config files haven't changed. + + ## HAProxy Configuration ### App Labels From eef20c87e1dffdb30e9d9e50d32df2332700a9e8 Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Thu, 15 Sep 2016 16:30:12 +0200 Subject: [PATCH 10/17] Regenerate Longhelp.md --- Longhelp.md | 5 +++++ marathon_lb.py | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/Longhelp.md b/Longhelp.md index 06af3f3e..d4e116e5 100644 --- a/Longhelp.md +++ b/Longhelp.md @@ -20,6 +20,7 @@ which marathon-lb can be reached by Marathon. ``` usage: marathon_lb.py [-h] [--longhelp] [--marathon MARATHON [MARATHON ...]] [--listening LISTENING] [--callback-url CALLBACK_URL] + [--api-listen API_LISTEN] [--haproxy-config HAPROXY_CONFIG] [--group GROUP] [--command COMMAND] [--sse] [--health-check] [--lru-cache-capacity LRU_CACHE_CAPACITY] @@ -51,6 +52,10 @@ optional arguments: --callback-url CALLBACK_URL, -u CALLBACK_URL The HTTP address that Marathon can call this script back at (http://lb1:8080) (default: None) + --api-listen API_LISTEN + The address marathon-lb listens on to respond to API + requests. Only available in SSE mode. (e.g., + http://0.0.0.0:8080) (default: None) --haproxy-config HAPROXY_CONFIG Location of haproxy configuration (default: /etc/haproxy/haproxy.cfg) diff --git a/marathon_lb.py b/marathon_lb.py index f03cbb04..569d9a92 100755 --- a/marathon_lb.py +++ b/marathon_lb.py @@ -1586,8 +1586,8 @@ def get_arg_parser(): "script back at (http://lb1:8080)" ) parser.add_argument("--api-listen", - help="The address marathon-lb listens on to respond to" - "API requests. Only available in SSE mode. (e.g., " + help="The address marathon-lb listens on to respond " + "to API requests. Only available in SSE mode. (e.g., " "http://0.0.0.0:8080)" ) parser.add_argument("--haproxy-config", From 40890e265cafc14bd7d274190556b8d6b7bfe479 Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Thu, 22 Sep 2016 10:16:00 +0200 Subject: [PATCH 11/17] Remove API and add signal handling to the event processor --- marathon_lb.py | 117 +++++++++++-------------------------------------- 1 file changed, 25 insertions(+), 92 deletions(-) diff --git a/marathon_lb.py b/marathon_lb.py index 569d9a92..317cfef0 100755 --- a/marathon_lb.py +++ b/marathon_lb.py @@ -28,13 +28,13 @@ import random import re import shlex +import signal import stat import subprocess import sys import threading import time import traceback -from cgi import parse_qs from itertools import cycle from operator import attrgetter from shutil import move @@ -1562,6 +1562,16 @@ def handle_event(self, event): event['eventType'] == 'api_post_event': self.reset_from_tasks() + def handle_signal(self, sig, stack): + if sig == signal.SIGHUP: + logger.debug('received signal SIGHUP - reloading config') + self.reset_from_tasks() + elif sig == signal.SIGUSR1: + logger.debug('received signal SIGUSR1 - reloading existing config') + self.reload_existing_config() + else: + logger.warning('received unknown signal %d' % (sig,)) + def get_arg_parser(): parser = argparse.ArgumentParser( @@ -1585,11 +1595,6 @@ def get_arg_parser(): help="The HTTP address that Marathon can call this " + "script back at (http://lb1:8080)" ) - parser.add_argument("--api-listen", - help="The address marathon-lb listens on to respond " - "to API requests. Only available in SSE mode. (e.g., " - "http://0.0.0.0:8080)" - ) parser.add_argument("--haproxy-config", help="Location of haproxy configuration", default="/etc/haproxy/haproxy.cfg" @@ -1650,13 +1655,7 @@ def get_arg_parser(): return parser -def run_server(marathon, listen_addr, callback_url, config_file, groups, - bind_http_https, ssl_certs, haproxy_map, marathon_ca_cert): - processor = MarathonEventProcessor(marathon, - config_file, - groups, - bind_http_https, - ssl_certs, haproxy_map) +def run_server(marathon, listen_addr, callback_url, processor): try: marathon.add_subscriber(callback_url) @@ -1711,67 +1710,6 @@ def process_sse_events(marathon, processor): processor.stop() -class MarathonLbApi(object): - """ - This isn't a real API yet, it just contains an endpoint to trigger a reload - of the config: POST /reload - """ - - def __init__(self, listen_uri, processor): - self.__listen_uri = listen_uri - self.__processor = processor - - self.__thread = threading.Thread(target=self.listen) - - def run(self): - self.__thread.run() - - def listen(self): - try: - httpd = make_server(self.__listen_uri.hostname, - self.__listen_uri.port, self.wsgi_app) - httpd.serve_forever() - finally: - self.__processor.stop() - - # TODO(JayH5): Switch to a sane http server - # TODO(JayH5): Good exception catching, etc - def wsgi_app(self, env, start_response): - path = env.get("PATH_INFO", "").lstrip("/") - if re.match(r"reload/?$", path): - if env.get("REQUEST_METHOD") == "POST": - return self.reload(env, start_response) - - return self.method_not_allowed(env, start_response) - - return self.not_found(env, start_response) - - def reload(self, env, start_response, existing=False): - query_params = parse_qs(env["QUERY_STRING"]) - existing = query_params.get("existing", []) - if existing and existing[0].lower() in ["true", "1"]: - self.__processor.reload_existing_config() - msg = "Triggered reload of existing config...\n" - else: - self.__processor.reset_from_tasks() - msg = "Triggered reload of config...\n" - - start_response("200 OK", [("Content-Type", "text/plain")]) - - return [msg.encode("utf-8")] - - def method_not_allowed(self, env, start_response): - """Called if no method matches.""" - start_response("405 METHOD NOT ALLOWED", - [("Content-Type", "text/plain")]) - return ["Method Not Allowed".encode("utf-8")] - - def not_found(self, env, start_response): - """Called if no path matches.""" - start_response("404 NOT FOUND", [("Content-Type", "text/plain")]) - return ["Not Found".encode("utf-8")] - - if __name__ == '__main__': # Process arguments arg_parser = get_arg_parser() @@ -1792,8 +1730,6 @@ def not_found(self, env, start_response): if args.sse and args.listening: arg_parser.error( 'cannot use --listening and --sse at the same time') - if args.api_listen and not args.sse: - arg_parser.error('--api-listen is only supported with --sse') if bool(args.min_serv_port_ip_per_task) != \ bool(args.max_serv_port_ip_per_task): arg_parser.error( @@ -1832,6 +1768,18 @@ def not_found(self, env, start_response): get_marathon_auth_params(args), args.marathon_ca_cert) + # If we're going to be handling events, set up the event processor and + # hook it up to the process signals. + if args.listening or args.sse: + processor = MarathonEventProcessor(marathon, + args.haproxy_config, + args.group, + not args.dont_bind_http_https, + args.ssl_certs, + args.haproxy_map) + signal.signal(signal.SIGHUP, processor.handle_signal) + signal.signal(signal.SIGUSR1, processor.handle_signal) + # If in listening mode, spawn a webserver waiting for events. Otherwise # just write the config. if args.listening: @@ -1839,25 +1787,10 @@ def not_found(self, env, start_response): "and will be removed in future releases") callback_url = args.callback_url or args.listening try: - run_server(marathon, args.listening, callback_url, - args.haproxy_config, args.group, - not args.dont_bind_http_https, args.ssl_certs, - args.haproxy_map, args.marathon_ca_cert) + run_server(marathon, args.listening, callback_url, processor) finally: clear_callbacks(marathon, callback_url) elif args.sse: - processor = MarathonEventProcessor(marathon, - args.haproxy_config, - args.group, - not args.dont_bind_http_https, - args.ssl_certs, - args.haproxy_map) - - if args.api_listen: - listen_uri = parse.urlparse(args.api_listen) - api = MarathonLbApi(listen_uri, processor) - api.run() - backoff = 3 while True: stream_started = time.time() From 7b47d2d2de3f661f3807dca2c179d99317432eda Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Thu, 22 Sep 2016 11:59:32 +0200 Subject: [PATCH 12/17] Revert "Regenerate Longhelp.md" This reverts commit eef20c87e1dffdb30e9d9e50d32df2332700a9e8. --- Longhelp.md | 5 ----- 1 file changed, 5 deletions(-) diff --git a/Longhelp.md b/Longhelp.md index d4e116e5..06af3f3e 100644 --- a/Longhelp.md +++ b/Longhelp.md @@ -20,7 +20,6 @@ which marathon-lb can be reached by Marathon. ``` usage: marathon_lb.py [-h] [--longhelp] [--marathon MARATHON [MARATHON ...]] [--listening LISTENING] [--callback-url CALLBACK_URL] - [--api-listen API_LISTEN] [--haproxy-config HAPROXY_CONFIG] [--group GROUP] [--command COMMAND] [--sse] [--health-check] [--lru-cache-capacity LRU_CACHE_CAPACITY] @@ -52,10 +51,6 @@ optional arguments: --callback-url CALLBACK_URL, -u CALLBACK_URL The HTTP address that Marathon can call this script back at (http://lb1:8080) (default: None) - --api-listen API_LISTEN - The address marathon-lb listens on to respond to API - requests. Only available in SSE mode. (e.g., - http://0.0.0.0:8080) (default: None) --haproxy-config HAPROXY_CONFIG Location of haproxy configuration (default: /etc/haproxy/haproxy.cfg) From 20453ac5098fb611d4e5443ee6b9d32f3b640667 Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Thu, 22 Sep 2016 11:59:44 +0200 Subject: [PATCH 13/17] Revert "Add a README note about the endpoint" This reverts commit c5a5c501581a55534e6a66ba04fac22a8182409f. --- README.md | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/README.md b/README.md index 65110808..e340e178 100644 --- a/README.md +++ b/README.md @@ -162,7 +162,7 @@ Currently it creates a lookup dictionary only for host header (both HTTP and HTT ### API Endpoints -Marathon-lb exposes a few endpoints served by HAProxy on port 9090 (by default). They are: +Marathon-lb exposes a few endpoints on port 9090 (by default). They are: | Endpoint | Description | |-------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| @@ -175,11 +175,6 @@ Marathon-lb exposes a few endpoints served by HAProxy on port 9090 (by default). | `:9090/_haproxy_getpids` | Returns the PIDs for all HAProxy instances within the current process namespace. This literally returns `$(pidof haproxy)`. Implemented in [`getpids.lua`](getpids.lua). This is also used by the [`zdd.py`](zdd.py) script to determine if connections have finished draining during a deploy. | -In addition to these, marathon-lb can (optionally) expose an endpoint for performing some useful actions. By setting the `--api-endpoint` parameter to a listening address (e.g. `http://0.0.0.0:8080`), a *very* basic API is made available for managing marathon-lb. Currently the only endpoint is for triggering a configuration reload. This is done via the `/reload` endpoint. You can also specify a query parameter to reload existing config, rather than regenerating the HAProxy config from Marathon: `/reload?existing=true`. This is **only** available in SSE mode. - -This endpoint is useful as a debugging tool: a general way to poke marathon-lb to get it to reload. It may also be useful in other cases, such as when files related to the HAProxy config change (e.g. SSL certificates) but the actual contents of the HAProxy config files haven't changed. - - ## HAProxy Configuration ### App Labels From fc40264d36ad651b08b13ea038c8b5ac867912f6 Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Thu, 22 Sep 2016 15:33:12 +0200 Subject: [PATCH 14/17] Add Lua script and endpoints to send signals to marathon-lb --- config.py | 6 ++++++ signalmlb.lua | 40 +++++++++++++++++++++++++++++++++++++++ tests/test_marathon_lb.py | 6 ++++++ 3 files changed, 52 insertions(+) create mode 100644 signalmlb.lua diff --git a/config.py b/config.py index 0bef6768..a906f61a 100644 --- a/config.py +++ b/config.py @@ -60,6 +60,7 @@ def load(self): lua-load /marathon-lb/getpids.lua lua-load /marathon-lb/getconfig.lua lua-load /marathon-lb/getmaps.lua + lua-load /marathon-lb/signalmlb.lua defaults load-server-state-from-file global log global @@ -91,6 +92,11 @@ def load(self): http-request use-service lua.getappmap if getappmap acl getconfig path /_haproxy_getconfig http-request use-service lua.getconfig if getconfig + + acl signalmlbhup path /_mlb_signal/hup + http-request use-service lua.signalmlbhup if signalmlbhup + acl signalmlbusr1 path /_mlb_signal/usr1 + http-request use-service lua.signalmlbusr1 if signalmlbusr1 ''', overridable=False, description='''\ diff --git a/signalmlb.lua b/signalmlb.lua new file mode 100644 index 00000000..0e0db641 --- /dev/null +++ b/signalmlb.lua @@ -0,0 +1,40 @@ +-- A simple Lua module for HAProxy that sends signals to the marathon-lb process + +function run(cmd) + local file = io.popen(cmd) + local output = file:read('*a') + local success, _, code = file:close() + return output, success, code +end + +function send_response(applet, code, response) + applet:set_status(code) + applet:add_header("content-length", string.len(response)) + applet:add_header("content-type", "text/plain") + applet:start_response() + applet:send(response) +end + +core.register_service("signalmlbhup", "http", function(applet) + local _, success, code = run("pkill -HUP -f '^python.*marathon_lb.py'") + if not success then + send_response(applet, 500, string.format( + "Failed to send SIGHUP signal to marathon-lb (exit code %d). Is \z + marathon-lb running in 'poll' mode?", code)) + return + end + + send_response(applet, 200, "Sent SIGHUP signal to marathon-lb") +end) + +core.register_service("signalmlbusr1", "http", function(applet) + local _, success, code = run("pkill -USR1 -f '^python.*marathon_lb.py'") + if not success then + send_response(applet, 500, string.format( + "Failed to send SIGUSR1 signal to marathon-lb (exit code %d). Is \z + marathon-lb running in 'poll' mode?", code)) + return + end + + send_response(applet, 200, "Sent SIGUSR1 signal to marathon-lb") +end) diff --git a/tests/test_marathon_lb.py b/tests/test_marathon_lb.py index 39a6bb30..1ba4fa0d 100644 --- a/tests/test_marathon_lb.py +++ b/tests/test_marathon_lb.py @@ -44,6 +44,7 @@ def setUp(self): lua-load /marathon-lb/getpids.lua lua-load /marathon-lb/getconfig.lua lua-load /marathon-lb/getmaps.lua + lua-load /marathon-lb/signalmlb.lua defaults load-server-state-from-file global log global @@ -75,6 +76,11 @@ def setUp(self): http-request use-service lua.getappmap if getappmap acl getconfig path /_haproxy_getconfig http-request use-service lua.getconfig if getconfig + + acl signalmlbhup path /_mlb_signal/hup + http-request use-service lua.signalmlbhup if signalmlbhup + acl signalmlbusr1 path /_mlb_signal/usr1 + http-request use-service lua.signalmlbusr1 if signalmlbusr1 ''' def test_config_no_apps(self): From 726580e62621b7b73e4893d91705ad1458ae4952 Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Thu, 22 Sep 2016 16:09:11 +0200 Subject: [PATCH 15/17] Write about new endpoints in README --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index e340e178..b635ba49 100644 --- a/README.md +++ b/README.md @@ -173,7 +173,10 @@ Marathon-lb exposes a few endpoints on port 9090 (by default). They are: | `:9090/_haproxy_getvhostmap` | Returns the HAProxy vhost to backend map. This endpoint returns HAProxy map file only when the `--haproxy-map` flag is enabled, it returns an empty string otherwise. Implemented in [`getmaps.lua`](getmaps.lua). | | `:9090/_haproxy_getappmap` | Returns the HAProxy app ID to backend map. Like `_haproxy_getvhostmap`, this requires the `--haproxy-map` flag to be enabled and returns an empty string otherwise. Also implemented in `getmaps.lua`. | | `:9090/_haproxy_getpids` | Returns the PIDs for all HAProxy instances within the current process namespace. This literally returns `$(pidof haproxy)`. Implemented in [`getpids.lua`](getpids.lua). This is also used by the [`zdd.py`](zdd.py) script to determine if connections have finished draining during a deploy. | +| `:9090/_mlb_signal/hup`* | Sends a `SIGHUP` signal to the marathon-lb process, causing it to fetch the running apps from Marathon and reload the HAProxy config as though an event was received from Marathon. | +| `:9090/_mlb_signal/usr1`* | Sends a `SIGUSR1` signal to the marathon-lb process, causing it to restart HAProxy with the existing config, without checking Marathon for changes. | +\* These endpoints won't function when marathon-lb is in `poll` mode as there is no marathon-lb process to be signaled in this mode (marathon-lb exits after each poll). ## HAProxy Configuration From cf17d86a59bbf5ab319e781a4fd6b9b221de5b9a Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Thu, 22 Sep 2016 16:11:35 +0200 Subject: [PATCH 16/17] Regenerate Longhelp.md --- Longhelp.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Longhelp.md b/Longhelp.md index 06af3f3e..a09e6077 100644 --- a/Longhelp.md +++ b/Longhelp.md @@ -382,6 +382,7 @@ global lua-load /marathon-lb/getpids.lua lua-load /marathon-lb/getconfig.lua lua-load /marathon-lb/getmaps.lua + lua-load /marathon-lb/signalmlb.lua defaults load-server-state-from-file global log global @@ -413,6 +414,11 @@ listen stats http-request use-service lua.getappmap if getappmap acl getconfig path /_haproxy_getconfig http-request use-service lua.getconfig if getconfig + + acl signalmlbhup path /_mlb_signal/hup + http-request use-service lua.signalmlbhup if signalmlbhup + acl signalmlbusr1 path /_mlb_signal/usr1 + http-request use-service lua.signalmlbusr1 if signalmlbusr1 ``` ## `HAPROXY_HTTPS_FRONTEND_ACL` *Overridable* From 108a7d6ee5a791562175088e4242b7d3019c6d97 Mon Sep 17 00:00:00 2001 From: Jamie Hewland Date: Tue, 27 Sep 2016 12:53:00 +0200 Subject: [PATCH 17/17] Add note to README about marathon-lb expecting to be in its own PID namespace --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 1b81ca66..75f1543c 100644 --- a/README.md +++ b/README.md @@ -268,6 +268,7 @@ are [documented here](Longhelp.md#templates). > < HTTP/1.1 200 OK ``` + * Some of the features of marathon-lb assume that it is the only instance of itself running in a PID namespace. i.e. marathon-lb assumes that it is running in a container. Certain features like the `/_mlb_signal` endpoints and the `/_haproxy_getpids` endpoint (and by extension, zero-downtime deployments) may behave unexpectedly if more than one instance of marathon-lb is running in the same PID namespace or if there are other HAProxy processes in the same PID namespace. ## Zero-downtime Deployments @@ -355,4 +356,4 @@ PRs are welcome, but here are a few general guidelines: ``` bash /path/to/marathon-lb/scripts/install-git-hooks.sh - ``` \ No newline at end of file + ```