Skip to content

Commit

Permalink
added flag --skip-existed & overwrite check for variables and connect…
Browse files Browse the repository at this point in the history
…ions
  • Loading branch information
xnuinside committed Dec 9, 2023
1 parent 1099993 commit 38ea972
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 10 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
*0.2.0*

1. Added check for variables - now if variable already exists on server Airflow Helper will raise error if you tries to overwrite it from the config.
To overwrite existed Variables, Connections, Pools - use flag '--overwrite' or argument with same name, if you use Airflow Helper from Python.

2. Added flag --skip-existed to avoid raise error if variables/connections/pools exists already on Airflow Server - it will just add new one from config file.


*0.1.2*
1. Do not fail if some sections from config are not exists

Expand Down
38 changes: 36 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,36 @@ I can only guarantee that 100% library works with Apache Airflow versions that a
All arguments that required in cli or Python code have 'default' setting, you can check all of them in file 'airflow_helper/settings.py'


#### Variables overwriting
#### Airflow Helper settings & flags

Note, that by default Airflow always overwrite variables, then you want to set up same that exists already in Airflow DB. 'overwrite' param that exists in Airflow Helper in config upload process relative only to Pools.
You can configure how you want to use config - overwrite existed variables/connections/pools with values from config or just skip them, or raise error if already exist.

In cli (or as arguments in Python main class, if you use helper directly from python) exist several useful flags, that you can use:

```console

airflow-helper load [OPTIONS] [FILE_PATH]
# options:
--url TEXT Apache Airflow full url to connect. You can provide it or host & port separately. [default: None]--host TEXT Apache Airflow server host form that obtain existed settings [default: http://localhost]
--port TEXT Apache Airflow server port form that obtain existed settings [default: 8080]
--user -u TEXT Apache Airflow user with read rights [default: airflow]
--password -p TEXT Apache Airflow user password [default: airflow]
--overwrite -o Overwrite Connections & Pools if they already exists
--skip-existed -se Skip `already exists` errors
--help -h Show this message and exit.
```

```console

airflow-helper create [OPTIONS] COMMAND [ARGS]
# commands:
from-server Create config with values from existed Airflow Server
new Create new empty config
# options
--help -h Show this message and exit.
```

### What if I already have Airflow server with dozens of variables??

Expand Down Expand Up @@ -265,6 +291,14 @@ Example, to overwrite default airflow host you should provide environment variab
5. Create overwrite mode for settings upload

## Changelog
*0.2.0*

1. Added check for variables - now if variable already exists on server Airflow Helper will raise error if you tries to overwrite it from the config.
To overwrite existed Variables, Connections, Pools - use flag '--overwrite' or argument with same name, if you use Airflow Helper from Python.

2. Added flag --skip-existed to avoid raise error if variables/connections/pools exists already on Airflow Server - it will just add new one from config file.


*0.1.2*
1. Do not fail if some sections from config are not exists

