Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

Commit

Permalink
Fix return type of get_app_port_mappings (#494)
Browse files Browse the repository at this point in the history
* Fix return type of get_app_port_mappings
* Remove None from util.py helpers (#497)
* Fix ip cache usage, add cleanup_json
  • Loading branch information
drewkerrigan authored Sep 15, 2017
1 parent b6c75dc commit 0a5906f
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 48 deletions.
2 changes: 1 addition & 1 deletion marathon_lb.py
Original file line number Diff line number Diff line change
Expand Up @@ -1412,7 +1412,7 @@ def get_apps(marathon):
continue

task_ip, task_ports = get_task_ip_and_ports(app, task)
if not task_ip:
if task_ip is None:
logger.warning("Task has no resolvable IP address - skip")
continue

Expand Down
57 changes: 56 additions & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from mock import Mock, patch

from common import cleanup_json

import utils
from utils import ServicePortAssigner

Expand Down Expand Up @@ -363,6 +365,32 @@ def test_ip_per_task_marathon15(self):
self.assertEquals(self.assigner.get_service_ports(app),
[10000, 10001])

def test_ip_per_task_portMappings_empty(self):
app = {
'ipAddress': {
'networkName': 'testnet',
'discovery': {
'ports': []
}
},
'container': {
'type': 'DOCKER',
'docker': {
'network': 'USER',
'portMappings': [],
}
},
'tasks': [
{
'id': 'testtaskid',
'ipAddresses': [{'ipAddress': '1.2.3.4'}],
'ports': [],
'host': '4.3.2.1'
}
]
}
self.assertEquals(self.assigner.get_service_ports(app), [])

def test_ip_per_task_portMappings_null(self):
app = {
'ipAddress': {},
Expand All @@ -386,9 +414,36 @@ def test_ip_per_task_portMappings_null(self):
},
],
}
self.assertEquals(self.assigner.get_service_ports(app),
# Calling cleanup_json because all entrypoints to get_service_ports
# also call cleanup_json, so None isn't expected at runtime
self.assertEquals(self.assigner.get_service_ports(cleanup_json(app)),
[10000, 10001])

def test_ip_per_task_portMappings_null_marathon15(self):
app = {
'container': {
'type': 'DOCKER',
'docker': {
'image': 'nginx'
},
'portMappings': None
},
'networks': [
{
'mode': 'container',
'name': 'dcos'
}
],
'tasks': [{
"id": "testtaskid",
"ipAddresses": [{"ipAddress": "1.2.3.4"}]
}],
}
# Calling cleanup_json because all entrypoints to get_service_ports
# also call cleanup_json, so None isn't expected at runtime
self.assertEquals(self.assigner.get_service_ports(cleanup_json(app)),
[])


def _get_app(idx=1, num_ports=3, num_tasks=1, ip_per_task=True,
inc_service_ports=False):
Expand Down
102 changes: 58 additions & 44 deletions utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def get_service_ports(self, app):
if mode == "container" or mode == "container/bridge":
# Here we must use portMappings
portMappings = get_app_port_mappings(app)
if portMappings:
if len(portMappings) > 0:
ports = filter(lambda p: p is not None,
map(lambda p: p.get('servicePort', None),
portMappings))
Expand All @@ -137,8 +137,8 @@ def get_service_ports(self, app):
if not ports and mode == "container" and self.can_assign \
and len(app['tasks']) > 0:
task = app['tasks'][0]
_, task_ports = get_task_ip_and_ports(app, task)
if task_ports is not None:
task_ports = get_app_task_ports(app, task, mode)
if len(task_ports) > 0:
ports = [self._get_service_port(app, task_port)
for task_port in task_ports]
logger.debug("Service ports: %r", ports)
Expand Down Expand Up @@ -257,8 +257,11 @@ def _split_lines_from_chunks(chunks):


def resolve_ip(host):
cached_ip = ip_cache.get().get(host, None)
if cached_ip:
"""
:return: string, an empty string indicates that no ip was found.
"""
cached_ip = ip_cache.get().get(host, "")
if cached_ip != "":
return cached_ip
else:
try:
Expand All @@ -267,7 +270,7 @@ def resolve_ip(host):
ip_cache.get().set(host, ip)
return ip
except socket.gaierror:
return None
return ""


class LRUCacheSingleton(object):
Expand Down Expand Up @@ -309,95 +312,105 @@ def get_app_networking_mode(app):


def get_task_ip(task, mode):
"""
:return: string, an empty string indicates that no ip was found.
"""
if mode == 'container':
task_ip_addresses = task.get('ipAddresses', [])
if not task_ip_addresses:
if len(task_ip_addresses) == 0:
logger.warning("Task %s does not yet have an ip address allocated",
task['id'])
return None
task_ip = task_ip_addresses[0].get('ipAddress')
if not task_ip:
return ""
task_ip = task_ip_addresses[0].get('ipAddress', "")
if task_ip == "":
logger.warning("Task %s does not yet have an ip address allocated",
task['id'])
return None
return ""
return task_ip
else:
host = task.get('host')
if not host:
host = task.get('host', "")
if host == "":
logger.warning("Could not find task host, ignoring")
return ""
task_ip = resolve_ip(host)
if not task_ip:
if task_ip == "":
logger.warning("Could not resolve ip for host %s, ignoring",
host)
return ""
return task_ip


def get_app_port_mappings(app):
"""
:return: list
"""
portMappings = app.get('container', {})\
.get('docker', {})\
.get('portMappings')
if portMappings:
.get('portMappings', [])
if len(portMappings) > 0:
return portMappings

portMappings = app.get('container', {})\
.get('portMappings')
return portMappings
return app.get('container', {})\
.get('portMappings', [])


def get_task_ports(task):
return task.get('ports')
"""
:return: list
"""
return task.get('ports', [])


def get_port_definition_ports(app):
"""
:return: list
"""
port_definitions = app.get('portDefinitions', [])
task_ports = [p['port']
for p in port_definitions
if 'port' in p]
if len(task_ports) == 0:
return None
return task_ports
return [p['port'] for p in port_definitions if 'port' in p]


def get_ip_address_discovery_ports(app):
"""
:return: list
"""
ip_address = app.get('ipAddress', {})
if ip_address:
discovery = app.get('ipAddress', {}).get('discovery', {})
task_ports = [int(p['number'])
for p in discovery.get('ports', [])
if 'number' in p]
if len(task_ports) > 0:
return task_ports
return None
if len(ip_address) == 0:
return []
discovery = app.get('ipAddress', {}).get('discovery', {})
return [int(p['number'])
for p in discovery.get('ports', [])
if 'number' in p]


def get_port_mapping_ports(app):
"""
:return: list
"""
port_mappings = get_app_port_mappings(app)
task_ports = [p['containerPort']
for p in port_mappings
if 'containerPort' in p]
if len(task_ports) == 0:
return None
return task_ports
return [p['containerPort'] for p in port_mappings if 'containerPort' in p]


def get_app_task_ports(app, task, mode):
"""
:return: list
"""
if mode == 'host':
task_ports = get_task_ports(task)
if task_ports:
if len(task_ports) > 0:
return task_ports
return get_port_definition_ports(app)
elif mode == 'container/bridge':
task_ports = get_task_ports(task)
if task_ports:
if len(task_ports) > 0:
return task_ports
# Will only work for Marathon < 1.5
task_ports = get_port_definition_ports(app)
if task_ports:
if len(task_ports) > 0:
return task_ports
return get_port_mapping_ports(app)
else:
task_ports = get_ip_address_discovery_ports(app)
if task_ports:
if len(task_ports) > 0:
return task_ports
return get_port_mapping_ports(app)

Expand All @@ -417,6 +430,7 @@ def get_task_ip_and_ports(app, task):
mode = get_app_networking_mode(app)
task_ip = get_task_ip(task, mode)
task_ports = get_app_task_ports(app, task, mode)
# The overloading of empty string, and empty list as False is intentional.
if not (task_ip and task_ports):
return None, None
logger.debug("Returning: %r, %r", task_ip, task_ports)
Expand Down
4 changes: 2 additions & 2 deletions zdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def select_drained_listeners(listeners):

def get_svnames_from_task(app, task):
prefix = task['host'].replace('.', '_')
task_ip, task_port = get_task_ip_and_ports(app, task)
task_ip, _ = get_task_ip_and_ports(app, task)
if task['host'] == task_ip:
for port in task['ports']:
yield('{}_{}'.format(prefix, port))
Expand Down Expand Up @@ -645,7 +645,7 @@ def prepare_deploy(args, previous_deploys, app):

def load_app_json(args):
with open(args.json) as content_file:
return json.load(content_file)
return cleanup_json(json.load(content_file))


def safe_resume_deploy(args, previous_deploys):
Expand Down

0 comments on commit 0a5906f

Please sign in to comment.