Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add raw_socket_proxy to directly proxy websockets to TCP/unix sockets #447

Merged
merged 7 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/source/server-process.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,18 @@ One of:

(server-process:callable-arguments)=

### `websockify`
dylex marked this conversation as resolved.
Show resolved Hide resolved

_True_ to proxy only websocket connections into raw stream connections.
_False_ (default) if the proxied server speaks full HTTP.

If _True_, the proxied server is treated a raw TCP (or unix socket) server that
does not use HTTP.
In this mode, only websockets are handled, and messages are sent to the backend
server as stream data. This is equivalent to running a
[websockify](https://github.com/novnc/websockify) wrapper.
All other HTTP requests return 405.

#### Callable arguments

Any time you specify a callable in the config, it can ask for any arguments it needs
Expand Down
81 changes: 41 additions & 40 deletions jupyter_server_proxy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from traitlets.config import Configurable

from .handlers import AddSlashHandler, NamedLocalProxyHandler, SuperviseAndProxyHandler
from .websockify import WebsockifyHandler, SuperviseAndWebsockifyHandler

try:
# Traitlets >= 4.3.3
Expand All @@ -41,52 +42,55 @@
"new_browser_tab",
"request_headers_override",
"rewrite_response",
"websockify",
],
)


def _make_namedproxy_handler(sp: ServerProcess):
class _Proxy(NamedLocalProxyHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = sp.name
self.proxy_base = sp.name
self.absolute_url = sp.absolute_url
self.port = sp.port
self.unix_socket = sp.unix_socket
self.mappath = sp.mappath
self.rewrite_response = sp.rewrite_response

def get_request_headers_override(self):
return self._realize_rendered_template(sp.request_headers_override)

return _Proxy


def _make_supervisedproxy_handler(sp: ServerProcess):
def _make_proxy_handler(sp: ServerProcess):
"""
Create a SuperviseAndProxyHandler subclass with given parameters
Create an appropriate handler with given parameters
"""
if sp.command:
cls = SuperviseAndWebsockifyHandler if sp.websockify else SuperviseAndProxyHandler
args = dict(state={})
elif not (sp.port or isinstance(sp.unix_socket, str)):
warn(
f"Server proxy {sp.name} does not have a command, port "
f"number or unix_socket path. At least one of these is "
f"required."
)
return
else:
cls = WebsockifyHandler if sp.websockify else NamedLocalProxyHandler
args = {}

# FIXME: Set 'name' properly
class _Proxy(SuperviseAndProxyHandler):
class _Proxy(cls):
kwargs = args

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = sp.name
self.command = sp.command
self.proxy_base = sp.name
self.absolute_url = sp.absolute_url
self.requested_port = sp.port
self.requested_unix_socket = sp.unix_socket
if sp.command:
self.requested_port = sp.port
self.requested_unix_socket = sp.unix_socket
else:
self.port = sp.port
self.unix_socket = sp.unix_socket
self.mappath = sp.mappath
self.rewrite_response = sp.rewrite_response

def get_env(self):
return self._realize_rendered_template(sp.environment)

def get_request_headers_override(self):
return self._realize_rendered_template(sp.request_headers_override)

# these two methods are only used in supervise classes, but do no harm otherwise
def get_env(self):
return self._realize_rendered_template(sp.environment)

def get_timeout(self):
return sp.timeout

Expand All @@ -108,24 +112,14 @@ def make_handlers(base_url, server_processes):
"""
handlers = []
for sp in server_processes:
if sp.command:
handler = _make_supervisedproxy_handler(sp)
kwargs = dict(state={})
else:
if not (sp.port or isinstance(sp.unix_socket, str)):
warn(
f"Server proxy {sp.name} does not have a command, port "
f"number or unix_socket path. At least one of these is "
f"required."
)
continue
handler = _make_namedproxy_handler(sp)
kwargs = {}
handler = _make_proxy_handler(sp)
if not handler:
continue
handlers.append(
(
ujoin(base_url, sp.name, r"(.*)"),
handler,
kwargs,
handler.kwargs
)
)
handlers.append((ujoin(base_url, sp.name), AddSlashHandler))
Expand Down Expand Up @@ -157,6 +151,7 @@ def make_server_process(name, server_process_config, serverproxy_config):
"rewrite_response",
tuple(),
),
websockify=server_process_config.get("websockify", False),
)


Expand Down Expand Up @@ -273,6 +268,12 @@ def cats_only(response, path):
instead of "dogs not allowed".

Defaults to the empty tuple ``tuple()``.

websockify
Proxy websocket requests as a TCP (or unix socket) stream.
In this mode, only websockets are handled, and messages are sent to the backend,
equivalent to running a websockify layer (https://github.com/novnc/websockify).
All other HTTP requests return 405.
""",
config=True,
)
Expand Down
88 changes: 88 additions & 0 deletions jupyter_server_proxy/websockify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""
A simple translation layer between tornado websockets and asyncio stream
connections.

