Skip to content

Commit

Permalink
compression and packet encoding improvements:
Browse files Browse the repository at this point in the history
* add bz2 support
* generalize encoding and compression selection
* ensure we pick at least one encoder
* try to pick at least one compressor (if level>0)

git-svn-id: https://xpra.org/svn/Xpra/trunk@6964 3bb7dfac-3a0b-4e04-842a-767bc560f471
  • Loading branch information
totaam committed Jul 27, 2014
1 parent e33231a commit cd7d37b
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 36 deletions.
11 changes: 4 additions & 7 deletions src/xpra/client/client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from xpra.log import Logger
log = Logger("client")

from xpra.net.protocol import Protocol, use_lz4, use_rencode, use_yaml, get_network_caps
from xpra.net.protocol import Protocol, get_network_caps
from xpra.scripts.config import ENCRYPTION_CIPHERS
from xpra.version_util import version_compat_check, get_version_info, get_platform_info, local_version
from xpra.platform.features import GOT_PASSWORD_PROMPT_SUGGESTION
Expand Down Expand Up @@ -497,12 +497,9 @@ def parse_server_capabilities(self):

def parse_network_capabilities(self):
c = self.server_capabilities
if use_rencode and c.boolget("rencode"):
self._protocol.enable_rencode()
elif use_yaml and c.boolget("yaml"):
self._protocol.enable_yaml()
if use_lz4 and c.boolget("lz4") and self.compression_level==1:
self._protocol.enable_lz4()
self._protocol.enable_encoder_from_caps(c)
self._protocol.enable_compressor_from_caps(c)

if self.encryption:
#server uses a new cipher after second hello:
key = self.get_encryption_key()
Expand Down
26 changes: 23 additions & 3 deletions src/xpra/net/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

import os
import sys
from zlib import compress
import zlib
import bz2

from xpra.log import Logger
log = Logger("network", "protocol")
Expand All @@ -15,6 +16,7 @@

ZLIB_FLAG = 0x00
LZ4_FLAG = 0x10
BZ2_FLAG = 0x20


try:
Expand All @@ -40,10 +42,26 @@ def lz4_compress(packet, level):
def zcompress(packet, level):
if type(packet)!=bytes:
packet = bytes(packet, 'UTF-8')
return level + ZLIB_FLAG, compress(packet, level)
return level + ZLIB_FLAG, zlib.compress(packet, level)

def bzcompress(packet, level):
if type(packet)!=bytes:
packet = bytes(packet, 'UTF-8')
return level + BZ2_FLAG, bz2.compress(packet, level)

def nocompress(packet, level):
if type(packet)!=bytes:
packet = bytes(packet, 'UTF-8')
return 0, packet
else:
def zcompress(packet, level):
return level + ZLIB_FLAG, compress(str(packet), level)
return level + ZLIB_FLAG, zlib.compress(str(packet), level)
def bzcompress(packet, level):
return level + BZ2_FLAG, bz2.compress(str(packet), level)
def nocompress(packet, level):
return 0, packet
use_zlib = os.environ.get("XPRA_USE_ZLIB", "1")=="1"
use_bz2 = os.environ.get("XPRA_USE_BZ2", "1")=="1"


class Compressed(object):
Expand All @@ -55,6 +73,7 @@ def __len__(self):
def __repr__(self):
return "Compressed(%s: %s bytes)" % (self.datatype, len(self.data))


class LevelCompressed(Compressed):
def __init__(self, datatype, data, level, algo):
Compressed.__init__(self, datatype, data)
Expand All @@ -65,6 +84,7 @@ def __len__(self):
def __repr__(self):
return "LevelCompressed(%s: %s bytes as %s/%s)" % (self.datatype, len(self.data), self.algorithm, self.level)


def compressed_wrapper(datatype, data, level=5, lz4=False):
if lz4:
assert use_lz4, "cannot use lz4"
Expand Down
109 changes: 95 additions & 14 deletions src/xpra/net/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
import threading
import binascii
from threading import Lock
from zlib import decompress
import zlib
import bz2


from xpra.log import Logger
Expand All @@ -23,7 +24,7 @@
from xpra.os_util import Queue, strtobytes
from xpra.util import repr_ellipsized
from xpra.net.bytestreams import ABORT
from xpra.net.compression import zcompress, has_lz4, use_lz4, LZ4_FLAG, lz4_compress, LZ4_uncompress, Compressed, LevelCompressed
from xpra.net.compression import nocompress, zcompress, use_zlib, bzcompress, use_bz2, BZ2_FLAG, has_lz4, use_lz4, LZ4_FLAG, lz4_compress, LZ4_uncompress, Compressed, LevelCompressed
from xpra.net.header import unpack_header, pack_header, pack_header_and_data
from xpra.net.crypto import get_crypto_caps, get_cipher
from xpra.net.packet_encoding import rencode_dumps, rencode_loads, rencode_version, has_rencode, use_rencode, \
Expand Down Expand Up @@ -60,13 +61,15 @@ def get_network_caps(legacy=True):
"bencode" : use_bencode,
"yaml" : use_yaml,
"lz4" : use_lz4,
"bz2" : use_bz2,
"zlib" : use_zlib,
"zlib.version" : zlib.__version__,
"mmap" : mmap,
}
if legacy:
#for backwards compatibility only:
caps.update({
"raw_packets" : True,
"zlib" : True,
"chunked_compression" : True
})
caps.update(get_crypto_caps())
Expand All @@ -93,7 +96,7 @@ class Protocol(object):

