-
-
Notifications
You must be signed in to change notification settings - Fork 171
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* server tests with --use-display * x11 client tests using a vfb git-svn-id: https://xpra.org/svn/Xpra/trunk@13902 3bb7dfac-3a0b-4e04-842a-767bc560f471
- Loading branch information
Showing
5 changed files
with
292 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
#!/usr/bin/env python | ||
# This file is part of Xpra. | ||
# Copyright (C) 2016 Antoine Martin <[email protected]> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
#!/usr/bin/env python | ||
# This file is part of Xpra. | ||
# Copyright (C) 2016 Antoine Martin <[email protected]> | ||
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any | ||
# later version. See the file COPYING for details. | ||
|
||
import os | ||
import unittest | ||
from tests.unit.server_test_util import ServerTestUtil, log | ||
|
||
|
||
class X11ClientTest(ServerTestUtil): | ||
|
||
|
||
def run_client(self, *args): | ||
client_display = self.find_free_display() | ||
xvfb = self.start_Xvfb(client_display) | ||
from xpra.scripts.server import xauth_add | ||
xauth_add(client_display) | ||
env = self.run_env() | ||
env["DISPLAY"] = client_display | ||
log("starting test client on Xvfb %s", client_display) | ||
|
||
return xvfb, self.run_xpra(["xpra", "attach"] + list(args) , env) | ||
|
||
def do_test_connect(self, sharing=False): | ||
display = self.find_free_display() | ||
log("starting test server on %s", display) | ||
server = self.check_start_server(display, "--start=xterm", "--sharing=%s" % sharing) | ||
xvfb1, client1 = self.run_client(display, "--sharing=%s" % sharing) | ||
assert self.pollwait(client1, 2) is None | ||
xvfb2, client2 = self.run_client(display, "--sharing=%s" % sharing) | ||
assert self.pollwait(client2, 2) is None | ||
if not sharing: | ||
#starting a second client should disconnect the first when not sharing | ||
assert self.pollwait(client1, 2) is not None, "the first client should have been disconnected (sharing off)" | ||
#killing the Xvfb should kill the client | ||
xvfb1.terminate() | ||
xvfb2.terminate() | ||
assert self.pollwait(xvfb1, 2) is not None | ||
assert self.pollwait(xvfb2, 2) is not None | ||
assert self.pollwait(client1, 2) is not None | ||
assert self.pollwait(client2, 2) is not None | ||
server.terminate() | ||
|
||
|
||
def test_connect(self): | ||
self.do_test_connect(False) | ||
|
||
def test_sharing(self): | ||
self.do_test_connect(True) | ||
|
||
|
||
def main(): | ||
if os.name=="posix": | ||
unittest.main() | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
#!/usr/bin/env python | ||
# This file is part of Xpra. | ||
# Copyright (C) 2016 Antoine Martin <[email protected]> | ||
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any | ||
# later version. See the file COPYING for details. | ||
|
||
import unittest | ||
from tests.unit.server_test_util import ServerTestUtil, log | ||
|
||
|
||
class ProxyServerTest(ServerTestUtil): | ||
|
||
def test_proxy_start_stop(self): | ||
display = self.find_free_display() | ||
log("using free display=%s" % display) | ||
proxy = self.run_command(["xpra", "proxy", display, "--no-daemon"]) | ||
assert self.pollwait(proxy, 5) is None, "proxy failed to start" | ||
assert display in self.dotxpra.displays(), "proxy display not found" | ||
self.check_stop_server(proxy, "stop", display) | ||
|
||
|
||
def main(): | ||
unittest.main() | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
#!/usr/bin/env python | ||
# This file is part of Xpra. | ||
# Copyright (C) 2016 Antoine Martin <[email protected]> | ||
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any | ||
# later version. See the file COPYING for details. | ||
|
||
import os | ||
import time | ||
import unittest | ||
from tests.unit.server_test_util import ServerTestUtil, log | ||
|
||
|
||
class ProxyServerTest(ServerTestUtil): | ||
|
||
|
||
def test_display_reuse(self): | ||
display = self.find_free_display() | ||
log("starting test server on %s", display) | ||
server = self.check_start_server(display) | ||
assert display in self.find_X11_displays() | ||
#make sure we cannot start another server on the same display: | ||
try: | ||
log("should not be able to start another test server on %s", display) | ||
self.check_start_server(display) | ||
except: | ||
pass | ||
else: | ||
raise Exception("server using the same display should have failed to start") | ||
assert server.poll() is None, "server should not have terminated" | ||
#tell the server to exit and leave the display behind: | ||
log("asking the server to exit") | ||
self.check_stop_server(server, "exit", display) | ||
del server | ||
assert display not in self.dotxpra.displays(), "server socket for display should have been removed" | ||
#now we can start it again using "--use-display" | ||
log("start a new server on the same display") | ||
server = self.check_start_server(display, "--use-display") | ||
assert display in self.dotxpra.displays(), "server display not found" | ||
#shut it down now | ||
self.check_stop_server(server, "stop", display) | ||
assert display not in self.find_X11_displays(), "the display %s should have been killed" % display | ||
|
||
|
||
def test_existing_Xvfb(self): | ||
display = self.find_free_display() | ||
xvfb = self.start_Xvfb(display) | ||
time.sleep(1) | ||
assert display in self.find_X11_displays() | ||
#start server using this display: | ||
server = self.check_start_server(display, "--use-display") | ||
self.check_stop_server(server, "stop", display) | ||
time.sleep(1) | ||
assert self.pollwait(xvfb, 2) is None, "the Xvfb should not have been killed by xpra shutting down!" | ||
xvfb.terminate() | ||
|
||
|
||
def main(): | ||
if os.name=="posix": | ||
unittest.main() | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
#!/usr/bin/env python | ||
# This file is part of Xpra. | ||
# Copyright (C) 2016 Antoine Martin <[email protected]> | ||
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any | ||
# later version. See the file COPYING for details. | ||
|
||
import os | ||
import time | ||
import unittest | ||
import subprocess | ||
from xpra.platform.dotxpra import DotXpra | ||
from xpra.platform.paths import get_socket_dirs | ||
|
||
from xpra.log import Logger | ||
log = Logger("test") | ||
|
||
|
||
class ServerTestUtil(unittest.TestCase): | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
dirs = get_socket_dirs() | ||
cls.display_start = 100 | ||
cls.dotxpra = DotXpra(dirs[0], dirs) | ||
cls.default_xpra_args = ["--systemd-run=no"] | ||
ServerTestUtil.existing_displays = cls.dotxpra.displays() | ||
ServerTestUtil.processes = [] | ||
|
||
@classmethod | ||
def tearDownClass(cls): | ||
for x in ServerTestUtil.processes: | ||
try: | ||
if x.poll() is None: | ||
x.terminate() | ||
except: | ||
log.error("failed to stop subprocess %s", x) | ||
displays = set(cls.dotxpra.displays()) | ||
new_displays = displays - set(ServerTestUtil.existing_displays) | ||
if new_displays: | ||
for x in list(new_displays): | ||
log("stopping display %s" % x) | ||
proc = cls.run_xpra(["xpra", "stop", x]) | ||
proc.communicate(None) | ||
|
||
|
||
@classmethod | ||
def run_env(self): | ||
return dict((k,v) for k,v in os.environ.items() if k in ("HOME", "HOSTNAME", "SHELL", "TERM", "USER", "USERNAME", "PATH", "PWD", "XAUTHORITY", )) | ||
|
||
|
||
@classmethod | ||
def run_xpra(cls, command, env=None): | ||
cmd = command + cls.default_xpra_args | ||
return cls.run_command(cmd, env) | ||
|
||
@classmethod | ||
def run_command(cls, command, env=None): | ||
if env is None: | ||
env = cls.run_env() | ||
log("run_command(%s, %s)", command, env) | ||
proc = subprocess.Popen(args=command, env=env) | ||
ServerTestUtil.processes.append(proc) | ||
return proc | ||
|
||
|
||
@classmethod | ||
def find_X11_display_numbers(cls): | ||
#use X11 sockets: | ||
X11_displays = set() | ||
if os.name=="posix": | ||
for x in os.listdir("/tmp/.X11-unix"): | ||
if x.startswith("X"): | ||
try: | ||
X11_displays.add(int(x[1:])) | ||
except: | ||
pass | ||
return X11_displays | ||
|
||
@classmethod | ||
def find_X11_displays(cls): | ||
return [":%i" % x for x in cls.find_X11_display_numbers()] | ||
|
||
|
||
@classmethod | ||
def find_free_display_no(cls): | ||
#X11 sockets: | ||
X11_displays = cls.find_X11_display_numbers() | ||
displays = cls.dotxpra.displays() | ||
start = cls.display_start % 10000 | ||
for i in range(start, 20000): | ||
display = ":%i" % i | ||
if display not in displays and display not in X11_displays: | ||
cls.display_start += 100 | ||
return i | ||
raise Exception("failed to find any free displays!") | ||
|
||
@classmethod | ||
def find_free_display(cls): | ||
return ":%i" % cls.find_free_display_no() | ||
|
||
|
||
@classmethod | ||
def start_Xvfb(cls, display=None, screens=[(1024,768)]): | ||
assert os.name=="posix" | ||
if display is None: | ||
display = cls.find_free_display_no() | ||
XAUTHORITY = os.environ.get("XAUTHORITY", os.path.expanduser("~/.Xauthority")) | ||
cmd = ["Xvfb", "+extension", "Composite", "-nolisten", "tcp", "-noreset", | ||
"-auth", XAUTHORITY] | ||
for i, screen in enumerate(screens): | ||
(w, h) = screen | ||
cmd += ["-screen", "%i" % i, "%ix%ix24+32" % (w, h)] | ||
cmd.append(display) | ||
return cls.run_command(cmd) | ||
|
||
|
||
@classmethod | ||
def pollwait(cls, proc, timeout=5): | ||
start = time.time() | ||
while time.time()-start<timeout: | ||
v = proc.poll() | ||
if v is not None: | ||
return v | ||
time.sleep(0.1) | ||
return None | ||
|
||
@classmethod | ||
def check_start_server(cls, display, *args): | ||
server_proc = cls.run_xpra(["xpra", "start", display, "--no-daemon"]+list(args)) | ||
time.sleep(5) | ||
assert server_proc.poll() is None, "server failed to start" | ||
assert display in cls.dotxpra.displays(), "server display not found" | ||
return server_proc | ||
|
||
@classmethod | ||
def check_stop_server(cls, server_proc, subcommand="stop", display=":99999"): | ||
stopit = cls.run_xpra(["xpra", subcommand, display]) | ||
assert cls.pollwait(stopit) is not None, "server failed to exit" | ||
assert display not in cls.dotxpra.displays(), "server socket for display %s should have been removed" % display |