Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] GIE Golang Proxy and Docker Compose Support #852

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 103 additions & 4 deletions lib/galaxy/web/base/interactive_environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
from galaxy.managers import api_keys
from galaxy.tools.deps.docker_util import DockerVolume

from galaxy import eggs
eggs.require( "Mako" )
from mako.template import Template

import logging
log = logging.getLogger(__name__)

Expand Down Expand Up @@ -74,15 +78,18 @@ def load_deploy_config(self, default_dict={}):
# will need to be recorded here. The ConfigParser doesn't provide a
# .get() that will ignore missing sections, so we must make use of
# their defaults dictionary instead.
default_dict = {
builtin_defaults = {
'command': 'docker {docker_args}',
'compose_command': 'docker-compose up -d',
'command_inject': '--sig-proxy=true -e DEBUG=false',
'docker_hostname': 'localhost',
'wx_tempdir': 'False',
'docker_galaxy_temp_dir': None
}
viz_config = ConfigParser.SafeConfigParser(default_dict)
builtin_defaults.update(default_dict)
viz_config = ConfigParser.SafeConfigParser(builtin_defaults)
conf_path = os.path.join( self.attr.our_config_dir, self.attr.viz_id + ".ini" )
log.debug("Reading GIE configuration from %s", conf_path)
if not os.path.exists( conf_path ):
conf_path = "%s.sample" % conf_path
viz_config.read( conf_path )
Expand Down Expand Up @@ -205,6 +212,48 @@ def docker_cmd(self, env_override={}, volumes=[]):
)
return command

def docker_compose_cmd(self, env_override={}, volumes=[]):
"""Generate and return the docker-compose command to execute.
"""
# Get environment stuff, this will go into the mako template of a docker-compose.yml
env = self.get_conf_dict()
env.update(env_override)

clean_env = {}
for (key, value) in env.items():
clean_env[key.upper()] = value
# volume_str = ' '.join(['-v "%s"' % volume for volume in volumes])
# TODO: this works very poorly. What if we want to mount N volumes? Or
# mount them with non-keys or something? It's not friendly
volume_keyed = {key: volume_path for (key, volume_path) in volumes}

# Get our template docker-compose.yml file.
compose_template = os.path.join(self.attr.our_config_dir, "docker-compose.yml.mako")
compose_output_path = os.path.join(self.temp_dir, "docker-compose.yml")
with open(compose_output_path, 'w') as output_handle, open(compose_template, 'r') as input_handle:
output_handle.write(
Template(input_handle.read()).render(
env=clean_env,
volumes=volume_keyed,
)
)
# This is the basic docker command such as "cd /tmp/dir && sudo -u docker docker-compose {docker_args}"
# or just "cd /tmp/dir && docker-compose {docker_args}"
command = ('cd %s && ' % self.temp_dir) + self.attr.viz_config.get("docker", "compose_command")
# Then we format in the entire docker command in place of
# {docker_args}, so as to let the admin not worry about which args are
# getting passed
#
# Additionally we don't specify which docker-compose.yml file it is as
# we want it to fail horribly if something is wrong. (TODO: this
# comment should be removed once debugging is done.)
command = command.format(docker_args='up -d')
# Once that's available, we format again with all of our arguments
command = command.format(
compose_file=compose_output_path
)
return command

def launch(self, raw_cmd=None, env_override={}, volumes=[]):
if raw_cmd is None:
raw_cmd = self.docker_cmd(env_override=env_override, volumes=volumes)
Expand All @@ -218,8 +267,9 @@ def launch(self, raw_cmd=None, env_override={}, volumes=[]):
log.error( "%s\n%s" % (stdout, stderr) )
return None
else:
log.debug( "Container id: %s" % stdout)
port_mappings = self.get_proxied_ports(stdout)
container_id = stdout.strip()
log.debug( "Container id: %s" % container_id)
port_mappings = self.get_proxied_ports(container_id)
if len(port_mappings) > 1:
log.warning("Don't know how to handle proxies to containers with multiple exposed ports. Arbitrarily choosing first")
elif len(port_mappings) == 0:
Expand All @@ -235,6 +285,8 @@ def launch(self, raw_cmd=None, env_override={}, volumes=[]):
host=self.attr.docker_hostname,
port=host_port,
proxy_prefix=self.attr.proxy_prefix,
route_name='/%s/' % self.attr.viz_id,
container_ids=[container_id],
)
# These variables then become available for use in templating URLs
self.attr.proxy_url = self.attr.proxy_request[ 'proxy_url' ]
Expand All @@ -247,6 +299,53 @@ def launch(self, raw_cmd=None, env_override={}, volumes=[]):
# go through the proxy we ship.
# self.attr.PORT = self.attr.proxy_request[ 'proxied_port' ]

