Skip to content

Commit

Permalink
fix: remove metrics manual instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
dalpasso committed Jan 10, 2024
1 parent eefa7fe commit 2b889a9
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 431 deletions.
13 changes: 0 additions & 13 deletions eodag/api/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import re
import shutil
from operator import itemgetter
from time import perf_counter
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Tuple, Union

import geojson
Expand Down Expand Up @@ -81,7 +80,6 @@
UnsupportedProductType,
UnsupportedProvider,
)
from eodag.utils.otel import telemetry
from eodag.utils.stac_reader import fetch_stac_items

if TYPE_CHECKING:
Expand Down Expand Up @@ -1642,22 +1640,11 @@ def _do_search(
total_results = 0

try:
trace_id = telemetry.get_current_trace_id()
timer = telemetry.get_overhead_timer(trace_id)
start_time = perf_counter()

if need_auth and auth_plugin and can_authenticate:
search_plugin.auth = auth_plugin.authenticate()

res, nb_res = search_plugin.query(count=count, auth=auth_plugin, **kwargs)

end_time = perf_counter()
total_time = end_time - start_time
telemetry.record_outbound_request_duration(
search_plugin.provider, total_time
)
timer.record_subtask_time(total_time)

# Only do the pagination computations when it makes sense. For example,
# for a search by id, we can reasonably guess that the provider will return
# At most 1 product, so we don't need such a thing as pagination
Expand Down
32 changes: 6 additions & 26 deletions eodag/plugins/download/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
NotAvailableError,
)
from eodag.utils.notebook import NotebookWidgets
from eodag.utils.otel import telemetry

if TYPE_CHECKING:
from eodag.api.product import EOProduct
Expand Down Expand Up @@ -705,40 +704,21 @@ def progress_callback_decorator(
) -> Callable[[Any, Any], None]:
"""No-op decorator for the progress_callback.
Patching this function ca be usuful, for example, to implement automatic
Patching this function ca be useful, for example, to implement automatic
instrumentation that needs more context information.
:param progress_callback: The progress callback to decorate
:type progress_callback: :class:`~eodag.utils.ProgressCallback`
:param decorator_kwargs: Additional arguments used by the wrapper
:type decorator_kwargs: Any
"""

@functools.wraps(progress_callback)
def progress_callback_wrapper(*args: Any, **kwargs: Any) -> None:
progress_callback(*args, **kwargs)

return progress_callback_wrapper

def _record_downloaded_data(self, progress_callback):
"""
Record downloaded data wrapper.
Record the downloaded data metric by wrapping the callback.
:param progress_callback: A method or a callable object
which takes a current size and a maximum
size as inputs and handle progress bar
creation and update to give the user a
feedback on the download progress
:type progress_callback: :class:`~eodag.utils.ProgressCallback`
:returns: wrapper
:rtype: :class:`typing.Any`
:param decorator_kwargs: Additional arguments used by the wrapper
:type decorator_kwargs: Any
"""

def wrapper(*args, **kwargs):
# metrics
telemetry.record_downloaded_data(self.provider, args[0])
@functools.wraps(progress_callback)
def progress_callback_wrapper(*args: Any, **kwargs: Any) -> None:
progress_callback(*args, **kwargs)

return wrapper
return progress_callback_wrapper
50 changes: 4 additions & 46 deletions eodag/rest/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
get_stac_extension_oseo,
get_stac_item_by_id,
search_stac_items,
telemetry_init_auto,
telemetry_init,
)
from eodag.utils import DEFAULT_ITEMS_PER_PAGE, parse_header, update_nested_dict
from eodag.utils.exceptions import (
Expand All @@ -74,7 +74,6 @@
UnsupportedProvider,
ValidationError,
)
from eodag.utils.otel import telemetry

if TYPE_CHECKING:
from fastapi.types import DecoratedCallable
Expand Down Expand Up @@ -346,25 +345,15 @@ def stac_collections_item_download(
collection_id: str, item_id: str, request: Request
) -> StreamingResponse:
"""STAC collection item download"""
trace_id = telemetry.get_current_trace_id()
timer = telemetry.create_overhead_timer(trace_id)
timer.start_global_timer()
logger.debug(f"URL: {request.url}")

