Skip to content

Commit

Permalink
Include s3_path when printing an exception
Browse files Browse the repository at this point in the history
  • Loading branch information
Marko Bastovanovic committed Feb 12, 2019
1 parent f678937 commit d24d5c9
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions s3select
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import time
_sentinel = object()
max_result_limit_reached = False
total_files = 0
clear_line = '\r\033[K'


class S3ListThread(threading.Thread):
Expand Down Expand Up @@ -142,7 +143,8 @@ class ScanOneKey(threading.Thread):
except Exception as e:
self.events_queue.put(S3SelectEventResult(
exception=e,
max_retries_reached=current_try >= self.max_retries))
max_retries_reached=current_try >= self.max_retries,
s3_path=s3_path))
time.sleep(0.4)
current_try = current_try + 1

Expand Down Expand Up @@ -181,7 +183,8 @@ class ScanOneKey(threading.Thread):
exception=Exception(
"End event not received data is corrupted. Please "
"retry"),
max_retries_reached=True))
max_retries_reached=True,
s3_path=s3_path))


class S3SelectEventResult:
Expand Down Expand Up @@ -211,9 +214,8 @@ def format_bytes(bytes_count):
def refresh_status_bar(
files_processed, records_matched, bytes_scanned, verbose):
if verbose:
print('\r\033[KFiles processed: {}/{} Records matched: {} '
'Bytes scanned: {}'
.format(files_processed, total_files, records_matched,
print('{}Files processed: {}/{} Records matched: {} Bytes scanned: {}'
.format(clear_line, files_processed, total_files, records_matched,
format_bytes(bytes_scanned)),
file=sys.stderr, end="")

Expand Down Expand Up @@ -272,12 +274,16 @@ def select(prefixes=None, verbose=False, profile=None, thread_count=150,

event = events_queue.get()

matched_s3_path = event.s3_path

if event.exception is not None:
if event.max_retries_reached:
raise event.exception
elif verbose:
print('\r\033[K' + "Exception caught (will retry): " +
str(event.exception), file=sys.stderr)
print("{}Exception caught while processing {} (will retry). "
"Exception: {}"
.format(clear_line, event.s3_path, str(event.exception)),
file=sys.stderr)

bytes_returned = bytes_returned + event.bytes_returned
bytes_scanned = bytes_scanned + event.bytes_scanned
Expand All @@ -286,15 +292,13 @@ def select(prefixes=None, verbose=False, profile=None, thread_count=150,
refresh_status_bar(
files_processed, records_matched, bytes_scanned, verbose)

matched_s3_path = event.s3_path

for record in event.records:
if count:
records_matched = records_matched + int(record)
else:
records_matched = records_matched + 1
if verbose:
print('\r\033[K', file=sys.stderr, end="")
print(clear_line, file=sys.stderr, end="")
if with_filename:
print(matched_s3_path + "\t" + record)
else:
Expand All @@ -307,7 +311,7 @@ def select(prefixes=None, verbose=False, profile=None, thread_count=150,

if count:
if verbose:
print('\r\033[K', file=sys.stderr, end="")
print(clear_line, file=sys.stderr, end="")

print(records_matched)

Expand Down

0 comments on commit d24d5c9

Please sign in to comment.