Expand Down
9 changes: 9 additions & 0 deletions airflow_helper/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,18 @@ def load(
help="Overwrite Connections & Pools if they already exists",
),
] = False,
skip_existed: Annotated[
bool,
typer.Option(
"--skip-existed",
"-se",
help="Skip `already exists` errors",
),
] = False,
):
ConfigUploader(
overwrite=overwrite,
skip_existed=skip_existed,
file_path=file_path,
url=url,
host=host,
Expand Down
70 changes: 66 additions & 4 deletions airflow_helper/core.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import sys
from typing import Callable, List, Union

import airflow_client
Expand All @@ -9,6 +10,7 @@
from airflow_helper.api import AirflowAPIBase
from airflow_helper.models import Connection, Pool, Variable
from airflow_helper.reader import ConfigReader
from airflow_helper.remote import AirflowAPIGrabber
from airflow_helper.settings import logger
from airflow_helper.settings import settings as s

Expand All @@ -18,10 +20,12 @@ def __init__(
self,
file_path: str = s.config_file_name,
overwrite: bool = s.overwrite,
skip_existed: bool = s.skip_existed,
**kwargs,
) -> None:
self.api = PostAirflowAPI(overwrite=overwrite, **kwargs)
self.overwrite = overwrite
self.api = PostAirflowAPI(
overwrite=overwrite, skip_existed=skip_existed, **kwargs
)
self.config = ConfigReader(file_path=file_path).load_config()

def upload_config_to_server(self):
Expand All @@ -37,8 +41,10 @@ def upload_config_to_server(self):


class PostAirflowAPI(AirflowAPIBase):
def __init__(self, overwrite: bool = False, **kwargs):
def __init__(self, overwrite: bool = False, skip_existed: bool = False, **kwargs):
self.overwrite = overwrite
self.skip_existed = skip_existed
self.kwargs = kwargs
super().__init__(**kwargs)

def post_items(
Expand All @@ -56,8 +62,12 @@ def post_items(
# mean we have already duplicate
if self.overwrite:
items_to_patch.append(item)
else:
elif self.skip_existed is False:
raise e
else:
logger.error(
f"Item {item} already exists in Airflow Server. Skipped. \n"
)
return items_to_patch

def patch_connections(
Expand All @@ -69,7 +79,36 @@ def patch_connections(
APIConnection(**connection.model_dump(exclude_none=True)),
)

def filter_existed_connections(self, connections: List[Connection]) -> None:
if not self.overwrite:
# get list of variables on server first and filter variables that already exists,
# because Airflow anyway overwrite variables
api_get = AirflowAPIGrabber(**self.kwargs)
connections_on_server = api_get.get_connections()
connections_on_server = {
connection.connection_id: connection
for connection in connections_on_server
}
to_post_connections = []
for connection in connections:
if (
connection.connection_id in connections_on_server
and connection.conn_type
== connections_on_server[connection.connection_id].conn_type
):
message = f"""ERROR: Connection with connection_id `{connection.connection_id}`
and type `{connection.conn_type}` already exists \n"""
logger.error(message)
if not self.skip_existed:
sys.exit(1)
else:
to_post_connections.append(connection)
return to_post_connections
return connections

def create_connections(self, connections: List[Connection]) -> None:
connections = self.filter_existed_connections(connections)

with airflow_client.client.ApiClient(self.conn_config) as api_client:
items_to_patch = self.post_items(
connections,
Expand All @@ -95,7 +134,30 @@ def create_pools(self, pools: List[Connection]) -> None:
if self.overwrite and items_to_patch:
self.patch_pools(items_to_patch, api_client)

def filter_existed_variables(self, variables: List[Variable]) -> None:
if not self.overwrite:
# get list of variables on server first and filter variables that already exists,
# because Airflow anyway overwrite variables
api_get = AirflowAPIGrabber(**self.kwargs)
variables_on_server = api_get.get_variables()
variables_on_server = {variable.key for variable in variables_on_server}
to_post_variables = []
for variable in variables:
if variable.key in variables_on_server:
message = (
f"ERROR: Variable with name `{variable.key}` already exists \n"
)
logger.error(message)
if not self.skip_existed:
sys.exit(1)
else:
to_post_variables.append(variable)
return to_post_variables
return variables

def create_variables(self, variables: List[Variable]) -> None:
variables = self.filter_existed_variables(variables)

with airflow_client.client.ApiClient(self.conn_config) as api_client:
self.post_items(
variables,
Expand Down
1 change: 1 addition & 0 deletions airflow_helper/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class Settings(BaseSettings):
config_file_name: str = "airflow_settings.yaml"
# upload settings
overwrite: bool = False
skip_existed: bool = False

model_config = SettingsConfigDict(env_prefix="airflow_helper_")

Expand Down
43 changes: 40 additions & 3 deletions docs/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,37 @@ Default settings

All arguments that required in cli or Python code have 'default' setting, you can check all of them in file 'airflow_helper/settings.py'

Variables overwriting
~~~~~~~~~~~~~~~~~~~~~
Airflow Helper settings & flags
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Note, that by default Airflow always overwrite variables, then you want to set up same that exists already in Airflow DB. 'overwrite' param that exists in Airflow Helper in config upload process relative only to Pools.
You can configure how you want to use config - overwrite existed variables/connections/pools with values from config or just skip them, or raise error if already exist.

In cli (or as arguments in Python main class, if you use helper directly from python) exist several useful flags, that you can use:

.. code-block:: console
airflow-helper load [OPTIONS] [FILE_PATH]
# options:
--url TEXT Apache Airflow full url to connect. You can provide it or host & port separately. [default: None]--host TEXT Apache Airflow server host form that obtain existed settings [default: http://localhost]
--port TEXT Apache Airflow server port form that obtain existed settings [default: 8080]
--user -u TEXT Apache Airflow user with read rights [default: airflow]
--password -p TEXT Apache Airflow user password [default: airflow]
--overwrite -o Overwrite Connections & Pools if they already exists
--skip-existed -se Skip `already exists` errors
--help -h Show this message and exit.
.. code-block:: console
airflow-helper create [OPTIONS] COMMAND [ARGS]
# commands:
from-server Create config with values from existed Airflow Server
new Create new empty config
# options
--help -h Show this message and exit.
What if I already have Airflow server with dozens of variables??
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down Expand Up @@ -298,6 +325,16 @@ TODO
Changelog
---------

*0.2.0*


#.
Added check for variables - now if variable already exists on server Airflow Helper will raise error if you tries to overwrite it from the config.
To overwrite existed Variables, Connections, Pools - use flag '--overwrite' or argument with same name, if you use Airflow Helper from Python.

#.
Added flag --skip-existed to avoid raise error if variables/connections/pools exists already on Airflow Server - it will just add new one from config file.

*0.1.2*


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "airflow-helper"
version = "0.1.2"
version = "0.2.0"
description = ""
authors = ["Iuliia Volkova <[email protected]>"]
license = "MIT"
Expand Down

0 comments on commit 38ea972

Please sign in to comment.