Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non redundant provenance #101

Merged
merged 19 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
0f7afac
rename default log filepath
alexdunnjpl Jan 19, 2024
3544ee6
add detailed runtime duration breakdown to logs
alexdunnjpl Jan 19, 2024
aa85bfb
add value-safety to ancestry non-aggregate record generation
alexdunnjpl Jan 19, 2024
729db22
enforce safe order of updates, descendants-first
alexdunnjpl Jan 22, 2024
eaac389
update repairkit versioning key to use constant, standardized value
alexdunnjpl Jan 22, 2024
3c910aa
implement skipping bundle/collection updates based on software version
alexdunnjpl Jan 22, 2024
07380b3
implement ancestry software version metadata in updates
alexdunnjpl Jan 22, 2024
bbf937e
add primary_term and seq_no to Update
alexdunnjpl Jan 25, 2024
cb5e1c6
implement ancestry version metadata writes to registry-refs collectio…
alexdunnjpl Jan 25, 2024
9c1fb33
implement non-redundant non-aggregate collection-page query
alexdunnjpl Jan 25, 2024
8959907
add SWEEPERS_ANCESTRY_VERSION_METADATA_KEY to index mappings
alexdunnjpl Jan 25, 2024
2cef0d8
implement orphaned docs checking
alexdunnjpl Jan 25, 2024
00b749b
remove default index value for db.write_updated_docs()
alexdunnjpl Jan 26, 2024
60d3819
remove default index value for db.query_registry_db() for consistency
alexdunnjpl Jan 26, 2024
82962a6
improve log message
alexdunnjpl Jan 26, 2024
3f94d33
reimplement provenance sweeper with non-redundant processing
alexdunnjpl Jan 26, 2024
5b35354
fix handling of None/null successor values
alexdunnjpl Jan 26, 2024
96126a1
add comment
alexdunnjpl Jan 26, 2024
71abc7b
bugfix - add version metadata attribute to query
alexdunnjpl Jan 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions docker/sweepers_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def run_factory(sweeper_f: Callable) -> Callable:
return functools.partial(
sweeper_f,
client=get_opensearch_client_from_environment(verify_certs=True if not dev_mode else False),
log_filepath='provenance.log',
log_filepath='registry-sweepers.log',
log_level=log_level
)

Expand All @@ -121,7 +121,6 @@ def run_factory(sweeper_f: Callable) -> Callable:

args = parser.parse_args()


