Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

#594 Follow redirects from non-leaders in Marathon 1.7 #595

Merged
merged 5 commits into from
Aug 3, 2018
Merged

#594 Follow redirects from non-leaders in Marathon 1.7 #595

merged 5 commits into from
Aug 3, 2018

Conversation

paambaati
Copy link
Contributor

This PR fixes #594. We can land this once d2iq-archive/marathon#6211 lands.

CC: @timcharper

paambaati added 2 commits May 10, 2018 09:30
…thon 1.7.

Marathon 1.7.x returns a 302 redirect response from a non-leader. So we make sure Marathon-lb can follow it to the leader.

Related Marathon JIRA card → https://jira.mesosphere.com/projects/MARATHON/issues/MARATHON-7178?filter=allopenissues

Related Marathon feature branch → https://github.com/mesosphere/marathon/compare/inc/events-redirect
@mesosphere-ci
Copy link

Can one of the admins verify this patch?

3 similar comments
@mesosphere-ci
Copy link

Can one of the admins verify this patch?

@mesosphere-ci
Copy link

Can one of the admins verify this patch?

@mesosphere-ci
Copy link

Can one of the admins verify this patch?

@timcharper
Copy link
Contributor

Thank you @paambaati !

@timcharper timcharper merged commit 797b158 into d2iq-archive:master Aug 3, 2018
@paambaati paambaati deleted the marathon-1.7-support branch August 3, 2018 13:48
@eLvErDe
Copy link
Contributor

eLvErDe commented Apr 3, 2019

Hello,

Can you please re-open ?

It's not working properly: if authorization is enable it returns 401 with this patch, 302 otherwise

@paambaati
Copy link
Contributor Author

@eLvErDe A 401 just means that the credentials are invalid. Are you certain you’re using the correct credentials?

@eLvErDe
Copy link
Contributor

eLvErDe commented Apr 3, 2019

@paambaati
No it's not a credential issue, it's pycurl not re-sending credentials on 302 code.

See: #632 for a "suggestion"
But I don't understand this code at all, it's so complex for nothing... This should be re-written using an async framework, like aiohttp...

For the record, here's my implementation using aiohttp:

"""
Listen to Marathon's SSE event bus, push message
to queue and process them with a queue worker
"""


# pylint: disable=line-too-long


import logging
import asyncio
import itertools
import random
import re
import json

import aiohttp

from consumer import SseEventConsumer


