Skip to content

Commit

Permalink
merge v0.12.1
Browse files Browse the repository at this point in the history
  • Loading branch information
edgardevo committed Nov 23, 2023
2 parents a380d54 + e055213 commit c30847f
Show file tree
Hide file tree
Showing 16 changed files with 481 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.12.0
current_version = 0.12.1
commit = True
tag = True

Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ jobs:
run: |
python -m pip install --upgrade pip
python -m pip install poetry
poetry lock
poetry install
- name: Check Format
run: poetry run invoke format --check
Expand Down
39 changes: 39 additions & 0 deletions docs/custom_operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,42 @@ Then you link this function to a CustomCode object.

.. literalinclude:: examples/custom_operator.py
:language: python

How to use Custom arguments inside builder function?
#######################################################

It is possible to use kwargs dict arguments inside the builder function of the Custom Operator.

This feature allows you to create specific airflow Operators based on arguments value or the environment the DAG is running.
To be able to access those argument, you must defined them using the func_kwargs attribute when creatin the CustomCode object.

See the following example :

.. code-block:: python
def build(dag, env=None, task_type = "msg", **kwargs):
from airflow.operators.dummy import DummyOperator
if kwargs.get("key"):
return DummyOperator(task_id=f"dummy_{env}_{task_type}_{kwargs['key']}",dag=dag)
else :
return DummyOperator(task_id=f'dummy_{env}_{task_type}',dag=dag)
mycode = CustomCode(
name="dummy_env_custom",
version="1.0.0",
operator_builder=build,
requirements=[],
func_kwargs={"task_type": "msg"},
)
mycode_kwargs = CustomCode(
name="dummy_env_custom_kwargs",
version="1.0.0",
operator_builder=build,
requirements=[],
func_kwargs={"task_type": "msg","key":"random_string"},
)
Two CustomCode objects are created in this example. The first one defines only one argument (task_type) the result of the builder function will render a DummyOperator named based on the environment the DAG is running and based on the task_type argument value.

The second CustomCode object, add the argument key with the value msg. Because this second argument is not already defined in the builder function, you must include the kwargs dictionnary argument in the builder function and access the key argument via the kwargs.get(“key”) function.
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Welcome to Flycs SDK's documentation!
readme
installation
usage
transformations
views
functions
stored_procedures
Expand Down
81 changes: 81 additions & 0 deletions docs/transformations.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
=====
Transformations
=====

Flycs lets you define BigQuery queries that will be run as BigQuery Operators during the DAG run in which the query is used.
To define a query using the python SDK you must use a Transformation object.

.. code-block:: python
from flycs_sdk.transformations import Transformation
query = Transformation(
name="simple_copy",
query="SELECT * FROM raw.alpha.employees.employees AS raw",
version="1.0.0",
static=True,
)
How to use the Flycs SDK to create parameterized queries ?
#######################################################

It can happen that you need to generate some queries with information that are dynamic, or you want to create a pipeline that uses the same query but on different table.
To do all of these things, the easiest way is to leverage the power of the Flycs SDK.

In the following example we will show how you can define a pipeline that contains parameterized queries.

.. code-block:: python
from datetime import datetime, timezone
from flycs_sdk.pipelines import Pipeline, PipelineKind
from flycs_sdk.entities import Entity
from flycs_sdk.transformations import Transformation
# define your list of parameters
parameters = [
("table1", "tables2"),
("table3", "tables4"),
("table5", "tables6"),
]
# generate the transformation for each parameter
transformations = []
for table1, table2 in parameters:
query = Transformation(
name="transformation_" + "_".join([table1, table2]),
# Notice how we generate the content of the query using
# the parameters define at the top of the file
query=f"SELECT * FROM {table1} LEFT JOIN {table2}",
version="1.0.0",
)
transformations.append(query)
# define the entity
entity = Entity(
name="my_entity",
version="1.0.0",
stage_config={},
transformations={},
)
# insert the transformations into the entity
for t in transformations:
# stage_config is a dict that contains the stage name as key and a dictionnary of
# transformation name and version as value
entity.stage_config["staging"] = {t.name: t.version for t in transformations}
# stage_config is a dict that contains the stage name as key and a dictionnary of
# transformation name and transformation object as value
entity.transformations["staging"] = {t.name: t for t in transformations}
# define the pipeline
my_pipeline = Pipeline(
name="my_pipeline",
version="1.0.0",
schedule="10 10 * * *",
entities=[entity],
kind=PipelineKind.VANILLA,
start_time=datetime.now(tz=timezone.utc),
)
# to be picked up by the framework, all the pipelines needs to be
# added to a variables called `pipelines`
pipelines = [my_pipeline]
80 changes: 78 additions & 2 deletions docs/triggers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ The supported type of triggers are:
- `PubSub topic`
- `Google Cloud Storage`
- `Other pipeline`
- `Multiple Other pipelines`

PubSub topic
`PubSub topic`
############

