Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

Commit

Permalink
Use PycURL for fetching event streams
Browse files Browse the repository at this point in the history
Earlier issues #35 and #114 made things better in this department. However,
when running marathon-lb with a large (400+ applications) marathon instance,
there are still problems.

These problems can be traced back to Python itself, unfortunately:
http://stackoverflow.com/questions/21797753/efficiently-reading-lines-from-compressed-chunked-http-stream-as-they-arrive

Python requests uses urllib under the covers, and there are implicit issues
when 7.5 MB of JSON comes back on a single line, as we're seeing when certain
events are emitted. These events are deployment_info and deployment_success at
a minimum, there may be more.

By switching to PycURL, as noted in the Stack Overflow post, we bypass this
whole issue. We use an HTTP library that handles this particular edge case
well, reducing CPU usage dramatically when a large event comes in. It also
handles gzip compression, which means any 7.5 MB JSON dumps should shrink
significantly.
  • Loading branch information
Dan McGee committed Jan 20, 2017
1 parent 3857b37 commit 65472e4
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 12 deletions.
4 changes: 4 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ FROM debian:stretch
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
iptables \
libcurl3 \
liblua5.3-0 \
libssl1.0.2 \
openssl \
procps \
python3 \
runit \
Expand Down Expand Up @@ -37,6 +40,7 @@ COPY requirements.txt /marathon-lb/
RUN set -x \
&& buildDeps=' \
gcc \
libcurl4-openssl-dev \
libffi-dev \
liblua5.3-dev \
libpcre3-dev \
Expand Down
15 changes: 3 additions & 12 deletions marathon_lb.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
set_marathon_auth_args, setup_logging)
from config import ConfigTemplater, label_keys
from lrucache import LRUCache
from utils import get_task_ip_and_ports, ServicePortAssigner, set_ip_cache
from utils import (CurlHttpEventStream, get_task_ip_and_ports,
ServicePortAssigner, set_ip_cache)


logger = logging.getLogger('marathon_lb')
Expand Down Expand Up @@ -238,17 +239,7 @@ def get_event_stream(self):
logger.info(
"SSE Active, trying fetch events from {0}".format(url))

headers = {
'Cache-Control': 'no-cache',
'Accept': 'text/event-stream'
}

resp = requests.get(url,
stream=True,
headers=headers,
timeout=(3.05, 46),
auth=self.__auth,
verify=self.__verify)
resp = CurlHttpEventStream(url, self.__auth, self.__verify)

class Event(object):
def __init__(self, data):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
cryptography
PyJWT==1.4.0
pycurl
python-dateutil
requests
six
93 changes: 93 additions & 0 deletions utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#!/usr/bin/env python3

import hashlib
from io import BytesIO
import logging
import socket

import pycurl

from lrucache import LRUCache

logger = logging.getLogger('utils')
Expand Down Expand Up @@ -142,6 +145,96 @@ def get_service_ports(self, app):
return ports


class CurlHttpEventStream(object):
def __init__(self, url, auth, verify):
self.url = url
self.received_buffer = BytesIO()

self.curl = pycurl.Curl()
self.curl.setopt(pycurl.URL, url)
self.curl.setopt(pycurl.HTTPHEADER, ['Cache-Control: no-cache', 'Accept: text/event-stream'])
self.curl.setopt(pycurl.ENCODING, 'gzip')
self.curl.setopt(pycurl.CONNECTTIMEOUT, 5)
self.curl.setopt(pycurl.WRITEDATA, self.received_buffer)
if auth:
self.curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
self.curl.setopt(pycurl.USERPWD, '%s:%s' % auth)
if verify:
self.curl.setopt(pycurl.CA_INFO, verify)

self.curlmulti = pycurl.CurlMulti()
self.curlmulti.add_handle(self.curl)

self.status_code = 0

SELECT_TIMEOUT = 10

def _any_data_received(self):
return self.received_buffer.tell() != 0

def _get_received_data(self):
result = self.received_buffer.getvalue()
self.received_buffer.truncate(0)
self.received_buffer.seek(0)
return result

def _check_status_code(self):
if self.status_code == 0:
self.status_code = self.curl.getinfo(pycurl.HTTP_CODE)
if self.status_code != 0 and self.status_code != 200:
raise Exception(str(self.status_code) + ' ' + self.url)

def _perform_on_curl(self):
while True:
ret, num_handles = self.curlmulti.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
return num_handles

def _iter_chunks(self):
while True:
remaining = self._perform_on_curl()
if self._any_data_received():
self._check_status_code()
yield self._get_received_data()
if remaining == 0:
break
self.curlmulti.select(self.SELECT_TIMEOUT)

self._check_status_code()
self._check_curl_errors()

def _check_curl_errors(self):
for f in self.curlmulti.info_read()[2]:
raise pycurl.error(*f[1:])

def iter_lines(self):
chunks = self._iter_chunks()
return self._split_lines_from_chunks(chunks)

@staticmethod
def _split_lines_from_chunks(chunks):
#same behaviour as requests' Response.iter_lines(...)

pending = None
for chunk in chunks:

if pending is not None:
chunk = pending + chunk
lines = chunk.splitlines()

if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
pending = lines.pop()
else:
pending = None

for line in lines:
yield line

if pending is not None:
yield pending


def resolve_ip(host):
cached_ip = ip_cache.get(host, None)
if cached_ip:
Expand Down

0 comments on commit 65472e4

Please sign in to comment.