Skip to content

Commit

Permalink
add unit test for oss task handler
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala committed Feb 26, 2023
1 parent c77ca64 commit d8086b5
Showing 1 changed file with 37 additions and 0 deletions.
37 changes: 37 additions & 0 deletions tests/providers/alibaba/cloud/log/test_oss_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@
# under the License.
from __future__ import annotations

import os
from unittest import mock
from unittest.mock import PropertyMock

import pytest

from airflow.providers.alibaba.cloud.log.oss_task_handler import OSSTaskHandler
from airflow.utils.state import TaskInstanceState
from airflow.utils.timezone import datetime
from tests.test_utils.db import clear_db_dags, clear_db_runs

OSS_TASK_HANDLER_STRING = "airflow.providers.alibaba.cloud.log.oss_task_handler.{}"
MOCK_OSS_CONN_ID = "mock_id"
Expand All @@ -37,6 +43,20 @@ def setup_method(self):
self.oss_log_folder = f"oss://{MOCK_BUCKET_NAME}/airflow/logs"
self.oss_task_handler = OSSTaskHandler(self.base_log_folder, self.oss_log_folder)

@pytest.fixture(autouse=True)
def task_instance(self, create_task_instance):
self.ti = ti = create_task_instance(
dag_id="dag_for_testing_oss_task_handler",
task_id="task_for_testing_oss_task_handler",
execution_date=datetime(2020, 1, 1),
state=TaskInstanceState.RUNNING,
)
ti.try_number = 1
ti.raw = False
yield
clear_db_runs()
clear_db_dags()

@mock.patch(OSS_TASK_HANDLER_STRING.format("conf.get"))
@mock.patch(OSS_TASK_HANDLER_STRING.format("OSSHook"))
def test_hook(self, mock_service, mock_conf_get):
Expand Down Expand Up @@ -130,3 +150,20 @@ def test_oss_write_into_remote_non_existing_file_not_via_append(self, mock_servi
mock_service.return_value.append_string.assert_called_once_with(
MOCK_BUCKET_NAME, MOCK_CONTENT, "airflow/logs/1.log", 0
)

@pytest.mark.parametrize(
"delete_local_copy, expected_existence_of_local_copy", [(True, False), (False, True)]
)
@mock.patch(OSS_TASK_HANDLER_STRING.format("OSSTaskHandler.hook"), new_callable=PropertyMock)
def test_close_with_delete_local_copy_conf(
self, mock_service, tmp_path_factory, delete_local_copy, expected_existence_of_local_copy
):
local_log_path = str(tmp_path_factory.mktemp("local-oss-log-location"))
handler = OSSTaskHandler(local_log_path, self.oss_log_folder, delete_local_copy=delete_local_copy)

handler.log.info("test")
handler.set_context(self.ti)
assert handler.upload_on_close

handler.close()
assert os.path.exists(handler.handler.baseFilename) == expected_existence_of_local_copy

0 comments on commit d8086b5

Please sign in to comment.