From 8da11191ed145a532d56ec46ac79da068fdc05f1 Mon Sep 17 00:00:00 2001 From: Lewis Crawford Date: Wed, 24 Feb 2021 17:51:54 +0000 Subject: [PATCH] added Ignore errors option to keep processing records --- s3select | 43 ++++++++++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/s3select b/s3select index ed6ee17..5ddd599 100755 --- a/s3select +++ b/s3select @@ -68,8 +68,9 @@ class ScanOneKey(threading.Thread): def __init__( self, files_queue, events_queue, s3, output_fields=None, count=None, field_delimiter=None, record_delimiter=None, where=None, limit=None, - max_retries=None): + max_retries=None, ignore_errors=None): threading.Thread.__init__(self) + self.ignore_errors = ignore_errors self.max_retries = max_retries self.limit = limit self.where = where @@ -141,10 +142,14 @@ class ScanOneKey(threading.Thread): if self.limit > 0: query += " LIMIT " + str(self.limit) - if '.gz' == s3_key.lower()[-3:]: + if s3_key.lower().endswith(".gz"): input_ser['CompressionType'] = 'GZIP' + if s3_key.lower().endswith(".gz.parquet") or s3_key.lower().endswith(".parquet"): + input_ser = {'Parquet': {}} + current_try = 0 + ignoreRec = False while True: try: response = self.s3.select_object_content( @@ -157,15 +162,26 @@ class ScanOneKey(threading.Thread): ) break except Exception as e: - self.events_queue.put(S3SelectEventResult( - exception=e, - max_retries_reached=current_try >= self.max_retries, - s3_path=s3_path)) - time.sleep(0.4) - current_try = current_try + 1 + if self.ignore_errors: + print("Ignore: " + s3_key + " due to " + str(e)) + ignoreRec = True + break + else: + self.events_queue.put(S3SelectEventResult( + exception=e, + max_retries_reached=current_try >= self.max_retries, + s3_path=s3_path)) + time.sleep(0.4) + current_try = current_try + 1 + + payload_from_previous_event = "" end_event_received = False + if ignoreRec: + response = {} + response['Payload'] = {} + end_event_received = True for event in response['Payload']: if max_result_limit_reached: self.handled = True @@ -239,7 +255,7 @@ def refresh_status_bar( def select(prefixes=None, verbose=False, profile=None, thread_count=150, count=False, limit=0, output_fields=None, field_delimiter=None, record_delimiter=None, where=None, max_retries=20, - with_filename=False): + with_filename=False, ignore_errors=False): if prefixes is None: raise Exception("S3 path prefix must be defined") @@ -267,7 +283,7 @@ def select(prefixes=None, verbose=False, profile=None, thread_count=150, files_queue, events_queue, s3, count=count, limit=limit, output_fields=output_fields, field_delimiter=field_delimiter, record_delimiter=record_delimiter, where=where, - max_retries=max_retries) + max_retries=max_retries, ignore_errors=ignore_errors) # daemon threads allow for fast exit if max number of records has been # specified @@ -430,6 +446,11 @@ if __name__ == "__main__": help="Maximum number of retries per queried S3 object in case API " "request fails" ) - + parser.add_argument( + "-I", + "--ignore_errors", + action='store_true', + help="Output error to stdout but keep processing records. This will Not Retry." + ) args = parser.parse_args() select(**vars(args))