Skip to content

Commit

Permalink
Add query-template
Browse files Browse the repository at this point in the history
  • Loading branch information
blythed committed Jun 26, 2024
1 parent c69760b commit 36883ba
Show file tree
Hide file tree
Showing 13 changed files with 214 additions and 63 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

#### New Features & Functionality

-
- QueryTemplate component

#### Bug Fixes

Expand All @@ -42,7 +42,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Mask special character keys in mongodb queries
- Fix listener cleanup after removal
- Don't require `@dc.dataclass` or `@merge_docstrings` decorator
- Make output of `Document.encode()` a bit more minimalistic
- Make output of `Document.encode()` more minimalistic
- Increment minimum supported ibis version to 9.0.0
- Make database connections reconnection on token expiry
- Prototype the cron job services
Expand Down
27 changes: 8 additions & 19 deletions superduperdb/base/build.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import re
import sys
import typing as t

from prettytable import PrettyTable
Expand Down Expand Up @@ -44,29 +43,19 @@ def _build_metadata(cfg, databackend: t.Optional['BaseDataBackend'] = None):
metadata = None

if metadata is None:
try:
# try to connect to the data backend uri.
logging.info("Connecting to Metadata Client with URI: ", cfg.data_backend)
return _build_databackend_impl(
cfg.data_backend, metadata_stores, type='metadata'
)
except Exception as e:
# Exit quickly if a connection fails.
logging.error("Error initializing to Metadata Client:", str(e))
sys.exit(1)
# try to connect to the data backend uri.
logging.info("Connecting to Metadata Client with URI: ", cfg.data_backend)
return _build_databackend_impl(
cfg.data_backend, metadata_stores, type='metadata'
)


def _build_databackend(cfg, databackend=None):
# Connect to data backend.
# ------------------------------
try:
if not databackend:
databackend = _build_databackend_impl(cfg.data_backend, data_backends)
logging.info("Data Client is ready.", databackend.conn)
except Exception as e:
# Exit quickly if a connection fails.
logging.error("Error initializing to DataBackend Client:", str(e))
sys.exit(1)
if not databackend:
databackend = _build_databackend_impl(cfg.data_backend, data_backends)
logging.info("Data Client is ready.", databackend.conn)
return databackend


Expand Down
2 changes: 1 addition & 1 deletion superduperdb/base/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __next__(self):
try:
r['score'] = self.scores[str(r[self.id_field])]
except KeyError:
logging.warn(f"No document id found for {r}")
logging.debug(f"No document id found for {r}")

return Document.decode(r, db=self.db, schema=self.schema)

Expand Down
2 changes: 1 addition & 1 deletion superduperdb/base/datalayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,10 @@ def drop(self, force: bool = False, data: bool = False):

if data:
self.databackend.drop(force=True)
self.artifact_store.drop(force=True)
else:
self.databackend.drop_outputs()
self.metadata.drop(force=True)
self.artifact_store.drop(force=True)

