Skip to content
This repository has been archived by the owner on Nov 29, 2022. It is now read-only.

Commit

Permalink
Fixes #97: Detect complete ANSI control strings if they fall at the e…
Browse files Browse the repository at this point in the history
…nd of the data
  • Loading branch information
hSaria committed Jan 14, 2022
1 parent c15935a commit 0cb5be6
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 23 deletions.
12 changes: 7 additions & 5 deletions chromaterm/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@
br'\x1b\x5b[\x30-\x3f]*[\x20-\x2f]*[\x40-\x6c\x6e-\x7e]|'
br'\x1b[\x50\x58\x5d\x5e\x5f][^\x07\x1b]*(?:\x07|\x1b\x5c)?)')

# The start of a control string
SPLIT_CONTROL_STRINGS = (b'\x1b\x50', b'\x1b\x58', b'\x1b\x5d', b'\x1b\x5e',
b'\x1b\x5f')
# Control strings that have arbitrary lengths
ANSI_CONTROL_STRINGS_START = (b'\x1b\x50', b'\x1b\x58', b'\x1b\x5d',
b'\x1b\x5e', b'\x1b\x5f')
ANSI_CONTROL_STRINGS_END = (b'\x07', b'\x1b\x5c')


def args_init(args=None):
Expand Down Expand Up @@ -319,8 +320,9 @@ def process_input(config, data_fd, forward_fd=None, max_wait=None):

data, separator = chunks[-1]

# Separator is an incomplete control strings; wait for the rest
if data_read and separator.startswith(SPLIT_CONTROL_STRINGS):
# Separator is an incomplete control string; wait for the rest
if data_read and separator.startswith(ANSI_CONTROL_STRINGS_START) \
and not separator.endswith(ANSI_CONTROL_STRINGS_END):
buffer = data + separator
# A single character indicates keyboard typing; don't highlight
# Account for backspaces added by some shells, like zsh
Expand Down
39 changes: 21 additions & 18 deletions tests/test__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,24 +503,27 @@ def patched_read_ready(*_1, timeout=None):
worker.start()

try:
for code in ['\x50', '\x58', '\x5d', '\x5e', '\x5f']:
# Data (printed), followed by the first part (not printed)
event.clear()
os.write(pipe_w, b'hello\n\x1b' + code.encode() + b'p1')
event.wait()
assert capsys.readouterr().out == 'hello\n'

# Second part (not printed)
event.clear()
os.write(pipe_w, 'p2'.encode())
event.wait()
assert capsys.readouterr().out == ''

# Final part (printed) and some data (printed)
event.clear()
os.write(pipe_w, 'p3\x07world'.encode())
event.wait()
assert capsys.readouterr().out == '\x1b' + code + 'p1p2p3\x07world'
for end in ['\x07', '\x1b\x5c']:
for code in ['\x50', '\x58', '\x5d', '\x5e', '\x5f']:
start = '\x1b' + code

# Data (printed), followed by the first part (not printed)
event.clear()
os.write(pipe_w, b'hello\n' + start.encode() + b'p1')
event.wait()
assert capsys.readouterr().out == 'hello\n'

# Second part (not printed)
event.clear()
os.write(pipe_w, b'p2')
event.wait()
assert capsys.readouterr().out == ''

# Final part (printed)
event.clear()
os.write(pipe_w, b'p3' + end.encode())
event.wait()
assert capsys.readouterr().out == start + 'p1p2p3' + end
finally:
os.close(pipe_w)
worker.join()
Expand Down

0 comments on commit 0cb5be6

Please sign in to comment.