From 38ea972ca6116e31936b2b508a7844108ff9b4cc Mon Sep 17 00:00:00 2001 From: xnuinside Date: Sat, 9 Dec 2023 23:57:23 +0300 Subject: [PATCH] added flag --skip-existed & overwrite check for variables and connections --- CHANGELOG.txt | 8 +++++ README.md | 38 +++++++++++++++++++-- airflow_helper/cli.py | 9 +++++ airflow_helper/core.py | 70 +++++++++++++++++++++++++++++++++++--- airflow_helper/settings.py | 1 + docs/README.rst | 43 +++++++++++++++++++++-- pyproject.toml | 2 +- 7 files changed, 161 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.txt b/CHANGELOG.txt index 5b14c9a..5cf49c9 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -1,3 +1,11 @@ +*0.2.0* + +1. Added check for variables - now if variable already exists on server Airflow Helper will raise error if you tries to overwrite it from the config. +To overwrite existed Variables, Connections, Pools - use flag '--overwrite' or argument with same name, if you use Airflow Helper from Python. + +2. Added flag --skip-existed to avoid raise error if variables/connections/pools exists already on Airflow Server - it will just add new one from config file. + + *0.1.2* 1. Do not fail if some sections from config are not exists diff --git a/README.md b/README.md index 923fb72..455f006 100644 --- a/README.md +++ b/README.md @@ -71,10 +71,36 @@ I can only guarantee that 100% library works with Apache Airflow versions that a All arguments that required in cli or Python code have 'default' setting, you can check all of them in file 'airflow_helper/settings.py' -#### Variables overwriting +#### Airflow Helper settings & flags -Note, that by default Airflow always overwrite variables, then you want to set up same that exists already in Airflow DB. 'overwrite' param that exists in Airflow Helper in config upload process relative only to Pools. +You can configure how you want to use config - overwrite existed variables/connections/pools with values from config or just skip them, or raise error if already exist. +In cli (or as arguments in Python main class, if you use helper directly from python) exist several useful flags, that you can use: + +```console + + airflow-helper load [OPTIONS] [FILE_PATH] + + # options: + --url TEXT Apache Airflow full url to connect. You can provide it or host & port separately. [default: None]--host TEXT Apache Airflow server host form that obtain existed settings [default: http://localhost] + --port TEXT Apache Airflow server port form that obtain existed settings [default: 8080] + --user -u TEXT Apache Airflow user with read rights [default: airflow] + --password -p TEXT Apache Airflow user password [default: airflow] + --overwrite -o Overwrite Connections & Pools if they already exists + --skip-existed -se Skip `already exists` errors + --help -h Show this message and exit. +``` + +```console + + airflow-helper create [OPTIONS] COMMAND [ARGS] + + # commands: + from-server Create config with values from existed Airflow Server + new Create new empty config + # options + --help -h Show this message and exit. +``` ### What if I already have Airflow server with dozens of variables?? @@ -265,6 +291,14 @@ Example, to overwrite default airflow host you should provide environment variab 5. Create overwrite mode for settings upload ## Changelog +*0.2.0* + +1. Added check for variables - now if variable already exists on server Airflow Helper will raise error if you tries to overwrite it from the config. +To overwrite existed Variables, Connections, Pools - use flag '--overwrite' or argument with same name, if you use Airflow Helper from Python. + +2. Added flag --skip-existed to avoid raise error if variables/connections/pools exists already on Airflow Server - it will just add new one from config file. + + *0.1.2* 1. Do not fail if some sections from config are not exists diff --git a/airflow_helper/cli.py b/airflow_helper/cli.py index 1456b40..1b42d96 100644 --- a/airflow_helper/cli.py +++ b/airflow_helper/cli.py @@ -96,9 +96,18 @@ def load( help="Overwrite Connections & Pools if they already exists", ), ] = False, + skip_existed: Annotated[ + bool, + typer.Option( + "--skip-existed", + "-se", + help="Skip `already exists` errors", + ), + ] = False, ): ConfigUploader( overwrite=overwrite, + skip_existed=skip_existed, file_path=file_path, url=url, host=host, diff --git a/airflow_helper/core.py b/airflow_helper/core.py index e08ba1d..6eb9dc9 100644 --- a/airflow_helper/core.py +++ b/airflow_helper/core.py @@ -1,3 +1,4 @@ +import sys from typing import Callable, List, Union import airflow_client @@ -9,6 +10,7 @@ from airflow_helper.api import AirflowAPIBase from airflow_helper.models import Connection, Pool, Variable from airflow_helper.reader import ConfigReader +from airflow_helper.remote import AirflowAPIGrabber from airflow_helper.settings import logger from airflow_helper.settings import settings as s @@ -18,10 +20,12 @@ def __init__( self, file_path: str = s.config_file_name, overwrite: bool = s.overwrite, + skip_existed: bool = s.skip_existed, **kwargs, ) -> None: - self.api = PostAirflowAPI(overwrite=overwrite, **kwargs) - self.overwrite = overwrite + self.api = PostAirflowAPI( + overwrite=overwrite, skip_existed=skip_existed, **kwargs + ) self.config = ConfigReader(file_path=file_path).load_config() def upload_config_to_server(self): @@ -37,8 +41,10 @@ def upload_config_to_server(self): class PostAirflowAPI(AirflowAPIBase): - def __init__(self, overwrite: bool = False, **kwargs): + def __init__(self, overwrite: bool = False, skip_existed: bool = False, **kwargs): self.overwrite = overwrite + self.skip_existed = skip_existed + self.kwargs = kwargs super().__init__(**kwargs) def post_items( @@ -56,8 +62,12 @@ def post_items( # mean we have already duplicate if self.overwrite: items_to_patch.append(item) - else: + elif self.skip_existed is False: raise e + else: + logger.error( + f"Item {item} already exists in Airflow Server. Skipped. \n" + ) return items_to_patch def patch_connections( @@ -69,7 +79,36 @@ def patch_connections( APIConnection(**connection.model_dump(exclude_none=True)), ) + def filter_existed_connections(self, connections: List[Connection]) -> None: + if not self.overwrite: + # get list of variables on server first and filter variables that already exists, + # because Airflow anyway overwrite variables + api_get = AirflowAPIGrabber(**self.kwargs) + connections_on_server = api_get.get_connections() + connections_on_server = { + connection.connection_id: connection + for connection in connections_on_server + } + to_post_connections = [] + for connection in connections: + if ( + connection.connection_id in connections_on_server + and connection.conn_type + == connections_on_server[connection.connection_id].conn_type + ): + message = f"""ERROR: Connection with connection_id `{connection.connection_id}` + and type `{connection.conn_type}` already exists \n""" + logger.error(message) + if not self.skip_existed: + sys.exit(1) + else: + to_post_connections.append(connection) + return to_post_connections + return connections + def create_connections(self, connections: List[Connection]) -> None: + connections = self.filter_existed_connections(connections) + with airflow_client.client.ApiClient(self.conn_config) as api_client: items_to_patch = self.post_items( connections, @@ -95,7 +134,30 @@ def create_pools(self, pools: List[Connection]) -> None: if self.overwrite and items_to_patch: self.patch_pools(items_to_patch, api_client) + def filter_existed_variables(self, variables: List[Variable]) -> None: + if not self.overwrite: + # get list of variables on server first and filter variables that already exists, + # because Airflow anyway overwrite variables + api_get = AirflowAPIGrabber(**self.kwargs) + variables_on_server = api_get.get_variables() + variables_on_server = {variable.key for variable in variables_on_server} + to_post_variables = [] + for variable in variables: + if variable.key in variables_on_server: + message = ( + f"ERROR: Variable with name `{variable.key}` already exists \n" + ) + logger.error(message) + if not self.skip_existed: + sys.exit(1) + else: + to_post_variables.append(variable) + return to_post_variables + return variables + def create_variables(self, variables: List[Variable]) -> None: + variables = self.filter_existed_variables(variables) + with airflow_client.client.ApiClient(self.conn_config) as api_client: self.post_items( variables, diff --git a/airflow_helper/settings.py b/airflow_helper/settings.py index 4008e29..1743811 100644 --- a/airflow_helper/settings.py +++ b/airflow_helper/settings.py @@ -18,6 +18,7 @@ class Settings(BaseSettings): config_file_name: str = "airflow_settings.yaml" # upload settings overwrite: bool = False + skip_existed: bool = False model_config = SettingsConfigDict(env_prefix="airflow_helper_") diff --git a/docs/README.rst b/docs/README.rst index e31c080..9ad3951 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -94,10 +94,37 @@ Default settings All arguments that required in cli or Python code have 'default' setting, you can check all of them in file 'airflow_helper/settings.py' -Variables overwriting -~~~~~~~~~~~~~~~~~~~~~ +Airflow Helper settings & flags +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Note, that by default Airflow always overwrite variables, then you want to set up same that exists already in Airflow DB. 'overwrite' param that exists in Airflow Helper in config upload process relative only to Pools. +You can configure how you want to use config - overwrite existed variables/connections/pools with values from config or just skip them, or raise error if already exist. + +In cli (or as arguments in Python main class, if you use helper directly from python) exist several useful flags, that you can use: + +.. code-block:: console + + + airflow-helper load [OPTIONS] [FILE_PATH] + + # options: + --url TEXT Apache Airflow full url to connect. You can provide it or host & port separately. [default: None]--host TEXT Apache Airflow server host form that obtain existed settings [default: http://localhost] + --port TEXT Apache Airflow server port form that obtain existed settings [default: 8080] + --user -u TEXT Apache Airflow user with read rights [default: airflow] + --password -p TEXT Apache Airflow user password [default: airflow] + --overwrite -o Overwrite Connections & Pools if they already exists + --skip-existed -se Skip `already exists` errors + --help -h Show this message and exit. + +.. code-block:: console + + + airflow-helper create [OPTIONS] COMMAND [ARGS] + + # commands: + from-server Create config with values from existed Airflow Server + new Create new empty config + # options + --help -h Show this message and exit. What if I already have Airflow server with dozens of variables?? ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -298,6 +325,16 @@ TODO Changelog --------- +*0.2.0* + + +#. + Added check for variables - now if variable already exists on server Airflow Helper will raise error if you tries to overwrite it from the config. + To overwrite existed Variables, Connections, Pools - use flag '--overwrite' or argument with same name, if you use Airflow Helper from Python. + +#. + Added flag --skip-existed to avoid raise error if variables/connections/pools exists already on Airflow Server - it will just add new one from config file. + *0.1.2* diff --git a/pyproject.toml b/pyproject.toml index c76dafc..ed4c015 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "airflow-helper" -version = "0.1.2" +version = "0.2.0" description = "" authors = ["Iuliia Volkova "] license = "MIT"