Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve quoting options; Restore API-default quoting; Add bz2 support #2

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 52 additions & 15 deletions s3select
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,18 @@ class S3ListThread(threading.Thread):
class ScanOneKey(threading.Thread):
def __init__(
self, files_queue, events_queue, s3, output_fields=None, count=None,
field_delimiter=None, record_delimiter=None, where=None, limit=None,
max_retries=None):
field_delimiter=None, record_delimiter=None, quote_character=None,
quote_escape_character=None, allow_quoted_record_delimiter=False,
where=None, limit=None, max_retries=None):
threading.Thread.__init__(self)
self.max_retries = max_retries
self.limit = limit
self.where = where
self.field_delimiter = field_delimiter
self.record_delimiter = record_delimiter
self.quote_character = quote_character
self.quote_escape_character = quote_escape_character
self.allow_quoted_record_delimiter = allow_quoted_record_delimiter
self.count = count
self.output_fields = output_fields
self.files_queue = files_queue
Expand All @@ -101,21 +105,31 @@ class ScanOneKey(threading.Thread):
return
input_ser = {'JSON': {"Type": "Document"}}
output_ser = {'JSON': {}}
if self.field_delimiter is not None or \
self.record_delimiter is not None:

if (
self.field_delimiter is not None or
self.record_delimiter is not None or
self.quote_character is not None or
self.quote_escape_character is not None or
self.allow_quoted_record_delimiter
):
if self.field_delimiter is None:
self.field_delimiter = "\n"
self.field_delimiter = ","
if self.record_delimiter is None:
self.record_delimiter = ","
self.record_delimiter = "\n"
if self.quote_character is None:
self.quote_character = "\""
if self.quote_escape_character is None:
self.quote_escape_character = "\\"

input_ser = {
'CSV':
{
"FieldDelimiter": self.field_delimiter,
"FileHeaderInfo": "NONE",
"RecordDelimiter": self.record_delimiter,
"QuoteCharacter": ''
"QuoteCharacter": self.quote_character,
"QuoteEscapeCharacter": self.quote_escape_character,
"AllowQuotedRecordDelimiter": self.allow_quoted_record_delimiter,
}
}
output_ser = {'CSV': {"FieldDelimiter": self.field_delimiter}}
Expand Down Expand Up @@ -143,8 +157,9 @@ class ScanOneKey(threading.Thread):

if s3_key.lower().endswith(".gz"):
input_ser['CompressionType'] = 'GZIP'

if s3_key.lower().endswith(".gz.parquet") or s3_key.lower().endswith(".parquet"):
elif s3_key.lower().endswith(".bz2"):
input_ser['CompressionType'] = 'BZIP2'
elif s3_key.lower().endswith(".gz.parquet") or s3_key.lower().endswith(".parquet"):
input_ser = {'Parquet': {}}

current_try = 0
Expand Down Expand Up @@ -241,7 +256,8 @@ def refresh_status_bar(

def select(prefixes=None, verbose=False, profile=None, thread_count=150,
count=False, limit=0, output_fields=None, field_delimiter=None,
record_delimiter=None, where=None, max_retries=20,
record_delimiter=None, quote_character=None, quote_escape_character=None,
allow_quoted_record_delimiter=False, where=None, max_retries=20,
with_filename=False):
if prefixes is None:
raise Exception("S3 path prefix must be defined")
Expand Down Expand Up @@ -269,8 +285,10 @@ def select(prefixes=None, verbose=False, profile=None, thread_count=150,
thread = ScanOneKey(
files_queue, events_queue, s3, count=count, limit=limit,
output_fields=output_fields, field_delimiter=field_delimiter,
record_delimiter=record_delimiter, where=where,
max_retries=max_retries)
record_delimiter=record_delimiter, quote_character=quote_character,
quote_escape_character=quote_escape_character,
allow_quoted_record_delimiter=allow_quoted_record_delimiter,
where=where, max_retries=max_retries)

# daemon threads allow for fast exit if max number of records has been
# specified
Expand Down Expand Up @@ -373,13 +391,32 @@ if __name__ == "__main__":
"-d",
"--field_delimiter",
help="Field delimiter to be used for CSV files. If specified CSV "
"parsing will be used. By default we expect JSON input"
"parsing will be used. By default we expect JSON input."
)
parser.add_argument(
"-D",
"--record_delimiter",
help="Record delimiter to be used for CSV files. If specified CSV "
"parsing will be used. By default we expect JSON input"
"parsing will be used. By default we expect JSON input."
)
parser.add_argument(
"-q",
"--quote_character",
help="Quote character to be used for CSV files. If specified CSV "
"parsing will be used. By default we expect JSON input."
)
parser.add_argument(
"-Q",
"--quote_escape_character",
help="Quote escape character to be used for CSV files. If specified "
"CSV parsing will be used. By default we expect JSON input."
)
parser.add_argument(
"-A",
"--allow_quoted_record_delimiter",
action='store_true',
help="Allow record delimiter to be quoted in CSV files. If specified "
"CSV parsing will be used. By default we expect JSON input."
)
parser.add_argument(
"-l",
Expand Down