class SseEventListener(object):  # pylint: disable=too-many-instance-attributes
    """
    Listen to Marathon's SSE event bus, push message
    to queue and process them with a queue worker
    """


    def __init__(self, urls, registry_api_url, registry_api_key, username=None, password=None, timeout=15):  # pylint: disable=too-many-arguments

        assert isinstance(urls, list) and urls, 'urls must be list of non-empty string'
        assert isinstance(registry_api_url, str) and registry_api_url, 'registry_api_url must be a non-empty string'
        assert isinstance(registry_api_key, str) and registry_api_key, 'registry_api_key must be a non-empty string'
        for url in urls:
            assert isinstance(url, str) and url, 'urls must be list of non-empty string'
        if username is not None or password is not None:
            assert isinstance(username, str) and username, 'username and passord must be a non-empty strings o both None'
            assert isinstance(password, str) and password, 'password and passord must be a non-empty strings o both None'

        # Turn host list into a randomize never ending list
        # Access later with url property
        random.shuffle(urls)
        self.url_cycle = itertools.cycle(urls)
        self.active_url = None

        self.username = username
        self.password = password

        self.headers = {'Accept': 'text/event-stream'}
        self.auth = aiohttp.BasicAuth(self.username, self.password) if self.username else None

        self.logger = logging.getLogger(self.__class__.__name__)
        self.stopping = False
        self.timeout = timeout  # default value 15s, marathon seems to send empty line every 10

        self.sse_event = None
        self.sse_line_pattern = re.compile('(?P<name>[^:]*):?( ?(?P<value>.*))?')

        self.sse_event_queue = asyncio.Queue()
        self.sse_event_consumer_coro = None
        self.sse_event_consumer_task = SseEventConsumer(
            self.sse_event_queue,
            registry_api_url,
            registry_api_key,
        ).consume_forever()


    @property
    def url(self):
        """
        Property returning another url from a never ending shuffle list
        Also set self.active_url for logging purpose (ie)
        """

        self.active_url = self.url_cycle.__next__()
        return self.active_url


    def handle_sse_line(self, line):
        """
        Receive each SSE line and decide what to do
        """

        try:

            self.logger.debug('New message: %r', line)

            line = str(line, 'utf-8').strip()
            if not line:
                return

            sse_match = self.sse_line_pattern.match(line)

            if not sse_match:
                return

            name = sse_match.group('name')
            value = sse_match.group('value')

            if name == 'event':
                self.sse_event = value
            elif name == 'data':
                value = json.loads(sse_match.group('value'))
                self.logger.debug('SSE message %r with data %r', self.sse_event, value)
                if self.sse_event is not None:
                    self.sse_event_queue.put_nowait((self.sse_event, value))
                else:
                    self.logger.warning('Got a data message without an event one before, discarding')
            else:
                self.logger.warning('Unknown SSE message type %r', name)

        except Exception as exc:  # pylint: disable=broad-except
            self.logger.exception('SSE line handler crashed: %s: %s', exc.__class__.__name__, exc)


    async def listen_forever(self):
        """
        Listen SSE bus forever and switch server on error
        Read message and push them to self.
        """

        self.sse_event_consumer_coro = asyncio.ensure_future(self.sse_event_consumer_task)

        while not self.stopping:

            try:

                async with aiohttp.ClientSession(read_timeout=0) as session:
                    async with session.get(self.url, auth=self.auth, headers=self.headers) as response:
                        self.logger.info('Connecting to %s', self.active_url)
                        # Insane shit to override aiohttp.streams.DEFAULT_LIMIT
                        # It seems Marathon sends insanely long message with all apps definition sometimes
                        stream_reader = response.content
                        stream_reader._limit = 2 ** 20  # pylint: disable=protected-access
                        while not self.stopping:
                            # Replace async for so I can add a timeout on readline() here
                            # break patterns come from aiohttp code in streams.py
                            try:
                                line = await asyncio.wait_for(stream_reader.readline(), timeout=self.timeout)
                            except aiohttp.streams.EofStream:
                                self.logger.info('Received EofStream, server is closing')
                                break
                                # aiohttp issue I guess when closing
                            except AssertionError as exc:
                                if str(exc) == """yield from wasn't used with future""":
                                    self.logger.info('Aiohttp AssertionError catched  when closing, looks like a bug')
                                    break
                                raise
                            except asyncio.TimeoutError:
                                self.logger.error('Aiohttp readline() timed out after %d seconds', self.timeout)
                                break
                            if line == b'':
                                self.logger.info('Received empty line, server is closing')
                                break
                            self.handle_sse_line(line)

            except asyncio.CancelledError:
                pass

            except Exception as exc:  # pylint: disable=broad-except
                self.logger.exception('SSE bus listen crashed: %s: %s', exc.__class__.__name__, exc)

            finally:
                if not self.stopping:
                    self.sse_event = None
                    self.logger.info('Sleeping 1s before reconnecting to next host')
                    await asyncio.sleep(1)

        self.logger.info('Closing')


    def close(self):
        """ Stop processing """

        self.stopping = True
        self.sse_event_consumer_coro.cancel()

@paambaati
Copy link
Contributor Author

@eLvErDe Ah, thanks for the detailed info. A quick fix would be to include —

self.curl.setopt(pycurl.UNRESTRICTED_AUTH, True)

Can you test this? I’m not near a computer for a few days, so if you can confirm that this works, I can send a PR as soon as I’m available.

@paambaati
Copy link
Contributor Author

Oh, I see that @jkoelker already has #633 with this fix!

@eLvErDe Can you test that branch out?

@eLvErDe
Copy link
Contributor

eLvErDe commented Apr 3, 2019

Yup, that seem to work correctly :-)

cybernaut613 pushed a commit to inbaytech/marathon-lb that referenced this pull request Apr 23, 2020
Marathon 1.7.x returns a 302 redirect response from a non-leader. So we make sure Marathon-lb can follow it to the leader.

Related Marathon JIRA card: https://jira.mesosphere.com/projects/MARATHON/issues/MARATHON-7178

Fixes d2iq-archive#594
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Follow standby redirects
4 participants