Skip to content

Commit

Permalink
Provide the logger_name param in providers hooks in order to override…
Browse files Browse the repository at this point in the history
… the logger name (#36675)

* Provide the logger_name param in providers hooks in order to override the logger name
  • Loading branch information
hussein-awala authored Jan 10, 2024
1 parent 8fb55f2 commit 6bd450d
Show file tree
Hide file tree
Showing 62 changed files with 139 additions and 97 deletions.
3 changes: 2 additions & 1 deletion airflow/providers/apache/beam/hooks/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,10 @@ class BeamHook(BaseHook):
def __init__(
self,
runner: str,
**kwargs,
) -> None:
self.runner = runner
super().__init__()
super().__init__(**kwargs)

def _start_pipeline(
self,
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/apache/druid/hooks/druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ def __init__(
druid_ingest_conn_id: str = "druid_ingest_default",
timeout: int = 1,
max_ingestion_time: int | None = None,
**kwargs,
) -> None:
super().__init__()
super().__init__(**kwargs)
self.druid_ingest_conn_id = druid_ingest_conn_id
self.timeout = timeout
self.max_ingestion_time = max_ingestion_time
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/apache/hdfs/hooks/webhdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ class WebHDFSHook(BaseHook):
default_conn_name = "webhdfs_default"
hook_name = "Apache WebHDFS"

def __init__(self, webhdfs_conn_id: str = default_conn_name, proxy_user: str | None = None):
super().__init__()
def __init__(self, webhdfs_conn_id: str = default_conn_name, proxy_user: str | None = None, **kwargs):
super().__init__(**kwargs)
self.webhdfs_conn_id = webhdfs_conn_id
self.proxy_user = proxy_user

Expand Down
7 changes: 4 additions & 3 deletions airflow/providers/apache/hive/hooks/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ def __init__(
hive_cli_params: str = "",
auth: str | None = None,
proxy_user: str | None = None,
**kwargs,
) -> None:
super().__init__()
super().__init__(**kwargs)
conn = self.get_connection(hive_cli_conn_id)
self.hive_cli_params: str = hive_cli_params
self.use_beeline: bool = conn.extra_dejson.get("use_beeline", False)
Expand Down Expand Up @@ -495,8 +496,8 @@ class HiveMetastoreHook(BaseHook):
conn_type = "hive_metastore"
hook_name = "Hive Metastore Thrift"

def __init__(self, metastore_conn_id: str = default_conn_name) -> None:
super().__init__()
def __init__(self, metastore_conn_id: str = default_conn_name, **kwargs) -> None:
super().__init__(**kwargs)
self.conn = self.get_connection(metastore_conn_id)
self.metastore = self.get_metastore_client()

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/kafka/hooks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class KafkaBaseHook(BaseHook):

def __init__(self, kafka_config_id=default_conn_name, *args, **kwargs):
"""Initialize our Base."""
super().__init__()
super().__init__(**kwargs)
self.kafka_config_id = kafka_config_id
self.get_conn

Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/apache/kylin/hooks/kylin.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ def __init__(
kylin_conn_id: str = default_conn_name,
project: str | None = None,
dsn: str | None = None,
**kwargs,
):
super().__init__()
super().__init__(**kwargs)
self.kylin_conn_id = kylin_conn_id
self.project = project
self.dsn = dsn
Expand Down
7 changes: 5 additions & 2 deletions airflow/providers/apache/pig/hooks/pig.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ class PigCliHook(BaseHook):
hook_name = "Pig Client Wrapper"

def __init__(
self, pig_cli_conn_id: str = default_conn_name, pig_properties: list[str] | None = None
self,
pig_cli_conn_id: str = default_conn_name,
pig_properties: list[str] | None = None,
**kwargs,
) -> None:
super().__init__()
super().__init__(**kwargs)
conn = self.get_connection(pig_cli_conn_id)
conn_pig_properties = conn.extra_dejson.get("pig_properties")
if conn_pig_properties:
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/apache/pinot/hooks/pinot.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ def __init__(
conn_id: str = "pinot_admin_default",
cmd_path: str = "pinot-admin.sh",
pinot_admin_system_exit: bool = False,
**kwargs,
) -> None:
super().__init__()
super().__init__(**kwargs)
conn = self.get_connection(conn_id)
self.host = conn.host
self.port = str(conn.port)
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/apache/spark/hooks/spark_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ def __init__(
num_executors: int | None = None,
verbose: bool = True,
yarn_queue: str | None = None,
**kwargs,
) -> None:
super().__init__()
super().__init__(**kwargs)
options: dict = {}
conn: Connection | None = None

Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/apprise/hooks/apprise.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class AppriseHook(BaseHook):
conn_type = "apprise"
hook_name = "Apprise"

def __init__(self, apprise_conn_id: str = default_conn_name) -> None:
super().__init__()
def __init__(self, apprise_conn_id: str = default_conn_name, **kwargs) -> None:
super().__init__(**kwargs)
self.apprise_conn_id = apprise_conn_id

def get_config_from_conn(self):
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/atlassian/jira/hooks/jira.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class JiraHook(BaseHook):
conn_name_attr = "jira_conn_id"
hook_name = "JIRA"

def __init__(self, jira_conn_id: str = default_conn_name, proxies: Any | None = None) -> None:
super().__init__()
def __init__(self, jira_conn_id: str = default_conn_name, proxies: Any | None = None, **kwargs) -> None:
super().__init__(**kwargs)
self.jira_conn_id = jira_conn_id
self.proxies = proxies
self.client: Jira | None = None
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/cloudant/hooks/cloudant.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]:
"relabeling": {"host": "Account", "login": "Username (or API Key)", "schema": "Database"},
}

