-
Notifications
You must be signed in to change notification settings - Fork 0
/
pipeline.py
46 lines (35 loc) · 1.19 KB
/
pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from argparse import ArgumentParser
from pathlib import Path
from tempfile import TemporaryDirectory
from zipfile import ZipFile, BadZipfile
# Not yet functional
def load_archive(path):
archive_path = Path('data/archives/PA_SURE_2022-10-27.zip')
output_path = Path('data/output')
try:
archive = ZipFile(archive_path)
except BadZipfile:
print('Loading file failed.')
# Inspect file and report out
print(archive.printdir())
# Create temporary directory and process archive
with TemporaryDirectory() as tmpdirname:
print('created temporary directory', tmpdirname)
archive.extractall(tmpdirname)
tmpdir = Path(tmpdirname)
for f in tmpdir.iterdir():
print(f)
# Not yet functional
def load_directory(path):
directory = Path(args.path)
for f in directory.iterdir():
print(f)
parser = ArgumentParser()
subparsers = parser.add_subparsers()
archive_parser = subparsers.add_parser('archive')
directory_parser = subparsers.add_parser('directory')
directory_parser.add_argument('path')
directory_parser.set_defaults(func=load_directory)
if __name__ == '__main__':
args = parser.parse_args()
args.func(args)