FLAGS_RENCODE = 0x1
FLAGS_CIPHER = 0x2
FLAGS_JSON = 0x4
FLAGS_YAML = 0x4
FLAGS_NOHEADER = 0x40

def __init__(self, scheduler, conn, process_packet_cb, get_packet_cb=None):
Expand Down Expand Up @@ -131,7 +134,7 @@ def __init__(self, scheduler, conn, process_packet_cb, get_packet_cb=None):
self._log_stats = None #None here means auto-detect
self._closed = False
self._encoder = self.noencode
self._compress = zcompress
self._compress = nocompress
self.compression_level = 0
self.cipher_in = None
self.cipher_in_name = None
Expand All @@ -147,6 +150,7 @@ def __init__(self, scheduler, conn, process_packet_cb, get_packet_cb=None):
self._write_format_thread = make_daemon_thread(self._write_format_thread_loop, "format")
self._source_has_more = threading.Event()
self.enable_default_encoder()
self.enable_default_compressor()

STATE_FIELDS = ("max_packet_size", "large_packets", "send_aliases", "receive_aliases",
"cipher_in", "cipher_in_name", "cipher_in_block_size",
Expand All @@ -155,6 +159,7 @@ def __init__(self, scheduler, conn, process_packet_cb, get_packet_cb=None):
def save_state(self):
state = {
"zlib" : self._compress==zcompress,
"bz2" : self._compress==bzcompress,
"lz4" : lz4_compress and self._compress==lz4_compress,
"bencode" : self._encoder == self.bencode,
"rencode" : self._encoder == self.rencode,
Expand All @@ -169,12 +174,23 @@ def restore_state(self, state):
for x in Protocol.STATE_FIELDS:
assert x in state, "field %s is missing" % x
setattr(self, x, state[x])
if state.get("lz4", False):
if state.get("lz4"):
self.enable_lz4()
if state.get("rencode", False):
elif state.get("bz2"):
self.enable_bz2()
elif state.get("zlib"):
self.enable_zlib()
else:
self.enable_nocompress()

if state.get("rencode"):
self.enable_rencode()
elif state.get("yaml", False):
elif state.get("yaml"):
self.enable_yaml()
elif state.get("bencode"):
self.enable_bencode()
else:
raise Exception("invalid state: no encoder specified!")

def wait_for_io_threads_exit(self, timeout=None):
for t in (self._read_thread, self._write_thread):
Expand Down Expand Up @@ -229,6 +245,10 @@ def get_info(self):
info["compression"] = "zlib"
elif self._compress==lz4_compress:
info["compression"] = "lz4"
elif self._compress==bzcompress:
info["compression"] = "bz2"
elif self._compress==nocompress:
info["compression"] = "none"
for k,v in self.send_aliases.items():
info["send_alias." + str(k)] = v
info["send_alias." + str(v)] = k
Expand Down Expand Up @@ -366,11 +386,25 @@ def new_tree(append):
self.do_verify_packet(new_tree("key for value='%s'" % str(v)), k)
self.do_verify_packet(new_tree("value for key='%s'" % str(k)), v)


def enable_default_encoder(self):
if has_bencode:
self.enable_bencode()
elif has_rencode:
self.enable_rencode()
else:
assert has_yaml, "no packet encoders available!"
self.enable_yaml()

def enable_encoder_from_caps(self, caps):
if use_rencode and caps.boolget("rencode"):
self.enable_rencode()
elif use_yaml and caps.boolget("yaml"):
self.enable_yaml()
elif use_bencode and caps.boolget("bencode", True):
self.enable_bencode()
else:
raise Exception("no matching packet encoder found!")

def enable_bencode(self):
assert has_bencode, "bencode cannot be enabled: the module failed to load!"
Expand All @@ -388,6 +422,37 @@ def enable_yaml(self):
self._encoder = self.yaml


def enable_default_compressor(self):
if use_zlib:
self.enable_zlib()
elif use_lz4:
self.enable_lz4()
elif use_bz2:
self.enable_yaml()
else:
self.enable_nocompress()

def enable_compressor_from_caps(self, caps):
if self.compression_level==0:
self.enable_nocompress()
return
if caps.boolget("lz4") and use_lz4 and self.compression_level==1:
self.enable_lz4()
elif caps.boolget("zlib") and use_zlib:
self.enable_zlib()
elif caps.boolget("bz2") and use_bz2:
self.enable_bz2()
#retry lz4 (without level check)
elif caps.boolget("lz4") and use_lz4:
self.enable_lz4()
else:
log.error("no matching compressor found!")
self.enable_nocompress()

def enable_nocompress(self):
log("nocompress()")
self._compress = nocompress

def enable_zlib(self):
log("enable_zlib()")
self._compress = zcompress
Expand All @@ -397,6 +462,12 @@ def enable_lz4(self):
log("enable_lz4()")
self._compress = lz4_compress

def enable_bz2(self):
log("enable_bz2()")
self._compress = bzcompress



def noencode(self, data):
#just send data as a string for clients that don't understand xpra packet format:
return ": ".join([str(x) for x in data])+"\n", Protocol.FLAGS_NOHEADER
Expand All @@ -408,7 +479,7 @@ def rencode(self, data):
return rencode_dumps(data), Protocol.FLAGS_RENCODE

def yaml(self, data):
return yaml_encode(data), Protocol.FLAGS_JSON
return yaml_encode(data), Protocol.FLAGS_YAML


def encode(self, packet_in):
Expand Down Expand Up @@ -486,6 +557,7 @@ def encode(self, packet_in):

def set_compression_level(self, level):
#this may be used next time encode() is called
assert level>=0 and level<=10, "invalid compression level: %s (must be between 0 and 10" % level
self.compression_level = level

def _io_thread_loop(self, name, callback):
Expand Down Expand Up @@ -697,14 +769,23 @@ def debug_str(s):
if compression_level>0:
try:
if compression_level & LZ4_FLAG:
assert has_lz4
ctype = "lz4"
assert has_lz4, "lz4 is not available"
assert use_lz4, "lz4 is not enabled"
data = LZ4_uncompress(data)
elif compression_level & BZ2_FLAG:
ctype = "bz2"
assert use_bz2, "bz2 is not enabled"
data = bz2.decompress(data)
else:
data = decompress(data)
ctype = "zlib"
assert use_zlib, "zlib is not enabled"
data = zlib.decompress(data)
except Exception, e:
log("%s packet decompression failed", ctype, exc_info=True)
if self.cipher_in:
return self._call_connection_lost("decompression failed (invalid encryption key?): %s" % e)
return self._call_connection_lost("decompression failed: %s" % e)
return self._call_connection_lost("%s packet decompression failed (invalid encryption key?): %s" % (ctype, e))
return self._call_connection_lost("%s packet decompression failed: %s" % (ctype, e))

if self.cipher_in and not (protocol_flags & Protocol.FLAGS_CIPHER):
return self._call_connection_lost("unencrypted packet dropped: %s" % repr_ellipsized(data))
Expand All @@ -724,7 +805,7 @@ def debug_str(s):
if protocol_flags & Protocol.FLAGS_RENCODE:
assert has_rencode, "we don't support rencode mode but the other end sent us a rencoded packet! not an xpra client?"
packet = list(rencode_loads(data))
elif protocol_flags & Protocol.FLAGS_JSON:
elif protocol_flags & Protocol.FLAGS_YAML:
assert has_yaml, "we don't support yaml mode but the other end sent us a yaml packet! not an xpra client?"
packet = list(yaml_decode(data))
else:
Expand Down
7 changes: 6 additions & 1 deletion src/xpra/server/server_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ def debug_usage():
if len(args)!=1:
return argn_err(1)
compression = args[0].lower()
opts = ("lz4", "zlib")
opts = ("lz4", "zlib", "bz2")
if compression=="lz4":
for cproto in protos:
cproto.enable_lz4()
Expand All @@ -796,6 +796,11 @@ def debug_usage():
cproto.enable_zlib()
forward_all_clients(["enable_zlib"])
return success()
elif compression=="bz2":
for cproto in protos:
cproto.enable_zlib()
forward_all_clients(["enable_bz2"])
return success()
return arg_err("must be one of: %s" % (", ".join(opts)))
elif command=="encoder":
if len(args)!=1:
Expand Down
14 changes: 3 additions & 11 deletions src/xpra/server/server_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@
from xpra.platform import set_application_name
from xpra.os_util import load_binary_file, get_machine_id, get_user_uuid, SIGNAMES
from xpra.version_util import version_compat_check, get_version_info, get_platform_info, get_host_info, local_version
from xpra.net.protocol import Protocol, use_lz4, use_rencode, use_yaml, get_network_caps
from xpra.net.protocol import Protocol, get_network_caps
from xpra.net.crypto import new_cipher_caps
from xpra.server.background_worker import stop_worker
from xpra.daemon_thread import make_daemon_thread
from xpra.server.proxy import XpraProxy
from xpra.util import typedict, updict, repr_ellipsized



MAX_CONCURRENT_CONNECTIONS = 20


Expand Down Expand Up @@ -410,15 +409,8 @@ def _process_hello(self, proto, packet):
capabilities = packet[1]
c = typedict(capabilities)
proto.set_compression_level(c.intget("compression_level", self.compression_level))
if use_rencode and c.boolget("rencode"):
proto.enable_rencode()
elif use_yaml and c.boolget("yaml"):
proto.enable_yaml()
else:
proto.enable_bencode()

if c.boolget("lz4") and use_lz4 and self.compression_level==1:
proto.enable_lz4()
proto.enable_encoder_from_caps(c)
proto.enable_compressor_from_caps(c)

log("process_hello: capabilities=%s", capabilities)
if c.boolget("version_request"):
Expand Down

0 comments on commit cd7d37b

Please sign in to comment.