Skip to content

hazbottles/flonb

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

49 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

flonb

Build Status

flonb lets you develop data pipelines (a.k.a task graphs) with very rapid "modify -> run -> inspect" iteration loops.

It lets you:

  • Label each run of the pipeline with a human-readable list of options. Easy to see what you did to the data, easy to reproduce.
  • Cache results automagically based on the labels. Change an option, add a new option - flonb will manage the cache without extra boilerplate!
  • Parallel execution of your pipeline. flonb pipelines are interoperable with dask schedulers.

So you can spend more time focussing on your logic, and less time on error-prone boilerplate code.

Installation

$ pip install flonb

Note this also installs dask.core. If you require dask extensions (e.g. dask.bag, dask.dataframe, dask.delayed, etc...) then make sure to install them yourself. It can be a bit unclear with dask what extensions you have installed! https://docs.dask.org/en/latest/install.html

Example basic usage

Here is a simple pipeline that counts the number of times a word appears in a block of text.

Define a Task Graph using:

  • flonb.task_func decorator for each processing step.
  • flonb.Dep as the default parameter value to tell flonb which tasks to use as inputs.
import flonb


@flonb.task_func
def parse_text(normalise):
    text = (
        "Badger badger badger Mushroom mushroom "
        "Badger badger badger Mushroom mushroom "
        "Badger badger badger SNAKE its a SNAKE "
    )
    words = text.split(" ")
    if normalise:
        words = [w.lower() for w in words]
    return words


@flonb.task_func
def word_count(word, text=flonb.Dep(parse_text)):
    return sum([w == word for w in text])

Run the Task Graph using the .compute method that @flonb.task_func adds to your function.

Note that we supply all the options that specify the entire Task Graph. Even though the function word_count does not explicitly require the option normalise, its dependency parse_text does.

word_count.compute(normalise=False, word="badger")
6

You can still run your pipeline by manually calling the functions the usual way. This can be handy for debugging:

text = parse_text(normalise=False)
word_count(word="badger", text=text)
6

Change the options, and everything works like you'd expect.

word_count.compute(normalise=True, word="badger")
9

Importantly, you can use a single option in multiple places in the same pipeline:

@flonb.task_func
def summary_str(word, normalise, count=flonb.Dep(word_count)):
    return f"word={word}, normalise={normalise} -> count={count}"

print(summary_str.compute(word="mushroom", normalise=True))
word=mushroom, normalise=True -> count=4

A typical workflow using flonb is:

  1. Run pipeline with different options to see the impacts on your data.
  2. Add new options and/or steps.
  3. Repeat.

flonb is especially useful because you can add new options early in the Task Graph without having to manually propagate the option through every function in the pipeline!

Task Graphs

We can visualize the graph using dask[complete] + graphviz. You can install these with conda:

$ conda install dask python-graphviz

See https://docs.dask.org/en/latest/graphviz.html for other installation options.

import dask
graph, key = word_count.graph_and_key(normalise=False, word="badger")
dask.visualize(graph)

png

list dependencies

Oftentimes you will want to combine multiple computations with different versions of an option.

You can do this by supplying a list to flonb.Dep and using the .partial(**options) method.

@flonb.task_func
def count_many_words(
    counts=flonb.Dep(
        [word_count.partial(word="badger"), word_count.partial(word="mushroom")]
    )
):
    return counts  # so we can see how flonb substitutes the counts flonb.Dep parameter

count_many_words.compute(normalise=False)
[6, 2]

Lists can be nested, and non-flonb.Task objects are ignored:

@flonb.task_func
def count_many_words_heterogenous_nested_list(
    counts=flonb.Dep(
        [
            [word, word_count.partial(word=word)]
            for word in ["badger", "mushroom", "snake", "cow"]
        ]
    )
):
    return counts  # so we can see how flonb substitutes the counts flonb.Dep parameter

count_many_words_heterogenous_nested_list.compute(normalise=True)
[['badger', 9], ['mushroom', 4], ['snake', 2], ['cow', 0]]

Caching

flonb uses the options supplied to a task to uniquely identify it, and can automagically cache on disk the results using this identifier.

If a task has been cached, flonb will fetch the cached result instead of computing not only the task but also any of its dependencies.

Warning: If you invalidate your cache by changing what an option means (e.g. fixing a bug in your code), make sure to delete your cache!

flonb.set_cache_dir("/tmp")

@flonb.task_func(cache_disk=True)
def power(x, y):
    print(f"computing {x}**{y}")  # side effect - doesn't happen when cached result called
    return x**y


# only prints once
power.compute(x=3, y=3)
power.compute(x=3, y=3)
computing 3**3
# each unique set of options labels the cache
power.compute(x=3, y=2)
power.compute(x=2, y=3)
computing 3**2
computing 2**3

Dynamic dependencies

Sometimes you want to know the value of an option before you resolve the dependencies. flonb.DynamicDep has your back here.

@flonb.task_func
def add(x, y):
    return x + y

@flonb.task_func
def multiply(a, b):
    return a * b

@flonb.task_func
def multiply_or_add(
    result=flonb.DynamicDep(lambda mode: {"add": add, "multiply": multiply}[mode])
):
    return result

Here the option 'mode' determines what the task graph looks like, and even what options are available:

multiply_or_add.compute(mode="multiply", a=4, b=3)
12
multiply_or_add.compute(mode="add", x=4, y=3)
7

Multiproccesing

flonb implements the dask Task Graph specification.

  1. call .graph_and_key() on your task.
  2. feed into dask as required.
import dask.multiprocessing
import time

@flonb.task_func
def add_one_in_one_second(x):
    time.sleep(1)  # artifically slow down the task
    result = x + 1
    return result

@flonb.task_func
def do_sums(data=flonb.Dep([add_one_in_one_second.partial(x=x) for x in range(10)])):
    return data


graph, key = do_sums.graph_and_key()

# single process
tic = time.time()
dask.get(graph, key)
print(f"Single-process execution took {time.time() - tic:.2f} seconds.")

print("------------")

# multiple process
tic = time.time()
dask.multiprocessing.get(graph, key, num_workers=4)
print(f"Multiproccessing execution took {time.time() - tic:.2f} seconds.")
Single-process execution took 10.01 seconds.
------------
Multiproccessing execution took 3.30 seconds.

Alternatives

There are many high quality frameworks that let you build and run task graphs. flonb is lightweight and easy to experiment with, but make sure to check out others if you want to delve further into your options. Here are some suggestions:

  • dask
  • luigi
  • airflow

Acknowledgements

A big thanks to Solcast (https://solcast.com) for supporting the development of flonb.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published