diff --git a/chromaterm/__main__.py b/chromaterm/__main__.py index c3a5a260..0218628a 100644 --- a/chromaterm/__main__.py +++ b/chromaterm/__main__.py @@ -29,13 +29,22 @@ # Maximum chuck size per read READ_SIZE = 4096 # 4 KiB -# Sequences upon which ct will split during processing. This includes new lines, -# vertical spaces, form feeds, private control functions (ECMA-035) and C1 set -# (ECMA-048), SCS (G0 through G3 sets), CSI (excluding SGR), and OSC. -SPLIT_RE = re.compile(br'(\r\n?|\n|\v|\f|\x1b[\x30-\x5a\x5c\x5e\x5f]|' - br'\x1b[\x28-\x2b\x2d-\x2f][\x20-\x7e]|' - br'\x1b\x5b[\x30-\x3f]*[\x20-\x2f]*[\x40-\x6c\x6e-\x7e]|' - br'\x1b\x5d[^\x07\x1b]*(?:\x07|\x1b\x5c)?)') +# Sequences upon which ct will split during processing (ECMA 035 and 048): +# * new lines, vertical spaces, form feeds; +# * private control functions, C1 set (excluding control strings); +# * independent control functions (\e#), SCS (G0 through G3 sets); +# * CSI (excluding SGR); and +# * control strings (DSC, SOS, OSC, PM, APC). +SPLIT_RE = re.compile( + br'(\r\n?|[\n\v\f]|' + br'\x1b[\x30-\x4f\x51-\x57\x59-\x5a\x5c\x60-\x7e]|' + br'\x1b[\x23\x28-\x2b\x2d-\x2f][\x20-\x7e]|' + br'\x1b\x5b[\x30-\x3f]*[\x20-\x2f]*[\x40-\x6c\x6e-\x7e]|' + br'\x1b[\x50\x58\x5d\x5e\x5f][^\x07\x1b]*(?:\x07|\x1b\x5c)?)') + +# The start of a control string +SPLIT_CONTROL_STRINGS = (b'\x1b\x50', b'\x1b\x58', b'\x1b\x5d', b'\x1b\x5e', + b'\x1b\x5f') def args_init(args=None): @@ -310,8 +319,8 @@ def process_input(config, data_fd, forward_fd=None, max_wait=None): data, separator = chunks[-1] - # Separator is an incomplete OSC; wait for a bit - if data_read and separator.startswith(b'\x1b\x5d'): + # Separator is an incomplete control strings; wait for the rest + if data_read and separator.startswith(SPLIT_CONTROL_STRINGS): buffer = data + separator # Zero or one characters indicates keyboard typing; don't highlight # Account for the backspaces added by some shells, like zsh diff --git a/tests/test__main__.py b/tests/test__main__.py index 93144034..4d003214 100644 --- a/tests/test__main__.py +++ b/tests/test__main__.py @@ -482,8 +482,8 @@ def test_process_input_multiline(capsys): assert capsys.readouterr().out == '\nt \x1b[1mhello world\x1b[22m t\n' * 2 -def test_process_input_partial_osc(capsys, monkeypatch): - '''An incomplete OSC should not be printed.''' +def test_process_input_partial_control_string(capsys, monkeypatch): + '''An incomplete control string should not be printed.''' pipe_r, pipe_w = os.pipe() config = chromaterm.__main__.Config() event = threading.Event() @@ -503,23 +503,24 @@ def patched_read_ready(*_1, timeout=None): worker.start() try: - # Data (printed), followed by the first part of the OSC (not printed) - event.clear() - os.write(pipe_w, b'hello\n\x1b\x5dp1') - event.wait() - assert capsys.readouterr().out == 'hello\n' - - # Second part of the OSC (not printed) - event.clear() - os.write(pipe_w, 'p2'.encode()) - event.wait() - assert capsys.readouterr().out == '' - - # Final part of the OSC (printed) and some data (printed) - event.clear() - os.write(pipe_w, 'p3\x07world'.encode()) - event.wait() - assert capsys.readouterr().out == '\x1b\x5dp1p2p3\x07world' + for code in ['\x50', '\x58', '\x5d', '\x5e', '\x5f']: + # Data (printed), followed by the first part (not printed) + event.clear() + os.write(pipe_w, b'hello\n\x1b' + code.encode('utf-8') + b'p1') + event.wait() + assert capsys.readouterr().out == 'hello\n' + + # Second part (not printed) + event.clear() + os.write(pipe_w, 'p2'.encode()) + event.wait() + assert capsys.readouterr().out == '' + + # Final part (printed) and some data (printed) + event.clear() + os.write(pipe_w, 'p3\x07world'.encode()) + event.wait() + assert capsys.readouterr().out == '\x1b' + code + 'p1p2p3\x07world' finally: os.close(pipe_w) worker.join() @@ -663,23 +664,30 @@ def test_split_buffer_private_control_functions(): def test_split_buffer_c1_set(): - '''Split based on the ECMA-048 C1 set, excluding CSI and OSC.''' - c1_except_csi_and_osc = itertools.chain( - range(int('40', 16), int('5b', 16)), - [ - int('5c', 16), - int('5e', 16), - int('5f', 16), - ], - ) - - for char_id in c1_except_csi_and_osc: + '''Split based on the ECMA-048 C1 set, excluding CSI and control strings.''' + c1_set = itertools.chain(range(int('40', 16), int('50', 16)), + range(int('51', 16), int('58', 16)), + range(int('59', 16), int('5b', 16)), + (int('5c', 16), )) + + for char_id in c1_set: data = b'Hello \x1b%c World' % char_id expected = ((b'Hello ', b'\x1b%c' % char_id), (b' World', b'')) assert chromaterm.__main__.split_buffer(data) == expected +def test_split_buffer_independent_control_functions(): + '''Split based on the ECMA-048 independent control functions.''' + for escape in (b'\x1b', b'\x1b\x23'): + for char_id in range(int('60', 16), int('7f', 16)): + code = escape + b'%c' % char_id + data = b'Hello ' + code + b' World' + expected = ((b'Hello ', code), (b' World', b'')) + + assert chromaterm.__main__.split_buffer(data) == expected + + def test_split_buffer_csi_exclude_sgr(): '''Fail to split based on the ECMA-048 C1 CSI SGR. Added some intermediate characters to prevent matching other CSI codes; strictly checking empty SGR.'''