Skip to content

Commit

Permalink
[CHORE] added login in run method
Browse files Browse the repository at this point in the history
  • Loading branch information
gaurav7261 committed Apr 22, 2024
1 parent ab91c88 commit 1b08b98
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 20 deletions.
23 changes: 4 additions & 19 deletions airflow/providers/databricks/operators/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,32 +156,17 @@ def _handle_deferrable_databricks_operator_execution(operator, hook, log, contex
log.info("%s completed successfully.", operator.task_id)


def _handle_deferrable_databricks_operator_completion(event: dict, log: Logger, hook: DatabricksHook) -> None:
def _handle_deferrable_databricks_operator_completion(event: dict, log: Logger) -> None:
validate_trigger_event(event)
run_state = RunState.from_json(event["run_state"])
run_page_url = event["run_page_url"]
run_id = event["run_id"]
notebook_error = event["notebook_error"]
log.info("View run status, Spark UI, and logs at %s", run_page_url)

if run_state.is_successful:
log.info("Job run completed successfully.")
return

run_info = await hook.a_get_run(run_id)
task_run_id = None
if "tasks" in run_info:
for task in run_info["tasks"]:
if task.get("state", {}).get("result_state", "") == "FAILED":
task_run_id = task["run_id"]
if task_run_id is not None:
run_output = await hook.a_get_run_output(task_run_id)
if "error" in run_output:
notebook_error = run_output["error"]
else:
notebook_error = run_state.state_message
else:
notebook_error = run_state.state_message

error_message = f"Job run failed with terminal state: {run_state} and with the error {notebook_error}"

if event["repair_run"]:
Expand Down Expand Up @@ -590,7 +575,7 @@ def on_kill(self):
self.log.error("Error: Task: %s with invalid run_id was requested to be cancelled.", self.task_id)

def execute_complete(self, context: dict | None, event: dict):
_handle_deferrable_databricks_operator_completion(event, self.log, self._hook)
_handle_deferrable_databricks_operator_completion(event, self.log)


@deprecated(
Expand Down Expand Up @@ -867,7 +852,7 @@ def execute(self, context: Context):

def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
if event:
_handle_deferrable_databricks_operator_completion(event, self.log, self._hook)
_handle_deferrable_databricks_operator_completion(event, self.log)
if event["repair_run"]:
self.repair_run = False
self.run_id = event["run_id"]
Expand Down
17 changes: 17 additions & 0 deletions airflow/providers/databricks/triggers/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,30 @@ async def run(self):
async with self.hook:
while True:
run_state = await self.hook.a_get_run_state(self.run_id)
notebook_error = None
if run_state.is_terminal:
if run_state.result_state == "FAILED":
run_info = await self.hook.a_get_run(self.run_id)
task_run_id = None
if "tasks" in run_info:
for task in run_info["tasks"]:
if task.get("state", {}).get("result_state", "") == "FAILED":
task_run_id = task["run_id"]
if task_run_id is not None:
run_output = await self.hook.a_get_run_output(task_run_id)
if "error" in run_output:
notebook_error = run_output["error"]
else:
notebook_error = run_state.state_message
else:
notebook_error = run_state.state_message
yield TriggerEvent(
{
"run_id": self.run_id,
"run_page_url": self.run_page_url,
"run_state": run_state.to_json(),
"repair_run": self.repair_run,
"notebook_error": notebook_error
}
)
return
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/databricks/utils/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def validate_trigger_event(event: dict):
See: :class:`~airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger`.
"""
keys_to_check = ["run_id", "run_page_url", "run_state"]
keys_to_check = ["run_id", "run_page_url", "run_state", "notebook_error"]
for key in keys_to_check:
if key not in event:
raise AirflowException(f"Could not find `{key}` in the event: {event}")
Expand Down
1 change: 1 addition & 0 deletions tests/providers/databricks/utils/test_databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def test_validate_trigger_event_success(self):
"run_id": RUN_ID,
"run_page_url": RUN_PAGE_URL,
"run_state": RunState("TERMINATED", "SUCCESS", "").to_json(),
"notebook_error": None
}
assert validate_trigger_event(event) is None

Expand Down

0 comments on commit 1b08b98

Please sign in to comment.