Skip to content

Commit

Permalink
Add record delimiter option
Browse files Browse the repository at this point in the history
  • Loading branch information
Marko Bastovanovic committed Feb 12, 2019
1 parent d24d5c9 commit 728788a
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 21 deletions.
17 changes: 11 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ s3select uses the same authentication and endpoint configuration as [aws-cli](ht
First get some help:
<pre>
$ s3select -h
usage: s3select [-h] [-w WHERE] [-d DELIMITER] [-l LIMIT] [-v] [-c] [-H]
[-o OUTPUT_FIELDS] [-t THREAD_COUNT] [--profile PROFILE]
[-M MAX_RETRIES]
usage: s3select [-h] [-w WHERE] [-d FIELD_DELIMITER] [-D RECORD_DELIMITER]
[-l LIMIT] [-v] [-c] [-H] [-o OUTPUT_FIELDS] [-t THREAD_COUNT]
[--profile PROFILE] [-M MAX_RETRIES]
prefixes [prefixes ...]

s3select makes s3 select querying API much easier and faster
Expand All @@ -61,9 +61,14 @@ optional arguments:
-h, --help show this help message and exit
-w WHERE, --where WHERE
WHERE part of the SQL query
-d DELIMITER, --delimiter DELIMITER
Delimiter to be used for CSV files. If specified CSV
parsing will be used. By default we expect JSON input
-d FIELD_DELIMITER, --field_delimiter FIELD_DELIMITER
Field delimiter to be used for CSV files. If specified
CSV parsing will be used. By default we expect JSON
input
-D RECORD_DELIMITER, --record_delimiter RECORD_DELIMITER
Record delimiter to be used for CSV files. If
specified CSV parsing will be used. By default we
expect JSON input
-l LIMIT, --limit LIMIT
Maximum number of results to return
-v, --verbose Be more verbose
Expand Down
53 changes: 38 additions & 15 deletions s3select
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@ class S3ListThread(threading.Thread):
class ScanOneKey(threading.Thread):
def __init__(
self, files_queue, events_queue, s3, output_fields=None, count=None,
delimiter=None, where=None, limit=None, max_retries=None):
field_delimiter=None, record_delimiter=None, where=None, limit=None,
max_retries=None):
threading.Thread.__init__(self)
self.max_retries = max_retries
self.limit = limit
self.where = where
self.delimiter = delimiter
self.field_delimiter = field_delimiter
self.record_delimiter = record_delimiter
self.count = count
self.output_fields = output_fields
self.files_queue = files_queue
Expand All @@ -99,10 +101,24 @@ class ScanOneKey(threading.Thread):
return
input_ser = {'JSON': {"Type": "Document"}}
output_ser = {'JSON': {}}
if self.delimiter is not None:
input_ser = {'CSV': {"FieldDelimiter": self.delimiter,
"FileHeaderInfo": "NONE"}}
output_ser = {'CSV': {"FieldDelimiter": self.delimiter}}
if self.field_delimiter is not None or \
self.record_delimiter is not None:

if self.field_delimiter is None:
self.field_delimiter = "\n"
if self.record_delimiter is None:
self.record_delimiter = ","

input_ser = {
'CSV':
{
"FieldDelimiter": self.field_delimiter,
"FileHeaderInfo": "NONE",
"RecordDelimiter": self.record_delimiter,
"QuoteCharacter": ''
}
}
output_ser = {'CSV': {"FieldDelimiter": self.field_delimiter}}

if self.count:
# no need to parse JSON if we are only expecting the count of
Expand Down Expand Up @@ -221,15 +237,15 @@ def refresh_status_bar(


def select(prefixes=None, verbose=False, profile=None, thread_count=150,
count=False, limit=0, output_fields=None, delimiter=None,
where=None, max_retries=20, with_filename=False):

count=False, limit=0, output_fields=None, field_delimiter=None,
record_delimiter=None, where=None, max_retries=20,
with_filename=False):
if prefixes is None:
raise Exception("S3 path prefix must be defined")

# shortcut as specifying \t from command line is bit tricky with all escapes
if delimiter is not None and "\\t" in delimiter:
delimiter = '\t'
if field_delimiter is not None and "\\t" in field_delimiter:
field_delimiter = '\t'

if profile is not None:
boto3.setup_default_session(profile_name=profile)
Expand All @@ -249,7 +265,8 @@ def select(prefixes=None, verbose=False, profile=None, thread_count=150,
else:
thread = ScanOneKey(
files_queue, events_queue, s3, count=count, limit=limit,
output_fields=output_fields, delimiter=delimiter, where=where,
output_fields=output_fields, field_delimiter=field_delimiter,
record_delimiter=record_delimiter, where=where,
max_retries=max_retries)

# daemon threads allow for fast exit if max number of records has been
Expand Down Expand Up @@ -351,9 +368,15 @@ if __name__ == "__main__":
)
parser.add_argument(
"-d",
"--delimiter",
help="Delimiter to be used for CSV files. If specified CSV parsing will"
" be used. By default we expect JSON input"
"--field_delimiter",
help="Field delimiter to be used for CSV files. If specified CSV "
"parsing will be used. By default we expect JSON input"
)
parser.add_argument(
"-D",
"--record_delimiter",
help="Record delimiter to be used for CSV files. If specified CSV "
"parsing will be used. By default we expect JSON input"
)
parser.add_argument(
"-l",
Expand Down

0 comments on commit 728788a

Please sign in to comment.