Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 arguments #8

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
12 changes: 12 additions & 0 deletions bin/docker-run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,15 @@

## Executable Docker image
docker run --rm --mount type=bind,source=${PWD},target=/data -it artifactory.huit.harvard.edu/lts/jp2_remediator $@

## Passing AWS credentials: If you want to use the s3 bucket option, create an .env file in the same directory with AWS credentials
# set -a
# source $(dirname "$0")/.env
# set +a

# docker run --rm \
# --mount type=bind,source=${PWD},target=/data \
# -e AWS_ACCESS_KEY_ID \
# -e AWS_SECRET_ACCESS_KEY \
# -e AWS_SESSION_TOKEN \
# -it artifactory.huit.harvard.edu/lts/jp2_remediator "$@"
50 changes: 41 additions & 9 deletions src/jp2_remediator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,50 @@ def main():
)

# Subparser for processing all JP2 files in an S3 bucket
bucket_parser = subparsers.add_parser(
"bucket", help="Process all JP2 files in an S3 bucket"
s3_bucket_parser = subparsers.add_parser(
"s3", help="Process JP2 files in an S3 bucket"
)
bucket_parser.add_argument(
"bucket", help="Name of the AWS S3 bucket to process JP2 files from"
s3_bucket_parser.add_argument(
"input_bucket", help="Name of the AWS S3 bucket to process JP2 files from"
)
bucket_parser.add_argument(
"--prefix", help="Prefix of files in the AWS S3 bucket (optional)",
default=""
s3_bucket_parser.add_argument(
"--input-prefix", help="Prefix of files in the AWS S3 bucket (optional)", default=""
)
bucket_parser.set_defaults(
func=lambda args: processor.process_s3_bucket(args.bucket, args.prefix)
s3_bucket_parser.add_argument(
"--output-bucket", help="Name of the AWS S3 bucket to upload modified files (optional)"
)
s3_bucket_parser.add_argument(
"--output-prefix", help="Prefix for uploaded files in the output bucket (optional)", default=""
)
s3_bucket_parser.set_defaults(
func=lambda args: processor.process_s3_bucket(
args.input_bucket, args.input_prefix, args.output_bucket, args.output_prefix
)
)

# Subparser for processing a single JP2 file in S3
s3_file_parser = subparsers.add_parser(
"s3-file", help="Process a single JP2 file in S3"
)
s3_file_parser.add_argument(
"input_bucket", help="Name of the AWS S3 bucket containing the JP2 file"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change from:

"input_bucket", help="Name of the AWS S3 bucket containing the JP2 file"

to:

"--input_bucket", help="Name of the AWS S3 bucket containing the JP2 file", required=True

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this still be required after adding the -- flag? and it will be --input-bucket instead of --input_bucket

)
s3_file_parser.add_argument(
"--input-key", help="Key (path) of the JP2 file in the S3 bucket", required=True
)
s3_file_parser.add_argument(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make required

"--output-bucket", help="Name of the AWS S3 bucket to upload the modified file (optional)"
)
s3_file_parser.add_argument(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this argument

Copy link
Collaborator Author

@kimpham54 kimpham54 Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to leave it as an option or remove completely? if it is optional but not used, it defaults to the input bucket

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep this one, after discussion with andrew

"--output-prefix", help="Prefix for the uploaded file in the output bucket (optional)", default=""
)
s3_file_parser.add_argument(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make required

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right now it is optional, is there a situation where you wouldn't want to use the --output-prefix option, such as if you wanted to just put it in the same input directory, or same bucket?

"--output-key", help="Full key (path) for the uploaded file (overrides output-prefix)"
)
s3_file_parser.set_defaults(
func=lambda args: processor.process_s3_file(
args.input_bucket, args.input_key, args.output_bucket, args.output_prefix, args.output_key
)
)

args = parser.parse_args()
Expand Down
60 changes: 40 additions & 20 deletions src/jp2_remediator/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,50 @@ def process_directory(self, directory_path):
reader = self.box_reader_factory.get_reader(file_path)
reader.read_jp2_file()

def process_s3_bucket(self, bucket_name, prefix=""):
def process_s3_file(self, input_bucket, input_key, output_bucket, output_prefix=None, output_key=None):
"""Process a specific JP2 file from S3 and upload to a specified S3 location."""
s3 = boto3.client("s3")

# Generate the output key dynamically if not explicitly provided
if not output_key:
timestamp = datetime.datetime.now().strftime("%Y%m%d")
output_key = os.path.join(
output_prefix,
os.path.basename(input_key).replace(".jp2", f"_modified_file_{timestamp}.jp2")
)

# Download the file from S3
download_path = f"/tmp/{os.path.basename(input_key)}"
print(f"Downloading file: {input_key} from bucket: {input_bucket}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please replace all print statements with self.logger.info(...)

s3.download_file(input_bucket, input_key, download_path)

# Process the file
reader = self.box_reader_factory.get_reader(download_path)
reader.read_jp2_file()

# Generate the modified file path
timestamp = datetime.datetime.now().strftime("%Y%m%d")
modified_file_path = download_path.replace(".jp2", f"_modified_{timestamp}.jp2")

if os.path.exists(modified_file_path):
print(f"Uploading modified file to bucket: {output_bucket}, key: {output_key}")
s3.upload_file(modified_file_path, output_bucket, output_key)
else:
print(f"File {modified_file_path} does not exist, skipping upload.")

def process_s3_bucket(self, input_bucket, input_prefix, output_bucket, output_prefix):
"""Process all JP2 files in a given S3 bucket."""
s3 = boto3.client("s3")
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
response = s3.list_objects_v2(Bucket=input_bucket, Prefix=input_prefix)

if "Contents" in response:
for obj in response["Contents"]:
if obj["Key"].lower().endswith(".jp2"):
file_path = obj["Key"]
print(f"""Processing file: {file_path} from bucket {
bucket_name
}""")
download_path = f"/tmp/{os.path.basename(file_path)}"
s3.download_file(bucket_name, file_path, download_path)
reader = self.box_reader_factory.get_reader(download_path)
reader.read_jp2_file()
# Optionally, upload modified file back to S3
timestamp = datetime.datetime.now().strftime(
"%Y%m%d"
) # use "%Y%m%d_%H%M%S" for more precision
s3.upload_file(
download_path.replace(
".jp2", f"_modified_{timestamp}.jp2"
),
bucket_name,
file_path.replace(".jp2", f"_modified_{timestamp}.jp2")
input_key = obj["Key"]
timestamp = datetime.datetime.now().strftime("%Y%m%d")
output_key = os.path.join(
output_prefix,
os.path.basename(input_key).replace(".jp2", f"_modified_{timestamp}.jp2")
)
print(f"Processing file: {input_key} from bucket: {input_bucket}")
self.process_s3_file(input_bucket, input_key, output_bucket, output_key=output_key)
174 changes: 94 additions & 80 deletions src/jp2_remediator/tests/unit/test_processor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import unittest
import pytest
from unittest.mock import call, patch, MagicMock
from unittest.mock import patch, MagicMock
from jp2_remediator.processor import Processor


Expand All @@ -14,112 +13,127 @@ def mock_box_reader_factory(self):
def processor(self, mock_box_reader_factory):
return Processor(mock_box_reader_factory)

# Test for process_file function
@patch("builtins.print")
def test_process_file(self, mock_print, processor, mock_box_reader_factory):
# Define the file path
file_path = "test_file.jp2"

# Call process_file with the test file path
processor.process_file(file_path)

# Check that the file was processed
mock_print.assert_called_once_with(f"Processing file: {file_path}")

# Ensure the BoxReader instance had its read_jp2_file method called
mock_box_reader_factory.get_reader.assert_called_once_with(file_path)
mock_box_reader_factory.get_reader.return_value.read_jp2_file.assert_called_once()

# Test for process_directory function
@patch("os.walk", return_value=[("root", [], ["file1.jp2", "file2.jp2"])])
@patch("builtins.print")
def test_process_directory_with_multiple_files(
self, mock_print, mock_os_walk, processor, mock_box_reader_factory
):
# Call process_directory with a dummy path
def test_process_directory_with_multiple_files(self, mock_print, mock_os_walk, processor, mock_box_reader_factory):
processor.process_directory("dummy_path")

# Check that each JP2 file in the directory was processed
mock_print.assert_any_call("Processing file: root/file1.jp2")
mock_print.assert_any_call("Processing file: root/file2.jp2")

# Ensure each BoxReader instance had its read_jp2_file method called
assert mock_box_reader_factory.get_reader.call_count == 2

# Ensure each BoxReader instance was created with the correct file path
assert mock_box_reader_factory.get_reader.call_args_list == [
call("root/file1.jp2"),
call("root/file2.jp2"),
]
assert mock_box_reader_factory.get_reader.return_value.read_jp2_file.call_count == 2

# Test for process_s3_bucket function
@patch("jp2_remediator.processor.boto3.client")
@patch("builtins.print")
def test_process_s3_bucket(self, mock_print, mock_boto3_client, processor, mock_box_reader_factory):
# Set up the mock S3 client
@patch("jp2_remediator.processor.os.path.exists", return_value=True)
@patch("jp2_remediator.processor.boto3.client", autospec=True)
@patch("builtins.print", autospec=True)
def test_process_s3_file_without_output_key_or_prefix(
self, mock_print, mock_boto3_client, mock_os_path_exists, processor, mock_box_reader_factory
):
# No explicit output_key or prefix given, output_prefix is empty string
mock_s3_client = MagicMock()
mock_boto3_client.return_value = mock_s3_client
input_bucket = "test-bucket"
input_key = "some_folder/my_image.jp2"
mock_s3_client.download_file.return_value = None
mock_s3_client.upload_file.return_value = None

# Define the bucket name and prefix
bucket_name = "test-bucket"
prefix = "test-prefix"
processor.process_s3_file(
input_bucket=input_bucket,
input_key=input_key,
output_bucket=input_bucket,
output_prefix="" # Ensures dynamic output_key creation without None issues
)

# Prepare a fake response for list_objects_v2
mock_s3_client.download_file.assert_called_once()
mock_box_reader_factory.get_reader.assert_called_once()
mock_box_reader_factory.get_reader.return_value.read_jp2_file.assert_called_once()
assert any("Uploading modified file to bucket:" in c.args[0] for c in mock_print.call_args_list)

@patch("jp2_remediator.processor.os.path.exists", return_value=True)
@patch("jp2_remediator.processor.boto3.client", autospec=True)
@patch("builtins.print", autospec=True)
def test_process_s3_bucket_with_jp2_files(
self, mock_print, mock_boto3_client, mock_os_path_exists, processor, mock_box_reader_factory
):
mock_s3_client = MagicMock()
mock_boto3_client.return_value = mock_s3_client
input_bucket = "test-input-bucket"
input_prefix = "some-prefix/"
output_bucket = "test-output-bucket"
output_prefix = "processed/"
mock_s3_client.list_objects_v2.return_value = {
"Contents": [
{"Key": "file1.jp2"},
{"Key": "file2.jp2"},
{"Key": "file3.txt"}, # Non-JP2 file to test filtering
{"Key": "some-prefix/image1.jp2"},
{"Key": "some-prefix/image2.jp2"}
]
}

# Mock download_file and upload_file methods
mock_s3_client.download_file.return_value = None
mock_s3_client.upload_file.return_value = None

# Call the method under test
processor.process_s3_bucket(bucket_name, prefix)

# Verify that list_objects_v2 was called with the correct parameters
mock_s3_client.list_objects_v2.assert_called_once_with(Bucket=bucket_name, Prefix=prefix)
processor.process_s3_bucket(input_bucket, input_prefix, output_bucket, output_prefix)

# Verify that download_file was called for each .jp2 file
expected_download_calls = [
unittest.mock.call(bucket_name, "file1.jp2", "/tmp/file1.jp2"),
unittest.mock.call(bucket_name, "file2.jp2", "/tmp/file2.jp2"),
]
assert mock_s3_client.download_file.call_args_list == expected_download_calls
mock_s3_client.list_objects_v2.assert_called_once()
mock_print.assert_any_call(f"Processing file: some-prefix/image1.jp2 from bucket: {input_bucket}")
mock_print.assert_any_call(f"Processing file: some-prefix/image2.jp2 from bucket: {input_bucket}")
assert mock_box_reader_factory.get_reader.call_count == 2
assert mock_box_reader_factory.get_reader.return_value.read_jp2_file.call_count == 2

# Verify that BoxReader was instantiated with the correct download paths
expected_boxreader_calls = [
unittest.mock.call("/tmp/file1.jp2"),
unittest.mock.call("/tmp/file2.jp2"),
]
assert mock_box_reader_factory.get_reader.call_args_list == expected_boxreader_calls
@patch("jp2_remediator.processor.boto3.client", autospec=True)
@patch("builtins.print", autospec=True)
def test_process_s3_bucket_empty_response(self, mock_print, mock_boto3_client, processor):
mock_s3_client = MagicMock()
mock_boto3_client.return_value = mock_s3_client
mock_s3_client.list_objects_v2.return_value = {}
processor.process_s3_bucket("test-bucket", "test-prefix/", "output-bucket", "output-prefix/")
mock_print.assert_not_called()
mock_s3_client.upload_file.assert_not_called()

@patch("jp2_remediator.processor.os.path.exists", return_value=True)
@patch("jp2_remediator.processor.boto3.client", autospec=True)
@patch("builtins.print", autospec=True)
def test_process_s3_bucket_skip_non_jp2_files(
self, mock_print, mock_boto3_client, mock_os_path_exists, processor
):
mock_s3_client = MagicMock()
mock_boto3_client.return_value = mock_s3_client
mock_s3_client.list_objects_v2.return_value = {
"Contents": [
{"Key": "test-prefix/file1.jp2"},
{"Key": "test-prefix/file2.txt"},
{"Key": "test-prefix/file3.jpg"},
]
}
processor.process_s3_bucket("test-bucket", "test-prefix/", "output-bucket", "output-prefix/")
mock_print.assert_any_call("Processing file: test-prefix/file1.jp2 from bucket: test-bucket")
mock_s3_client.upload_file.assert_called_once()

@patch("jp2_remediator.processor.os.path.exists", side_effect=[True, False])
@patch("jp2_remediator.processor.boto3.client", autospec=True)
@patch("builtins.print", autospec=True)
def test_process_s3_file_upload_logic(
self, mock_print, mock_boto3_client, mock_os_path_exists, processor, mock_box_reader_factory
):
mock_s3_client = MagicMock()
mock_boto3_client.return_value = mock_s3_client
input_bucket = "test-bucket"
input_key = "test-folder/file.jp2"
output_bucket = "output-bucket"
output_key = "output-folder/file_modified.jp2"
mock_s3_client.download_file.return_value = None
mock_s3_client.upload_file.return_value = None

# Verify that read_jp2_file was called for each .jp2 file
assert mock_box_reader_factory.get_reader.return_value.read_jp2_file.call_count == 2
# First call (file exists)
processor.process_s3_file(input_bucket, input_key, output_bucket, output_key=output_key)
# Second call (file does not exist after modification)
processor.process_s3_file(input_bucket, input_key, output_bucket, output_key=output_key)

# Verify that upload_file was called for each .jp2 file
upload_calls = mock_s3_client.upload_file.call_args_list
assert len(upload_calls) == 2
for c in upload_calls:
args, _ = c
local_file_path = args[0]
upload_bucket = args[1]
upload_key = args[2]
# Check that the local file path includes '_modified_' and ends with '.jp2'
assert "_modified_" in local_file_path, "'_modified_' should be in local_file_path"
assert local_file_path.endswith(".jp2")
# Check that the upload is to the correct bucket and key
assert upload_bucket == bucket_name
assert "_modified_" in upload_key
assert upload_key.endswith(".jp2")

# Verify that print was called correctly
expected_print_calls = [
unittest.mock.call(f"Processing file: file1.jp2 from bucket {bucket_name}"),
unittest.mock.call(f"Processing file: file2.jp2 from bucket {bucket_name}"),
]
mock_print.assert_has_calls(expected_print_calls, any_order=True)
mock_print.assert_any_call(f"Uploading modified file to bucket: {output_bucket}, key: {output_key}")
all_prints = [c.args[0] for c in mock_print.call_args_list]
assert any("does not exist, skipping upload." in p for p in all_prints)
Loading