-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy paths3_json_to_parquet_job.py
72 lines (60 loc) · 2.17 KB
/
s3_json_to_parquet_job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import sys
from typing import Any
from awsglue import context, utils
from pyspark import context as spark_context
class S3JSONToParquetJob:
"""
Transforms a given JSON file into a Parquet table.
The source file and target table reside in S3.
Job parameters:
source-bucket-name: The source bucket name.
source-file-path: The source file path.
target-bucket-name: The target bucket name.
target-table-path: The target table path.
"""
def __init__(self):
args = utils.getResolvedOptions(
sys.argv,
[
"source-bucket-name",
"source-file-path",
"target-bucket-name",
"target-table-path",
],
)
self._source_bucket_name = args["source_bucket_name"]
self._source_file_path = args["source_file_path"]
self._target_bucket_name = args["target_bucket_name"]
self._target_table_path = args["target_table_path"]
self._glue_context = context.GlueContext(
spark_context.SparkContext.getOrCreate()
)
def run(self) -> None:
dynamic_frame = self._read_json(
self._source_bucket_name, self._source_file_path
)
self._write_parquet(
dynamic_frame, self._target_bucket_name, self._target_table_path
)
def _read_json(self, source_bucket_name: str, source_file_path: str) -> Any:
return self._glue_context.create_dynamic_frame.from_options(
connection_type="s3",
connection_options={
"paths": [f"s3://{source_bucket_name}/{source_file_path}"],
"recurse": True,
},
format="json",
)
def _write_parquet(
self, dynamic_frame: Any, target_bucket_name: str, target_table_path: str
) -> Any:
return self._glue_context.write_dynamic_frame.from_options(
frame=dynamic_frame,
connection_type="s3",
connection_options={
"path": f"s3://{target_bucket_name}/{target_table_path}"
},
format="parquet",
)
if __name__ == "__main__":
S3JSONToParquetJob().run()