Skip to content

Commit

Permalink
Merge branch 'main' into vincbeck/execute_callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
vincbeck committed Aug 31, 2023
2 parents adc10fd + 097e3e6 commit 7921dc5
Show file tree
Hide file tree
Showing 244 changed files with 2,040 additions and 1,463 deletions.
6 changes: 5 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,11 @@ repos:
language: python
entry: ./scripts/ci/pre_commit/pre_commit_version_heads_map.py
pass_filenames: false
files: ^airflow/migrations/versions|^airflow/__init__.py$
files: >
(?x)
^scripts/ci/pre_commit/pre_commit_version_heads_map\.py$|
^airflow/migrations/versions/.*$|^airflow/migrations/versions|
^airflow/utils/db.py$
additional_dependencies: ['packaging','google-re2']
- id: update-version
name: Update version to the latest version in the documentation
Expand Down
14 changes: 7 additions & 7 deletions BREEZE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ For example, this following command:

.. code-block:: bash
breeze static-checks -t mypy-core
breeze static-checks --type mypy-core
will run mypy check for currently staged files inside ``airflow/`` excluding providers.

Expand All @@ -507,7 +507,7 @@ re-run latest pre-commits on your changes, but it can take a long time (few minu

.. code-block:: bash
breeze static-checks -t mypy-core --all-files
breeze static-checks --type mypy-core --all-files
The above will run mypy check for all files.

Expand All @@ -516,7 +516,7 @@ specifying (can be multiple times) ``--file`` flag.

.. code-block:: bash
breeze static-checks -t mypy-core --file airflow/utils/code_utils.py --file airflow/utils/timeout.py
breeze static-checks --type mypy-core --file airflow/utils/code_utils.py --file airflow/utils/timeout.py
The above will run mypy check for those to files (note: autocomplete should work for the file selection).

Expand All @@ -528,26 +528,26 @@ of commits you choose.

.. code-block:: bash
breeze static-checks -t mypy-core --last-commit
breeze static-checks --type mypy-core --last-commit
The above will run mypy check for all files in the last commit in your branch.

.. code-block:: bash
breeze static-checks -t mypy-core --only-my-changes
breeze static-checks --type mypy-core --only-my-changes
The above will run mypy check for all commits in your branch which were added since you branched off from main.

.. code-block:: bash
breeze static-checks -t mypy-core --commit-ref 639483d998ecac64d0fef7c5aa4634414065f690
breeze static-checks --type mypy-core --commit-ref 639483d998ecac64d0fef7c5aa4634414065f690
The above will run mypy check for all files in the 639483d998ecac64d0fef7c5aa4634414065f690 commit.
Any ``commit-ish`` reference from Git will work here (branch, tag, short/long hash etc.)

.. code-block:: bash
breeze static-checks -t identity --verbose --from-ref HEAD^^^^ --to-ref HEAD
breeze static-checks --type identity --verbose --from-ref HEAD^^^^ --to-ref HEAD
The above will run the check for the last 4 commits in your branch. You can use any ``commit-ish`` references
in ``--from-ref`` and ``--to-ref`` flags.
Expand Down
6 changes: 3 additions & 3 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -677,9 +677,9 @@ druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github, github_enterp
google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes,
ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql,
mysql, neo4j, odbc, openfaas, openlineage, opsgenie, oracle, otel, pagerduty, pandas, papermill,
password, pinot, plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba,
segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd,
tableau, tabular, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
password, pinot, plexus, postgres, presto, rabbitmq, redis, s3, salesforce, samba, segment,
sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, tableau,
tabular, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
.. END EXTRAS HERE
Provider packages
Expand Down
6 changes: 3 additions & 3 deletions INSTALL
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github, github_enterp
google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes,
ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql,
mysql, neo4j, odbc, openfaas, openlineage, opsgenie, oracle, otel, pagerduty, pandas, papermill,
password, pinot, plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba,
segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd,
tableau, tabular, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
password, pinot, plexus, postgres, presto, rabbitmq, redis, s3, salesforce, samba, segment,
sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, tableau,
tabular, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
# END EXTRAS HERE

# For installing Airflow in development environments - see CONTRIBUTING.rst
Expand Down
1 change: 1 addition & 0 deletions INTHEWILD.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ Currently, **officially** using Airflow:
1. [Dentsu Inc.](http://www.dentsu.com/) [[@bryan831](https://github.com/bryan831) & [@loozhengyuan](https://github.com/loozhengyuan)]
1. [Deseret Digital Media](http://deseretdigital.com/) [[@formigone](https://github.com/formigone)
1. [Digital First Media](http://www.digitalfirstmedia.com/) [[@duffn](https://github.com/duffn) & [@mschmo](https://github.com/mschmo) & [@seanmuth](https://github.com/seanmuth)]
1. [Disney](https://www.disney.com/) [[@coolbeans201](https://github.com/coolbeans201)]
1. [Docsity](https://www.docsity.com/)
1. [DoorDash](https://www.doordash.com/)
1. [Dotmodus](http://dotmodus.com) [[@dannylee12](https://github.com/dannylee12)]
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Apache Airflow is tested with:
\* Experimental

**Note**: MySQL 5.x versions are unable to or have limitations with
running multiple schedulers -- please see the [Scheduler docs](https://airflow.apache.org/docs/apache-airflow/stable/scheduler.html).
running multiple schedulers -- please see the [Scheduler docs](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/scheduler.html).
MariaDB is not tested/recommended.

**Note**: SQLite is used in Airflow tests. Do not use it in production. We recommend
Expand All @@ -117,9 +117,9 @@ is used in the [Community managed DockerHub image](https://hub.docker.com/p/apac
## Getting started

Visit the official Airflow website documentation (latest **stable** release) for help with
[installing Airflow](https://airflow.apache.org/docs/apache-airflow/stable/installation.html),
[installing Airflow](https://airflow.apache.org/docs/apache-airflow/stable/installation/),
[getting started](https://airflow.apache.org/docs/apache-airflow/stable/start.html), or walking
through a more complete [tutorial](https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html).
through a more complete [tutorial](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/).

> Note: If you're looking for documentation for the main branch (latest development branch): you can find it on [s.apache.org/airflow-docs](https://s.apache.org/airflow-docs/).
Expand Down
2 changes: 1 addition & 1 deletion STATIC_CODE_CHECKS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ Run all checks for all changes in my branch since branched from main:

.. code-block:: bash
breeze static-checks -t mypy-core --only-my-changes
breeze static-checks --type mypy-core --only-my-changes
More examples can be found in `Breeze documentation <BREEZE.rst#running-static-checks>`_

Expand Down
4 changes: 3 additions & 1 deletion airflow/api_connexion/endpoints/variable_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def get_variable(*, variable_key: str, session: Session = NEW_SESSION) -> Respon
"""Get a variable by key."""
var = session.scalar(select(Variable).where(Variable.key == variable_key).limit(1))
if not var:
raise NotFound("Variable not found")
raise NotFound("Variable not found", detail="Variable does not exist")
return variable_schema.dump(var)


Expand Down Expand Up @@ -116,6 +116,8 @@ def patch_variable(
raise BadRequest("Invalid post body", detail="key from request body doesn't match uri parameter")
non_update_fields = ["key"]
variable = session.scalar(select(Variable).filter_by(key=variable_key).limit(1))
if not variable:
raise NotFound("Variable not found", detail="Variable does not exist")
if update_mask:
data = extract_update_mask_data(update_mask, non_update_fields, data)
for key, val in data.items():
Expand Down
15 changes: 5 additions & 10 deletions airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,15 @@ def internal_airflow_api(body: dict[str, Any]) -> APIResponse:
params_json = json.loads(str(body.get("params")))
params = BaseSerialization.deserialize(params_json, use_pydantic_models=True)
except Exception as err:
log.error("Error deserializing parameters.")
log.error(err)
log.error("Error (%s) when deserializing parameters: %s", err, params_json)
return Response(response="Error deserializing parameters.", status=400)

log.debug("Calling method %.", {method_name})
log.debug("Calling method %s.", method_name)
try:
output = handler(**params)
output_json = BaseSerialization.serialize(output, use_pydantic_models=True)
log.debug("Returning response")
return Response(
response=json.dumps(output_json or "{}", default=BaseSerialization.serialize),
headers={"Content-Type": "application/json"},
)
response = json.dumps(output_json) if output_json is not None else None
return Response(response=response, headers={"Content-Type": "application/json"})
except Exception as e:
log.error("Error when calling method %s.", method_name)
log.error(e)
log.error("Error (%s) when calling method %s.", e, method_name)
return Response(response=f"Error executing method: {method_name}.", status=500)
4 changes: 3 additions & 1 deletion airflow/api_internal/internal_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def make_jsonrpc_request(method_name: str, params_json: str) -> bytes:
return response.content

@wraps(func)
def wrapper(*args, **kwargs) -> RT:
def wrapper(*args, **kwargs):
use_internal_api = InternalApiConfig.get_use_internal_api()
if not use_internal_api:
return func(*args, **kwargs)
Expand All @@ -128,6 +128,8 @@ def wrapper(*args, **kwargs) -> RT:
)
method_name = f"{func.__module__}.{func.__qualname__}"
result = make_jsonrpc_request(method_name, args_json)
if result is None or result == b"":
return None
return BaseSerialization.deserialize(json.loads(result), use_pydantic_models=True)

return wrapper
3 changes: 1 addition & 2 deletions airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,8 +687,7 @@ def update_triggers(self, requested_trigger_ids: set[int]):
self.set_trigger_logging_metadata(new_trigger_orm.task_instance, new_id, new_trigger_instance)
self.to_create.append((new_id, new_trigger_instance))
# Enqueue orphaned triggers for cancellation
for old_id in cancel_trigger_ids:
self.to_cancel.append(old_id)
self.to_cancel.extend(cancel_trigger_ids)

def set_trigger_logging_metadata(self, ti: TaskInstance, trigger_id, trigger):
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3333,7 +3333,7 @@ def validate_schedule_and_params(self):
if not self.timetable.can_be_scheduled:
return

for k, v in self.params.items():
for v in self.params.values():
# As type can be an array, we would check if `null` is an allowed type or not
if not v.has_value and ("type" not in v.schema or "null" not in v.schema["type"]):
raise AirflowException(
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/SUSPENDING_AND_RESUMING_PROVIDERS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Suspending providers

As of April 2023, we have the possibility to suspend individual providers, so that they are not holding
back dependencies for Airflow and other providers. The process of suspending providers is described
in `description of the process <https://github.com/apache/airflow/blob/main/README.md#suspending-releases-for-providers>`_
in `description of the process <https://github.com/apache/airflow/blob/main/PROVIDERS.rst#suspending-releases-for-providers>`_

Technically, suspending a provider is done by setting ``suspended : true``, in the provider.yaml of the
provider. This should be followed by committing the change and either automatically or manually running
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/amazon/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

Changelog
---------
* ``A bug intoduced in provider-amazon version 8.0.0 caused all 'EcsRunTaskOperator' tasks to detach from the ECS task after 10 minutes and fail - even if the ECS task was still running. In this version we are fixing it by returning the default 'waiter_max_attempts' value to 'sys.maxsize'``

8.6.0
.....
Expand Down
1 change: 0 additions & 1 deletion airflow/providers/amazon/aws/hooks/sns.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def publish_to_target(
:param target_arn: either a TopicArn or an EndpointArn
:param message: the default message you want to send
:param message: str
:param subject: subject of message
:param message_attributes: additional attributes to publish for message filtering. This should be
a flat dict; the DataType to be sent depends on the type of the value:
Expand Down
94 changes: 94 additions & 0 deletions airflow/providers/amazon/aws/notifications/sns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from functools import cached_property
from typing import Sequence

from airflow.exceptions import AirflowOptionalProviderFeatureException
from airflow.providers.amazon.aws.hooks.sns import SnsHook

try:
from airflow.notifications.basenotifier import BaseNotifier
except ImportError:
raise AirflowOptionalProviderFeatureException(
"Failed to import BaseNotifier. This feature is only available in Airflow versions >= 2.6.0"
)


class SnsNotifier(BaseNotifier):
"""
Amazon SNS (Simple Notification Service) Notifier.
.. seealso::
For more information on how to use this notifier, take a look at the guide:
:ref:`howto/notifier:SnsNotifier`
:param aws_conn_id: The :ref:`Amazon Web Services Connection id <howto/connection:aws>`
used for AWS credentials. If this is None or empty then the default boto3 behaviour is used.
:param target_arn: Either a TopicArn or an EndpointArn.
:param message: The message you want to send.
:param subject: The message subject you want to send.
:param message_attributes: The message attributes you want to send as a flat dict (data type will be
determined automatically).
:param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
"""

template_fields: Sequence[str] = (
"target_arn",
"message",
"subject",
"message_attributes",
"aws_conn_id",
"region_name",
)

def __init__(
self,
*,
aws_conn_id: str | None = SnsHook.default_conn_name,
target_arn: str,
message: str,
subject: str | None = None,
message_attributes: dict | None = None,
region_name: str | None = None,
):
super().__init__()
self.aws_conn_id = aws_conn_id
self.region_name = region_name
self.target_arn = target_arn
self.message = message
self.subject = subject
self.message_attributes = message_attributes

@cached_property
def hook(self) -> SnsHook:
"""Amazon SNS Hook (cached)."""
return SnsHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)

def notify(self, context):
"""Publish the notification message to Amazon SNS."""
self.hook.publish_to_target(
target_arn=self.target_arn,
message=self.message,
subject=self.subject,
message_attributes=self.message_attributes,
)


send_sns_notification = SnsNotifier
6 changes: 3 additions & 3 deletions airflow/providers/amazon/aws/operators/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from __future__ import annotations

import re
import sys
import warnings
from datetime import timedelta
from functools import cached_property
Expand Down Expand Up @@ -476,7 +475,9 @@ def __init__(
number_logs_exception: int = 10,
wait_for_completion: bool = True,
waiter_delay: int = 6,
waiter_max_attempts: int = 100,
waiter_max_attempts: int = 1000000 * 365 * 24 * 60 * 10,
# Set the default waiter duration to 1M years (attempts*delay)
# Airflow execution_timeout handles task timeout
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
**kwargs,
):
Expand Down Expand Up @@ -665,7 +666,6 @@ def _wait_for_task_ended(self) -> None:
return

waiter = self.client.get_waiter("tasks_stopped")
waiter.config.max_attempts = sys.maxsize # timeout is managed by airflow
waiter.wait(
cluster=self.cluster,
tasks=[self.arn],
Expand Down
9 changes: 7 additions & 2 deletions airflow/providers/amazon/aws/sensors/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from deprecated import deprecated

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook
from airflow.providers.amazon.aws.triggers.batch import BatchJobTrigger
from airflow.sensors.base import BaseSensorOperator
Expand Down Expand Up @@ -115,7 +115,12 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
Relies on trigger to throw an exception, otherwise it assumes execution was successful.
"""
if event["status"] != "success":
raise AirflowException(f"Error while running job: {event}")
message = f"Error while running job: {event}"
# TODO: remove this if-else block when min_airflow_version is set to higher than the version that
# changed in https://github.com/apache/airflow/pull/33424 is released
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)
job_id = event["job_id"]
self.log.info("Batch Job %s complete", job_id)

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/sensors/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def poke(self, context: Context) -> bool:
key = {self.partition_key_name: self.partition_key_value}
msg = (
f"Checking table {self.table_name} for "
+ f"item Partition Key: {self.partition_key_name}={self.partition_key_value}"
f"item Partition Key: {self.partition_key_name}={self.partition_key_value}"
)

if self.sort_key_name and self.sort_key_value:
Expand Down
Loading

0 comments on commit 7921dc5

Please sign in to comment.