Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More cleaning #528

Merged
merged 2 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 22 additions & 32 deletions superduperdb/datalayer/base/datalayer.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,35 @@
from collections import defaultdict
from dask.distributed import Future
import click
import dataclasses as dc
import math
import networkx
import superduperdb as s
import typing as t
import warnings
from collections import defaultdict

import click
import networkx

from superduperdb import CFG
from superduperdb.core.task_workflow import TaskWorkflow
from superduperdb.core.component import Component
from superduperdb.core.document import Document
from superduperdb.core.exceptions import ComponentInUseError, ComponentInUseWarning
from superduperdb.datalayer.base.artifacts import ArtifactStore
from superduperdb.datalayer.base.data_backend import BaseDataBackend
from superduperdb.datalayer.base.metadata import MetaDataStore
from superduperdb.datalayer.base.query import (
Insert,
Select,
Delete,
Update,
SelectOne,
Like,
)
from superduperdb.core.job import FunctionJob, ComponentJob, Job
from superduperdb.core.serializable import Serializable
from superduperdb.core.task_workflow import TaskWorkflow
from superduperdb.misc.downloads import Downloader, gather_uris
from superduperdb.misc.special_dicts import MongoStyleDict
from .download_content import download_content
from superduperdb.misc.downloads import Downloader
from superduperdb.misc.downloads import gather_uris
from superduperdb.misc.logger import logging
from superduperdb.vector_search.base import VectorDatabase

from .artifacts import ArtifactStore
from .data_backend import BaseDataBackend
from .download_content import download_content
from .exceptions import ComponentInUseError, ComponentInUseWarning
from .metadata import MetaDataStore
from .query import Delete, Insert, Like, Select, SelectOne, Update

from superduperdb.core.artifact_tree import (
get_artifacts,
infer_artifacts,
load_artifacts_from_store,
replace_artifacts,
)
from ...core.job import FunctionJob, ComponentJob, Job
from ...core.serializable import Serializable


