From a2da98b1b599834f4144ab22fb4c06a399979311 Mon Sep 17 00:00:00 2001 From: totaam Date: Thu, 25 Aug 2022 21:33:18 +0700 Subject: [PATCH] #3100 'Basic' http authentication handler only enabled via env vars for now --- xpra/net/http_handler.py | 45 +++++++++++++++++++++++++++++++++- xpra/net/websockets/handler.py | 10 +++++--- 2 files changed, 51 insertions(+), 4 deletions(-) diff --git a/xpra/net/http_handler.py b/xpra/net/http_handler.py index 1650b43068..d88a3785fa 100644 --- a/xpra/net/http_handler.py +++ b/xpra/net/http_handler.py @@ -20,6 +20,10 @@ HTTP_ACCEPT_ENCODING = os.environ.get("XPRA_HTTP_ACCEPT_ENCODING", "br,gzip").split(",") DIRECTORY_LISTING = envbool("XPRA_HTTP_DIRECTORY_LISTING", False) +AUTH_REALM = os.environ.get("XPRA_HTTP_AUTH_REALM", "Xpra") +AUTH_USERNAME = os.environ.get("XPRA_HTTP_AUTH_USERNAME") +AUTH_PASSWORD = os.environ.get("XPRA_HTTP_AUTH_PASSWORD") + EXTENSION_TO_MIMETYPE = { ".wasm" : "application/wasm", ".js" : "text/javascript", @@ -63,10 +67,13 @@ class HTTPRequestHandler(BaseHTTPRequestHandler): def __init__(self, sock, addr, web_root="/usr/share/xpra/www/", - http_headers_dirs=("/etc/xpra/http-headers",), script_paths=None): + http_headers_dirs=("/etc/xpra/http-headers",), script_paths=None, + username=AUTH_USERNAME, password=AUTH_PASSWORD): self.web_root = web_root self.http_headers_dirs = http_headers_dirs self.script_paths = script_paths or {} + self.username = username + self.password = password server = AdHocStruct() server.logger = log self.directory_listing = DIRECTORY_LISTING @@ -205,6 +212,33 @@ def do_GET(self): self.handle_request() def handle_request(self): + if self.password: + def auth_err(msg): + self.do_AUTHHEAD() + self.wfile.write(msg.encode("latin1")) + log.warn(f"http authentication failed: {msg}") + auth = self.headers.get("Authorization") + log("handle_request() auth header=%s", auth) + if not auth: + return auth_err("missing authentication header") + #ie: auth = 'Basic dGVzdDp0ZXN0' + if not auth.startswith("Basic "): + return auth_err("invalid authentication header") + b64str = auth.split("Basic ", 1)[1] + import base64 + try: + s = base64.b64decode(b64str).decode("utf8") + except Exception: + s = "" + if s.find(":")<0: + return auth_err("invalid authentication format") + username, password = s.split(":", 1) + if (self.username and username!=self.username) or password!=self.password: + log("http authentication: expected %s:%s but received %s:%s", + self.username or "", self.password, username, password) + return auth_err("invalid credentials") + log("http authentication passed") + content = self.send_head() if content: try: @@ -225,6 +259,15 @@ def handle_request(self): def do_HEAD(self): self.send_head() + def do_AUTHHEAD(self): + self.send_response(401) + if self.password: + self.send_header("WWW-Authenticate", f"Basic realm=\"{AUTH_REALM}\"") + self.send_header("Content-type", "text/html") + self.end_headers() + + + #code taken from MIT licensed code in GzipSimpleHTTPServer.py def send_head(self): path = self.path.split("?",1)[0].split("#",1)[0] diff --git a/xpra/net/websockets/handler.py b/xpra/net/websockets/handler.py index 9fc2a50779..0d7c2c3dab 100644 --- a/xpra/net/websockets/handler.py +++ b/xpra/net/websockets/handler.py @@ -5,7 +5,7 @@ from xpra.util import envbool from xpra.net.websockets.common import make_websocket_accept_hash -from xpra.net.http_handler import HTTPRequestHandler +from xpra.net.http_handler import HTTPRequestHandler, AUTH_USERNAME, AUTH_PASSWORD from xpra.log import Logger log = Logger("network", "websocket") @@ -26,11 +26,15 @@ def __init__(self, sock, addr, new_websocket_client, web_root="/usr/share/xpra/www/", http_headers_dir="/etc/xpra/http-headers", script_paths=None, - redirect_https=False): + redirect_https=False, + username=AUTH_USERNAME, password=AUTH_PASSWORD, + ): self.new_websocket_client = new_websocket_client self.only_upgrade = WEBSOCKET_ONLY_UPGRADE self.redirect_https = redirect_https - super().__init__(sock, addr, web_root, http_headers_dir, script_paths) + super().__init__(sock, addr, + web_root, http_headers_dir, script_paths, + username, password) def handle_websocket(self): log("handle_websocket() calling %s, request=%s (%s)",