def launch_multi(self, raw_cmd=None, env_override={}, volumes=[]):
if raw_cmd is None:
raw_cmd = self.docker_compose_cmd(env_override=env_override, volumes=volumes)
log.info("Starting docker-compose container(s) for IE {0} with command [{1}]".format(
self.attr.viz_id,
raw_cmd
))
p = Popen( raw_cmd, stdout=PIPE, stderr=PIPE, close_fds=True, shell=True)
stdout, stderr = p.communicate()
if p.returncode != 0:
log.error("===\n%s\n===\n%s\n===" % (stdout, stderr))
return None
else:
# Container ID is NOT available, so we have to do this another way.
# We make a hard requirement that a single service be named
# "external" and listen on port 8080. TODO: make this nicer for people.
port_cmd = "cd %s && docker-compose port external 80" % self.temp_dir
find_ports = Popen(port_cmd, stdout=PIPE, stderr=PIPE, close_fds=True, shell=True)
stdout, stderr = find_ports.communicate()
if find_ports.returncode != 0:
log.error("===\n%s\n===\n%s\n===" % (stdout, stderr))
else:
# Watch this fail horrifically for anyone on IPv6
(proxy_host, proxy_port) = stdout.strip().split(':')

container_id_cmd = "cd %s && docker-compose ps -q" % self.temp_dir
find_ids = Popen(container_id_cmd, stdout=PIPE, stderr=PIPE, close_fds=True, shell=True)
stdout, stderr = find_ids.communicate()
if find_ids.returncode != 0:
log.error("===\n%s\n===\n%s\n===" % (stdout, stderr))
else:
# Pick out and strip container IDs
ids = [container_id.strip() for container_id in stdout.strip().split('\n')]

# Now we configure our proxy_requst object and we manually specify
# the port to map to and ensure the proxy is available.
self.attr.proxy_request = self.trans.app.proxy_manager.setup_proxy(
self.trans,
host=self.attr.docker_hostname,
port=proxy_port,
proxy_prefix=self.attr.proxy_prefix,
route_name='/%s/' % self.attr.viz_id,
container_ids=ids,
)
# These variables then become available for use in templating URLs
self.attr.proxy_url = self.attr.proxy_request[ 'proxy_url' ]