DBResult = t.Any
TaskGraph = t.Any
Expand Down Expand Up @@ -92,7 +82,7 @@ def __init__(
self.encoders = LoadDict(self, 'encoder')
self.vector_indices = LoadDict(self, 'vector_index')

self.distributed = CFG.distributed
self.distributed = s.CFG.distributed
self.metadata = metadata
self.artifact_store = artifact_store
self.databackend = databackend
Expand Down Expand Up @@ -254,7 +244,7 @@ def run(
:param depends_on: t.Sequence of dependencies
"""
if distributed is None:
distributed = CFG.distributed
distributed = s.CFG.distributed
return job(db=self, dependencies=depends_on, distributed=distributed)

def select(self, select: Select) -> SelectResult:
Expand Down Expand Up @@ -518,13 +508,13 @@ def _compute_model_outputs(
model=None,
predict_kwargs=None,
):
logging.info('finding documents under filter')
s.log.info('finding documents under filter')
features = features or {}
model_identifier = model_info['identifier']
if features is None:
features = {} # pragma: no cover
documents = list(self.execute(select.select_using_ids(_ids)))
logging.info('done.')
s.log.info('done.')
documents = [x.unpack() for x in documents]
if key != '_base' or '_base' in features:
passed_docs = [r[key] for r in documents]
Expand All @@ -548,7 +538,7 @@ def _add(

existing_versions = self.show(object.variety, object.identifier)
if isinstance(object.version, int) and object.version in existing_versions:
logging.warn(f'{object.unique_id} already exists - doing nothing')
s.log.warn(f'{object.unique_id} already exists - doing nothing')
return

if existing_versions:
Expand Down Expand Up @@ -702,7 +692,7 @@ def _download_content(

documents = [x.content for x in documents]
uris, keys, place_ids = gather_uris(documents)
logging.info(f'found {len(uris)} uris')
s.log.info(f'found {len(uris)} uris')
if not uris:
return

Expand Down Expand Up @@ -805,7 +795,7 @@ def _apply_watcher( # noqa: F811

if max_chunk_size is not None:
for it, i in enumerate(range(0, len(ids), max_chunk_size)):
logging.info(
s.log.info(
'computing chunk '
f'({it + 1}/{math.ceil(len(ids) / max_chunk_size)})'
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
"""
Operations on dictionaries used to fill and combine config files
and environment variables
"""

from pathlib import Path
import typing as t
import fil
Expand Down
8 changes: 4 additions & 4 deletions superduperdb/misc/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dataclasses import dataclass
from functools import cached_property
from pathlib import Path
from superduperdb.misc import dicts
from superduperdb.misc import config_dicts
import typing as t
import os

Expand Down Expand Up @@ -46,10 +46,10 @@ def config(self) -> t.Any:
if isinstance(files, str):
files = files.split(FILE_SEP)

data = dicts.read_all(files)
data = config_dicts.read_all(files)
parent = self.cls().dict()
environ_dict = dicts.environ_to_config_dict(self.prefix, parent, environ)
return self.cls(**dicts.combine((*data, environ_dict)))
environ_dict = config_dicts.environ_to_config_dict(self.prefix, parent, environ)
return self.cls(**config_dicts.combine((*data, environ_dict)))


def build_config():
Expand Down
5 changes: 4 additions & 1 deletion tests/unittests/datalayer/mongodb/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
from superduperdb.core.dataset import Dataset
from superduperdb.core.document import Document
from superduperdb.core.encoder import Encoder
from superduperdb.core.exceptions import ComponentInUseError, ComponentInUseWarning
from superduperdb.datalayer.base.exceptions import (
ComponentInUseError,
ComponentInUseWarning,
)
from superduperdb.core.watcher import Watcher
from superduperdb.datalayer.mongodb.query import Collection
from superduperdb.encoders.torch.tensor import tensor
Expand Down
2 changes: 1 addition & 1 deletion tests/unittests/misc/test_config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .test_dicts import PARENT
from .test_config_dicts import PARENT
from collections import Counter
from pydantic import ValidationError
from superduperdb.misc.config import Config, Factory, JSONable, Notebook
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from superduperdb.misc import dicts
from superduperdb.misc import config_dicts
import io
import pytest


def test_combine_dicts():
actual = dicts.combine(
def test_combine_config_dicts():
actual = config_dicts.combine(
(
{'one': {'two': ['three', 'four', 'five']}},
{'one': {'three': 3}},
Expand All @@ -27,7 +27,7 @@ def test_combine_dicts():


def test_environ_dict_():
actual = dicts.environ_dict(
actual = config_dicts.environ_dict(
'TEST_',
{
'TOAST_ONE': 'one',
Expand Down Expand Up @@ -63,7 +63,7 @@ def test_environ_dict_():
),
)
def test_split_address(key, expected):
actual = [list(i) for i in dicts.split_address(key, PARENT)]
actual = [list(i) for i in config_dicts.split_address(key, PARENT)]
assert actual == expected


Expand All @@ -76,7 +76,7 @@ def test_environ_to_config_dict_many():
'TEST_PURPLE': 'purple',
}
err = io.StringIO()
actual = dicts.environ_to_config_dict('TEST_', PARENT, environ, err)
actual = config_dicts.environ_to_config_dict('TEST_', PARENT, environ, err)
expected = {'blue': {'green': {'orange': 'bge'}}, 'red': 'red'}

assert actual == expected
Expand All @@ -94,7 +94,7 @@ def test_environ_to_config_dict_single():
'TEST_BLUE_GREEN_ORANGE': 'bge',
}
err = io.StringIO()
actual = dicts.environ_to_config_dict('TEST_', PARENT, environ, err)
actual = config_dicts.environ_to_config_dict('TEST_', PARENT, environ, err)

expected = {'blue': {'green': {'orange': 'bge'}}}
assert actual == expected
Expand Down