This subsumes the functionality of websockify
(https://github.com/novnc/websockify) without needing an extra proxy hop
or process through with all messages pass for translation.
"""

import asyncio

from .handlers import NamedLocalProxyHandler, SuperviseAndProxyHandler

class WebsockifyProtocol(asyncio.Protocol):
"""
A protocol handler for the proxied stream connection.
Sends any received blocks directly as websocket messages.
"""
def __init__(self, handler):
self.handler = handler

def data_received(self, data):
"Send the buffer as a websocket message."
self.handler._record_activity()
self.handler.write_message(data, binary=True) # async, no wait
dylex marked this conversation as resolved.
Show resolved Hide resolved

def connection_lost(self, exc):
"Close the websocket connection."
self.handler.log.info(f"Websockify {self.handler.name} connection lost: {exc}")
self.handler.close()

class WebsockifyHandler(NamedLocalProxyHandler):
"""
HTTP handler that proxies websocket connections into a backend stream.
All other HTTP requests return 405.
"""
def _create_ws_connection(self, proto: asyncio.BaseProtocol):
"Create the appropriate backend asyncio connection"
loop = asyncio.get_running_loop()
if self.unix_socket is not None:
self.log.info(f"Websockify {self.name} connecting to {self.unix_socket}")
return loop.create_unix_connection(proto, self.unix_socket)
else:
self.log.info(f"Websockify {self.name} connecting to port {self.port}")
return loop.create_connection(proto, 'localhost', self.port)

async def proxy(self, port, path):
raise web.HTTPError(405, "websockets only")
dylex marked this conversation as resolved.
Show resolved Hide resolved

async def proxy_open(self, host, port, proxied_path=""):
"""
Open the backend connection. host and port are ignored (as they are in
the parent for unix sockets) since they are always passed known values.
"""
transp, proto = await self._create_ws_connection(lambda: WebsockifyProtocol(self))
self.ws_transp = transp
self.ws_proto = proto
self._record_activity()
self.log.info(f"Websockify {self.name} connected")

def on_message(self, message):
"Send websocket messages as stream writes, encoding if necessary."
self._record_activity()
if hasattr(self, "ws_transp"):
dylex marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(message, str):
message = message.encode('utf-8')
self.ws_transp.write(message) # buffered non-blocking. should block?

def on_ping(self, message):
"No-op"
self._record_activity()

def on_close(self):
"Close the backend connection."
self.log.info(f"Websockify {self.name} connection closed")
if hasattr(self, "ws_transp"):
self.ws_transp.close()

class SuperviseAndWebsockifyHandler(SuperviseAndProxyHandler, WebsockifyHandler):
async def _http_ready_func(self, p):
# not really HTTP here, just try an empty connection
try:
transp, _ = await self._create_ws_connection(asyncio.Protocol)
except OSError as exc:
self.log.debug(f"Websockify {self.name} connection check failed: {exc}")
return False
transp.close()
return True