Skip to content

Commit

Permalink
Use PycURL for fetching event streams
Browse files Browse the repository at this point in the history
Earlier issues d2iq-archive#35 and d2iq-archive#114 made things better in this department. However,
when running marathon-lb with a large (400+ applications) marathon instance,
there are still problems.

These problems can be traced back to Python itself, unfortunately:
http://stackoverflow.com/questions/21797753/efficiently-reading-lines-from-compressed-chunked-http-stream-as-they-arrive

Python requests uses urllib under the covers, and there are implicit issues
when 7.5 MB of JSON comes back on a single line, as we're seeing when certain
events are emitted. These events are deployment_info and deployment_success at
a minimum, there may be more.

By switching to PycURL, as noted in the Stack Overflow post, we bypass this
whole issue. We use an HTTP library that handles this particular edge case
well, reducing CPU usage dramatically when a large event comes in. It also
handles gzip compression, which means any 7.5 MB JSON dumps should shrink
significantly.

One unsolved problem remains here: the addition of DC/OS authentication support
in d2iq-archive#285 is extremely tightly coupled to internal implementation details of the
python requests module. This simply won't work with this code, and I have zero
ability to fix or test it as we don't use DC/OS.
  • Loading branch information
Dan McGee committed Dec 14, 2016
1 parent d2d8b5c commit a628b63
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 12 deletions.
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ FROM debian:stretch
# runtime dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
iptables \
libcurl3 \
openssl \
procps \
python3 \
Expand All @@ -17,6 +18,7 @@ RUN set -x \
&& buildDeps=' \
gcc \
libc6-dev \
libcurl4-openssl-dev \
libffi-dev \
libpcre3-dev \
libreadline-dev \
Expand Down
15 changes: 3 additions & 12 deletions marathon_lb.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
set_marathon_auth_args, setup_logging)
from config import ConfigTemplater, label_keys
from lrucache import LRUCache
from utils import get_task_ip_and_ports, ServicePortAssigner, set_ip_cache
from utils import (CurlHttpEventStream, get_task_ip_and_ports,
ServicePortAssigner, set_ip_cache)


logger = logging.getLogger('marathon_lb')
Expand Down Expand Up @@ -238,17 +239,7 @@ def get_event_stream(self):
logger.info(
"SSE Active, trying fetch events from {0}".format(url))

headers = {
'Cache-Control': 'no-cache',
'Accept': 'text/event-stream'
}

resp = requests.get(url,
stream=True,
headers=headers,
timeout=(3.05, 46),
auth=self.__auth,
verify=self.__verify)
resp = CurlHttpEventStream(url, self.__auth, self.__verify)

class Event(object):
def __init__(self, data):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
cryptography
PyJWT==1.4.0
pycurl
python-dateutil
requests
six
93 changes: 93 additions & 0 deletions utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#!/usr/bin/env python3

import hashlib
from io import BytesIO
import logging
import socket

import pycurl

from lrucache import LRUCache

logger = logging.getLogger('utils')
Expand Down Expand Up @@ -142,6 +145,96 @@ def get_service_ports(self, app):
return ports


class CurlHttpEventStream(object):
def __init__(self, url, auth, verify):
self.url = url
self.received_buffer = BytesIO()

self.curl = pycurl.Curl()
self.curl.setopt(pycurl.URL, url)
self.curl.setopt(pycurl.HTTPHEADER, ['Cache-Control: no-cache', 'Accept: text/event-stream'])
self.curl.setopt(pycurl.ENCODING, 'gzip')
self.curl.setopt(pycurl.CONNECTTIMEOUT, 5)
self.curl.setopt(pycurl.WRITEDATA, self.received_buffer)
if auth:
self.curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
self.curl.setopt(pycurl.USERPWD, '%s:%s' % auth)
if verify:
self.curl.setopt(pycurl.CA_INFO, verify)

self.curlmulti = pycurl.CurlMulti()
self.curlmulti.add_handle(self.curl)

self.status_code = 0

SELECT_TIMEOUT = 10

def _any_data_received(self):
return self.received_buffer.tell() != 0

def _get_received_data(self):
result = self.received_buffer.getvalue()
self.received_buffer.truncate(0)
self.received_buffer.seek(0)
return result

def _check_status_code(self):
if self.status_code == 0:
self.status_code = self.curl.getinfo(pycurl.HTTP_CODE)
if self.status_code != 0 and self.status_code != 200:
raise Exception(str(self.status_code) + ' ' + self.url)

def _perform_on_curl(self):
while True:
ret, num_handles = self.curlmulti.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
return num_handles

def _iter_chunks(self):
while True:
remaining = self._perform_on_curl()
if self._any_data_received():
self._check_status_code()
yield self._get_received_data()
if remaining == 0:
break
self.curlmulti.select(self.SELECT_TIMEOUT)

self._check_status_code()
self._check_curl_errors()

def _check_curl_errors(self):
for f in self.curlmulti.info_read()[2]:
raise pycurl.error(*f[1:])

def iter_lines(self):
chunks = self._iter_chunks()
return self._split_lines_from_chunks(chunks)

@staticmethod
def _split_lines_from_chunks(chunks):
#same behaviour as requests' Response.iter_lines(...)

pending = None
for chunk in chunks:

if pending is not None:
chunk = pending + chunk
lines = chunk.splitlines()

if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
pending = lines.pop()
else:
pending = None

for line in lines:
yield line

if pending is not None:
yield pending


def resolve_ip(host):
cached_ip = ip_cache.get(host, None)
if cached_ip:
Expand Down

0 comments on commit a628b63

Please sign in to comment.