-
Notifications
You must be signed in to change notification settings - Fork 14.4k
/
test_compression.py
87 lines (75 loc) · 3.16 KB
/
test_compression.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import bz2
import errno
import filecmp
import gzip
import shutil
import tempfile
import pytest
from airflow.utils import compression
class TestCompression:
@pytest.fixture(autouse=True)
def setup_attrs(self):
self.file_names = {}
header = b"Sno\tSome,Text \n"
line1 = b"1\tAirflow Test\n"
line2 = b"2\tCompressionUtil\n"
self.tmp_dir = tempfile.mkdtemp(prefix="test_utils_compression_")
# create sample txt, gz and bz2 files
with tempfile.NamedTemporaryFile(mode="wb+", dir=self.tmp_dir, delete=False) as f_txt:
self._set_fn(f_txt.name, ".txt")
f_txt.writelines([header, line1, line2])
fn_gz = self._get_fn(".txt") + ".gz"
with gzip.GzipFile(filename=fn_gz, mode="wb") as f_gz:
self._set_fn(fn_gz, ".gz")
f_gz.writelines([header, line1, line2])
fn_bz2 = self._get_fn(".txt") + ".bz2"
with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2:
self._set_fn(fn_bz2, ".bz2")
f_bz2.writelines([header, line1, line2])
yield
try:
shutil.rmtree(self.tmp_dir)
except OSError as e:
# ENOENT - no such file or directory
if e.errno != errno.ENOENT:
raise e
# Helper method to create a dictionary of file names and
# file extension
def _set_fn(self, fn, ext):
self.file_names[ext] = fn
# Helper method to fetch a file of a
# certain extension
def _get_fn(self, ext):
return self.file_names[ext]
def test_uncompress_file(self):
# Testing txt file type
with pytest.raises(NotImplementedError, match="^Received .txt format. Only gz and bz2.*"):
compression.uncompress_file(
**{"input_file_name": None, "file_extension": ".txt", "dest_dir": None},
)
# Testing gz file type
fn_txt = self._get_fn(".txt")
fn_gz = self._get_fn(".gz")
txt_gz = compression.uncompress_file(fn_gz, ".gz", self.tmp_dir)
assert filecmp.cmp(txt_gz, fn_txt, shallow=False), "Uncompressed file doest match original"
# Testing bz2 file type
fn_bz2 = self._get_fn(".bz2")
txt_bz2 = compression.uncompress_file(fn_bz2, ".bz2", self.tmp_dir)
assert filecmp.cmp(txt_bz2, fn_txt, shallow=False), "Uncompressed file doest match original"