def show(
self,
Expand Down
30 changes: 28 additions & 2 deletions superduperdb/base/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from superduperdb.base.code import Code
from superduperdb.base.constant import KEY_BLOBS, KEY_BUILDS, KEY_FILES
from superduperdb.base.leaf import Leaf, _import_item
from superduperdb.base.variables import _replace_variables
from superduperdb.components.component import Component
from superduperdb.components.datatype import (
_ENCODABLES,
Expand Down Expand Up @@ -118,11 +119,11 @@ def encode(
schema = self.schema or schema
schema = get_schema(self.db, schema) if schema else None
out = dict(self)

if schema is not None:
out = schema.encode_data(
out, builds, blobs, files, leaves_to_keep=leaves_to_keep
)

out = _deep_flat_encode(
out,
builds=builds,
Expand Down Expand Up @@ -152,6 +153,10 @@ def decode(
:param schema: The schema to use.
:param db: The datalayer to use.
"""
if '_variables' in r:
r = _replace_variables(
{k: v for k, v in r.items() if k != '_variables'}, **r['_variables']
)
schema = schema or r.get(SCHEMA_KEY)
schema = get_schema(db, schema)

Expand Down Expand Up @@ -267,6 +272,27 @@ def _create_metadata_update(update, original=None):
update = {'$set': update}
return update

def to_template(self, **substitutions):
"""
Convert the document to a template with variables.
:param substitutions: The substitutions to make.
`str-to-replace -> variable-name`
"""

def substitute(x):
if isinstance(x, str):
for k, v in substitutions:
x = x.replace(k, f'<var:{v}>')
return x
if isinstance(x, dict):
return {substitute(k): substitute(v) for k, v in x.items()}
if isinstance(x, (list, tuple)):
return [substitute(v) for v in x]
return x

return SuperDuperFlatEncode(substitute(dict(self)))

def encode(
self,
original: t.Any = None,
Expand Down Expand Up @@ -363,7 +389,7 @@ def _deep_flat_encode(
if r.identifier in builds:
logging.warn(f'Leaf {r.identifier} already exists')

logging.info(f'Building leaf {type(r)} with identifier: {r.identifier}')
logging.debug(f'Decoding leaf {type(r)} with identifier: {r.identifier}')

if isinstance(r, leaves_to_keep):
builds[r.identifier] = r
Expand Down
10 changes: 10 additions & 0 deletions superduperdb/cli/serve.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
import typing as t

from superduperdb import superduper

from . import command


Expand Down Expand Up @@ -95,3 +97,11 @@ def rest():
from superduperdb.rest.app import app

app.start()


@command(help='Start a stream')
def subscribe(identifier: str, version: t.Optional[int] = None):
"""Run a stream."""
db = superduper()
subscriber = db.load('subscriber', identifier, version=version)
subscriber.run()
2 changes: 1 addition & 1 deletion superduperdb/components/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def _find_blobs(r, out):
template = Template(
'_tmp',
template=template_body,
variables=list(kwargs.keys()),
template_variables=list(kwargs.keys()),
blobs=list(set(blobs)),
)

Expand Down
32 changes: 32 additions & 0 deletions superduperdb/components/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import datetime
import typing as t
from abc import abstractmethod

from superduperdb import Component, logging


class Subscriber(Component):
"""A stream of of data which can interact with the `Datalayer`.
***Note that this feature deploys on SuperDuperDB Enterprise.***
:param table: Table to stream into
:param key: Key to write data into
"""

type_id: t.ClassVar[str] = "subscriber"
table: str
key: str

@abstractmethod
def next(self) -> t.Generator:
"""Next event."""
pass

def run(self):
"""Stream data to the database."""
while True:
logging.info(f'Got next item at {datetime.now().isoformat()}')
next_items = self.next()
next_items = [{self.key: item} for item in next_items]
self.db[self.table].insert(next_items).execute()
101 changes: 78 additions & 23 deletions superduperdb/components/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,77 @@
from superduperdb.base.constant import KEY_BLOBS
from superduperdb.base.datalayer import Datalayer
from superduperdb.base.document import Document
from superduperdb.base.leaf import Leaf
from superduperdb.base.variables import _replace_variables
from superduperdb.components.component import Component
from superduperdb.misc.special_dicts import SuperDuperFlatEncode

from .component import Component, ensure_initialized
from .component import ensure_initialized


class Template(Component):
class _BaseTemplate(Component):
"""
Application template component.
Base template component.
:param component: Template component with variables.
:param variables: Variables to be set.
:param blobs: Blob identifiers in `Template.component`
:param template: Template component with variables.
:param template_variables: Variables to be set.
:param info: Additional information.
:param blobs: Blob identifiers in `Template.component`.
:param substitutions: Substitutions to be made to create variables.
"""

literals: t.ClassVar[t.Tuple[str]] = ('template',)
type_id: t.ClassVar[str] = "template"
template: t.Dict
variables: t.Optional[t.List[str]] = None
blobs: t.Optional[t.List[str]] = None

template: t.Union[t.Dict, Component]
template_variables: t.Optional[t.List[str]] = None
info: t.Optional[t.Dict] = dc.field(default_factory=dict)
blobs: t.Optional[t.List[str]] = None
substitutions: dc.InitVar[t.Optional[t.Dict]] = None

def __post_init__(self, db, artifacts, substitutions):
if isinstance(self.template, Leaf):
self.template = self.template.encode(defaults=False, metadata=False)
self.template = SuperDuperFlatEncode(self.template)
if substitutions is not None:
self.template = self.template.to_template(**substitutions)
if self.template_variables is None:
self.template_variables = self.template.variables
super().__post_init__(db, artifacts)

@ensure_initialized
def __call__(self, **kwargs):
"""Method to create component from the given template and `kwargs`."""
assert set(kwargs.keys()) == set(self.template_variables)
component = _replace_variables(self.template, **kwargs)
return Document.decode(component, db=self.db).unpack()

@property
def form_template(self):
"""Form to be diplayed to user."""
return {
'identifier': '<enter-a-unique-identifier>',
'_variables': {
k: f'<value-{i}>' for i, k in enumerate(self.template_variables)
},
**{k: v for k, v in self.template.items() if k != 'identifier'},
}


class Template(_BaseTemplate):
"""Application template component."""

type_id: t.ClassVar[str] = "template"

def pre_create(self, db: Datalayer) -> None:
"""Run before the object is created."""
super().pre_create(db)
assert isinstance(self.template, dict)
if KEY_BLOBS in self.template:
for identifier, blob in self.template[KEY_BLOBS].items():
db.artifact_store.put_bytes(blob, identifier)
self.blobs = list(self.template[KEY_BLOBS].keys())
self.template.pop(KEY_BLOBS)

def __post_init__(self, db, artifacts):
self.template = SuperDuperFlatEncode(self.template)
if self.variables is None:
self.variables = self.template.variables
return super().__post_init__(db, artifacts)

@ensure_initialized
def __call__(self, **kwargs):
"""Method to create component from the given template and `kwargs`."""
assert set(kwargs.keys()) == set(self.variables)
component = _replace_variables(self.template, **kwargs)
return Document.decode(component, db=self.db).unpack()

def export(
self,
path: t.Optional[str] = None,
Expand Down Expand Up @@ -88,3 +114,32 @@ def export(
f.write(blob)
if zip:
self._zip_export(path)


class QueryTemplate(_BaseTemplate):
"""
Query template component.
Example:
-------
>>> q = db['docs'].select().limit('<var:limit>')
>>> t = QueryTemplate('select_lim', template=q)
>>> t.variables
['limit']
"""

type_id: t.ClassVar[str] = 'query_template'

def __post_init__(self, db, artifacts, substitutions):
if isinstance(self.template, Leaf):
self.template = self.template.dict(metadata=False, defaults=False).encode()
return super().__post_init__(db, artifacts, substitutions)

def execute(self, **kwargs):
"""Execute the query with the given variables.
:param kwargs: Variables to be set in the query.
"""
query = self.query.set_variables(**kwargs)
return self.db.execute(query)
29 changes: 29 additions & 0 deletions superduperdb/misc/special_dicts.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
from superduperdb.base.constant import KEY_BLOBS, KEY_BUILDS, KEY_FILES
from superduperdb.base.variables import _find_variables

if t.TYPE_CHECKING:
pass


class IndexableDict(OrderedDict):
"""IndexableDict.
Expand Down Expand Up @@ -146,11 +149,37 @@ class SuperDuperFlatEncode(t.Dict[str, t.Any]):
:param kwargs: **kwargs for `dict`
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

@property
def builds(self):
"""Return the builds of the dictionary."""
return IndexableDict(self.get(KEY_BUILDS, {}))

@staticmethod
def _str2var(x, item, variable):
if isinstance(x, str):
return x.replace(item, f'<var:{variable}>')
if isinstance(x, dict):
return {
SuperDuperFlatEncode._str2var(
k, item, variable
): SuperDuperFlatEncode._str2var(v, item, variable)
for k, v in x.items()
}
if isinstance(x, list):
return [SuperDuperFlatEncode._str2var(v, item, variable) for v in x]
return x

def create_template(self, **kwargs):
"""Convert all instances of string to variable."""
r = self
for k, v in kwargs.items():
r = SuperDuperFlatEncode._str2var(r, v, k)
r['_variables'] = {v: f'<value-{i}>' for i, v in enumerate(kwargs.values())}
return SuperDuperFlatEncode(r)

@property
def files(self):
"""Return the files of the dictionary."""
Expand Down
Loading

0 comments on commit 36883ba

Please sign in to comment.