diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index fb979d9..9a8fa29 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -28,12 +28,12 @@ jobs: python -m pip install --upgrade pip pip install -r requirements.txt pip install -e . - -# - name: Run flake8 -# run: | -# pip install flake8 -# # stop the build if there are flake8 errors -# flake8 . --count --show-source --statistics + + - name: Run flake8 + run: | + pip install flake8 + # stop the build if there are flake8 errors + flake8 . --count --show-source --statistics - name: Run unit tests run: | @@ -47,21 +47,23 @@ jobs: python -m coverage combine python -m coverage report -m --skip-covered python -m coverage xml - -# # Fetch base branch for comparison (e.g., main) -# - name: Fetch base branch -# run: git fetch origin main - -# # Compare coverage with the base branch -# - name: Compare coverage -# run: | -# git checkout main -# python -m coverage run -p -m pytest src/jp2_remediator/tests/unit -# python -m coverage xml -o coverage-base.xml -# git checkout - -# python -m diff-cover --compare-branch=main coverage.xml - -# # Fail if coverage decreases -# - name: Fail if coverage decreases -# run: python -m diff-cover --compare-branch=main coverage.xml --fail-under=100 + + # Fetch base branch for comparison (e.g., main) + - name: Fetch base branch + run: git fetch origin main + + # Compare coverage with the base branch + - name: Compare coverage + run: | + pip install diff-cover + git checkout main + python -m coverage run -p -m pytest src/jp2_remediator/tests/unit + python -m coverage xml -o coverage-base.xml + git checkout - + python diff-cover --compare-branch=main coverage.xml + + # Fail if coverage decreases + - name: Fail if coverage decreases + run: | + python diff-cover --compare-branch=main coverage.xml --fail-under=100 diff --git a/src/jp2_remediator/box_reader.py b/src/jp2_remediator/box_reader.py index 53b22c7..63f252e 100644 --- a/src/jp2_remediator/box_reader.py +++ b/src/jp2_remediator/box_reader.py @@ -1,11 +1,14 @@ -import sys +# import sys import os import argparse import boto3 import datetime -from jpylyzer import jpylyzer + +# from jpylyzer import jpylyzer from jpylyzer import boxvalidator -from jpylyzer import byteconv + +# from jpylyzer import byteconv + class BoxReader: def __init__(self, file_path): @@ -17,7 +20,7 @@ def __init__(self, file_path): def read_file(self, file_path): """Reads the file content from the given path.""" try: - with open(file_path, 'rb') as file: + with open(file_path, "rb") as file: return file.read() except IOError as e: print(f"Error reading file {file_path}: {e}") @@ -25,8 +28,14 @@ def read_file(self, file_path): def initialize_validator(self): """Initializes the jpylyzer BoxValidator for JP2 file validation.""" - options = {'validationFormat': 'jp2', 'verboseFlag': True, 'nullxmlFlag': False, 'packetmarkersFlag': False} - self.validator = boxvalidator.BoxValidator(options, 'JP2', self.file_contents) + options = { + "validationFormat": "jp2", + "verboseFlag": True, + "nullxmlFlag": False, + "packetmarkersFlag": False, + } + self.validator = boxvalidator.BoxValidator( + options, "JP2", self.file_contents) self.validator.validate() return self.validator @@ -35,14 +44,18 @@ def find_box_position(self, box_hex): return self.file_contents.find(box_hex) def check_boxes(self): - """Checks for presence of 'jp2h' and 'colr' boxes in the file contents.""" - jp2h_position = self.find_box_position(b'\x6a\x70\x32\x68') # search hex for 'jp2h' + """Checks for presence of 'jp2h' and 'colr' boxes in file contents.""" + jp2h_position = self.find_box_position( + b"\x6a\x70\x32\x68" + ) # search hex for 'jp2h' if jp2h_position != -1: print(f"'jp2h' found at byte position: {jp2h_position}") else: print("'jp2h' not found in the file.") - colr_position = self.find_box_position(b'\x63\x6f\x6c\x72') # search hex for 'colr' + colr_position = self.find_box_position( + b"\x63\x6f\x6c\x72" + ) # search hex for 'colr' if colr_position != -1: print(f"'colr' found at byte position: {colr_position}") else: @@ -56,18 +69,48 @@ def process_colr_box(self, colr_position): """Processes the 'colr' box to determine header offset position.""" if colr_position != -1: print(f"'colr' found at byte position: {colr_position}") - meth_byte_position = colr_position + 4 # ISO/IEC 15444-1:2019(E) Figure I.10 colr specification box, byte position of METH value after 'colr' + meth_byte_position = ( + colr_position + 4 + ) + """ ISO/IEC 15444-1:2019(E) Figure I.10 colr specification box + byte position of METH value after 'colr' """ meth_value = self.file_contents[meth_byte_position] - print(f"'meth' value: {meth_value} at byte position: {meth_byte_position}") - + print( + f"""'meth' value: {meth_value} at byte position: { + meth_byte_position + }""" + ) if meth_value == 1: - header_offset_position = meth_byte_position + 7 # ISO/IEC 15444-1:2019(E) Table I.11 colr specification box, if meth is 1 then color profile starts at byte position 7 after 'colr' - print(f"'meth' is 1, setting header_offset_position to: {header_offset_position}") + header_offset_position = ( + meth_byte_position + 7 + ) + """ ISO/IEC 15444-1:2019(E) Table I.11 colr specification box + if meth is 1 then color profile starts + at byte position 7 after 'colr' """ + print( + f"""'meth' is 1, setting header_offset_position to: { + header_offset_position + }""" + ) elif meth_value == 2: - header_offset_position = meth_byte_position + 3 # ISO/IEC 15444-1:2019(E) Table I.11 colr specification box, if meth is 2 then color profile (ICC profile) starts at byte position 3 after 'colr' - print(f"'meth' is 2, setting header_offset_position to: {header_offset_position} (start of ICC profile)") + header_offset_position = ( + meth_byte_position + 3 + ) + """ ISO/IEC 15444-1:2019(E) Table I.11 colr specification box + if meth is 2 then color profile (ICC profile) starts + at byte position 3 after 'colr' """ + + print( + f"""'meth' is 2, setting header_offset_position to: { + header_offset_position + } (start of ICC profile)""" + ) else: - print(f"'meth' value {meth_value} is not recognized (must be 1 or 2).") + print( + f"""'meth' value { + meth_value + } is not recognized (must be 1 or 2).""" + ) header_offset_position = None else: print("'colr' not found in the file.") @@ -75,7 +118,11 @@ def process_colr_box(self, colr_position): return header_offset_position - def process_trc_tag(self, trc_hex, trc_name, new_contents, header_offset_position): + def process_trc_tag(self, + trc_hex, + trc_name, + new_contents, + header_offset_position): """Processes the TRC tag and modifies contents if necessary.""" trc_position = new_contents.find(trc_hex) if trc_position == -1: @@ -83,53 +130,101 @@ def process_trc_tag(self, trc_hex, trc_name, new_contents, header_offset_positio return new_contents print(f"'{trc_name}' found at byte position: {trc_position}") - trc_tag_entry = new_contents[trc_position:trc_position + 12] # 12-byte tag entry length + trc_tag_entry = new_contents[trc_position: trc_position + 12] + # 12-byte tag entry length if len(trc_tag_entry) != 12: - print(f"Could not extract the full 12-byte '{trc_name}' tag entry.") + print( + f"Could not extract the full 12-byte '{trc_name}' tag entry." + ) return new_contents - trc_tag_signature = trc_tag_entry[0:4] # ICC.1:2022 Table 24 tag signature, e.g. 'rTRC' - trc_tag_offset = int.from_bytes(trc_tag_entry[4:8], byteorder='big') # ICC.1:2022 Table 24 tag offset - trc_tag_size = int.from_bytes(trc_tag_entry[8:12], byteorder='big') # ICC.1:2022 Table 24 tag size + trc_tag_signature = trc_tag_entry[ + 0:4 + ] # ICC.1:2022 Table 24 tag signature, e.g. 'rTRC' + trc_tag_offset = int.from_bytes( + trc_tag_entry[4:8], byteorder="big" + ) # ICC.1:2022 Table 24 tag offset + trc_tag_size = int.from_bytes( + trc_tag_entry[8:12], byteorder="big" + ) # ICC.1:2022 Table 24 tag size print(f"'{trc_name}' Tag Signature: {trc_tag_signature}") print(f"'{trc_name}' Tag Offset: {trc_tag_offset}") print(f"'{trc_name}' Tag Size: {trc_tag_size}") if header_offset_position is None: - print(f"Cannot calculate 'curv_{trc_name}_position' due to an unrecognized 'meth' value.") + print( + f"""Cannot calculate 'curv_{ + trc_name + }_position' due to an unrecognized 'meth' value.""" + ) return new_contents - curv_trc_position = trc_tag_offset + header_offset_position # start of curv profile data - curv_profile = new_contents[curv_trc_position:curv_trc_position + 12] # 12-byte curv profile data length + curv_trc_position = ( + trc_tag_offset + header_offset_position + ) # start of curv profile data + curv_profile = new_contents[ + curv_trc_position: curv_trc_position + 12 + ] # 12-byte curv profile data length if len(curv_profile) < 12: - print(f"Could not read the full 'curv' profile data for {trc_name}.") + print( + f"Could not read the full 'curv' profile data for {trc_name}." + ) return new_contents - curv_signature = curv_profile[0:4].decode('utf-8') # ICC.1:2022 Table 35 tag signature - curv_reserved = int.from_bytes(curv_profile[4:8], byteorder='big') # ICC.1:2022 Table 35 reserved 0's - curv_trc_gamma_n = int.from_bytes(curv_profile[8:12], byteorder='big') # # ICC.1:2022 Table 35 n value + curv_signature = curv_profile[0:4].decode( + "utf-8" + ) # ICC.1:2022 Table 35 tag signature + curv_reserved = int.from_bytes( + curv_profile[4:8], byteorder="big" + ) # ICC.1:2022 Table 35 reserved 0's + curv_trc_gamma_n = int.from_bytes( + curv_profile[8:12], byteorder="big" + ) # # ICC.1:2022 Table 35 n value print(f"'curv' Profile Signature for {trc_name}: {curv_signature}") print(f"'curv' Reserved Value: {curv_reserved}") print(f"'curv_{trc_name}_gamma_n' Value: {curv_trc_gamma_n}") - curv_trc_field_length = curv_trc_gamma_n * 2 + 12 # ICC.1:2022 Table 35 2n field length + curv_trc_field_length = ( + curv_trc_gamma_n * 2 + 12 + ) # ICC.1:2022 Table 35 2n field length print(f"'curv_{trc_name}_field_length': {curv_trc_field_length}") - # Check if curv_trc_gamma_n is not 1 and ask for confirmation to proceed, loops through all TRC tags + """Check if curv_trc_gamma_n is not 1 and ask + for confirmation to proceed, loops through all TRC tags""" if curv_trc_gamma_n != 1: - print(f"Warning: 'curv_{trc_name}_gamma_n' value is {curv_trc_gamma_n}, expected 1.") - proceed = input(f"Do you want to proceed with fixing the file {self.file_path}? (y/n): ").lower() - if proceed != 'y': + print( + f"""Warning: 'curv_{trc_name}_gamma_n' value is { + curv_trc_gamma_n + }, expected 1.""" + ) + proceed = input( + f"""Do you want to proceed with fixing the file { + self.file_path + } (y/n): """ + ).lower() + if proceed != "y": print(f"Skipping fixing for {self.file_path}") return new_contents if trc_tag_size != curv_trc_field_length: - print(f"'{trc_name}' Tag Size ({trc_tag_size}) does not match 'curv_{trc_name}_field_length' ({curv_trc_field_length}). Modifying the size...") - new_trc_size_bytes = curv_trc_field_length.to_bytes(4, byteorder='big') - new_contents[trc_position + 8: trc_position + 12] = new_trc_size_bytes + print( + f"""'{trc_name}' Tag Size ({ + trc_tag_size + }) does not match 'curv_{ + trc_name + }_field_length' ({ + curv_trc_field_length + }). Modifying size-""" + ) + new_trc_size_bytes = curv_trc_field_length.to_bytes( + 4, + byteorder="big") + new_contents[ + trc_position + 8: trc_position + 12 + ] = new_trc_size_bytes return new_contents @@ -137,22 +232,27 @@ def process_all_trc_tags(self, header_offset_position): """Function to process 'TRC' tags (rTRC, gTRC, bTRC).""" new_file_contents = bytearray(self.file_contents) trc_tags = { - b'\x72\x54\x52\x43': 'rTRC', # search hex for 'rTRC' - b'\x67\x54\x52\x43': 'gTRC', # search hex for 'gTRC' - b'\x62\x54\x52\x43': 'bTRC' # search hex for 'bTRC' + b"\x72\x54\x52\x43": "rTRC", # search hex for 'rTRC' + b"\x67\x54\x52\x43": "gTRC", # search hex for 'gTRC' + b"\x62\x54\x52\x43": "bTRC", # search hex for 'bTRC' } for trc_hex, trc_name in trc_tags.items(): - new_file_contents = self.process_trc_tag(trc_hex, trc_name, new_file_contents, header_offset_position) + new_file_contents = self.process_trc_tag( + trc_hex, trc_name, new_file_contents, header_offset_position + ) return new_file_contents def write_modified_file(self, new_file_contents): - """Writes the modified file contents to a new file if changes were made.""" + """Writes modified file contents to new file if changes were made.""" if new_file_contents != self.file_contents: - timestamp = datetime.datetime.now().strftime("%Y%m%d") # use "%Y%m%d_%H%M%S" for more precision - new_file_path = self.file_path.replace(".jp2", f"_modified_{timestamp}.jp2") - with open(new_file_path, 'wb') as new_file: + timestamp = datetime.datetime.now().strftime( + "%Y%m%d" + ) # use "%Y%m%d_%H%M%S" for more precision + new_file_path = self.file_path.replace( + ".jp2", f"_modified_{timestamp}.jp2") + with open(new_file_path, "wb") as new_file: new_file.write(new_file_contents) print(f"New JP2 file created with modifications: {new_file_path}") else: @@ -172,40 +272,59 @@ def read_jp2_file(self): self.write_modified_file(new_file_contents) + def process_directory(directory_path): """Process all JP2 files in a given directory.""" for root, _, files in os.walk(directory_path): for file in files: - if file.lower().endswith('.jp2'): + if file.lower().endswith(".jp2"): file_path = os.path.join(root, file) print(f"Processing file: {file_path}") reader = BoxReader(file_path) reader.read_jp2_file() -def process_s3_bucket(bucket_name, prefix=''): + +def process_s3_bucket(bucket_name, prefix=""): """Process all JP2 files in a given S3 bucket.""" - s3 = boto3.client('s3') + s3 = boto3.client("s3") response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix) - if 'Contents' in response: - for obj in response['Contents']: - if obj['Key'].lower().endswith('.jp2'): - file_path = obj['Key'] - print(f"Processing file: {file_path} from bucket {bucket_name}") + if "Contents" in response: + for obj in response["Contents"]: + if obj["Key"].lower().endswith(".jp2"): + file_path = obj["Key"] + print(f"""Processing file: {file_path} from bucket { + bucket_name + }""") download_path = f"/tmp/{os.path.basename(file_path)}" s3.download_file(bucket_name, file_path, download_path) reader = BoxReader(download_path) reader.read_jp2_file() # Optionally, upload modified file back to S3 - timestamp = datetime.datetime.now().strftime("%Y%m%d") # use "%Y%m%d_%H%M%S" for more precision - s3.upload_file(download_path.replace(".jp2", f"_modified_{timestamp}.jp2"), bucket_name, file_path.replace(".jp2", f"_modified_{timestamp}.jp2")) + timestamp = datetime.datetime.now().strftime( + "%Y%m%d" + ) # use "%Y%m%d_%H%M%S" for more precision + s3.upload_file( + download_path.replace( + ".jp2", f"_modified_{timestamp}.jp2" + ), + bucket_name, + file_path.replace(".jp2", f"_modified_{timestamp}.jp2"), + ) + if __name__ == "__main__": parser = argparse.ArgumentParser(description="JP2 file processor") parser.add_argument("--file", help="Path to a single JP2 file to process.") - parser.add_argument("--directory", help="Path to a directory of JP2 files to process.") - parser.add_argument("--bucket", help="Name of the AWS S3 bucket to process JP2 files from.") - parser.add_argument("--prefix", help="Prefix of files in the AWS S3 bucket (optional).") + parser.add_argument( + "--directory", help="Path to a directory of JP2 files to process." + ) + parser.add_argument( + "--bucket", help="Name of the AWS S3 bucket to process JP2 files from." + ) + parser.add_argument( + "--prefix", help="Prefix of files in the AWS S3 bucket (optional)." + ) args = parser.parse_args() @@ -217,4 +336,4 @@ def process_s3_bucket(bucket_name, prefix=''): elif args.bucket: process_s3_bucket(args.bucket, args.prefix) else: - print("Please specify either --file, --directory, or --bucket.") \ No newline at end of file + print("Please specify either --file, --directory, or --bucket.") diff --git a/src/jp2_remediator/main.py b/src/jp2_remediator/main.py index 9db6ec7..426e551 100644 --- a/src/jp2_remediator/main.py +++ b/src/jp2_remediator/main.py @@ -1,6 +1,7 @@ import sys import os + def main(): if len(sys.argv) != 3: print("Usage: python script.py ") @@ -20,6 +21,7 @@ def main(): print(f"Folder 1: {folder_path1}") print(f"Folder 2: {folder_path2}") + if __name__ == "__main__": main() @@ -27,5 +29,6 @@ def main(): def hello_world(): print("Hello, world!") + def add_one(number): - return number + 1 \ No newline at end of file + return number + 1 diff --git a/src/jp2_remediator/tests/integration/test_aws_connection.py b/src/jp2_remediator/tests/integration/test_aws_connection.py index 549e00c..8020db0 100644 --- a/src/jp2_remediator/tests/integration/test_aws_connection.py +++ b/src/jp2_remediator/tests/integration/test_aws_connection.py @@ -9,4 +9,4 @@ # Print bucket names print("Buckets in your account:") for bucket in response['Buckets']: - print(bucket['Name']) \ No newline at end of file + print(bucket['Name']) diff --git a/src/jp2_remediator/tests/unit/test_box_reader.py b/src/jp2_remediator/tests/unit/test_box_reader.py index 220beb7..94bc3a7 100644 --- a/src/jp2_remediator/tests/unit/test_box_reader.py +++ b/src/jp2_remediator/tests/unit/test_box_reader.py @@ -7,7 +7,8 @@ import datetime # Define the path to the test data file -TEST_DATA_PATH = os.path.join(paths.dir_unit_resources, 'sample.jp2') +TEST_DATA_PATH = os.path.join(paths.dir_unit_resources, "sample.jp2") + class TestJP2ProcessingWithFile(unittest.TestCase): @@ -41,8 +42,8 @@ def test_find_box_position_in_file(self): # Set the file contents for the reader instance self.reader.file_contents = file_contents - # Find a known box position in the sample file (you should know the expected values) - position = self.reader.find_box_position(b'\x6a\x70\x32\x68') + # Find a known box position in the sample file + position = self.reader.find_box_position(b"\x6a\x70\x32\x68") self.assertNotEqual(position, -1) # Ensure that the box is found def test_check_boxes_in_file(self): @@ -53,7 +54,7 @@ def test_check_boxes_in_file(self): # Set the file contents for the reader instance self.reader.file_contents = file_contents - + # Call check_boxes header_offset_position = self.reader.check_boxes() self.assertIsNotNone(header_offset_position) @@ -68,15 +69,19 @@ def test_process_colr_box_in_file(self): self.reader.file_contents = file_contents # Find the colr box position - colr_position = self.reader.find_box_position(b'\x63\x6f\x6c\x72') + colr_position = self.reader.find_box_position(b"\x63\x6f\x6c\x72") if colr_position == -1: self.fail("'colr' box not found in the test file.") - + # Process the colr box header_offset_position = self.reader.process_colr_box(colr_position) self.assertIsNotNone(header_offset_position) - @patch('builtins.open', new_callable=mock_open, read_data=b'sample content') + @patch( + "builtins.open", + new_callable=mock_open, + read_data=b"sample content" + ) def test_write_modified_file_with_changes(self, mock_file): # Read file content (from mock) file_contents = self.reader.read_file(TEST_DATA_PATH) @@ -84,18 +89,23 @@ def test_write_modified_file_with_changes(self, mock_file): self.fail("Test file could not be read.") # Modify the file contents slightly using bytes - new_file_contents = file_contents + b' modified' # Append in bytes, not string + new_file_contents = file_contents + b" modified" + # Append in bytes, not string # Test writing the modified file self.reader.write_modified_file(new_file_contents) - timestamp = datetime.datetime.now().strftime("%Y%m%d") # use "%Y%m%d_%H%M%S" for more precision - expected_filename = TEST_DATA_PATH.replace('.jp2', f'_modified_{timestamp}.jp2') - + timestamp = datetime.datetime.now().strftime( + "%Y%m%d" + ) # use "%Y%m%d_%H%M%S" for more precision + expected_filename = TEST_DATA_PATH.replace( + ".jp2", f"_modified_{timestamp}.jp2") + # Ensure the file was written to the correct file path - mock_file.assert_any_call(expected_filename, 'wb') - + mock_file.assert_any_call(expected_filename, "wb") + # Ensure the contents were written correctly - mock_file().write.assert_called_once_with(b'sample content modified') + mock_file().write.assert_called_once_with(b"sample content modified") + -if __name__ == '__main__': - unittest.main() \ No newline at end of file +if __name__ == "__main__": + unittest.main()