# Define default sweepers to be run here, in order of execution
sweepers = [
repairkit.run,
Expand All @@ -136,10 +135,18 @@ def run_factory(sweeper_f: Callable) -> Callable:
sweeper_descriptions = [inspect.getmodule(f).__name__ for f in sweepers]
log.info(f'Running sweepers: {sweeper_descriptions}')

execution_begin = datetime.now()
total_execution_begin = datetime.now()

sweeper_execution_duration_strs = []

for sweeper in sweepers:
sweeper_execution_begin = datetime.now()
run_sweeper_f = run_factory(sweeper)

run_sweeper_f()

log.info(f'Sweepers successfully executed in {get_human_readable_elapsed_since(execution_begin)}')
sweeper_name = inspect.getmodule(sweeper).__name__
sweeper_execution_duration_strs.append(f'{sweeper_name}: {get_human_readable_elapsed_since(sweeper_execution_begin)}')

log.info(f'Sweepers successfully executed in {get_human_readable_elapsed_since(total_execution_begin)}\n '
+ '\n '.join(sweeper_execution_duration_strs))
40 changes: 36 additions & 4 deletions src/pds/registrysweepers/ancestry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
from pds.registrysweepers.ancestry.generation import get_bundle_ancestry_records
from pds.registrysweepers.ancestry.generation import get_collection_ancestry_records
from pds.registrysweepers.ancestry.generation import get_nonaggregate_ancestry_records
from pds.registrysweepers.ancestry.queries import get_orphaned_documents
from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION
from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY
from pds.registrysweepers.utils import configure_logging
from pds.registrysweepers.utils import parse_args
from pds.registrysweepers.utils.db import write_updated_docs
Expand Down Expand Up @@ -43,21 +46,49 @@ def run(
collection_records = list(get_collection_ancestry_records(client, registry_mock_query_f))
nonaggregate_records = get_nonaggregate_ancestry_records(client, collection_records, registry_mock_query_f)

ancestry_records = chain(bundle_records, collection_records, nonaggregate_records)
updates = generate_updates(ancestry_records, ancestry_records_accumulator, bulk_updates_sink)
# the order of this chain is now important - writing descendants first ensures that if an ancestor is given a
# "processed by sweeper version" flag, it may be assumed that all its descendants have also been processed
# this avoids the potential for a bundle/collection to be metadata-marked as up-to-date when execution failed before
# its descendants were updated (due to execution interruption, e.g. database overload)
ancestry_records = chain(nonaggregate_records, collection_records, bundle_records)
ancestry_records_to_write = filter(lambda r: not r.skip_write, ancestry_records)
updates = generate_updates(ancestry_records_to_write, ancestry_records_accumulator, bulk_updates_sink)

if bulk_updates_sink is None:
log.info("Ensuring metadata keys are present in database index...")
for metadata_key in [METADATA_PARENT_BUNDLE_KEY, METADATA_PARENT_COLLECTION_KEY]:
for metadata_key in [
METADATA_PARENT_BUNDLE_KEY,
METADATA_PARENT_COLLECTION_KEY,
SWEEPERS_ANCESTRY_VERSION_METADATA_KEY,
]:
ensure_index_mapping(client, "registry", metadata_key, "keyword")

for metadata_key in [
SWEEPERS_ANCESTRY_VERSION_METADATA_KEY,
]:
ensure_index_mapping(client, "registry-refs", metadata_key, "keyword")

log.info("Writing bulk updates to database...")
write_updated_docs(client, updates)
write_updated_docs(
client,
updates,
index_name="registry",
)
else:
# consume generator to dump bulk updates to sink
for _ in updates:
pass

log.info("Checking indexes for orphaned documents")
for index_name in ["registry", "registry-refs"]:
orphaned_docs = get_orphaned_documents(client, registry_mock_query_f, index_name)
orphaned_doc_ids = [doc.get("_id") for doc in orphaned_docs]
orphaned_doc_count = len(orphaned_doc_ids)
if orphaned_doc_count > 0:
log.error(
f'Detected {orphaned_doc_count} orphaned documents in index "{index_name} - please inform developers": {orphaned_doc_ids}'
)

log.info("Ancestry sweeper processing complete!")


Expand All @@ -80,6 +111,7 @@ def generate_updates(
update_content = {
METADATA_PARENT_BUNDLE_KEY: [str(id) for id in record.parent_bundle_lidvids],
METADATA_PARENT_COLLECTION_KEY: [str(id) for id in record.parent_collection_lidvids],
SWEEPERS_ANCESTRY_VERSION_METADATA_KEY: int(SWEEPERS_ANCESTRY_VERSION),
}

# Tee the stream of bulk update KVs into the accumulator, if one was provided (functional testing).
Expand Down
7 changes: 6 additions & 1 deletion src/pds/registrysweepers/ancestry/ancestryrecord.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ class AncestryRecord:
parent_collection_lidvids: Set[PdsLidVid] = field(default_factory=set)
parent_bundle_lidvids: Set[PdsLidVid] = field(default_factory=set)

# flag to track records which are used during processing, but should not be written to db, for example if an
# equivalent record is known to already exist due to up-to-date ancestry version flag in the source document
skip_write: bool = False

def __repr__(self):
return f"AncestryRecord(lidvid={self.lidvid}, parent_collection_lidvids={sorted([str(x) for x in self.parent_collection_lidvids])}, parent_bundle_lidvids={sorted([str(x) for x in self.parent_bundle_lidvids])})"

Expand All @@ -32,14 +36,15 @@ def to_dict(self, sort_lists: bool = True) -> SerializableAncestryRecordTypeDef:
}

@staticmethod
def from_dict(d: SerializableAncestryRecordTypeDef) -> AncestryRecord:
def from_dict(d: SerializableAncestryRecordTypeDef, skip_write: bool = False) -> AncestryRecord:
try:
return AncestryRecord(
lidvid=PdsLidVid.from_string(d["lidvid"]), # type: ignore
parent_collection_lidvids=set(
PdsLidVid.from_string(lidvid) for lidvid in d["parent_collection_lidvids"]
),
parent_bundle_lidvids=set(PdsLidVid.from_string(lidvid) for lidvid in d["parent_bundle_lidvids"]),
skip_write=skip_write,
)
except (KeyError, ValueError) as err:
raise ValueError(
Expand Down
87 changes: 75 additions & 12 deletions src/pds/registrysweepers/ancestry/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import shutil
import sys
import tempfile
from collections import namedtuple
from typing import Dict
from typing import Iterable
from typing import List
from typing import Mapping
from typing import Set
from typing import Union

import psutil # type: ignore
from opensearchpy import OpenSearch
Expand All @@ -23,20 +25,32 @@
from pds.registrysweepers.ancestry.utils import load_partial_history_to_records
from pds.registrysweepers.ancestry.utils import make_history_serializable
from pds.registrysweepers.ancestry.utils import merge_matching_history_chunks
from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION
from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY
from pds.registrysweepers.utils.db import Update
from pds.registrysweepers.utils.db import write_updated_docs
from pds.registrysweepers.utils.misc import coerce_list_type
from pds.registrysweepers.utils.productidentifiers.factory import PdsProductIdentifierFactory
from pds.registrysweepers.utils.productidentifiers.pdslid import PdsLid
from pds.registrysweepers.utils.productidentifiers.pdslidvid import PdsLidVid

log = logging.getLogger(__name__)

# It's necessary to track which registry-refs documents have been processed during this run. This cannot be derived
# by repeating the query, as the sweeper may be running concurrently with harvest, and document content may change.
# RefDocBookkeepingEntry is used to ensure that only those documents which have been processed and have not been
# externally modified during sweeper execution will be marked as processed with the current sweeper version.
RefDocBookkeepingEntry = namedtuple("RefDocBookkeepingEntry", ["id", "primary_term", "seq_no"])


def get_bundle_ancestry_records(client: OpenSearch, db_mock: DbMockTypeDef = None) -> Iterable[AncestryRecord]:
log.info("Generating AncestryRecords for bundles...")
docs = get_bundle_ancestry_records_query(client, db_mock)
for doc in docs:
try:
yield AncestryRecord(lidvid=PdsLidVid.from_string(doc["_source"]["lidvid"]))
sweeper_version_in_doc = doc["_source"].get(SWEEPERS_ANCESTRY_VERSION_METADATA_KEY, 0)
skip_write = sweeper_version_in_doc >= SWEEPERS_ANCESTRY_VERSION
yield AncestryRecord(lidvid=PdsLidVid.from_string(doc["_source"]["lidvid"]), skip_write=skip_write)
except (ValueError, KeyError) as err:
log.warning(
'Failed to instantiate AncestryRecord from document in index "%s" with id "%s" due to %s: %s',
Expand All @@ -54,8 +68,10 @@ def get_ancestry_by_collection_lidvid(collections_docs: Iterable[Dict]) -> Mappi
ancestry_by_collection_lidvid = {}
for doc in collections_docs:
try:
sweeper_version_in_doc = doc["_source"].get(SWEEPERS_ANCESTRY_VERSION_METADATA_KEY, 0)
skip_write = sweeper_version_in_doc >= SWEEPERS_ANCESTRY_VERSION
lidvid = PdsLidVid.from_string(doc["_source"]["lidvid"])
ancestry_by_collection_lidvid[lidvid] = AncestryRecord(lidvid=lidvid)
ancestry_by_collection_lidvid[lidvid] = AncestryRecord(lidvid=lidvid, skip_write=skip_write)
except (ValueError, KeyError) as err:
log.warning(
'Failed to instantiate AncestryRecord from document in index "%s" with id "%s" due to %s: %s',
Expand Down Expand Up @@ -107,8 +123,12 @@ def get_collection_ancestry_records(
collection_aliases_by_lid: Dict[PdsLid, Set[PdsLid]] = get_collection_aliases_by_lid(collections_docs)

# Prepare empty ancestry records for collections, with fast access by LID or LIDVID
ancestry_by_collection_lidvid = get_ancestry_by_collection_lidvid(collections_docs)
ancestry_by_collection_lid = get_ancestry_by_collection_lid(ancestry_by_collection_lidvid)
ancestry_by_collection_lidvid: Mapping[PdsLidVid, AncestryRecord] = get_ancestry_by_collection_lidvid(
collections_docs
)
ancestry_by_collection_lid: Mapping[PdsLid, Set[AncestryRecord]] = get_ancestry_by_collection_lid(
ancestry_by_collection_lidvid
)

# For each bundle, add it to the bundle-ancestry of every collection it references
for doc in bundles_docs:
Expand Down Expand Up @@ -238,6 +258,7 @@ def _get_nonaggregate_ancestry_records_with_chunking(
log.debug(f"dumping partial non-aggregate ancestry result-sets to {on_disk_cache_dir}")

collection_refs_query_docs = get_nonaggregate_ancestry_records_query(client, registry_db_mock)
touched_ref_documents: List[RefDocBookkeepingEntry] = []

baseline_memory_usage = psutil.virtual_memory().percent
user_configured_max_memory_usage = AncestryRuntimeConstants.max_acceptable_memory_usage
Expand All @@ -252,10 +273,12 @@ def _get_nonaggregate_ancestry_records_with_chunking(
0 # populated based on the largest encountered chunk. see split_chunk_if_oversized() for explanation
)

most_recent_attempted_collection_lidvid: Union[PdsLidVid, None] = None
nonaggregate_ancestry_records_by_lidvid = {}
for doc in collection_refs_query_docs:
try:
collection_lidvid = PdsLidVid.from_string(doc["_source"]["collection_lidvid"])
most_recent_attempted_collection_lidvid = collection_lidvid
for nonaggregate_lidvid_str in doc["_source"]["product_lidvid"]:
bundle_ancestry = bundle_ancestry_by_collection_lidvid[collection_lidvid]

Expand All @@ -281,21 +304,33 @@ def _get_nonaggregate_ancestry_records_with_chunking(
) # slightly problematic due to reuse of pointers vs actual values, but let's try it
nonaggregate_ancestry_records_by_lidvid = {}

except (ValueError, KeyError) as err:
log.warning(
'Failed to parse collection and/or product LIDVIDs from document in index "%s" with id "%s" due to %s: %s',
doc.get("_index"),
doc.get("_id"),
type(err).__name__,
err,
# mark collection for metadata update
touched_ref_documents.append(
RefDocBookkeepingEntry(id=doc["_id"], primary_term=doc["_primary_term"], seq_no=doc["_seq_no"])
)

except (ValueError, KeyError) as err:
if (
isinstance(err, KeyError)
and most_recent_attempted_collection_lidvid not in bundle_ancestry_by_collection_lidvid
):
probable_cause = f'[Probable Cause]: Collection primary document with id "{doc["_source"].get("collection_lidvid")}" not found in index "registry" for registry-refs doc with id "{doc.get("_id")}"'
elif isinstance(err, ValueError):
probable_cause = f'[Probable Cause]: Failed to parse collection and/or product LIDVIDs from document with id "{doc.get("_id")}" in index "{doc.get("_index")}" due to {type(err).__name__}: {err}'
else:
probable_cause = f"Unknown error due to {type(err).__name__}: {err}"

log.warning(probable_cause)
continue

# don't forget to yield non-disk-dumped records
make_history_serializable(nonaggregate_ancestry_records_by_lidvid)
chunk_size_max = max(chunk_size_max, sys.getsizeof(nonaggregate_ancestry_records_by_lidvid))
for history_dict in nonaggregate_ancestry_records_by_lidvid.values():
yield AncestryRecord.from_dict(history_dict)
try:
yield AncestryRecord.from_dict(history_dict)
except ValueError as err:
log.error(err)
del nonaggregate_ancestry_records_by_lidvid
gc.collect()

Expand All @@ -317,3 +352,31 @@ def _get_nonaggregate_ancestry_records_with_chunking(

if not using_cache_override:
shutil.rmtree(on_disk_cache_dir)

# See race condition comment in function def
update_refs_document_metadata(client, touched_ref_documents)


def update_refs_document_metadata(client: OpenSearch, docs: List[RefDocBookkeepingEntry]):
"""
Write ancestry version metadata for all collection-page documents for which AncestryRecords were successfully
produced.
Subject to a race condition where this will be called when the final AncestryRecord is yielded, and therefore may
write metadata before the final page of AncestryRecords is written to the db (which may fail). This is an
acceptably-small risk for now given that we can detect orphaned documents, but may need to be refactored later.
"""

def generate_update(doc: RefDocBookkeepingEntry) -> Update:
return Update(
id=doc.id,
primary_term=doc.primary_term,
seq_no=doc.seq_no,
content={SWEEPERS_ANCESTRY_VERSION_METADATA_KEY: SWEEPERS_ANCESTRY_VERSION},
)

updates = map(generate_update, docs)
logging.info(
f"Updating {len(docs)} registry-refs docs with {SWEEPERS_ANCESTRY_VERSION_METADATA_KEY}={SWEEPERS_ANCESTRY_VERSION}"
)
write_updated_docs(client, updates, index_name="registry-refs", bulk_chunk_max_update_count=20000)
logging.info("registry-refs metadata update complete")
45 changes: 38 additions & 7 deletions src/pds/registrysweepers/ancestry/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from opensearchpy import OpenSearch
from pds.registrysweepers.ancestry.runtimeconstants import AncestryRuntimeConstants
from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION
from pds.registrysweepers.ancestry.versioning import SWEEPERS_ANCESTRY_VERSION_METADATA_KEY
from pds.registrysweepers.utils.db import query_registry_db_or_mock

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -35,9 +37,9 @@ def product_class_query_factory(cls: ProductClass) -> Dict:

def get_bundle_ancestry_records_query(client: OpenSearch, db_mock: DbMockTypeDef = None) -> Iterable[Dict]:
query = product_class_query_factory(ProductClass.BUNDLE)
_source = {"includes": ["lidvid"]}
_source = {"includes": ["lidvid", SWEEPERS_ANCESTRY_VERSION_METADATA_KEY]}
query_f = query_registry_db_or_mock(db_mock, "get_bundle_ancestry_records", use_search_after=True)
docs = query_f(client, query, _source)
docs = query_f(client, "registry", query, _source)

return docs

Expand All @@ -46,7 +48,7 @@ def get_collection_ancestry_records_bundles_query(client: OpenSearch, db_mock: D
query = product_class_query_factory(ProductClass.BUNDLE)
_source = {"includes": ["lidvid", "ref_lid_collection"]}
query_f = query_registry_db_or_mock(db_mock, "get_collection_ancestry_records_bundles", use_search_after=True)
docs = query_f(client, query, _source)
docs = query_f(client, "registry", query, _source)

return docs

Expand All @@ -56,28 +58,57 @@ def get_collection_ancestry_records_collections_query(
) -> Iterable[Dict]:
# Query the registry for all collection identifiers
query = product_class_query_factory(ProductClass.COLLECTION)
_source = {"includes": ["lidvid", "alternate_ids"]}
_source = {"includes": ["lidvid", "alternate_ids", SWEEPERS_ANCESTRY_VERSION_METADATA_KEY]}
query_f = query_registry_db_or_mock(db_mock, "get_collection_ancestry_records_collections", use_search_after=True)
docs = query_f(client, query, _source)
docs = query_f(client, "registry", query, _source)

return docs


def get_nonaggregate_ancestry_records_query(client: OpenSearch, registry_db_mock: DbMockTypeDef) -> Iterable[Dict]:
# Query the registry-refs index for the contents of all collections
query: Dict = {"query": {"match_all": {}}}
query: Dict = {
"query": {
"bool": {
"must_not": [{"range": {SWEEPERS_ANCESTRY_VERSION_METADATA_KEY: {"gte": SWEEPERS_ANCESTRY_VERSION}}}]
}
},
"seq_no_primary_term": True,
}
_source = {"includes": ["collection_lidvid", "batch_id", "product_lidvid"]}
query_f = query_registry_db_or_mock(registry_db_mock, "get_nonaggregate_ancestry_records", use_search_after=True)

# each document will have many product lidvids, so a smaller page size is warranted here
docs = query_f(
client,
"registry-refs",
query,
_source,
index_name="registry-refs",
page_size=AncestryRuntimeConstants.nonaggregate_ancestry_records_query_page_size,
request_timeout_seconds=30,
sort_fields=["collection_lidvid", "batch_id"],
)

return docs


def get_orphaned_documents(client: OpenSearch, registry_db_mock: DbMockTypeDef, index_name: str) -> Iterable[Dict]:
# Query an index documents without an up-to-date ancestry version reference - this would indicate a product which is
# orphaned and is getting missed in processing
query: Dict = {
"query": {
"bool": {
"must_not": [{"range": {SWEEPERS_ANCESTRY_VERSION_METADATA_KEY: {"gte": SWEEPERS_ANCESTRY_VERSION}}}]
}
}
}
_source: Dict = {"includes": []}
query_f = query_registry_db_or_mock(registry_db_mock, "get_orphaned_ancestry_docs", use_search_after=True)

sort_fields_override = (
["collection_lidvid", "batch_id"] if index_name == "registry-refs" else None
) # use default for registry

docs = query_f(client, index_name, query, _source, sort_fields=sort_fields_override)

return docs
Loading
Loading