Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filename template arg in providers file task handlers backward compitability support #41633

Merged
merged 1 commit into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):

trigger_should_wrap = True

def __init__(self, base_log_folder: str, log_group_arn: str):
def __init__(self, base_log_folder: str, log_group_arn: str, **kwargs):
super().__init__(base_log_folder)
split_arn = log_group_arn.split(":")

Expand Down
1 change: 1 addition & 0 deletions airflow/providers/elasticsearch/log/es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def __init__(
es_kwargs: dict | None | Literal["default_es_kwargs"] = "default_es_kwargs",
*,
log_id_template: str | None = None,
**kwargs,
):
es_kwargs = es_kwargs or {}
if es_kwargs == "default_es_kwargs":
Expand Down
4 changes: 4 additions & 0 deletions tests/providers/alibaba/cloud/log/test_oss_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,7 @@ def test_close_with_delete_local_copy_conf(

handler.close()
assert os.path.exists(handler.handler.baseFilename) == expected_existence_of_local_copy

def test_filename_template_for_backward_compatibility(self):
# filename_template arg support for running the latest provider on airflow 2
OSSTaskHandler(self.base_log_folder, self.oss_log_folder, filename_template=None)
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,14 @@ def test_close_prevents_duplicate_calls(self):

mock_log_handler_close.assert_called_once()

def test_filename_template_for_backward_compatibility(self):
# filename_template arg support for running the latest provider on airflow 2
CloudwatchTaskHandler(
self.local_log_location,
f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}",
filename_template=None,
)


def generate_log_events(conn, log_group_name, log_stream_name, log_events):
conn.create_log_group(logGroupName=log_group_name)
Expand Down
4 changes: 4 additions & 0 deletions tests/providers/amazon/aws/log/test_s3_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,7 @@ def test_close_with_delete_local_logs_conf(self, delete_local_copy, expected_exi

handler.close()
assert os.path.exists(handler.handler.baseFilename) == expected_existence_of_local_copy

def test_filename_template_for_backward_compatibility(self):
# filename_template arg support for running the latest provider on airflow 2
S3TaskHandler(self.local_log_location, self.remote_log_base, filename_template=None)
11 changes: 11 additions & 0 deletions tests/providers/elasticsearch/log/test_es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,17 @@ def test_get_index_patterns_with_callable(self):
mock_callable.assert_called_once_with({})
assert result == "callable_index_pattern"

def test_filename_template_for_backward_compatibility(self):
# filename_template arg support for running the latest provider on airflow 2
ElasticsearchTaskHandler(
base_log_folder="local/log/location",
end_of_log_mark="end_of_log\n",
write_stdout=False,
json_format=False,
json_fields="asctime,filename,lineno,levelname,message,exc_text",
filename_template=None,
)


def test_safe_attrgetter():
class A: ...
Expand Down
9 changes: 9 additions & 0 deletions tests/providers/google/cloud/log/test_gcs_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,3 +283,12 @@ def test_close_with_delete_local_copy_conf(

handler.close()
assert os.path.exists(handler.handler.baseFilename) == expected_existence_of_local_copy

@pytest.fixture(autouse=True)
def test_filename_template_for_backward_compatibility(self, local_log_location):
# filename_template arg support for running the latest provider on airflow 2
GCSTaskHandler(
base_log_folder=local_log_location,
gcs_log_folder="gs://bucket/remote/log/location",
filename_template=None,
)
10 changes: 10 additions & 0 deletions tests/providers/microsoft/azure/log/test_wasb_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,13 @@ def test_close_with_delete_local_logs_conf(

handler.close()
assert os.path.exists(handler.handler.baseFilename) == expected_existence_of_local_copy

def test_filename_template_for_backward_compatibility(self):
# filename_template arg support for running the latest provider on airflow 2
WasbTaskHandler(
base_log_folder=self.local_log_location,
wasb_log_folder=self.wasb_log_folder,
wasb_container=self.container_name,
delete_local_copy=True,
filename_template=None,
)