Skip to content

Commit

Permalink
Merge pull request #202 from brightway-lca/snowflake-ids
Browse files Browse the repository at this point in the history
Use Snowflake IDs for nodes and edges
  • Loading branch information
jsvgoncalves authored Nov 8, 2024
2 parents 610f8af + 209475a commit 9ddc291
Show file tree
Hide file tree
Showing 17 changed files with 205 additions and 129 deletions.
9 changes: 7 additions & 2 deletions bw2data/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ def _efficient_write_dataset(
check_activity_type(ds.get("type"))
check_activity_keys(ds)

activities.append(dict_as_activitydataset(ds))
activities.append(dict_as_activitydataset(ds, add_snowflake_id=True))

if len(activities) > 125:
ActivityDataset.insert_many(activities).execute()
Expand Down Expand Up @@ -687,6 +687,9 @@ def new_node(self, code: str = None, **kwargs):
kwargs.pop("database")
obj["database"] = self.name

if "id" in kwargs:
raise ValueError(f"`id` must be created automatically, but `id={kwargs['id']}` given.")

if code is None:
obj["code"] = uuid.uuid4().hex
else:
Expand Down Expand Up @@ -853,7 +856,9 @@ def _add_inventory_geomapping_to_datapackage(self, dp: Datapackage) -> None:
dict_iterator=(
{
"row": row[0],
"col": geomapping[location_mapper(retupleize_geo_strings(row[1]) or config.global_location)],
"col": geomapping[
location_mapper(retupleize_geo_strings(row[1]) or config.global_location)
],
"amount": 1,
}
for row in inv_mapping_qs.tuples()
Expand Down
18 changes: 14 additions & 4 deletions bw2data/backends/proxies.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ def save(self, signal: bool = True, data_already_set: bool = False, force_insert
check_activity_keys(self)

for key, value in dict_as_activitydataset(self._data).items():
# ID value is either already in `._document` (update) or will be created by
# `SnowflakeIDBaseClass.save()`.
if key != "id":
setattr(self._document, key, value)

Expand Down Expand Up @@ -495,8 +497,10 @@ def new_edge(self, **kwargs):
"""Create a new exchange linked to this activity"""
exc = Exchange()
exc.output = self.key
for key in kwargs:
exc[key] = kwargs[key]
for key, value in kwargs.items():
if key == "id":
raise ValueError(f"`id` must be created automatically, but `id={value}` given.")
exc[key] = value
return exc

def copy(self, code: Optional[str] = None, signal: bool = True, **kwargs):
Expand All @@ -511,13 +515,19 @@ def copy(self, code: Optional[str] = None, signal: bool = True, **kwargs):
for key, value in self.items():
if key != "id":
activity[key] = value
for k, v in kwargs.items():
activity._data[k] = v
for key, value in kwargs.items():
if key == "id":
raise ValueError(f"`id` must be created automatically, but `id={value}` given.")
activity._data[key] = value
activity._data["code"] = str(code or uuid.uuid4().hex)
activity.save(signal=signal)

for exc in self.exchanges():
data = copy.deepcopy(exc._data)
if "id" in data:
# New snowflake ID will be inserted by `.save()`; shouldn't be copied over
# or specified manually
del data["id"]
data["output"] = activity.key
# Change `input` for production exchanges
if exc["input"] == exc["output"]:
Expand Down
6 changes: 3 additions & 3 deletions bw2data/backends/schema.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from peewee import DoesNotExist, TextField

from bw2data.errors import UnknownObject
from bw2data.signals import SignaledDataset
from bw2data.snowflake_ids import SnowflakeIDBaseClass
from bw2data.sqlite import PickleField


class ActivityDataset(SignaledDataset):
class ActivityDataset(SnowflakeIDBaseClass):
data = PickleField() # Canonical, except for other C fields
code = TextField() # Canonical
database = TextField() # Canonical
Expand All @@ -19,7 +19,7 @@ def key(self):
return (self.database, self.code)


class ExchangeDataset(SignaledDataset):
class ExchangeDataset(SnowflakeIDBaseClass):
data = PickleField() # Canonical, except for other C fields
input_code = TextField() # Canonical
input_database = TextField() # Canonical
Expand Down
17 changes: 12 additions & 5 deletions bw2data/backends/utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import copy
import warnings
from typing import Optional
from typing import Any, Optional

import numpy as np

from bw2data import config
from bw2data.backends.schema import SignaledDataset, get_id
from bw2data.backends.schema import get_id
from bw2data.configuration import labels
from bw2data.errors import InvalidExchange, UntypedExchange
from bw2data.meta import databases, methods
from bw2data.signals import SignaledDataset
from bw2data.snowflake_ids import snowflake_id_generator


def get_csv_data_dict(ds):
Expand Down Expand Up @@ -66,8 +68,8 @@ def check_exchange(exc):
raise ValueError("Invalid amount in exchange {}".format(exc))


def dict_as_activitydataset(ds):
return {
def dict_as_activitydataset(ds: Any, add_snowflake_id: bool = False) -> dict:
val = {
"data": ds,
"database": ds["database"],
"code": ds["code"],
Expand All @@ -76,9 +78,14 @@ def dict_as_activitydataset(ds):
"product": ds.get("reference product"),
"type": ds.get("type", labels.process_node_default),
}
# Use during `insert_many` calls as these skip auto id generation because they don't call
# `.save()`
if add_snowflake_id:
val["id"] = next(snowflake_id_generator)
return val


def dict_as_exchangedataset(ds):
def dict_as_exchangedataset(ds: Any) -> dict:
return {
"data": ds,
"input_database": ds["input"][0],
Expand Down
4 changes: 1 addition & 3 deletions bw2data/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from copy import copy
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional, Sequence, TypeVar
from typing import TYPE_CHECKING, Any, Optional, Sequence

import wrapt
from bw_processing import safe_filename
Expand All @@ -25,8 +25,6 @@

if TYPE_CHECKING:
from bw2data import revisions
from bw2data.backends import schema
SD = TypeVar("SD", bound=schema.SignaledDataset)


READ_ONLY_PROJECT = """
Expand Down
10 changes: 4 additions & 6 deletions bw2data/revisions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
from typing import Any, Optional, Sequence, TypeVar

import deepdiff
from snowflake import SnowflakeGenerator as sfg

from bw2data.backends.proxies import Activity, Exchange
from bw2data.backends.schema import ActivityDataset, ExchangeDataset, SignaledDataset
from bw2data.backends.schema import ActivityDataset, ExchangeDataset
from bw2data.backends.utils import dict_as_activitydataset, dict_as_exchangedataset
from bw2data.errors import DifferentObjects, IncompatibleClasses, InconsistentData
from bw2data.signals import SignaledDataset
from bw2data.snowflake_ids import snowflake_id_generator
from bw2data.utils import get_node

try:
Expand All @@ -16,9 +17,6 @@
from typing_extensions import Self


SD = TypeVar("SD", bound=SignaledDataset)


class RevisionGraph:
"""Graph of revisions, edges are based on `metadata.parent_revision`."""

Expand Down Expand Up @@ -186,7 +184,7 @@ def generate_metadata(
) -> dict[str, Any]:
metadata = metadata or {}
metadata["parent_revision"] = parent_revision
metadata["revision"] = revision or next(sfg(0))
metadata["revision"] = revision or next(snowflake_id_generator)
metadata["authors"] = metadata.get("authors", "Anonymous")
metadata["title"] = metadata.get("title", "Untitled revision")
metadata["description"] = metadata.get("description", "No description")
Expand Down
39 changes: 39 additions & 0 deletions bw2data/snowflake_ids.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import uuid

from peewee import IntegerField
from snowflake import SnowflakeGenerator

from bw2data.signals import SignaledDataset

# Jan 1, 2024
# from datetime import datetime
# (datetime(2024, 1, 1) - datetime.utcfromtimestamp(0)).total_seconds() * 1000.0
EPOCH_START_MS = 1704067200000

# From https://softwaremind.com/blog/the-unique-features-of-snowflake-id-and-its-comparison-to-uuid/
# Snowflake bits:
# Sign bit: 1 bit. It will always be 0. This is reserved for future uses. It can potentially be used
# to distinguish between signed and unsigned numbers.
# Timestamp: 41 bits. Milliseconds since the epoch or custom epoch.
# Datacenter ID: 5 bits, which gives us 2 ^ 5 = 32 datacenters.
# Machine ID: 5 bits, which gives us 2 ^ 5 = 32 machines per datacenter.
# However, `snowflake-id` lumps the two datacenter and machine id values together into an
# `instance` parameter with 2 ^ 10 = 1024 possible values.
# Sequence number: 12 bits. For every ID generated on that machine/process, the sequence number is
# incremented by 1. The number is reset to 0 every millisecond.
snowflake_id_generator = SnowflakeGenerator(instance=uuid.getnode() % 1024, epoch=EPOCH_START_MS)


class SnowflakeIDBaseClass(SignaledDataset):
id = IntegerField(primary_key=True)

def save(self, **kwargs):
if self.id is None:
# If the primary key column data is already present (even if the object doesn't exist in
# the database), peewee will make an `UPDATE` query. This will have no effect if there
# isn't a matching row. Need for force an `INSERT` query instead as we generate the ids
# ourselves.
# https://docs.peewee-orm.com/en/latest/peewee/models.html#id4
self.id = next(snowflake_id_generator)
kwargs["force_insert"] = True
super().save(**kwargs)
7 changes: 7 additions & 0 deletions tests/activity_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ def test_copy(activity):
assert cp["name"] == "baz"
assert cp["location"] == "bar"
assert ExchangeDataset.select().count() == 2

cp.save()

assert ActivityDataset.select().count() == 2
assert (
ActivityDataset.select()
Expand Down Expand Up @@ -241,6 +244,8 @@ def test_delete_activity_parameters():
b.save()
a.new_exchange(amount=0, input=b, type="technosphere", formula="foo * bar + 4").save()

assert ExchangeDataset.select().count() == 1

activity_data = [
{
"name": "reference_me",
Expand All @@ -258,6 +263,8 @@ def test_delete_activity_parameters():
parameters.new_activity_parameters(activity_data, "my group")
parameters.add_exchanges_to_group("my group", a)

assert ExchangeDataset.select().count() == 1

assert ActivityParameter.select().count() == 2
assert ParameterizedExchange.select().count() == 1

Expand Down
38 changes: 22 additions & 16 deletions tests/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,27 +65,32 @@ def setup():
def test_prepare_lca_inputs_basic(setup):
d, objs, r = prepare_lca_inputs(demand={("food", "1"): 1}, method=("foo",))
# ID is 3; two biosphere flows, then '1' is next written
assert d == {3: 1}
assert list(d.values()) == [1]
assert {o.metadata["id"] for o in objs} == {o.datapackage().metadata["id"] for o in setup}

b1 = get_node(database="biosphere", code="1").id
b2 = get_node(database="biosphere", code="2").id
f1 = get_node(database="food", code="1").id
f2 = get_node(database="food", code="2").id

remapping_expected = {
"activity": {
1: ("biosphere", "1"),
2: ("biosphere", "2"),
3: ("food", "1"),
4: ("food", "2"),
b1: ("biosphere", "1"),
b2: ("biosphere", "2"),
f1: ("food", "1"),
f2: ("food", "2"),
},
"product": {
1: ("biosphere", "1"),
2: ("biosphere", "2"),
3: ("food", "1"),
4: ("food", "2"),
b1: ("biosphere", "1"),
b2: ("biosphere", "2"),
f1: ("food", "1"),
f2: ("food", "2"),
},
"biosphere": {
1: ("biosphere", "1"),
2: ("biosphere", "2"),
3: ("food", "1"),
4: ("food", "2"),
b1: ("biosphere", "1"),
b2: ("biosphere", "2"),
f1: ("food", "1"),
f2: ("food", "2"),
},
}
assert r == remapping_expected
Expand All @@ -104,16 +109,17 @@ def test_prepare_lca_inputs_multiple_demands_data_types(setup):
first = get_node(database="food", code="1")
second = get_node(database="food", code="2")
d, objs, r = prepare_lca_inputs(demands=[{first: 1}, {second.id: 10}], method=("foo",))
assert d == [{3: 1}, {4: 10}]
assert d == [{first.id: 1}, {second.id: 10}]
assert {o.metadata["id"] for o in objs} == {o.datapackage().metadata["id"] for o in setup}


def test_prepare_lca_inputs_multiple_demands(setup):
d, objs, r = prepare_lca_inputs(
demands=[{("food", "1"): 1}, {("food", "2"): 10}], method=("foo",)
)
# ID is 3; two biosphere flows, then '1' is next written
assert d == [{3: 1}, {4: 10}]
f1 = get_node(database="food", code="1").id
f2 = get_node(database="food", code="2").id
assert d == [{f1: 1}, {f2: 10}]
assert {o.metadata["id"] for o in objs} == {o.datapackage().metadata["id"] for o in setup}


Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Fixtures for bw2data"""

import sqlite3

# import pytest
Expand Down
Loading

0 comments on commit 9ddc291

Please sign in to comment.