arguments = dict(request.query_params)
provider = arguments.pop("provider", None)

response = download_stac_item_by_id_stream(
return download_stac_item_by_id_stream(
catalogs=[collection_id], item_id=item_id, provider=provider, **arguments
)

timer.stop_global_timer()
telemetry.record_request_duration(provider, timer.get_global_time())
telemetry.record_request_overhead_duration(provider, timer.get_overhead_time())
telemetry.delete_overhead_timer(trace_id)

return response


@router.get(
"/collections/{collection_id}/items/{item_id}/download/{asset_filter}",
Expand Down Expand Up @@ -544,27 +533,17 @@ def stac_catalogs_item_download(
catalogs: str, item_id: str, request: Request
) -> StreamingResponse:
"""STAC Catalog item download"""
trace_id = telemetry.get_current_trace_id()
timer = telemetry.create_overhead_timer(trace_id)
timer.start_global_timer()
logger.debug(f"URL: {request.url}")

arguments = dict(request.query_params)
provider = arguments.pop("provider", None)

list_catalog = catalogs.strip("/").split("/")

response = download_stac_item_by_id_stream(
return download_stac_item_by_id_stream(
catalogs=list_catalog, item_id=item_id, provider=provider, **arguments
)

timer.stop_global_timer()
telemetry.record_request_duration(provider, timer.get_global_time())
telemetry.record_request_overhead_duration(provider, timer.get_overhead_time())
telemetry.delete_overhead_timer(trace_id)

return response


@router.get(
"/catalogs/{catalogs:path}/items/{item_id}/download/{asset_filter}",
Expand Down Expand Up @@ -719,9 +698,6 @@ def stac_search(
request: Request, search_body: Optional[SearchBody] = None
) -> ORJSONResponse:
"""STAC collections items"""
trace_id = telemetry.get_current_trace_id()
timer = telemetry.create_overhead_timer(trace_id)
timer.start_global_timer()
logger.debug(f"URL: {request.url}")
logger.debug(f"Body: {search_body}")

Expand All @@ -736,20 +712,6 @@ def stac_search(
arguments = dict(request.query_params, **body)
provider = arguments.pop("provider", None)

# metrics
args_collections = arguments.get("collections", None)
if isinstance(args_collections, str):
product_type = args_collections.split(",")[0] if args_collections else None
elif isinstance(args_collections, list) and len(args_collections) > 0:
product_type = args_collections[0]
else:
product_type = None
if not product_type:
raise ValidationError(
"Cannot get product_type from collections %s" % args_collections
)
telemetry.record_searched_product_type(product_type)

response = search_stac_items(
url=url,
arguments=arguments,
Expand All @@ -760,12 +722,8 @@ def stac_search(
resp = ORJSONResponse(
content=response, status_code=200, media_type="application/json"
)
timer.stop_global_timer()
telemetry.record_request_duration(provider, timer.get_global_time())
telemetry.record_request_overhead_duration(provider, timer.get_overhead_time())
telemetry.delete_overhead_timer(trace_id)
return resp


app.include_router(router)
telemetry_init_auto(app)
telemetry_init(app)
11 changes: 1 addition & 10 deletions eodag/rest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
ValidationError,
)
from eodag.utils.instrumentation.eodag import EODAGInstrumentor
from eodag.utils.otel import telemetry

if TYPE_CHECKING:
from io import BufferedReader
Expand Down Expand Up @@ -1103,15 +1102,7 @@ def eodag_api_init() -> None:
next(eodag_api._plugins_manager.get_search_plugins(provider=provider))


def telemetry_init(app: FastAPI):
"""Init telemetry
:param app: FastAPI to automatically instrument.
:type app: FastAPI"""
telemetry.configure_instruments(eodag_api, app)


def telemetry_init_auto(fastapi_app: FastAPI = None):
def telemetry_init(fastapi_app: FastAPI = None):
"""Init telemetry
:param fastapi_app: FastAPI to automatically instrument.
Expand Down
Loading

0 comments on commit 2b889a9

Please sign in to comment.