Skip to content

Commit

Permalink
Merge branch 'main' of github.com:apache/airflow into 23497-tasks-stuck
Browse files Browse the repository at this point in the history
* 'main' of github.com:apache/airflow:
  Revert "Fix k8s pod.execute randomly stuck indefinitely by logs consumption (apache#23497) (apache#23618)" (apache#23656)
  Rename cluster_policy to task_policy (apache#23468)
  [FEATURE] google provider - BigQueryInsertJobOperator log query (apache#23648)
  Fix k8s pod.execute randomly stuck indefinitely by logs consumption (apache#23497) (apache#23618)
  Fixed test and remove pytest.mark.xfail for test_exc_tb (apache#23650)
  Added kubernetes version (1.24) in README.md(for Main version(dev)), … (apache#23649)
  Add `RedshiftDeleteClusterOperator` support (apache#23563)
  Added postgres 14 to support versions(including breeze) (apache#23506)
  Don't run pre-migration checks for downgrade (apache#23634)
  Add index for event column in log table (apache#23625)
  Simplify flash message for _airflow_moved tables (apache#23635)
  Fix assuming "Feature" answer on CI when generating docs (apache#23640)
  Fix typo issue (apache#23633)
  [FEATURE] add K8S 1.24 support (apache#23637)
  [FEATURE] update K8S-KIND to 0.13.0 (apache#23636)
  Prevent KubernetesJobWatcher getting stuck on resource too old (apache#23521)
  Make provider doc preparation a bit more fun :) (apache#23629)
  • Loading branch information
schattian committed May 11, 2022
2 parents 67474cc + 2eeb120 commit 79d38f3
Show file tree
Hide file tree
Showing 29 changed files with 598 additions and 343 deletions.
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ Airflow is not a streaming solution, but it is often used to process real-time d

Apache Airflow is tested with:

| | Main version (dev) | Stable version (2.3.0) |
|---------------------|-------------------------|------------------------|
| Python | 3.7, 3.8, 3.9, 3.10 | 3.7, 3.8, 3.9, 3.10 |
| Platform | AMD64/ARM64(\*) | AMD64/ARM64(\*) |
| Kubernetes | 1.20, 1.21, 1.22, 1.23 | 1.20, 1.21, 1.22, 1.23 |
| PostgreSQL | 10, 11, 12, 13 | 10, 11, 12, 13 |
| MySQL | 5.7, 8 | 5.7, 8 |
| SQLite | 3.15.0+ | 3.15.0+ |
| MSSQL | 2017(\*), 2019 (\*) | 2017(\*), 2019 (\*) |
| | Main version (dev) | Stable version (2.3.0) |
|---------------------|------------------------------|------------------------|
| Python | 3.7, 3.8, 3.9, 3.10 | 3.7, 3.8, 3.9, 3.10 |
| Platform | AMD64/ARM64(\*) | AMD64/ARM64(\*) |
| Kubernetes | 1.20, 1.21, 1.22, 1.23, 1.24 | 1.20, 1.21, 1.22, 1.23 |
| PostgreSQL | 10, 11, 12, 13, 14 | 10, 11, 12, 13, 14 |
| MySQL | 5.7, 8 | 5.7, 8 |
| SQLite | 3.15.0+ | 3.15.0+ |
| MSSQL | 2017(\*), 2019 (\*) | 2017(\*), 2019 (\*) |

\* Experimental

Expand Down
3 changes: 3 additions & 0 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ def run(self) -> None:
time.sleep(1)
except Exception:
self.log.exception('Unknown error in KubernetesJobWatcher. Failing')
self.resource_version = "0"
ResourceVersion().resource_version = "0"
raise
else:
self.log.warning(
Expand Down Expand Up @@ -288,6 +290,7 @@ def _health_check_kube_watcher(self):
self.log.error(
'Error while health checking kube watcher process. Process died for unknown reasons'
)
ResourceVersion().resource_version = "0"
self.kube_watcher = self._make_kube_watcher()

def run_next(self, next_job: KubernetesJobType) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Add index for ``event`` column in ``log`` table.
Revision ID: 1de7bc13c950
Revises: b1b348e02d07
Create Date: 2022-05-10 18:18:43.484829
"""

from alembic import op

# revision identifiers, used by Alembic.
revision = '1de7bc13c950'
down_revision = 'b1b348e02d07'
branch_labels = None
depends_on = None
airflow_version = '2.3.1'


def upgrade():
"""Apply Add index for ``event`` column in ``log`` table."""
op.create_index('idx_log_event', 'log', ['event'], unique=False)


def downgrade():
"""Unapply Add index for ``event`` column in ``log`` table."""
op.drop_index('idx_log_event', table_name='log')
5 changes: 4 additions & 1 deletion airflow/models/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ class Log(Base):
owner = Column(String(500))
extra = Column(Text)

__table_args__ = (Index('idx_log_dag', dag_id),)
__table_args__ = (
Index('idx_log_dag', dag_id),
Index('idx_log_event', event),
)

def __init__(self, event, task_instance=None, owner=None, extra=None, **kwargs):
self.dttm = timezone.utcnow()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.operators.redshift_cluster import (
RedshiftCreateClusterOperator,
RedshiftDeleteClusterOperator,
RedshiftPauseClusterOperator,
RedshiftResumeClusterOperator,
)
Expand Down Expand Up @@ -80,10 +81,18 @@
)
# [END howto_operator_redshift_resume_cluster]

# [START howto_operator_redshift_delete_cluster]
task_delete_cluster = RedshiftDeleteClusterOperator(
task_id="delete_cluster",
cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
)
# [END howto_operator_redshift_delete_cluster]

chain(
task_create_cluster,
task_wait_cluster_available,
task_pause_cluster,
task_wait_cluster_paused,
task_resume_cluster,
task_delete_cluster,
)
64 changes: 64 additions & 0 deletions airflow/providers/amazon/aws/operators/redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import time
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence

from airflow.models import BaseOperator
Expand Down Expand Up @@ -317,3 +318,66 @@ def execute(self, context: 'Context'):
self.log.warning(
"Unable to pause cluster since cluster is currently in status: %s", cluster_state
)


class RedshiftDeleteClusterOperator(BaseOperator):
"""
Delete an AWS Redshift cluster.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:RedshiftDeleteClusterOperator`
:param cluster_identifier: unique identifier of a cluster
:param skip_final_cluster_snapshot: determines cluster snapshot creation
:param final_cluster_snapshot_identifier: name of final cluster snapshot
:param wait_for_completion: Whether wait for cluster deletion or not
The default value is ``True``
:param aws_conn_id: aws connection to use
:param poll_interval: Time (in seconds) to wait between two consecutive calls to check cluster state
"""

template_fields: Sequence[str] = ("cluster_identifier",)
ui_color = "#eeaa11"
ui_fgcolor = "#ffffff"

def __init__(
self,
*,
cluster_identifier: str,
skip_final_cluster_snapshot: bool = True,
final_cluster_snapshot_identifier: Optional[str] = None,
wait_for_completion: bool = True,
aws_conn_id: str = "aws_default",
poll_interval: float = 30.0,
**kwargs,
):
super().__init__(**kwargs)
self.cluster_identifier = cluster_identifier
self.skip_final_cluster_snapshot = skip_final_cluster_snapshot
self.final_cluster_snapshot_identifier = final_cluster_snapshot_identifier
self.wait_for_completion = wait_for_completion
self.redshift_hook = RedshiftHook(aws_conn_id=aws_conn_id)
self.poll_interval = poll_interval

def execute(self, context: 'Context'):
self.delete_cluster()

if self.wait_for_completion:
cluster_status: str = self.check_status()
while cluster_status != "cluster_not_found":
self.log.info(
"cluster status is %s. Sleeping for %s seconds.", cluster_status, self.poll_interval
)
time.sleep(self.poll_interval)
cluster_status = self.check_status()

def delete_cluster(self) -> None:
self.redshift_hook.delete_cluster(
cluster_identifier=self.cluster_identifier,
skip_final_cluster_snapshot=self.skip_final_cluster_snapshot,
final_cluster_snapshot_identifier=self.final_cluster_snapshot_identifier,
)

def check_status(self) -> str:
return self.redshift_hook.cluster_status(self.cluster_identifier)
1 change: 1 addition & 0 deletions airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2145,6 +2145,7 @@ def execute(self, context: Any):
job_id = self._job_id(context)

try:
self.log.info(f"Executing: {self.configuration}")
job = self._submit_job(hook, job_id)
self._handle_job_error(job)
except Conflict:
Expand Down
10 changes: 0 additions & 10 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -1504,16 +1504,6 @@ def downgrade(*, to_revision, from_revision=None, show_sql_only=False, session:
log.info("Attempting downgrade to revision %s", to_revision)
config = _get_alembic_config()

errors_seen = False
for err in _check_migration_errors(session=session):
if not errors_seen:
log.error("Automatic migration failed. You may need to apply downgrades manually. ")
errors_seen = True
log.error("%s", err)

if errors_seen:
exit(1)

with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
if show_sql_only:
log.warning("Generating sql scripts for manual migration.")
Expand Down
26 changes: 19 additions & 7 deletions airflow/www/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,27 @@
{% for m in dashboard_alerts %}
{{ show_message(m.message, m.category) }}
{% endfor %}
{% for original_table_name, moved_table_name in migration_moved_data_alerts %}
{% call show_message(category='error', dismissible=false) %}
Airflow found incompatible data in the <code>{{ original_table_name }}</code> table in the
metadatabase, and has moved them to <code>{{ moved_table_name }}</code> during the database migration
to upgrade. Please inspect the moved data to decide whether you need to keep them, and manually drop
the <code>{{ moved_table_name }}</code> table to dismiss this warning. Read more about it
{% if migration_moved_data_alerts %}
{% call show_message(category='warning', dismissible=false) %}
While upgrading the metadatabase, Airflow had to move some bad data in order to apply new constraints.
The moved data can be found in the following tables:<br>
<table>
<tr>
<th style="padding-right:10px">Source table</th>
<th>Table with moved rows</th>
</tr>
{% for original_table_name, moved_table_name in migration_moved_data_alerts %}
<tr>
<td style="padding-right:10px"><code>{{ original_table_name }}</code></td>
<td><code>{{ moved_table_name }}</code></td>
</tr>
{% endfor %}
</table>
Please inspect the moved data to decide whether you need to keep them, and manually drop
the moved tables to dismiss this warning. Read more about it
in <a href={{ get_docs_url("installation/upgrading.html") }}><b>Upgrading</b></a>.
{% endcall %}
{% endfor %}
{% endif %}
{{ super() }}
{% if sqlite_warning | default(true) %}
{% call show_message(category='warning', dismissible=false) %}
Expand Down
4 changes: 2 additions & 2 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,13 +831,13 @@ def index(self):

def _iter_parsed_moved_data_table_names():
for table_name in inspect(session.get_bind()).get_table_names():
segments = table_name.split("__", 2)
segments = table_name.split("__", 3)
if len(segments) < 3:
continue
if segments[0] != settings.AIRFLOW_MOVED_TABLE_PREFIX:
continue
# Second segment is a version marker that we don't need to show.
yield segments[2], table_name
yield segments[3], table_name

if (
permissions.ACTION_CAN_ACCESS_MENU,
Expand Down
6 changes: 3 additions & 3 deletions breeze-complete
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ _breeze_allowed_python_major_minor_versions="3.7 3.8 3.9 3.10"
_breeze_allowed_backends="sqlite mysql postgres mssql"
_breeze_allowed_integrations="cassandra kerberos mongo openldap pinot rabbitmq redis statsd trino all"
_breeze_allowed_kubernetes_modes="image"
_breeze_allowed_kubernetes_versions="v1.23.4 v1.22.7 v1.21.10 v1.20.15"
_breeze_allowed_kubernetes_versions="v1.24.0 v1.23.6 v1.22.9 v1.21.12 v1.20.15"
_breeze_allowed_helm_versions="v3.6.3"
_breeze_allowed_kind_versions="v0.12.0"
_breeze_allowed_kind_versions="v0.13.0"
_breeze_allowed_mysql_versions="5.7 8"
_breeze_allowed_mssql_versions="2017-latest 2019-latest"
_breeze_allowed_postgres_versions="10 11 12 13"
_breeze_allowed_postgres_versions="10 11 12 13 14"
_breeze_allowed_kind_operations="start stop restart status deploy test shell k9s"
_breeze_allowed_executors="KubernetesExecutor CeleryExecutor LocalExecutor CeleryKubernetesExecutor LocalKubernetesExecutor"
_breeze_allowed_test_types="All Always Core Providers API CLI Integration Other WWW Postgres MySQL Helm Quarantined"
Expand Down
12 changes: 6 additions & 6 deletions dev/breeze/src/airflow_breeze/global_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
'all',
]
ALLOWED_KUBERNETES_MODES = ['image']
ALLOWED_KUBERNETES_VERSIONS = ['v1.23.4', 'v1.22.7', 'v1.21.10', 'v1.20.15']
ALLOWED_KIND_VERSIONS = ['v0.12.0']
ALLOWED_KUBERNETES_VERSIONS = ['v1.24.0', 'v1.23.6', 'v1.22.9', 'v1.21.12', 'v1.20.15']
ALLOWED_KIND_VERSIONS = ['v0.13.0']
ALLOWED_HELM_VERSIONS = ['v3.6.3']
ALLOWED_EXECUTORS = ['KubernetesExecutor', 'CeleryExecutor', 'LocalExecutor', 'CeleryKubernetesExecutor']
ALLOWED_KIND_OPERATIONS = ['start', 'stop', 'restart', 'status', 'deploy', 'test', 'shell', 'k9s']
Expand All @@ -63,7 +63,7 @@
MOUNT_NONE = "none"

ALLOWED_MOUNT_OPTIONS = [MOUNT_SELECTED, MOUNT_ALL, MOUNT_NONE]
ALLOWED_POSTGRES_VERSIONS = ['10', '11', '12', '13']
ALLOWED_POSTGRES_VERSIONS = ['10', '11', '12', '13', '14']
ALLOWED_MYSQL_VERSIONS = ['5.7', '8']
ALLOWED_MSSQL_VERSIONS = ['2017-latest', '2019-latest']
ALLOWED_TEST_TYPES = [
Expand Down Expand Up @@ -155,7 +155,7 @@ def get_available_packages(short_version=False) -> List[str]:
PRODUCTION_IMAGE = False
ALL_PYTHON_MAJOR_MINOR_VERSIONS = ['3.7', '3.8', '3.9', '3.10']
CURRENT_PYTHON_MAJOR_MINOR_VERSIONS = ['3.7', '3.8', '3.9', '3.10']
CURRENT_POSTGRES_VERSIONS = ['10', '11', '12', '13']
CURRENT_POSTGRES_VERSIONS = ['10', '11', '12', '13', '14']
CURRENT_MYSQL_VERSIONS = ['5.7', '8']
CURRENT_MSSQL_VERSIONS = ['2017-latest', '2019-latest']
POSTGRES_VERSION = CURRENT_POSTGRES_VERSIONS[0]
Expand Down Expand Up @@ -226,8 +226,8 @@ def get_airflow_extras():
ENABLED_SYSTEMS = ""

CURRENT_KUBERNETES_MODES = ['image']
CURRENT_KUBERNETES_VERSIONS = ['v1.23.4', 'v1.22.7', 'v1.21.10', 'v1.20.15']
CURRENT_KIND_VERSIONS = ['v0.12.0']
CURRENT_KUBERNETES_VERSIONS = ['v1.24.0', 'v1.23.6', 'v1.22.9', 'v1.21.12', 'v1.20.15']
CURRENT_KIND_VERSIONS = ['v0.13.0']
CURRENT_HELM_VERSIONS = ['v3.6.3']
CURRENT_EXECUTORS = ['KubernetesExecutor']

Expand Down
Loading

0 comments on commit 79d38f3

Please sign in to comment.