Skip to content

Commit

Permalink
[dataset]: Added tracing to create_item (#185)
Browse files Browse the repository at this point in the history
* [dataset]: Added tracing to create_item

This adds some tracing to monitor item creating time to the create_item
task. After an item is created, we'll log a messaage to an application
insights instance with some custom dimensions

* `type`: `pctasks.create_item` for filtering
* `collection_id`: The collection ID, for filtering
* `asset_uri`: unique(ish) identifier for what assets were cataloged
* `duration_seconds`: The time it took to create the item
  • Loading branch information
Tom Augspurger authored Apr 25, 2023
1 parent 834f4a3 commit 24bb839
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 19 deletions.
6 changes: 6 additions & 0 deletions deployment/terraform/resources/keyvault.tf
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ resource "azurerm_key_vault_secret" "task-client-secret" {
key_vault_id = data.azurerm_key_vault.pctasks.id
}

resource "azurerm_key_vault_secret" "task-application-insights-connection-string" {
name = "task-application-insights-connection-string"
value = azurerm_application_insights.pctasks.connection_string
key_vault_id = data.azurerm_key_vault.pctasks.id
}

# API Management access key

data "azurerm_key_vault" "deploy_secrets" {
Expand Down
84 changes: 67 additions & 17 deletions pctasks/dataset/pctasks/dataset/items/task.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import contextlib
import logging
import os
import time
from typing import Callable, List, Union
from typing import Callable, Iterator, List, Optional, Union

import orjson
import pystac
from opencensus.ext.azure.log_exporter import AzureLogHandler

from pctasks.core.models.task import FailedTaskResult, WaitTaskResult
from pctasks.core.storage import StorageFactory
Expand All @@ -14,6 +16,9 @@
from pctasks.task.task import Task

logger = logging.getLogger(__name__)
azlogger = logging.getLogger("monitor.pctasks.dataset.items.task")
azlogger.setLevel(logging.INFO)
azhandler = None # initialized later in `_init_azlogger`


class CreateItemsError(Exception):
Expand All @@ -34,6 +39,61 @@ def asset_chunk_id_to_ndjson_chunk_id(asset_chunk_id: str) -> str:
return os.path.join(folder_name, "items.ndjson")


def _init_azlogger() -> None:
# AzureLogHandler is slow to initialize
# do it once here
global azhandler

if azhandler is None:
logger.debug("Initializing AzureLogHandler")
try:
azhandler = AzureLogHandler()
except ValueError:
# missing instrumentation key
azhandler = False
logger.warning("Unable to initialize AzureLogHandler")
else:
azhandler.setLevel(logging.INFO)
azlogger.addHandler(azhandler)


@contextlib.contextmanager
def traced_create_item(
asset_uri: str,
collection_id: Optional[str],
i: Optional[int] = None,
asset_count: Optional[int] = None,
) -> Iterator[None]:
_init_azlogger()
start_time = time.monotonic()
yield
end_time = time.monotonic()

if i is not None and asset_count is not None:
# asset_chunk_info case
logger.info(
f"({((i+1)/asset_count)*100:06.2f}%) "
f"[{end_time - start_time:.2f}s] "
f" - {asset_uri} "
f"({i+1} of {asset_count})"
)
else:
# asset_uri case
logger.info(
f"Created items from {asset_uri} in " f"{end_time - start_time:.2f}s"
)

properties = {
"custom_dimensions": {
"type": "pctasks.create_item",
"collection_id": collection_id,
"asset_uri": asset_uri,
"duration_seconds": end_time - start_time,
}
}
azlogger.info("Created item", extra=properties)


class CreateItemsTask(Task[CreateItemsInput, CreateItemsOutput]):
_input_model = CreateItemsInput
_output_model = CreateItemsOutput
Expand Down Expand Up @@ -83,13 +143,8 @@ def _ensure_collection(items: List[pystac.Item]) -> None:

if args.asset_uri:
try:
start_time = time.monotonic()
result = self._create_item(args.asset_uri, storage_factory)
end_time = time.monotonic()
logger.info(
f"Created items from {args.asset_uri} in "
f"{end_time - start_time:.2f}s"
)
with traced_create_item(args.asset_uri, args.collection_id):
result = self._create_item(args.asset_uri, storage_factory)
except Exception as e:
raise CreateItemsError(
f"Failed to create item from {args.asset_uri}"
Expand All @@ -113,15 +168,10 @@ def _ensure_collection(items: List[pystac.Item]) -> None:
chunk_lines = chunk_lines[: args.options.limit]
for i, asset_uri in enumerate(chunk_lines):
try:
start_time = time.monotonic()
result = self._create_item(asset_uri, storage_factory)
end_time = time.monotonic()
logger.info(
f"({((i+1)/asset_count)*100:06.2f}%) "
f"[{end_time - start_time:.2f}s] "
f" - {asset_uri} "
f"({i+1} of {asset_count})"
)
with traced_create_item(
asset_uri, args.collection_id, i=i, asset_count=asset_count
):
result = self._create_item(asset_uri, storage_factory)
except Exception as e:
raise CreateItemsError(
f"Failed to create item from {asset_uri}"
Expand Down
3 changes: 2 additions & 1 deletion pctasks/dataset/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
"dev": [
"pytest",
"pytest-cov",
"pre-commit"
"pre-commit",
"responses",
],
"docs": ["mkdocs", "mkdocs-material", "pdocs"],
}
Expand Down
43 changes: 42 additions & 1 deletion pctasks/dataset/tests/items/test_task.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
import json
import logging
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import List, Union

import pystac
import responses
from pystac.utils import str_to_datetime

import pctasks.dataset.items.task
from pctasks.core.models.task import CompletedTaskResult, WaitTaskResult
from pctasks.core.storage import StorageFactory
from pctasks.core.storage.local import LocalStorage
from pctasks.core.utils.stac import validate_stac
from pctasks.dataset.chunks.models import ChunkInfo
from pctasks.dataset.items.models import CreateItemsOutput
from pctasks.dataset.items.task import CreateItemsInput, CreateItemsTask
from pctasks.dataset.items.task import (
CreateItemsInput,
CreateItemsTask,
traced_create_item,
)
from pctasks.dev.test_utils import run_test_task
from pctasks.task.utils import get_task_path

Expand Down Expand Up @@ -89,3 +96,37 @@ def test_wait_for_assets():

task_result = run_test_task(args.dict(), TASK_PATH)
assert isinstance(task_result, WaitTaskResult)


@responses.activate
def test_log_to_monitor(monkeypatch, caplog):
monkeypatch.setenv(
"APPLICATIONINSIGHTS_CONNECTION_STRING",
"InstrumentationKey=00000000-0000-0000-0000-000000000000;IngestionEndpoint=https://westeurope-5.in.applicationinsights.azure.com/;LiveEndpoint=https://westeurope.livediagnostics.monitor.azure.com/", # noqa: E501
)
# opencensus will log an error about the instrumentation key being invalid
opencensus_logger = logging.getLogger("opencensus.ext.azure")
opencensus_logger.setLevel(logging.CRITICAL)

responses.post(
"https://westus-0.in.applicationinsights.azure.com//v2.1/track",
)

# Ensure that any previous tests initializing logging
# (without an instrumentation key) didn't mess up our handler
monkeypatch.setattr(pctasks.dataset.items.task, "azhandler", None)

with caplog.at_level(logging.INFO):
with traced_create_item("blob://test/test/asset.tif", "test-collection"):
pass

record = caplog.records[1]
assert record.custom_dimensions.pop("duration_seconds")
assert record.custom_dimensions == {
"asset_uri": "blob://test/test/asset.tif",
"collection_id": "test-collection",
"type": "pctasks.create_item",
}

azlogger = logging.getLogger("monitor.pctasks.dataset.items.task")
assert len(azlogger.handlers) == 1
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pystac[validation]==1.*

azure-functions
azure-functions-durable
responses

# Mypy stubs

Expand Down

0 comments on commit 24bb839

Please sign in to comment.