Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Provider Deprecations in Elasticsearch #44629

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions providers/src/airflow/providers/elasticsearch/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@
Changelog
---------

main
.....

.. warning::
All deprecated classes, parameters and features have been removed from the ElasticSearch provider package.
The following breaking changes were introduced:

* Hooks
* Remove ``airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchHook``. Use ``airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchSQLHook`` instead.
* Log
* Removed ``log_id_template`` parameter from ``ElasticsearchTaskHandler``.
* Removed ``retry_timeout`` parameter from ``ElasticsearchTaskHandler``. Use ``retry_on_timeout`` instead

5.5.3
.....

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
from typing import TYPE_CHECKING, Any
from urllib import parse

from deprecated import deprecated
from elasticsearch import Elasticsearch

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.hooks.base import BaseHook
from airflow.providers.common.sql.hooks.sql import DbApiHook

Expand Down Expand Up @@ -142,21 +140,6 @@ def get_uri(self) -> str:
return uri


@deprecated(
reason="Please use `airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchSQLHook`.",
category=AirflowProviderDeprecationWarning,
)
class ElasticsearchHook(ElasticsearchSQLHook):
"""
This class is deprecated and was renamed to ElasticsearchSQLHook.

Please use :class:`airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchSQLHook`.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)


class ElasticsearchPythonHook(BaseHook):
"""
Interacts with Elasticsearch. This hook uses the official Elasticsearch Python Client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import logging
import sys
import time
import warnings
from collections import defaultdict
from operator import attrgetter
from typing import TYPE_CHECKING, Any, Callable, Literal
Expand All @@ -36,7 +35,7 @@

from airflow import __version__ as airflow_version
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.exceptions import AirflowException
from airflow.models.dagrun import DagRun
from airflow.providers.elasticsearch.log.es_json_formatter import ElasticsearchJSONFormatter
from airflow.providers.elasticsearch.log.es_response import ElasticSearchResponse, Hit
Expand Down Expand Up @@ -77,20 +76,6 @@ def get_es_kwargs_from_config() -> dict[str, Any]:
if elastic_search_config
else {}
)
# TODO: Remove in next major release (drop support for elasticsearch<8 parameters)
if (
elastic_search_config
and "retry_timeout" in elastic_search_config
and not kwargs_dict.get("retry_on_timeout")
):
warnings.warn(
"retry_timeout is not supported with elasticsearch>=8. Please use `retry_on_timeout`.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
retry_timeout = elastic_search_config.get("retry_timeout")
if retry_timeout is not None:
kwargs_dict["retry_on_timeout"] = retry_timeout
return kwargs_dict


Expand Down Expand Up @@ -162,8 +147,6 @@ def __init__(
index_patterns: str = conf.get("elasticsearch", "index_patterns"),
index_patterns_callable: str = conf.get("elasticsearch", "index_patterns_callable", fallback=""),
es_kwargs: dict | None | Literal["default_es_kwargs"] = "default_es_kwargs",
*,
log_id_template: str | None = None,
**kwargs,
):
es_kwargs = es_kwargs or {}
Expand All @@ -175,14 +158,7 @@ def __init__(

self.client = elasticsearch.Elasticsearch(host, **es_kwargs)
# in airflow.cfg, host of elasticsearch has to be http://dockerhostXxxx:9200
if USE_PER_RUN_LOG_ID and log_id_template is not None:
warnings.warn(
"Passing log_id_template to ElasticsearchTaskHandler is deprecated and has no effect",
AirflowProviderDeprecationWarning,
stacklevel=2,
)

self.log_id_template = log_id_template # Only used on Airflow < 2.3.2.
self.frontend = frontend
self.mark_end_on_close = True
self.end_of_log_mark = end_of_log_mark.strip()
Expand Down Expand Up @@ -244,8 +220,6 @@ def _render_log_id(self, ti: TaskInstance | TaskInstanceKey, try_number: int) ->
dag_run = ti.get_dagrun(session=session)
if USE_PER_RUN_LOG_ID:
log_id_template = dag_run.get_log_template(session=session).elasticsearch_id
else:
log_id_template = self.log_id_template

if TYPE_CHECKING:
assert ti.task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ hooks:
- airflow.providers.elasticsearch.hooks.elasticsearch

connection-types:
- hook-class-name: airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchHook
- hook-class-name: airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchSQLHook
connection-type: elasticsearch

logging:
Expand Down
30 changes: 4 additions & 26 deletions providers/tests/elasticsearch/hooks/test_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,46 +20,24 @@
from unittest import mock
from unittest.mock import MagicMock

import pytest
from elasticsearch import Elasticsearch

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models import Connection
from airflow.providers.elasticsearch.hooks.elasticsearch import (
ElasticsearchHook,
ElasticsearchPythonHook,
ElasticsearchSQLHook,
ESConnection,
)


class TestElasticsearchHook:
def test_throws_warning(self):
self.cur = mock.MagicMock(rowcount=0)
self.conn = mock.MagicMock()
self.conn.cursor.return_value = self.cur
conn = self.conn
self.connection = Connection(host="localhost", port=9200, schema="http")

with pytest.warns(AirflowProviderDeprecationWarning):

class UnitTestElasticsearchHook(ElasticsearchHook):
conn_name_attr = "test_conn_id"

def get_conn(self):
return conn

self.db_hook = UnitTestElasticsearchHook()


class TestElasticsearchSQLHookConn:
def setup_method(self):
self.connection = Connection(host="localhost", port=9200, schema="http")

class UnitTestElasticsearchHook(ElasticsearchSQLHook):
class UnitTestElasticsearchSQLHook(ElasticsearchSQLHook):
conn_name_attr = "elasticsearch_conn_id"

self.db_hook = UnitTestElasticsearchHook()
self.db_hook = UnitTestElasticsearchSQLHook()
self.db_hook.get_connection = mock.Mock()
self.db_hook.get_connection.return_value = self.connection

Expand All @@ -77,13 +55,13 @@ def setup_method(self):
self.conn.cursor.return_value = self.cur
conn = self.conn

class UnitTestElasticsearchHook(ElasticsearchSQLHook):
class UnitTestElasticsearchSQLHook(ElasticsearchSQLHook):
conn_name_attr = "test_conn_id"

def get_conn(self):
return conn

self.db_hook = UnitTestElasticsearchHook()
self.db_hook = UnitTestElasticsearchSQLHook()

def test_get_first_record(self):
statement = "SQL"
Expand Down
3 changes: 0 additions & 3 deletions tests/always/test_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,9 +526,6 @@ class TestElasticsearchProviderProjectStructure(ExampleCoverageTest):
PROVIDER = "elasticsearch"
CLASS_DIRS = {"hooks"}
CLASS_SUFFIXES = ["Hook"]
DEPRECATED_CLASSES = {
"airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchHook",
}


class TestCncfProviderProjectStructure(ExampleCoverageTest):
Expand Down