Skip to content

Commit

Permalink
40 add logging aggregation, LOGLEVEL env var, other logging tweaks (#45)
Browse files Browse the repository at this point in the history
* implement aggregation of product update errors by error type and error reason
this prevents the logs from getting blasted when batches full of similar errors present.  does not aggregate when the product id appears in the reason

* implement environment variable LOGLEVEL in sweepers_driver
takes an int or string representation of a python standard log level like INFO

* add lowercase support to utils.parse_log_level()

* update README.md for LOGLEVEL env var

* fix bug in query progress logging

* add explanation for Opensearch returning HTTP200 when failures exist

* remove redundant/misleading log message
  • Loading branch information
alexdunnjpl authored Jul 26, 2023
1 parent 7def3d8 commit 52c1529
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 22 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The ancestry sweeper generates membership metadata for each product, i.e. which
```
PROV_CREDENTIALS={"admin": "admin"} // OpenSearch username/password
PROV_ENDPOINT=https://localhost:9200 // OpenSearch host url and port
LOGLEVEL - an integer log level or anycase string matching a python log level like `INFO` (optional - defaults to `INFO`))
DEV_MODE=1 // disables host verification
```

Expand Down
2 changes: 2 additions & 0 deletions docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ Requires a running deployment of registry
#### Env Variables
`PROV_ENDPOINT` - the URL of the registry OpenSearch http endpoint
`PROV_CREDENTIALS` - a JSON string of format `{"$username": "$password"}`
`LOGLEVEL` - (optional - defaults to `INFO`) an integer log level or anycase string matching a python log level like `INFO`
`DEV_MODE=1` - (optional) in dev mode, host cert verification is disabled


### Development
Expand Down
5 changes: 3 additions & 2 deletions docker/sweepers_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
from typing import Callable, Iterable

from pds.registrysweepers import provenance, ancestry
from pds.registrysweepers.utils import configure_logging, get_human_readable_elapsed_since
from pds.registrysweepers.utils import configure_logging, get_human_readable_elapsed_since, parse_log_level

configure_logging(filepath=None, log_level=logging.INFO)
log = logging.getLogger(__name__)
Expand Down Expand Up @@ -92,6 +92,7 @@
logging.error(err)
raise ValueError(f'Failed to parse username/password from PROV_CREDENTIALS value "{provCredentialsStr}": {err}')

log_level = parse_log_level(os.environ.get('LOGLEVEL', 'INFO'))

def run_factory(sweeper_f: Callable) -> Callable:
return functools.partial(
Expand All @@ -100,7 +101,7 @@ def run_factory(sweeper_f: Callable) -> Callable:
username=username,
password=password,
log_filepath='provenance.log',
log_level=logging.INFO, # TODO: pull this from LOGLEVEL env var
log_level=log_level,
verify_host_certs=True if not dev_mode else False
)

Expand Down
70 changes: 50 additions & 20 deletions src/pds/registrysweepers/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def parse_log_level(input: str) -> int:
try:
result = int(input)
except ValueError:
result = getattr(logging, input)
result = getattr(logging, input.upper())
return result


Expand Down Expand Up @@ -123,6 +123,9 @@ def query_registry_db(
path = ",".join([index_name] + cross_cluster_indexes) + f"/_search?scroll={scroll_validity_duration_minutes}m"
served_hits = 0

last_info_log_at_percentage = 0
log.info("Query progress: 0%")

more_data_exists = True
while more_data_exists:
resp = retry_call(
Expand All @@ -143,14 +146,11 @@ def query_registry_db(
total_hits = data["hits"]["total"]["value"]
log.debug(f" paging query ({served_hits} to {min(served_hits + page_size, total_hits)} of {total_hits})")

last_info_log_at_percentage = 0
log.info("Query progress: 0%")

for hit in data["hits"]["hits"]:
served_hits += 1

percentage_of_hits_served = int(served_hits / total_hits * 100)
if last_info_log_at_percentage is None or percentage_of_hits_served > (last_info_log_at_percentage + 5):
if last_info_log_at_percentage is None or percentage_of_hits_served >= (last_info_log_at_percentage + 5):
last_info_log_at_percentage = percentage_of_hits_served
log.info(f"Query progress: {percentage_of_hits_served}%")

Expand Down Expand Up @@ -245,25 +245,55 @@ def _write_bulk_updates_chunk(host: Host, index_name: str, bulk_updates: Iterabl
headers=headers,
verify=host.verify,
)
response.raise_for_status()

# N.B. HTTP status 200 is insufficient as a success check for _bulk API.
# See: https://github.com/elastic/elasticsearch/issues/41434
response.raise_for_status()
response_content = response.json()
if response_content.get("errors"):
warn_types = {"document_missing_exception"} # these types represent bad data, not bad sweepers behaviour
items_with_error = [item for item in response_content["items"] if "error" in item["update"]]
items_with_warnings = [item for item in items_with_error if item["update"]["error"]["type"] in warn_types]
items_with_errors = [item for item in items_with_error if item["update"]["error"]["type"] not in warn_types]

for item in items_with_warnings:
error_type = item["update"]["error"]["type"]
log.warning(f'Attempt to update document {item["update"]["_id"]} failed due to {error_type}')

for item in items_with_errors:
log.error(
f'Attempt to update document {item["update"]["_id"]} unexpectedly failed: {item["update"]["error"]}'
)

log.info("Successfully wrote bulk updates chunk")
items_with_problems = [item for item in response_content["items"] if "error" in item["update"]]

if log.isEnabledFor(logging.WARNING):
items_with_warnings = [
item for item in items_with_problems if item["update"]["error"]["type"] in warn_types
]
warning_aggregates = aggregate_update_error_types(items_with_warnings)
for error_type, reason_aggregate in warning_aggregates.items():
for error_reason, ids in reason_aggregate.items():
log.warning(
f"Attempt to update the following documents failed due to {error_type} ({error_reason}): {ids}"
)

if log.isEnabledFor(logging.ERROR):
items_with_errors = [
item for item in items_with_problems if item["update"]["error"]["type"] not in warn_types
]
error_aggregates = aggregate_update_error_types(items_with_errors)
for error_type, reason_aggregate in error_aggregates.items():
for error_reason, ids in reason_aggregate.items():
log.error(
f"Attempt to update the following documents failed unexpectedly due to {error_type} ({error_reason}): {ids}"
)


def aggregate_update_error_types(items: Iterable[Dict]) -> Mapping[str, Dict[str, List[str]]]:
"""Return a nested aggregation of ids, aggregated first by error type, then by reason"""
agg: Dict[str, Dict[str, List[str]]] = {}
for item in items:
id = item["update"]["_id"]
error = item["update"]["error"]
error_type = error["type"]
error_reason = error["reason"]
if error_type not in agg:
agg[error_type] = {}

if error_reason not in agg[error_type]:
agg[error_type][error_reason] = []

agg[error_type][error_reason].append(id)

return agg


def coerce_list_type(db_value: Any) -> List[Any]:
Expand Down

0 comments on commit 52c1529

Please sign in to comment.