def __init__(self, cloudant_conn_id: str = default_conn_name) -> None:
super().__init__()
def __init__(self, cloudant_conn_id: str = default_conn_name, **kwargs) -> None:
super().__init__(**kwargs)
self.cloudant_conn_id = cloudant_conn_id

def get_conn(self) -> cloudant:
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/cohere/hooks/cohere.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ def __init__(
conn_id: str = default_conn_name,
timeout: int | None = None,
max_retries: int | None = None,
**kwargs,
) -> None:
super().__init__()
super().__init__(**kwargs)
self.conn_id = conn_id
self.timeout = timeout
self.max_retries = max_retries
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/common/sql/hooks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class DbApiHook(BaseHook):
_test_connection_sql = "select 1"

def __init__(self, *args, schema: str | None = None, log_sql: bool = True, **kwargs):
super().__init__()
super().__init__(logger_name=kwargs.pop("logger_name", None))
if not self.conn_name_attr:
raise AirflowException("conn_name_attr is not defined")
elif len(args) == 1:
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/databricks/hooks/databricks_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ def __init__(
retry_delay: float = 1.0,
retry_args: dict[Any, Any] | None = None,
caller: str = "Unknown",
**kwargs,
) -> None:
super().__init__()
super().__init__(**kwargs)
self.databricks_conn_id = databricks_conn_id
self.timeout_seconds = timeout_seconds
if retry_limit < 1:
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/docker/hooks/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ def __init__(
version: str | None = None,
tls: TLSConfig | bool | None = None,
timeout: int = DEFAULT_TIMEOUT_SECONDS,
**kwargs,
) -> None:
super().__init__()
super().__init__(**kwargs)
if not base_url:
raise AirflowException("URL to the Docker server not provided.")
elif tls:
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/elasticsearch/hooks/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ class ElasticsearchPythonHook(BaseHook):
Example: {"ca_cert":"/path/to/cert", "basic_auth": "(user, pass)"}
"""

def __init__(self, hosts: list[Any], es_conn_args: dict | None = None):
super().__init__()
def __init__(self, hosts: list[Any], es_conn_args: dict | None = None, **kwargs):
super().__init__(**kwargs)
self.hosts = hosts
self.es_conn_args = es_conn_args or {}

Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/facebook/ads/hooks/ads.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ def __init__(
self,
facebook_conn_id: str = default_conn_name,
api_version: str | None = None,
**kwargs,
) -> None:
super().__init__()
super().__init__(**kwargs)
self.facebook_conn_id = facebook_conn_id
self.api_version = api_version
self.client_required_fields = ["app_id", "app_secret", "access_token", "account_id"]
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/ftp/hooks/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class FTPHook(BaseHook):
conn_type = "ftp"
hook_name = "FTP"

def __init__(self, ftp_conn_id: str = default_conn_name) -> None:
super().__init__()
def __init__(self, ftp_conn_id: str = default_conn_name, **kwargs) -> None:
super().__init__(**kwargs)
self.ftp_conn_id = ftp_conn_id
self.conn: ftplib.FTP | None = None

Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/google/ads/hooks/ads.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ def __init__(
api_version: str | None,
gcp_conn_id: str = "google_cloud_default",
google_ads_conn_id: str = "google_ads_default",
**kwargs,
) -> None:
super().__init__()
super().__init__(**kwargs)
self.api_version = api_version or self.default_api_version
self.gcp_conn_id = gcp_conn_id
self.google_ads_conn_id = google_ads_conn_id
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/google/cloud/hooks/cloud_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,8 +771,9 @@ def __init__(
gcp_conn_id: str = "google_cloud_default",
default_gcp_project_id: str | None = None,
sql_proxy_binary_path: str | None = None,
**kwargs,
) -> None:
super().__init__()
super().__init__(**kwargs)
self.gcp_conn_id = gcp_conn_id
self.gcp_cloudsql_conn_id = gcp_cloudsql_conn_id
self.cloudsql_connection = self.get_connection(self.gcp_cloudsql_conn_id)
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/hooks/dataprep.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ class GoogleDataprepHook(BaseHook):
conn_type = "dataprep"
hook_name = "Google Dataprep"

def __init__(self, dataprep_conn_id: str = default_conn_name, api_version: str = "v4") -> None:
super().__init__()
def __init__(self, dataprep_conn_id: str = default_conn_name, api_version: str = "v4", **kwargs) -> None:
super().__init__(**kwargs)
self.dataprep_conn_id = dataprep_conn_id
self.api_version = api_version
conn = self.get_connection(self.dataprep_conn_id)
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/google/cloud/hooks/looker.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ class LookerHook(BaseHook):
def __init__(
self,
looker_conn_id: str,
**kwargs,
) -> None:
super().__init__()
super().__init__(**kwargs)
self.looker_conn_id = looker_conn_id
# source is used to track origin of the requests
self.source = f"airflow:{version}"
Expand Down
4 changes: 3 additions & 1 deletion airflow/providers/google/common/hooks/base_google.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,9 @@ def __init__(
gcp_conn_id: str = "google_cloud_default",
delegate_to: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__()
super().__init__(**kwargs)
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
Expand Down Expand Up @@ -628,6 +629,7 @@ class GoogleBaseAsyncHook(BaseHook):
sync_hook_class: Any = None

def __init__(self, **kwargs: Any):
super().__init__(logger_name=kwargs.pop("logger_name", None))
self._hook_kwargs = kwargs
self._sync_hook = None

Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/google/leveldb/hooks/leveldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ class LevelDBHook(BaseHook):
conn_type = "leveldb"
hook_name = "LevelDB"

def __init__(self, leveldb_conn_id: str = default_conn_name):
super().__init__()
def __init__(self, leveldb_conn_id: str = default_conn_name, **kwargs):
super().__init__(**kwargs)
self.leveldb_conn_id = leveldb_conn_id
self.connection = self.get_connection(leveldb_conn_id)
self.db: plyvel.DB | None = None
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/grpc/hooks/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ def __init__(
grpc_conn_id: str = default_conn_name,
interceptors: list[Callable] | None = None,
custom_connection_func: Callable | None = None,
**kwargs,
) -> None:
super().__init__()
super().__init__(**kwargs)
self.grpc_conn_id = grpc_conn_id
self.conn = self.get_connection(self.grpc_conn_id)
self.extras = self.conn.extra_dejson
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/hashicorp/hooks/vault.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def __init__(
radius_port: int | None = None,
**kwargs,
):
super().__init__()
super().__init__(logger_name=kwargs.pop("logger_name", None))
self.connection = self.get_connection(vault_conn_id)

if not auth_type:
Expand Down
5 changes: 4 additions & 1 deletion airflow/providers/http/hooks/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ def __init__(
tcp_keep_alive_idle: int = 120,
tcp_keep_alive_count: int = 20,
tcp_keep_alive_interval: int = 30,
**kwargs,
) -> None:
super().__init__()
super().__init__(**kwargs)
self.http_conn_id = http_conn_id
self.method = method.upper()
self.base_url: str = ""
Expand Down Expand Up @@ -283,7 +284,9 @@ def __init__(
auth_type: Any = aiohttp.BasicAuth,
retry_limit: int = 3,
retry_delay: float = 1.0,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.http_conn_id = http_conn_id
self.method = method.upper()
self.base_url: str = ""
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/imap/hooks/imap.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ class ImapHook(BaseHook):
conn_type = "imap"
hook_name = "IMAP"

def __init__(self, imap_conn_id: str = default_conn_name) -> None:
super().__init__()
def __init__(self, imap_conn_id: str = default_conn_name, **kwargs) -> None:
super().__init__(**kwargs)
self.imap_conn_id = imap_conn_id
self.mail_client: imaplib.IMAP4_SSL | imaplib.IMAP4 | None = None

Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/jenkins/hooks/jenkins.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]:
},
}

def __init__(self, conn_id: str = default_conn_name) -> None:
super().__init__()
def __init__(self, conn_id: str = default_conn_name, **kwargs) -> None:
super().__init__(**kwargs)
connection = self.get_connection(conn_id)
self.connection = connection
connection_prefix = "http"
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/microsoft/azure/hooks/adx.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]:
},
}

def __init__(self, azure_data_explorer_conn_id: str = default_conn_name) -> None:
super().__init__()
def __init__(self, azure_data_explorer_conn_id: str = default_conn_name, **kwargs) -> None:
super().__init__(**kwargs)
self.conn_id = azure_data_explorer_conn_id

@cached_property
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/microsoft/azure/hooks/asb.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]:
},
}

def __init__(self, azure_service_bus_conn_id: str = default_conn_name) -> None:
super().__init__()
def __init__(self, azure_service_bus_conn_id: str = default_conn_name, **kwargs) -> None:
super().__init__(**kwargs)
self.conn_id = azure_service_bus_conn_id

def get_conn(self):
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/microsoft/azure/hooks/base_azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]:
},
}

def __init__(self, sdk_client: Any, conn_id: str = "azure_default"):
def __init__(self, sdk_client: Any, conn_id: str = "azure_default", **kwargs):
self.sdk_client = sdk_client
self.conn_id = conn_id
super().__init__()
super().__init__(**kwargs)

def get_conn(self) -> Any:
"""
Expand Down
Loading

0 comments on commit 6bd450d

Please sign in to comment.