From 24bb839d6ecf05b1082ac832b4162185c2615cd4 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 25 Apr 2023 14:29:15 -0500 Subject: [PATCH] [dataset]: Added tracing to create_item (#185) * [dataset]: Added tracing to create_item This adds some tracing to monitor item creating time to the create_item task. After an item is created, we'll log a messaage to an application insights instance with some custom dimensions * `type`: `pctasks.create_item` for filtering * `collection_id`: The collection ID, for filtering * `asset_uri`: unique(ish) identifier for what assets were cataloged * `duration_seconds`: The time it took to create the item --- deployment/terraform/resources/keyvault.tf | 6 ++ pctasks/dataset/pctasks/dataset/items/task.py | 84 +++++++++++++++---- pctasks/dataset/setup.py | 3 +- pctasks/dataset/tests/items/test_task.py | 43 +++++++++- requirements-dev.txt | 1 + 5 files changed, 118 insertions(+), 19 deletions(-) diff --git a/deployment/terraform/resources/keyvault.tf b/deployment/terraform/resources/keyvault.tf index c249d001..893dc366 100644 --- a/deployment/terraform/resources/keyvault.tf +++ b/deployment/terraform/resources/keyvault.tf @@ -41,6 +41,12 @@ resource "azurerm_key_vault_secret" "task-client-secret" { key_vault_id = data.azurerm_key_vault.pctasks.id } +resource "azurerm_key_vault_secret" "task-application-insights-connection-string" { + name = "task-application-insights-connection-string" + value = azurerm_application_insights.pctasks.connection_string + key_vault_id = data.azurerm_key_vault.pctasks.id +} + # API Management access key data "azurerm_key_vault" "deploy_secrets" { diff --git a/pctasks/dataset/pctasks/dataset/items/task.py b/pctasks/dataset/pctasks/dataset/items/task.py index d387e124..d3a59323 100644 --- a/pctasks/dataset/pctasks/dataset/items/task.py +++ b/pctasks/dataset/pctasks/dataset/items/task.py @@ -1,10 +1,12 @@ +import contextlib import logging import os import time -from typing import Callable, List, Union +from typing import Callable, Iterator, List, Optional, Union import orjson import pystac +from opencensus.ext.azure.log_exporter import AzureLogHandler from pctasks.core.models.task import FailedTaskResult, WaitTaskResult from pctasks.core.storage import StorageFactory @@ -14,6 +16,9 @@ from pctasks.task.task import Task logger = logging.getLogger(__name__) +azlogger = logging.getLogger("monitor.pctasks.dataset.items.task") +azlogger.setLevel(logging.INFO) +azhandler = None # initialized later in `_init_azlogger` class CreateItemsError(Exception): @@ -34,6 +39,61 @@ def asset_chunk_id_to_ndjson_chunk_id(asset_chunk_id: str) -> str: return os.path.join(folder_name, "items.ndjson") +def _init_azlogger() -> None: + # AzureLogHandler is slow to initialize + # do it once here + global azhandler + + if azhandler is None: + logger.debug("Initializing AzureLogHandler") + try: + azhandler = AzureLogHandler() + except ValueError: + # missing instrumentation key + azhandler = False + logger.warning("Unable to initialize AzureLogHandler") + else: + azhandler.setLevel(logging.INFO) + azlogger.addHandler(azhandler) + + +@contextlib.contextmanager +def traced_create_item( + asset_uri: str, + collection_id: Optional[str], + i: Optional[int] = None, + asset_count: Optional[int] = None, +) -> Iterator[None]: + _init_azlogger() + start_time = time.monotonic() + yield + end_time = time.monotonic() + + if i is not None and asset_count is not None: + # asset_chunk_info case + logger.info( + f"({((i+1)/asset_count)*100:06.2f}%) " + f"[{end_time - start_time:.2f}s] " + f" - {asset_uri} " + f"({i+1} of {asset_count})" + ) + else: + # asset_uri case + logger.info( + f"Created items from {asset_uri} in " f"{end_time - start_time:.2f}s" + ) + + properties = { + "custom_dimensions": { + "type": "pctasks.create_item", + "collection_id": collection_id, + "asset_uri": asset_uri, + "duration_seconds": end_time - start_time, + } + } + azlogger.info("Created item", extra=properties) + + class CreateItemsTask(Task[CreateItemsInput, CreateItemsOutput]): _input_model = CreateItemsInput _output_model = CreateItemsOutput @@ -83,13 +143,8 @@ def _ensure_collection(items: List[pystac.Item]) -> None: if args.asset_uri: try: - start_time = time.monotonic() - result = self._create_item(args.asset_uri, storage_factory) - end_time = time.monotonic() - logger.info( - f"Created items from {args.asset_uri} in " - f"{end_time - start_time:.2f}s" - ) + with traced_create_item(args.asset_uri, args.collection_id): + result = self._create_item(args.asset_uri, storage_factory) except Exception as e: raise CreateItemsError( f"Failed to create item from {args.asset_uri}" @@ -113,15 +168,10 @@ def _ensure_collection(items: List[pystac.Item]) -> None: chunk_lines = chunk_lines[: args.options.limit] for i, asset_uri in enumerate(chunk_lines): try: - start_time = time.monotonic() - result = self._create_item(asset_uri, storage_factory) - end_time = time.monotonic() - logger.info( - f"({((i+1)/asset_count)*100:06.2f}%) " - f"[{end_time - start_time:.2f}s] " - f" - {asset_uri} " - f"({i+1} of {asset_count})" - ) + with traced_create_item( + asset_uri, args.collection_id, i=i, asset_count=asset_count + ): + result = self._create_item(asset_uri, storage_factory) except Exception as e: raise CreateItemsError( f"Failed to create item from {asset_uri}" diff --git a/pctasks/dataset/setup.py b/pctasks/dataset/setup.py index c5507293..5de24c56 100644 --- a/pctasks/dataset/setup.py +++ b/pctasks/dataset/setup.py @@ -15,7 +15,8 @@ "dev": [ "pytest", "pytest-cov", - "pre-commit" + "pre-commit", + "responses", ], "docs": ["mkdocs", "mkdocs-material", "pdocs"], } diff --git a/pctasks/dataset/tests/items/test_task.py b/pctasks/dataset/tests/items/test_task.py index 0aab4686..9fe0f3d9 100644 --- a/pctasks/dataset/tests/items/test_task.py +++ b/pctasks/dataset/tests/items/test_task.py @@ -1,18 +1,25 @@ import json +import logging from pathlib import Path from tempfile import TemporaryDirectory from typing import List, Union import pystac +import responses from pystac.utils import str_to_datetime +import pctasks.dataset.items.task from pctasks.core.models.task import CompletedTaskResult, WaitTaskResult from pctasks.core.storage import StorageFactory from pctasks.core.storage.local import LocalStorage from pctasks.core.utils.stac import validate_stac from pctasks.dataset.chunks.models import ChunkInfo from pctasks.dataset.items.models import CreateItemsOutput -from pctasks.dataset.items.task import CreateItemsInput, CreateItemsTask +from pctasks.dataset.items.task import ( + CreateItemsInput, + CreateItemsTask, + traced_create_item, +) from pctasks.dev.test_utils import run_test_task from pctasks.task.utils import get_task_path @@ -89,3 +96,37 @@ def test_wait_for_assets(): task_result = run_test_task(args.dict(), TASK_PATH) assert isinstance(task_result, WaitTaskResult) + + +@responses.activate +def test_log_to_monitor(monkeypatch, caplog): + monkeypatch.setenv( + "APPLICATIONINSIGHTS_CONNECTION_STRING", + "InstrumentationKey=00000000-0000-0000-0000-000000000000;IngestionEndpoint=https://westeurope-5.in.applicationinsights.azure.com/;LiveEndpoint=https://westeurope.livediagnostics.monitor.azure.com/", # noqa: E501 + ) + # opencensus will log an error about the instrumentation key being invalid + opencensus_logger = logging.getLogger("opencensus.ext.azure") + opencensus_logger.setLevel(logging.CRITICAL) + + responses.post( + "https://westus-0.in.applicationinsights.azure.com//v2.1/track", + ) + + # Ensure that any previous tests initializing logging + # (without an instrumentation key) didn't mess up our handler + monkeypatch.setattr(pctasks.dataset.items.task, "azhandler", None) + + with caplog.at_level(logging.INFO): + with traced_create_item("blob://test/test/asset.tif", "test-collection"): + pass + + record = caplog.records[1] + assert record.custom_dimensions.pop("duration_seconds") + assert record.custom_dimensions == { + "asset_uri": "blob://test/test/asset.tif", + "collection_id": "test-collection", + "type": "pctasks.create_item", + } + + azlogger = logging.getLogger("monitor.pctasks.dataset.items.task") + assert len(azlogger.handlers) == 1 diff --git a/requirements-dev.txt b/requirements-dev.txt index 79621dd2..da1f479e 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -10,6 +10,7 @@ pystac[validation]==1.* azure-functions azure-functions-durable +responses # Mypy stubs