Skip to content

Commit

Permalink
Merge pull request #1 from hazbottles/caching-and-logging
Browse files Browse the repository at this point in the history
Caching and logging
  • Loading branch information
hazbottles authored Feb 22, 2021
2 parents daf446d + d507439 commit 8cb1fd4
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 6 deletions.
2 changes: 1 addition & 1 deletion flonb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .task import task_func, set_cache_dir, Dep, DynamicDep # noqa: F401

__version__ = "0.1.1" # make sure to also update in ../setup.py
__version__ = "0.1.2" # make sure to also update in ../setup.py

__all__ = ["task_func", "set_cache_dir", "Dep", "DynamicDep", "__version__"]
24 changes: 20 additions & 4 deletions flonb/task.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import inspect
import functools
import hashlib
import logging
import os
import pickle
from typing import Callable, Dict, Tuple, Optional
Expand All @@ -8,6 +10,8 @@
import dask
import dask.optimization

_logger = logging.getLogger("flonb")


def set_cache_dir(dirpath: str):
Cache.set_dir(dirpath)
Expand All @@ -31,19 +35,23 @@ class Cache:
def __init__(self, category: str, key: str):
self.category = category
self.key = key
self.fpath = os.path.join(self._get_base_dir(), category, f"{key}.pickle")
fname = hashlib.md5(str(key).encode()).hexdigest()
self.fpath = os.path.join(self._get_base_dir(), category, f"{fname}.pickle")

def exists(self) -> bool:
return os.path.exists(self.fpath)

def read(self) -> object:
_logger.info(f"READING CACHE for {self.key} at {self.fpath}")
with open(self.fpath, "rb") as fd:
return pickle.load(fd)

def write(self, data: object):
_logger.info(f"WRITING CACHE for {self.key} to {self.fpath}")
os.makedirs(os.path.dirname(self.fpath), exist_ok=True)
with open(self.fpath, "wb") as fd:
pickle.dump(data, fd)
_logger.info(f"WROTE CACHE for {self.key} to {self.fpath}")

@classmethod
def set_dir(cls, dirpath: str):
Expand Down Expand Up @@ -124,13 +132,21 @@ def _get_cache_obj(self, key: str) -> Cache:
return Cache(self.__name__, key)

def _get_graph_func(self, key: str) -> Callable:
"""Returns a wrapper around self.func that handles"""
"""Returns a wrapper around self.func that handles writing to cache"""

@functools.wraps(self.func)
def _graph_func_wrapper(*args, **kwargs):
_logger.info(f"RUNNING {key}")
result = self.func(*args, **kwargs)
_logger.info(f"DONE {key}")
return result

if not self.cache_disk:
return self.func
return _graph_func_wrapper

@functools.wraps(self.func)
def write_cache_wrapper(*args, **kwargs):
result = self.func(*args, **kwargs)
result = _graph_func_wrapper(*args, **kwargs)
cache = self._get_cache_obj(key)
cache.write(result)
return cache.read()
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="flonb",
version="0.1.1", # make sure to also update in ./flonb/__init__.py
version="0.1.2", # make sure to also update in ./flonb/__init__.py
author="hazbottles",
description="Rapid-iteration data pipelines with automagic caching + parameter passing.",
long_description=long_description,
Expand Down

0 comments on commit 8cb1fd4

Please sign in to comment.