This triggers creates a subscription to the topic and then waits for any message to come. One a message is received, the rest of the pipeline is executed.

The PubSub trigger has 2 property you can configure:
The PubSub trigger has 2 properties you can configure:

- **topic**: The full path to a pubsub topic. The topic MUST exist before the pipeline with the trigger is executed.
- **subscription_project**: Optional property that let you choose in which project the subscription to the topic will be created.
Expand Down Expand Up @@ -187,3 +188,78 @@ Same example with the python SDK:
start_time=datetime.now(tz=timezone.utc)
schedule=master, # Here we pass the master Pipeline object directly into the `schedule` field.
)
`Multiple Other pipelines`
################

This trigger is used when you want a child pipeline to be triggered by multiple parents pipelines. Using this trigger will actually used sensor on the child pipeline to wait for all the parents pipelines to finish.

This feature use ExternalSensor on the child pipeline. The schedule period of it will be automatically computed to run at the wider cron job scheduled of the list of the parents pipelines.

E.g : Having pipeline_A running 30 9 * * * (every day UTC at 9.30) and pipeline_B running \*/30 * * * * (every 30 minutes) with a child pipeline_C depending on both A and B. Then, the pipeline_C will be scheduled with the cron configuration 30 9 * * \*.

Using yaml definition :

.. code-block:: yaml
# parent pipeline_A
name: pipeline_A
kind: vanilla
version: 1.0.0
entities:
... # removed for brevity
schedule: "30 9 * * *"
start_time: "2021-01-01T00:00:00"
# parent pipeline_B
name: pipeline_B
kind: vanilla
version: 1.0.0
entities:
... # removed for brevity
schedule: "*/30 * * * *"
start_time: "2021-01-01T00:00:00"
# child pipeline_C
name: pipeline_C
kind: vanilla
version: 1.0.0
entities:
... # removed for brevity
schedule: ["pipeline_A","pipeline_B"]
start_time: "2021-01-01T00:00:00" # /!\ start_time of all the parents pipeline (A & B) must be the same as the pipeline_C
Using python pdk :


.. code-block:: python
now_datetime = datetime.now(tz=timezone.utc)
pipeline_A = Pipeline(
name="pipeline_A",
version="1.0.0",
entities=[entity],
kind=PipelineKind.VANILLA,
start_time=now_datetime,
schedule="30 9 * * *",
)
pipeline_B = Pipeline(
name="pipeline_B",
version="1.0.0",
entities=[entity],
kind=PipelineKind.VANILLA,
start_time=now_datetime,
schedule="*/30 * * * *",
)
pipeline_C = Pipeline(
name="pipeline_C",
version="1.0.0",
entities=[entity],
kind=PipelineKind.VANILLA,
start_time=now_datetime,
schedule=[pipeline_A, pipeline_B], # Here we pass the list of parents pipelines objects.
)
44 changes: 44 additions & 0 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,47 @@ Parameterized pipeline and entity also allow to introduce custom logic. Here is

.. literalinclude:: examples/parametrize_pipeline.py
:language: python

How to set up different schedule based on the environment the DAG is running ?
##################################################################
You can schedule your pipeline with a different cron job syntax based on the environment it will be running. The environment available are the following : **sbx** (sandbox), **tst** (test), **acc** (acceptance), **prd** (production).
To define the schedule for each environment , you must use a dictionary object :

.. code-block:: python
import pendulum
from flycs_sdk.pipelines import Pipeline, PipelineKind
from .entities import entity_demo
demo_python = Pipeline(
name="demo_python_env_scheduled",
version="1.0.0",
schedule={"sbx":"30 10 * * *", "tst":"@daily" , "acc":"@daily" , "prd" : "@weekly"},
entities=[entity_demo],
kind=PipelineKind.VANILLA,
start_time = pendulum.now(tz="Europe/Brussels")
)
How to make a Timezone aware DAG ?
#######################################################
To make your DAG pipeline running in a specific timezone you first need to make sure you pick a timezone name defined in `this Timezone list <https://en.wikipedia.org/wiki/List_of_tz_database_time_zones>`_.
Using the python-sdk you have to use the pendulum package library :

.. code-block:: python
import pendulum
from flycs_sdk.pipelines import Pipeline, PipelineKind
from .entities import entity_demo
demo_python = Pipeline(
name="demo_python",
version="1.0.0",
schedule="10 10 * * *",
entities=[entity_demo],
kind=PipelineKind.VANILLA,
start_time = pendulum.now(tz="Europe/Brussels")
)
NB : By default, using a datetime object for python Pipeline instead of a pendulum object will consider the DAG is running in UTC timezone.
2 changes: 1 addition & 1 deletion flycs_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

__author__ = """Tristan Van Thielen"""
__email__ = "[email protected]"
__version__ = "0.12.0"
__version__ = "0.12.1"
Loading

0 comments on commit c30847f

Please sign in to comment.