Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix awslogs_stream_prefix pattern #43138

Merged
merged 1 commit into from
Nov 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions providers/src/airflow/providers/amazon/aws/operators/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ class EcsRunTaskOperator(EcsBaseOperator):
If None, this is the same as the `region` parameter. If that is also None,
this is the default AWS region based on your connection settings.
:param awslogs_stream_prefix: the stream prefix that is used for the CloudWatch logs.
This is usually based on some custom name combined with the name of the container.
This should match the prefix specified in the log configuration of the task definition.
Only required if you want logs to be shown in the Airflow UI after your job has
finished.
:param awslogs_fetch_interval: the interval that the ECS task log fetcher should wait
Expand Down Expand Up @@ -481,6 +481,7 @@ def __init__(
self.awslogs_region = self.region_name

self.arn: str | None = None
self.container_name: str | None = None
self._started_by: str | None = None

self.retry_args = quota_retry
Expand Down Expand Up @@ -624,6 +625,7 @@ def _start_task(self):
self.log.info("ECS Task started: %s", response)

self.arn = response["tasks"][0]["taskArn"]
self.container_name = response["tasks"][0]["containers"][0]["name"]
self.log.info("ECS task ID is: %s", self._get_ecs_task_id(self.arn))

def _try_reattach_task(self, started_by: str):
Expand Down Expand Up @@ -659,7 +661,7 @@ def _aws_logs_enabled(self):
return self.awslogs_group and self.awslogs_stream_prefix

def _get_logs_stream_name(self) -> str:
return f"{self.awslogs_stream_prefix}/{self._get_ecs_task_id(self.arn)}"
return f"{self.awslogs_stream_prefix}/{self.container_name}/{self._get_ecs_task_id(self.arn)}"

def _get_task_log_fetcher(self) -> AwsTaskLogFetcher:
if not self.awslogs_group:
Expand Down