diff --git a/Dockerfile b/Dockerfile index 230c0f36..80e56fe1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,7 +4,10 @@ FROM debian:stretch RUN apt-get update && apt-get install -y --no-install-recommends \ ca-certificates \ iptables \ + libcurl3 \ liblua5.3-0 \ + libssl1.0.2 \ + openssl \ procps \ python3 \ runit \ @@ -37,6 +40,7 @@ COPY requirements.txt /marathon-lb/ RUN set -x \ && buildDeps=' \ gcc \ + libcurl4-openssl-dev \ libffi-dev \ liblua5.3-dev \ libpcre3-dev \ diff --git a/marathon_lb.py b/marathon_lb.py index 015c502b..51c8bba3 100755 --- a/marathon_lb.py +++ b/marathon_lb.py @@ -50,7 +50,8 @@ set_marathon_auth_args, setup_logging) from config import ConfigTemplater, label_keys from lrucache import LRUCache -from utils import get_task_ip_and_ports, ServicePortAssigner, set_ip_cache +from utils import (CurlHttpEventStream, get_task_ip_and_ports, + ServicePortAssigner, set_ip_cache) logger = logging.getLogger('marathon_lb') @@ -238,17 +239,7 @@ def get_event_stream(self): logger.info( "SSE Active, trying fetch events from {0}".format(url)) - headers = { - 'Cache-Control': 'no-cache', - 'Accept': 'text/event-stream' - } - - resp = requests.get(url, - stream=True, - headers=headers, - timeout=(3.05, 46), - auth=self.__auth, - verify=self.__verify) + resp = CurlHttpEventStream(url, self.__auth, self.__verify) class Event(object): def __init__(self, data): diff --git a/requirements.txt b/requirements.txt index 0dd54028..c632941f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ cryptography PyJWT==1.4.0 +pycurl python-dateutil requests six diff --git a/utils.py b/utils.py index b54a3985..4f993561 100644 --- a/utils.py +++ b/utils.py @@ -1,9 +1,12 @@ #!/usr/bin/env python3 import hashlib +from io import BytesIO import logging import socket +import pycurl + from lrucache import LRUCache logger = logging.getLogger('utils') @@ -142,6 +145,96 @@ def get_service_ports(self, app): return ports +class CurlHttpEventStream(object): + def __init__(self, url, auth, verify): + self.url = url + self.received_buffer = BytesIO() + + self.curl = pycurl.Curl() + self.curl.setopt(pycurl.URL, url) + self.curl.setopt(pycurl.HTTPHEADER, ['Cache-Control: no-cache', 'Accept: text/event-stream']) + self.curl.setopt(pycurl.ENCODING, 'gzip') + self.curl.setopt(pycurl.CONNECTTIMEOUT, 5) + self.curl.setopt(pycurl.WRITEDATA, self.received_buffer) + if auth: + self.curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC) + self.curl.setopt(pycurl.USERPWD, '%s:%s' % auth) + if verify: + self.curl.setopt(pycurl.CA_INFO, verify) + + self.curlmulti = pycurl.CurlMulti() + self.curlmulti.add_handle(self.curl) + + self.status_code = 0 + + SELECT_TIMEOUT = 10 + + def _any_data_received(self): + return self.received_buffer.tell() != 0 + + def _get_received_data(self): + result = self.received_buffer.getvalue() + self.received_buffer.truncate(0) + self.received_buffer.seek(0) + return result + + def _check_status_code(self): + if self.status_code == 0: + self.status_code = self.curl.getinfo(pycurl.HTTP_CODE) + if self.status_code != 0 and self.status_code != 200: + raise Exception(str(self.status_code) + ' ' + self.url) + + def _perform_on_curl(self): + while True: + ret, num_handles = self.curlmulti.perform() + if ret != pycurl.E_CALL_MULTI_PERFORM: + break + return num_handles + + def _iter_chunks(self): + while True: + remaining = self._perform_on_curl() + if self._any_data_received(): + self._check_status_code() + yield self._get_received_data() + if remaining == 0: + break + self.curlmulti.select(self.SELECT_TIMEOUT) + + self._check_status_code() + self._check_curl_errors() + + def _check_curl_errors(self): + for f in self.curlmulti.info_read()[2]: + raise pycurl.error(*f[1:]) + + def iter_lines(self): + chunks = self._iter_chunks() + return self._split_lines_from_chunks(chunks) + + @staticmethod + def _split_lines_from_chunks(chunks): + #same behaviour as requests' Response.iter_lines(...) + + pending = None + for chunk in chunks: + + if pending is not None: + chunk = pending + chunk + lines = chunk.splitlines() + + if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]: + pending = lines.pop() + else: + pending = None + + for line in lines: + yield line + + if pending is not None: + yield pending + + def resolve_ip(host): cached_ip = ip_cache.get(host, None) if cached_ip: