Skip to content

Commit

Permalink
#426: fix socket race between proxy server and proxy process:
Browse files Browse the repository at this point in the history
* bytestreams code supports the use of a timeout on blocking socket operations
* allow servers to set a socket timeout (but defaults to None)
* proxy server uses this timeout (default to 0.1) to wait for its network threads to terminate
* we only start the proxy process once all the protocol threads have terminated, which prevents all the earlier races

git-svn-id: https://xpra.org/svn/Xpra/trunk@4614 3bb7dfac-3a0b-4e04-842a-767bc560f471
  • Loading branch information
totaam committed Oct 24, 2013
1 parent 6c307b1 commit 771f428
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 25 deletions.
28 changes: 23 additions & 5 deletions src/xpra/net/bytestreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sys
import os
import errno
import socket

#on some platforms (ie: OpenBSD), reading and writing from sockets
#raises an IOError but we should continue if the error code is EINTR
Expand All @@ -15,10 +16,12 @@
if sys.platform.startswith("win"):
WSAEWOULDBLOCK = 10035
CONTINUE.append(WSAEWOULDBLOCK)
def untilConcludes(f, *a, **kw):
while True:
def untilConcludes(is_active_cb, f, *a, **kw):
while is_active_cb():
try:
return f(*a, **kw)
except socket.timeout:
continue
except (IOError, OSError), e:
if e.args[0] in CONTINUE:
continue
Expand All @@ -34,15 +37,28 @@ def __init__(self, target, info):
self.input_bytecount = 0
self.output_bytecount = 0
self.filename = None #only used for unix domain sockets!
self.active = True

def is_active(self):
return self.active

def set_active(self, active):
self.active = active

def close(self):
self.set_active(False)

def untilConcludes(self, *args):
return untilConcludes(self.is_active, *args)

def _write(self, *args):
w = untilConcludes(*args)
w = self.untilConcludes(*args)
self.output_bytecount += w
return w

def _read(self, *args):
r = untilConcludes(*args)
self.input_bytecount += len(r)
r = self.untilConcludes(*args)
self.input_bytecount += len(r or "")
return r

def get_info(self):
Expand Down Expand Up @@ -75,6 +91,7 @@ def write(self, buf):
return self._write(os.write, self._writeable.fileno(), buf)

def close(self):
Connection.close(self)
try:
self._writeable.close()
self._readable.close()
Expand Down Expand Up @@ -103,6 +120,7 @@ def write(self, buf):
return self._write(self._socket.send, buf)

def close(self):
Connection.close(self)
self._socket.close()

def __str__(self):
Expand Down
18 changes: 14 additions & 4 deletions src/xpra/net/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,16 @@ def restore_state(self, state):
if state.get("rencode", False):
self.enable_rencode()

def wait_for_io_threads_exit(self, timeout=None):
for t in (self._read_thread, self._write_thread):
t.join(timeout)
exited = True
for t in (self._read_thread, self._write_thread):
if t.isAlive():
exited = False
break
return exited

def set_packet_source(self, get_packet_cb):
self._get_packet_cb = get_packet_cb

Expand Down Expand Up @@ -858,7 +868,7 @@ def close(self):
except:
log.error("error closing %s", self._conn, exc_info=True)
self._conn = None
self.terminate_io_threads()
self.terminate_queue_threads()
self.scheduler.idle_add(self.clean)

def steal_connection(self):
Expand All @@ -868,8 +878,7 @@ def steal_connection(self):
conn = self._conn
self._closed = True
self._conn = None
self.terminate_io_threads()
self.scheduler.idle_add(self.clean)
self.terminate_queue_threads()
return conn

def clean(self):
Expand All @@ -881,7 +890,8 @@ def clean(self):
self._read_parser_thread = None
self._process_packet_cb = None

def terminate_io_threads(self):
def terminate_queue_threads(self):
log("terminate_queue_threads()")
#the format thread will exit since closed is set too:
self._source_has_more.set()
#make the threads exit by adding the empty marker:
Expand Down
7 changes: 5 additions & 2 deletions src/xpra/server/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,22 @@ def _copy_loop(self, log_name, from_conn, to_conn):
try:
while not self._closed:
log("%s: waiting for data", log_name)
buf = untilConcludes(from_conn.read, 4096)
buf = untilConcludes(self.is_active, from_conn.read, 4096)
if not buf:
log("%s: connection lost", log_name)
self.quit()
return
while buf and not self._closed:
log("%s: writing %s bytes", log_name, len(buf))
written = untilConcludes(to_conn.write, buf)
written = untilConcludes(self.is_active, to_conn.write, buf)
buf = buf[written:]
except Exception, e:
log("%s: %s", log_name, e)
self.quit()

def is_active(self):
return not self._closed

def quit(self, *args):
log("closing proxy connections")
self._closed = True
Expand Down
47 changes: 33 additions & 14 deletions src/xpra/server/proxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
from xpra.scripts.main import parse_display_name, connect_to
from xpra.scripts.server import deadly_signal
from xpra.net.protocol import Protocol, Compressed, compressed_wrapper, new_cipher_caps, get_network_caps
from xpra.net.bytestreams import SocketConnection
from xpra.os_util import Queue, SIGNAMES
from xpra.util import typedict
from xpra.daemon_thread import make_daemon_thread
from xpra.scripts.config import parse_number, parse_bool


Expand Down Expand Up @@ -54,6 +56,7 @@ def __init__(self):
self.idle_add = gobject.idle_add
self.timeout_add = gobject.timeout_add
self.source_remove = gobject.source_remove
self._socket_timeout = 0.1

def init(self, opts):
log("ProxyServer.init(%s)", opts)
Expand Down Expand Up @@ -143,7 +146,6 @@ def parse_error(*args):
return
debug("server connection=%s", server_conn)

#grab client connection so we can pass it to the ProxyProcess:
client_conn = client_proto.steal_connection()
client_state = client_proto.save_state()
cipher = None
Expand All @@ -155,18 +157,35 @@ def parse_error(*args):
debug("start_proxy(..) client connection=%s", client_conn)
debug("start_proxy(..) client state=%s", client_state)

assert uid!=0 and gid!=0
try:
process = ProxyProcess(uid, gid, env_options, session_options, client_conn, client_state, cipher, encryption_key, server_conn, c, self.proxy_ended)
debug("starting %s from pid=%s", process, os.getpid())
process.start()
debug("process started")
#FIXME: remove processes that have terminated
self.processes.append(process)
finally:
#now we can close our handle on the connection:
client_conn.close()
server_conn.close()
#this may block, so run it in a thread:
def do_start_proxy():
debug("do_start_proxy()")
try:
#stop IO in proxy:
#(it may take up to _socket_timeout until the thread exits)
client_conn.set_active(False)
ioe = client_proto.wait_for_io_threads_exit(0.1+self._socket_timeout)
if not ioe:
log.error("IO threads have failed to terminate!")
return
#now we can go back to using blocking sockets:
#FIXME: this is a bit ugly, but less intrusive than the alternative?
if isinstance(client_conn, SocketConnection):
client_conn._socket.settimeout(None)
client_conn.set_active(True)

assert uid!=0 and gid!=0
process = ProxyProcess(uid, gid, env_options, session_options, client_conn, client_state, cipher, encryption_key, server_conn, c, self.proxy_ended)
debug("starting %s from pid=%s", process, os.getpid())
process.start()
debug("process started")
#FIXME: remove processes that have terminated
self.processes.append(process)
finally:
#now we can close our handle on the connection:
client_conn.close()
server_conn.close()
make_daemon_thread(do_start_proxy, "start_proxy(%s)" % client_conn).start()

def proxy_ended(self, proxy_process):
debug("proxy_ended(%s)", proxy_process)
Expand Down Expand Up @@ -337,7 +356,7 @@ def run_queue(self):
while True:
debug("run_queue() size=%s", self.main_queue.qsize())
v = self.main_queue.get()
debug("item=%s", v)
debug("run_queue() item=%s", v)
if v is None:
break
fn, args, kwargs = v
Expand Down
2 changes: 2 additions & 0 deletions src/xpra/server/server_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def __init__(self):
self._reverse_aliases = {}
self.socket_types = {}
self._max_connections = MAX_CONCURRENT_CONNECTIONS
self._socket_timeout = None

self.session_name = "Xpra"

Expand Down Expand Up @@ -275,6 +276,7 @@ def _new_connection(self, listener, *args):
peername = str(address)
sockname = sock.getsockname()
target = peername or sockname
sock.settimeout(self._socket_timeout)
log("new_connection(%s) sock=%s, sockname=%s, address=%s, peername=%s", args, sock, sockname, address, peername)
sc = SocketConnection(sock, sockname, address, target, socktype)
log.info("New connection received: %s", sc)
Expand Down

0 comments on commit 771f428

Please sign in to comment.