Skip to content
This repository was archived by the owner on Feb 3, 2021. It is now read-only.

Commit

Permalink
Feature: pure python ssh (#577)
Browse files Browse the repository at this point in the history
* forward multiple ports

* plumb through cli

* continue cli implementation

* fixes

* pylint ignore

* spacing

* remove debug stuff, fix bug

* add --internal support

* add to init

* add comment

* remove nesting

* add logging

* add some docs
  • Loading branch information
jafreck authored Jun 5, 2018
1 parent af449dc commit f16aac0
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 27 deletions.
17 changes: 17 additions & 0 deletions aztk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,23 @@ def __cluster_copy(self, cluster_id, source_path, destination_path, container_na
finally:
self.__delete_user_on_pool(generated_username, pool.id, nodes)

def __ssh_into_node(self, pool_id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False):
if internal:
result = self.batch_client.compute_node.get(pool_id=pool_id, node_id=node_id)
rls = models.RemoteLogin(ip_address=result.ip_address, port="22")
else:
result = self.batch_client.compute_node.get_remote_login_settings(pool_id, node_id)
rls = models.RemoteLogin(ip_address=result.remote_login_ip_address, port=str(result.remote_login_port))

ssh_lib.node_ssh(
username=username,
hostname=rls.ip_address,
port=rls.port,
ssh_key=ssh_key,
password=password,
port_forward_list=port_forward_list,
)

def __submit_job(self,
job_configuration,
start_task,
Expand Down
1 change: 1 addition & 0 deletions aztk/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@
from .software import Software
from .cluster import Cluster
from .scheduling_target import SchedulingTarget
from .port_forward_specification import PortForwardingSpecification
from .plugins import *
5 changes: 5 additions & 0 deletions aztk/models/port_forward_specification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from aztk.core.models import Model, fields

class PortForwardingSpecification(Model):
remote_port = fields.Integer()
local_port = fields.Integer()
6 changes: 6 additions & 0 deletions aztk/spark/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ def cluster_download(self, cluster_id: str, source_path: str, destination_path:
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

def cluster_ssh_into_master(self, cluster_id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False):
try:
self.__ssh_into_node(cluster_id, node_id, username, ssh_key, password, port_forward_list, internal)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

'''
job submission
'''
Expand Down
3 changes: 3 additions & 0 deletions aztk/spark/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ def __get_master_node_id(self):
class RemoteLogin(aztk.models.RemoteLogin):
pass

class PortForwardingSpecification(aztk.models.PortForwardingSpecification):
pass

class File(aztk.models.File):
pass
Expand Down Expand Up @@ -94,6 +96,7 @@ class SharedKeyConfiguration(aztk.models.SharedKeyConfiguration):
class DockerConfiguration(aztk.models.DockerConfiguration):
pass


class PluginConfiguration(aztk.models.PluginConfiguration):
pass

Expand Down
94 changes: 92 additions & 2 deletions aztk/utils/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,65 @@
'''
import asyncio
import io
import logging
import os
import select
import socket
import socketserver as SocketServer
import sys
import threading
from concurrent.futures import ThreadPoolExecutor

from aztk.error import AztkError
from . import helpers


class ForwardServer(SocketServer.ThreadingTCPServer):
daemon_threads = True
allow_reuse_address = True

# pylint: disable=no-member
class Handler(SocketServer.BaseRequestHandler):
def handle(self):
try:
channel = self.ssh_transport.open_channel('direct-tcpip',
(self.chain_host, self.chain_port),
self.request.getpeername())
except Exception as e:
logging.debug('Incoming request to %s:%d failed: %s', self.chain_host,
self.chain_port,
repr(e))
return
if channel is None:
logging.debug('Incoming request to %s:%d was rejected by the SSH server.', self.chain_host, self.chain_port)
return

logging.debug('Connected! Tunnel open %r -> %r -> %r', self.request.getpeername(), channel.getpeername(), (self.chain_host, self.chain_port))
while True:
r, w, x = select.select([self.request, channel], [], [])
if self.request in r:
data = self.request.recv(1024)
if len(data) == 0:
break
channel.send(data)
if channel in r:
data = channel.recv(1024)
if len(data) == 0:
break
self.request.send(data)

peername = self.request.getpeername()
channel.close()
self.request.close()
logging.debug('Tunnel closed from %r', peername)


def forward_tunnel(local_port, remote_host, remote_port, transport):
class SubHandler(Handler):
chain_host = remote_host
chain_port = remote_port
ssh_transport = transport
thread = threading.Thread(target=ForwardServer(('', local_port), SubHandler).serve_forever, daemon=True)
thread.start()
return thread


def connect(hostname,
Expand Down Expand Up @@ -39,6 +89,23 @@ def connect(hostname,
return client


def forward_ports(client, port_forward_list):
threads = []
if not port_forward_list:
return threads

for port_forwarding_specification in port_forward_list:
threads.append(
forward_tunnel(
port_forwarding_specification.remote_port,
"127.0.0.1",
port_forwarding_specification.local_port,
client.get_transport()
)
)
return threads


def node_exec_command(node_id, command, username, hostname, port, ssh_key=None, password=None, container_name=None, timeout=None):
try:
client = connect(hostname=hostname, port=port, username=username, password=password, pkey=ssh_key, timeout=timeout)
Expand Down Expand Up @@ -133,3 +200,26 @@ async def clus_copy(username, nodes, source_path, destination_path, ssh_key=None
container_name,
timeout) for node, node_rls in nodes]
)


def node_ssh(username, hostname, port, ssh_key=None, password=None, port_forward_list=None, timeout=None):
try:
client = connect(
hostname=hostname,
port=port,
username=username,
password=password,
pkey=ssh_key,
timeout=timeout
)
threads = forward_ports(client=client, port_forward_list=port_forward_list)
except AztkError as e:
raise e

try:
import time
while True:
time.sleep(1)
except KeyboardInterrupt:
# catch and ignore so stacktrace isn't printed
pass
89 changes: 65 additions & 24 deletions aztk_cli/spark/endpoints/cluster/cluster_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
from aztk_cli import config, log, utils
from aztk_cli.config import SshConfig

from aztk.spark.models import PortForwardingSpecification


def setup_parser(parser: argparse.ArgumentParser):
parser.add_argument('--id', dest="cluster_id", help='The unique id of your spark cluster')
parser.add_argument('--webui', help='Local port to port spark\'s master UI to')
parser.add_argument('--jobui', help='Local port to port spark\'s job UI to')
parser.add_argument('--jobhistoryui', help='Local port to port spark\'s job history UI to')
parser.add_argument('-u', '--username', help='Username to spark cluster')
parser.add_argument('--password', help='Password for the specified ssh user')
parser.add_argument('--host', dest="host", action='store_true', help='Connect to the host of the Spark container')
parser.add_argument('--no-connect', dest="connect", action='store_false',
help='Do not create the ssh session. Only print out the command to run.')
Expand Down Expand Up @@ -52,29 +55,11 @@ def execute(args: typing.NamedTuple):
utils.log_property("connect", ssh_conf.connect)
log.info("-------------------------------------------")

# get ssh command
try:
ssh_cmd = utils.ssh_in_master(
client=spark_client,
cluster_id=ssh_conf.cluster_id,
webui=ssh_conf.web_ui_port,
jobui=ssh_conf.job_ui_port,
jobhistoryui=ssh_conf.job_history_ui_port,
username=ssh_conf.username,
host=ssh_conf.host,
connect=ssh_conf.connect,
internal=ssh_conf.internal)

if not ssh_conf.connect:
log.info("")
log.info("Use the following command to connect to your spark head node:")
log.info("\t%s", ssh_cmd)

except batch_error.BatchErrorException as e:
if e.error.code == "PoolNotFound":
raise aztk.error.AztkError("The cluster you are trying to connect to does not exist.")
else:
raise
shell_out_ssh(spark_client, ssh_conf)
except OSError:
# no ssh client is found, falling back to pure python
native_python_ssh_into_master(spark_client, cluster, ssh_conf, args.password)


def print_plugin_ports(cluster_config: ClusterConfiguration):
Expand All @@ -88,16 +73,72 @@ def print_plugin_ports(cluster_config: ClusterConfiguration):
if port.expose_publicly:
has_ports = True
plugin_ports[plugin.name].append(port)

if has_ports:
log.info("plugins:")

for plugin in plugin_ports:
if plugin_ports[plugin]:
log.info(" " + plugin)
log.info(" %s ", plugin)
for port in plugin_ports[plugin]:
label = " - open"
if port.name:
label += " {}".format(port.name)
url = "{0}{1}".format(http_prefix, port.public_port)
utils.log_property(label, url)


def native_python_ssh_into_master(spark_client, cluster, ssh_conf, password):
if not ssh_conf.connect:
log.warning("No ssh client found, using pure python connection.")
return

configuration = spark_client.get_cluster_config(cluster.id)
plugin_ports = []
if configuration and configuration.plugins:
ports = [
PortForwardingSpecification(
port.internal,
port.public_port) for plugin in configuration.plugins for port in plugin.ports if port.expose_publicly
]
plugin_ports.extend(ports)

print("Press ctrl+c to exit...")
spark_client.cluster_ssh_into_master(
cluster.id,
cluster.master_node_id,
ssh_conf.username,
ssh_key=None,
password=password,
port_forward_list=[
PortForwardingSpecification(remote_port=8080, local_port=8080), # web ui
PortForwardingSpecification(remote_port=4040, local_port=4040), # job ui
PortForwardingSpecification(remote_port=18080, local_port=18080), # job history ui
] + plugin_ports,
internal=ssh_conf.internal
)


def shell_out_ssh(spark_client, ssh_conf):
try:
ssh_cmd = utils.ssh_in_master(
client=spark_client,
cluster_id=ssh_conf.cluster_id,
webui=ssh_conf.web_ui_port,
jobui=ssh_conf.job_ui_port,
jobhistoryui=ssh_conf.job_history_ui_port,
username=ssh_conf.username,
host=ssh_conf.host,
connect=ssh_conf.connect,
internal=ssh_conf.internal)

if not ssh_conf.connect:
log.info("")
log.info("Use the following command to connect to your spark head node:")
log.info("\t%s", ssh_cmd)

except batch_error.BatchErrorException as e:
if e.error.code == "PoolNotFound":
raise aztk.error.AztkError("The cluster you are trying to connect to does not exist.")
else:
raise
9 changes: 8 additions & 1 deletion aztk_cli/utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import datetime
import getpass
import subprocess
import sys
import threading
import time
import yaml
from subprocess import call
from typing import List

import azure.batch.models as batch_models

from aztk import error, utils
from aztk.utils import get_ssh_key, helpers
from aztk.models import ClusterConfiguration
from aztk.spark import models
from aztk.utils import get_ssh_key, helpers

from . import log


Expand Down Expand Up @@ -153,6 +157,8 @@ def ssh_in_master(
:param ports: an list of local and remote ports
:type ports: [[<local-port>, <remote-port>]]
"""
# check if ssh is available, this throws OSError if ssh is not present
subprocess.call(["ssh"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# Get master node id from task (job and task are both named pool_id)
cluster = client.get_cluster(cluster_id)
Expand Down Expand Up @@ -213,6 +219,7 @@ def ssh_in_master(

if connect:
call(command, shell=True)

return '\n\t{}\n'.format(command)

def print_batch_exception(batch_exception):
Expand Down
8 changes: 8 additions & 0 deletions docs/10-clusters.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ Now that you're in, you can change directory to your familiar `$SPARK_HOME`
cd $SPARK_HOME
```

To view the SSH command being called, pass the `--no-connect` flag:
```
aztk spark cluster ssh --id spark --no-connect
```

Note that an SSH tunnel and shell will be opened with the default SSH client if one is present. Otherwise, a pure python SSH tunnel is created to forward the necessary ports. The pure python SSH tunnel will not open a shell.


### Debugging your Spark Cluster

If your cluster is in an unknown or unusbale state, you can debug by running:
Expand Down

0 comments on commit f16aac0

Please sign in to comment.