def get_proxied_ports(self, container_id):
"""Run docker inspect on a container to figure out which ports were
mapped where.
Expand Down
130 changes: 48 additions & 82 deletions lib/galaxy/web/proxy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import logging
import os
import json
import urllib2
import time

from .filelock import FileLock
from galaxy.util import sockets
from galaxy.util.lazy_process import LazyProcess, NoOpLazyProcess
from galaxy.util import sqlite

log = logging.getLogger( __name__ )


DEFAULT_PROXY_TO_HOST = "localhost"
SECURE_COOKIE = "galaxysession"
API_KEY = "test"


class ProxyManager(object):
Expand All @@ -26,19 +26,21 @@ def __init__( self, config ):
self.lazy_process = self.__setup_lazy_process( config )
else:
self.lazy_process = NoOpLazyProcess()
self.proxy_ipc = proxy_ipc(config)

def shutdown( self ):
self.lazy_process.shutdown()

def setup_proxy( self, trans, host=DEFAULT_PROXY_TO_HOST, port=None, proxy_prefix="" ):
def setup_proxy( self, trans, host=DEFAULT_PROXY_TO_HOST, port=None, proxy_prefix="", route_name="", container_ids=None ):
if self.manage_dynamic_proxy:
log.info("Attempting to start dynamic proxy process")
self.lazy_process.start_process()

if container_ids is None:
container_ids = []

authentication = AuthenticationToken(trans)
proxy_requests = ProxyRequests(host=host, port=port)
self.proxy_ipc.handle_requests(authentication, proxy_requests)
self.register_proxy_route(authentication, proxy_requests, route_name, container_ids)
# TODO: These shouldn't need to be request.host and request.scheme -
# though they are reasonable defaults.
host = trans.request.host
Expand All @@ -55,14 +57,45 @@ def setup_proxy( self, trans, host=DEFAULT_PROXY_TO_HOST, port=None, proxy_prefi
'proxied_host': proxy_requests.host,
}

def register_proxy_route( self, auth, proxy_requests, route_name, containerIds, sleep=1 ):
"""Make a POST request to the GO proxy to register a route
"""
url = 'http://127.0.0.1:%s/api?api_key=%s' % (self.dynamic_proxy_bind_port, API_KEY)
values = {
'FrontendPath': route_name,
'BackendAddr': "%s:%s" % ( proxy_requests.host, proxy_requests.port ),
'AuthorizedCookie': auth.cookie_value,
'ContainerIds': containerIds,
}

log.debug(values)
log.debug(url)

req = urllib2.Request(url)
req.add_header('Content-Type', 'application/json')

# Sometimes it takes our poor little proxy a second or two to get
# going, so if this fails, re-call ourselves with an increased timeout.
try:
urllib2.urlopen(req, json.dumps(values))
except urllib2.URLError, err:
log.debug(err)
if sleep > 5:
excp = "Could not contact proxy after %s seconds" % sum(range(sleep + 1))
raise Exception(excp)
time.sleep(sleep)
self.register_proxy_route(auth, proxy_requests, route_name, containerIds, sleep=sleep + 1)

def __setup_lazy_process( self, config ):
launcher = proxy_launcher(self)
command = launcher.launch_proxy_command(config)
print ' '.join(command)
return LazyProcess(command)


def proxy_launcher(config):
return NodeProxyLauncher()
return GoProxyLauncher()
# return NodeProxyLauncher()


class ProxyLauncher(object):
Expand All @@ -71,21 +104,18 @@ def launch_proxy_command(self, config):
raise NotImplementedError()


class NodeProxyLauncher(object):
class GoProxyLauncher(object):

def launch_proxy_command(self, config):
args = [
"--sessions", config.proxy_session_map,
"--ip", config.dynamic_proxy_bind_ip,
"--port", str(config.dynamic_proxy_bind_port),
'gxproxy',
'-api_key', API_KEY,
'-cookie_name', SECURE_COOKIE,
'-listen', '%s:%s' % (config.dynamic_proxy_bind_ip, config.dynamic_proxy_bind_port),
'-storage', config.proxy_session_map,
'-listen_path', '/galaxy/gie_proxy',
]
if config.dynamic_proxy_debug:
args.append("--verbose")

parent_directory = os.path.dirname( __file__ )
path_to_application = os.path.join( parent_directory, "js", "lib", "main.js" )
command = [ path_to_application ] + args
return command
return args


class AuthenticationToken(object):
Expand All @@ -107,68 +137,4 @@ def __init__(self, host=None, port=None):
self.port = port


def proxy_ipc(config):
proxy_session_map = config.proxy_session_map
if proxy_session_map.endswith(".sqlite"):
return SqliteProxyIpc(proxy_session_map)
else:
return JsonFileProxyIpc(proxy_session_map)


class ProxyIpc(object):

def handle_requests(self, cookie, host, port):
raise NotImplementedError()


class JsonFileProxyIpc(object):

def __init__(self, proxy_session_map):
self.proxy_session_map = proxy_session_map

def handle_requests(self, authentication, proxy_requests):
key = "%s:%s" % ( proxy_requests.host, proxy_requests.port )
secure_id = authentication.cookie_value
with FileLock( self.proxy_session_map ):
if not os.path.exists( self.proxy_session_map ):
open( self.proxy_session_map, "w" ).write( "{}" )
json_data = open( self.proxy_session_map, "r" ).read()
session_map = json.loads( json_data )
to_remove = []
for k, value in session_map.items():
if value == secure_id:
to_remove.append( k )
for k in to_remove:
del session_map[ k ]
session_map[ key ] = secure_id
new_json_data = json.dumps( session_map )
open( self.proxy_session_map, "w" ).write( new_json_data )


class SqliteProxyIpc(object):

def __init__(self, proxy_session_map):
self.proxy_session_map = proxy_session_map

def handle_requests(self, authentication, proxy_requests):
key = "%s:%s" % ( proxy_requests.host, proxy_requests.port )
secure_id = authentication.cookie_value
with FileLock( self.proxy_session_map ):
conn = sqlite.connect(self.proxy_session_map)
try:
c = conn.cursor()
try:
# Create table
c.execute('''CREATE TABLE gxproxy
(key text PRIMARY_KEY, secret text)''')
except Exception:
pass
insert_tmpl = '''INSERT INTO gxproxy (key, secret) VALUES ('%s', '%s');'''
insert = insert_tmpl % (key, secure_id)
c.execute(insert)
conn.commit()
finally:
conn.close()

# TODO: RESTful API driven proxy?
# TODO: MQ diven proxy?
Loading