Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dataset]: Added tracing to create_item #185

Merged
merged 7 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions deployment/terraform/resources/keyvault.tf
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ resource "azurerm_key_vault_secret" "task-client-secret" {
key_vault_id = data.azurerm_key_vault.pctasks.id
}

resource "azurerm_key_vault_secret" "task-application-insights-connection-string" {
name = "task-application-insights-connection-string"
value = azurerm_application_insights.pctasks.connection_string
key_vault_id = data.azurerm_key_vault.pctasks.id
}

# API Management access key

data "azurerm_key_vault" "deploy_secrets" {
Expand Down
84 changes: 67 additions & 17 deletions pctasks/dataset/pctasks/dataset/items/task.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import contextlib
import logging
import os
import time
from typing import Callable, List, Union
from typing import Callable, Iterator, List, Optional, Union

import orjson
import pystac
from opencensus.ext.azure.log_exporter import AzureLogHandler

from pctasks.core.models.task import FailedTaskResult, WaitTaskResult
from pctasks.core.storage import StorageFactory
Expand All @@ -14,6 +16,9 @@
from pctasks.task.task import Task

logger = logging.getLogger(__name__)
azlogger = logging.getLogger("monitor.pctasks.dataset.items.task")
azlogger.setLevel(logging.INFO)
azhandler = None # initialized later in `_init_azlogger`


class CreateItemsError(Exception):
Expand All @@ -34,6 +39,61 @@ def asset_chunk_id_to_ndjson_chunk_id(asset_chunk_id: str) -> str:
return os.path.join(folder_name, "items.ndjson")


def _init_azlogger() -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logger initialization is a bit ugly. I see that the pctasks CLI has a setup_logging method, but I haven't fully investigated where that's called. This shouldn't be called on import, since the AzureLogHandler is call pretty slow (I think it makes a network request in the background).

# AzureLogHandler is slow to initialize
# do it once here
global azhandler

if azhandler is None:
logger.debug("Initializing AzureLogHandler")
try:
azhandler = AzureLogHandler()
except ValueError:
# missing instrumentation key
azhandler = False
logger.warning("Unable to initialize AzureLogHandler")
else:
azhandler.setLevel(logging.INFO)
azlogger.addHandler(azhandler)


@contextlib.contextmanager
def traced_create_item(
asset_uri: str,
collection_id: Optional[str],
i: Optional[int] = None,
asset_count: Optional[int] = None,
) -> Iterator[None]:
_init_azlogger()
start_time = time.monotonic()
yield
end_time = time.monotonic()

if i is not None and asset_count is not None:
# asset_chunk_info case
logger.info(
f"({((i+1)/asset_count)*100:06.2f}%) "
f"[{end_time - start_time:.2f}s] "
f" - {asset_uri} "
f"({i+1} of {asset_count})"
)
else:
# asset_uri case
logger.info(
f"Created items from {asset_uri} in " f"{end_time - start_time:.2f}s"
)
Comment on lines +72 to +84
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copy-pasted from below. It preserves the logs to stderr that we're used to.


properties = {
"custom_dimensions": {
"type": "pctasks.create_item",
"collection_id": collection_id,
"asset_uri": asset_uri,
"duration_seconds": end_time - start_time,
}
}
azlogger.info("Created item", extra=properties)


class CreateItemsTask(Task[CreateItemsInput, CreateItemsOutput]):
_input_model = CreateItemsInput
_output_model = CreateItemsOutput
Expand Down Expand Up @@ -83,13 +143,8 @@ def _ensure_collection(items: List[pystac.Item]) -> None:

if args.asset_uri:
try:
start_time = time.monotonic()
result = self._create_item(args.asset_uri, storage_factory)
end_time = time.monotonic()
logger.info(
f"Created items from {args.asset_uri} in "
f"{end_time - start_time:.2f}s"
)
with traced_create_item(args.asset_uri, args.collection_id):
result = self._create_item(args.asset_uri, storage_factory)
except Exception as e:
raise CreateItemsError(
f"Failed to create item from {args.asset_uri}"
Expand All @@ -113,15 +168,10 @@ def _ensure_collection(items: List[pystac.Item]) -> None:
chunk_lines = chunk_lines[: args.options.limit]
for i, asset_uri in enumerate(chunk_lines):
try:
start_time = time.monotonic()
result = self._create_item(asset_uri, storage_factory)
end_time = time.monotonic()
logger.info(
f"({((i+1)/asset_count)*100:06.2f}%) "
f"[{end_time - start_time:.2f}s] "
f" - {asset_uri} "
f"({i+1} of {asset_count})"
)
with traced_create_item(
asset_uri, args.collection_id, i=i, asset_count=asset_count
):
result = self._create_item(asset_uri, storage_factory)
except Exception as e:
raise CreateItemsError(
f"Failed to create item from {asset_uri}"
Expand Down
3 changes: 2 additions & 1 deletion pctasks/dataset/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
"dev": [
"pytest",
"pytest-cov",
"pre-commit"
"pre-commit",
"responses",
],
"docs": ["mkdocs", "mkdocs-material", "pdocs"],
}
Expand Down
39 changes: 38 additions & 1 deletion pctasks/dataset/tests/items/test_task.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import json
import logging
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import List, Union

import pystac
import responses
from pystac.utils import str_to_datetime

from pctasks.core.models.task import CompletedTaskResult, WaitTaskResult
Expand All @@ -12,7 +14,11 @@
from pctasks.core.utils.stac import validate_stac
from pctasks.dataset.chunks.models import ChunkInfo
from pctasks.dataset.items.models import CreateItemsOutput
from pctasks.dataset.items.task import CreateItemsInput, CreateItemsTask
from pctasks.dataset.items.task import (
CreateItemsInput,
CreateItemsTask,
traced_create_item,
)
from pctasks.dev.test_utils import run_test_task
from pctasks.task.utils import get_task_path

Expand Down Expand Up @@ -89,3 +95,34 @@ def test_wait_for_assets():

task_result = run_test_task(args.dict(), TASK_PATH)
assert isinstance(task_result, WaitTaskResult)


@responses.activate
def test_log_to_monitor(monkeypatch, caplog):
monkeypatch.setenv(
"APPLICATIONINSIGHTS_CONNECTION_STRING",
"InstrumentationKey=00000000-0000-0000-0000-000000000000;IngestionEndpoint=https://westeurope-5.in.applicationinsights.azure.com/;LiveEndpoint=https://westeurope.livediagnostics.monitor.azure.com/", # noqa: E501
)
logger = logging.getLogger("monitor.pctasks.dataset.items.task")

# opencensus will log an error about the instrumentation key being invalid
opencensus_logger = logging.getLogger("opencensus.ext.azure")
opencensus_logger.setLevel(logging.CRITICAL)

responses.post(
"https://westus-0.in.applicationinsights.azure.com//v2.1/track",
)

with caplog.at_level(logging.INFO):
with traced_create_item("blob://test/test/asset.tif", "test-collection"):
pass

record = caplog.records[1]
assert record.custom_dimensions.pop("duration_seconds")
assert record.custom_dimensions == {
"asset_uri": "blob://test/test/asset.tif",
"collection_id": "test-collection",
"type": "pctasks.create_item",
}

assert len(logger.handlers) == 1
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pystac[validation]==1.*

azure-functions
azure-functions-durable
responses

# Mypy stubs

Expand Down