Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ai-server): improved logging #16435

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions opentrons-ai-server/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
FROM --platform=linux/amd64 python:3.12-slim
ARG PLATFORM=linux/amd64
FROM --platform=$PLATFORM python:3.12-slim

ENV PYTHONUNBUFFERED True
ENV DOCKER_RUNNING True
ENV PYTHONUNBUFFERED=True
ENV DOCKER_RUNNING=True

WORKDIR /code

Expand All @@ -15,4 +16,4 @@ COPY ./api /code/api

EXPOSE 8000

CMD ["ddtrace-run", "uvicorn", "api.handler.fast:app", "--proxy-headers", "--host", "0.0.0.0", "--port", "8000", "--timeout-keep-alive", "190", "--workers", "3"]
CMD ["ddtrace-run", "uvicorn", "api.handler.fast:app", "--proxy-headers", "--host", "0.0.0.0", "--port", "8000", "--timeout-keep-alive", "190", "--log-config", "/code/api/uvicorn_disable_logging.json", "--workers", "3"]
2 changes: 1 addition & 1 deletion opentrons-ai-server/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ run:
docker logs -f $(CONTAINER_NAME)

.PHONY: clean
clean:
clean: gen-requirements
docker stop $(CONTAINER_NAME) || true
docker rm $(CONTAINER_NAME) || true

Expand Down
3 changes: 2 additions & 1 deletion opentrons-ai-server/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ fastapi = "==0.111.0"
ddtrace = "==2.9.2"
pydantic-settings = "==2.3.4"
pyjwt = {extras = ["crypto"], version = "*"}
python-json-logger = "==2.0.7"
beautifulsoup4 = "==4.12.3"
markdownify = "==0.13.1"
structlog = "==24.4.0"
asgi-correlation-id = "==4.3.3"

[dev-packages]
docker = "==7.1.0"
Expand Down
767 changes: 438 additions & 329 deletions opentrons-ai-server/Pipfile.lock

Large diffs are not rendered by default.

11 changes: 8 additions & 3 deletions opentrons-ai-server/api/domain/openai_predict.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging
from pathlib import Path
from typing import List, Tuple

import structlog
from ddtrace import tracer
from llama_index.core import Settings as li_settings
from llama_index.core import StorageContext, load_index_from_storage
from llama_index.embeddings.openai import OpenAIEmbedding
Expand All @@ -25,8 +26,8 @@
from api.domain.utils import refine_characters
from api.settings import Settings

logger = logging.getLogger(__name__)

settings: Settings = Settings()
logger = structlog.stdlib.get_logger(settings.logger_name)
ROOT_PATH: Path = Path(Path(__file__)).parent.parent.parent


Expand All @@ -38,6 +39,7 @@ def __init__(self, settings: Settings) -> None:
model_name="text-embedding-3-large", api_key=self.settings.openai_api_key.get_secret_value()
)

@tracer.wrap()
def get_docs_all(self, query: str) -> Tuple[str, str, str]:
commands = self.extract_atomic_description(query)
logger.info("Commands", extra={"commands": commands})
Expand Down Expand Up @@ -85,6 +87,7 @@ def get_docs_all(self, query: str) -> Tuple[str, str, str]:

return example_commands, docs + docs_ref, standard_api_names

@tracer.wrap()
def extract_atomic_description(self, protocol_description: str) -> List[str]:
class atomic_descr(BaseModel):
"""
Expand All @@ -106,6 +109,7 @@ class atomic_descr(BaseModel):
descriptions.append(x)
return descriptions

@tracer.wrap()
def refine_response(self, assistant_message: str) -> str:
if assistant_message is None:
return ""
Expand All @@ -129,6 +133,7 @@ def refine_response(self, assistant_message: str) -> str:

return response.choices[0].message.content if response.choices[0].message.content is not None else ""

@tracer.wrap()
def predict(self, prompt: str, chat_completion_message_params: List[ChatCompletionMessageParam] | None = None) -> None | str:

prompt = refine_characters(prompt)
Expand Down
8 changes: 5 additions & 3 deletions opentrons-ai-server/api/domain/prompts.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,31 @@
import json
import logging
import uuid
from typing import Any, Dict, Iterable

import requests
import structlog
from ddtrace import tracer
from openai.types.chat import ChatCompletionToolParam

from api.settings import Settings

settings: Settings = Settings()
logger = logging.getLogger(__name__)
logger = structlog.stdlib.get_logger(settings.logger_name)


def generate_unique_name() -> str:
unique_name = str(uuid.uuid4()) + ".py"
return unique_name


@tracer.wrap()
def send_post_request(payload: str) -> str:
url = "https://Opentrons-simulator.hf.space/protocol"
protocol_name: str = generate_unique_name()
data = {"name": protocol_name, "content": payload}
hf_token: str = settings.huggingface_api_key.get_secret_value()
headers = {"Content-Type": "application/json", "Authorization": "Bearer {}".format(hf_token)}

logger.info("Sending POST request to the simulate API", extra={"url": url, "protocolName": data["name"]})
response = requests.post(url, json=data, headers=headers)

if response.status_code != 200:
Expand Down
133 changes: 133 additions & 0 deletions opentrons-ai-server/api/handler/custom_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Taken directly from https://gist.github.com/nymous/f138c7f06062b7c43c060bf03759c29e
import logging
import sys

import ddtrace
import structlog
from ddtrace import tracer
from structlog.types import EventDict, Processor


# https://github.com/hynek/structlog/issues/35#issuecomment-591321744
def rename_event_key(_, __, event_dict: EventDict) -> EventDict: # type: ignore[no-untyped-def]
"""
Log entries keep the text message in the `event` field, but Datadog
uses the `message` field. This processor moves the value from one field to
the other.
See https://github.com/hynek/structlog/issues/35#issuecomment-591321744
"""
event_dict["message"] = event_dict.pop("event")
return event_dict


def drop_color_message_key(_, __, event_dict: EventDict) -> EventDict: # type: ignore[no-untyped-def]
"""
Uvicorn logs the message a second time in the extra `color_message`, but we don't
need it. This processor drops the key from the event dict if it exists.
"""
event_dict.pop("color_message", None)
return event_dict


def tracer_injection(_, __, event_dict: EventDict) -> EventDict: # type: ignore[no-untyped-def]
# get correlation ids from current tracer context
span = tracer.current_span()
trace_id, span_id = (str((1 << 64) - 1 & span.trace_id), span.span_id) if span else (None, None)

# add ids to structlog event dictionary
event_dict["dd.trace_id"] = str(trace_id or 0)
event_dict["dd.span_id"] = str(span_id or 0)

# add the env, service, and version configured for the tracer
event_dict["dd.env"] = ddtrace.config.env or ""
event_dict["dd.service"] = ddtrace.config.service or ""
event_dict["dd.version"] = ddtrace.config.version or ""

return event_dict


def setup_logging(json_logs: bool = False, log_level: str = "INFO") -> None:
timestamper = structlog.processors.TimeStamper(fmt="iso")

shared_processors: list[Processor] = [
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.stdlib.ExtraAdder(),
drop_color_message_key,
tracer_injection,
timestamper,
structlog.processors.StackInfoRenderer(),
]

if json_logs:
# We rename the `event` key to `message` only in JSON logs, as Datadog looks for the
# `message` key but the pretty ConsoleRenderer looks for `event`
shared_processors.append(rename_event_key)
# Format the exception only for JSON logs, as we want to pretty-print them when
# using the ConsoleRenderer
shared_processors.append(structlog.processors.format_exc_info)

structlog.configure(
processors=shared_processors
+ [
# Prepare event dict for `ProcessorFormatter`.
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)

log_renderer: structlog.types.Processor
if json_logs:
log_renderer = structlog.processors.JSONRenderer()
else:
log_renderer = structlog.dev.ConsoleRenderer()

formatter = structlog.stdlib.ProcessorFormatter(
# These run ONLY on `logging` entries that do NOT originate within
# structlog.
foreign_pre_chain=shared_processors,
# These run on ALL entries after the pre_chain is done.
processors=[
# Remove _record & _from_structlog.
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
log_renderer,
],
)

handler = logging.StreamHandler()
# Use OUR `ProcessorFormatter` to format all `logging` entries.
handler.setFormatter(formatter)
root_logger = logging.getLogger()
root_logger.addHandler(handler)
root_logger.setLevel(log_level.upper())

for _log in ["uvicorn", "uvicorn.error"]:
# Clear the log handlers for uvicorn loggers, and enable propagation
# so the messages are caught by our root logger and formatted correctly
# by structlog
logging.getLogger(_log).handlers.clear()
logging.getLogger(_log).propagate = True

# Since we re-create the access logs ourselves, to add all information
# in the structured log (see the `logging_middleware` in main.py), we clear
# the handlers and prevent the logs to propagate to a logger higher up in the
# hierarchy (effectively rendering them silent).
logging.getLogger("uvicorn.access").handlers.clear()
logging.getLogger("uvicorn.access").propagate = False

def handle_exception(exc_type, exc_value, exc_traceback): # type: ignore[no-untyped-def]
"""
Log any uncaught exception instead of letting it be printed by Python
(but leave KeyboardInterrupt untouched to allow users to Ctrl+C to stop)
See https://stackoverflow.com/a/16993115/3641865
"""
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return

root_logger.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))

sys.excepthook = handle_exception
82 changes: 74 additions & 8 deletions opentrons-ai-server/api/handler/fast.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,37 @@
import asyncio
import os
import time
from typing import Any, Awaitable, Callable, List, Literal, Union

import ddtrace
import structlog
from asgi_correlation_id import CorrelationIdMiddleware
from asgi_correlation_id.context import correlation_id
from ddtrace import tracer
from ddtrace.contrib.asgi.middleware import TraceMiddleware
from fastapi import FastAPI, HTTPException, Query, Request, Response, Security, status
from fastapi.exceptions import RequestValidationError
from fastapi.middleware.cors import CORSMiddleware
from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html
from fastapi.responses import HTMLResponse, JSONResponse
from pydantic import BaseModel, Field, conint
from starlette.middleware.base import BaseHTTPMiddleware
from uvicorn.protocols.utils import get_path_with_query_string

from api.domain.openai_predict import OpenAIPredict
from api.handler.logging_config import get_logger, setup_logging
from api.handler.custom_logging import setup_logging
from api.integration.auth import VerifyToken
from api.models.chat_request import ChatRequest
from api.models.chat_response import ChatResponse
from api.models.empty_request_error import EmptyRequestError
from api.models.internal_server_error import InternalServerError
from api.settings import Settings

setup_logging()
logger = get_logger(__name__)
ddtrace.patch(logging=True)
settings: Settings = Settings()
setup_logging(json_logs=settings.json_logging, log_level=settings.log_level.upper())

access_logger = structlog.stdlib.get_logger("api.access")
logger = structlog.stdlib.get_logger(settings.logger_name)

auth: VerifyToken = VerifyToken()
openai: OpenAIPredict = OpenAIPredict(settings)

Expand Down Expand Up @@ -75,6 +82,61 @@ async def dispatch(self, request: Request, call_next: Any) -> JSONResponse | Any
app.add_middleware(TimeoutMiddleware, timeout_s=178)


@app.middleware("http")
async def logging_middleware(request: Request, call_next) -> Response: # type: ignore[no-untyped-def]
structlog.contextvars.clear_contextvars()
# These context vars will be added to all log entries emitted during the request
request_id = correlation_id.get()
structlog.contextvars.bind_contextvars(request_id=request_id)

start_time = time.perf_counter_ns()
# If the call_next raises an error, we still want to return our own 500 response,
# so we can add headers to it (process time, request ID...)
response = Response(status_code=500)
try:
response = await call_next(request)
except Exception:
structlog.stdlib.get_logger("api.error").exception("Uncaught exception")
raise
finally:
process_time = time.perf_counter_ns() - start_time
status_code = response.status_code
url = get_path_with_query_string(request.scope) # type: ignore[arg-type]
client_host = request.client.host if request.client and request.client.host else "unknown"
client_port = request.client.port if request.client and request.client.port else "unknown"
http_method = request.method if request.method else "unknown"
http_version = request.scope["http_version"]
# Recreate the Uvicorn access log format, but add all parameters as structured information
access_logger.info(
f"""{client_host}:{client_port} - "{http_method} {url} HTTP/{http_version}" {status_code}""",
http={
"url": str(request.url),
"status_code": status_code,
"method": http_method,
"request_id": request_id,
"version": http_version,
},
network={"client": {"ip": client_host, "port": client_port}},
duration=process_time,
)
response.headers["X-Process-Time"] = str(process_time / 10**9)
return response


# This middleware must be placed after the logging, to populate the context with the request ID
# NOTE: Why last??
# Answer: middlewares are applied in the reverse order of when they are added (you can verify this
# by debugging `app.middleware_stack` and recursively drilling down the `app` property).
app.add_middleware(CorrelationIdMiddleware)

tracing_middleware = next((m for m in app.user_middleware if m.cls == TraceMiddleware), None)
if tracing_middleware is not None:
app.user_middleware = [m for m in app.user_middleware if m.cls != TraceMiddleware]
structlog.stdlib.get_logger("api.datadog_patch").info("Patching Datadog tracing middleware to be the outermost middleware...")
app.user_middleware.insert(0, tracing_middleware)
app.middleware_stack = app.build_middleware_stack()


# Models
class Status(BaseModel):
status: Literal["ok", "error"]
Expand Down Expand Up @@ -134,7 +196,7 @@ async def create_chat_completion(
return ChatResponse(reply=response, fake=body.fake)

except Exception as e:
logger.exception(e)
logger.exception("Error processing chat completion")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=InternalServerError(exception_object=e).model_dump()
) from e
Expand All @@ -143,7 +205,7 @@ async def create_chat_completion(
@app.get(
"/health",
response_model=Status,
summary="LB Health Check",
summary="Load Balancer Health Check",
description="Check the health and version of the API.",
include_in_schema=False,
)
Expand All @@ -154,10 +216,14 @@ async def get_health(request: Request) -> Status:

- **returns**: A Status containing the version of the API.
"""
logger.debug(f"{request.method} {request.url.path}")
if request.url.path == "/health":
pass # This is a health check from the load balancer
else:
logger.info(f"{request.method} {request.url.path}", extra={"requestMethod": request.method, "requestPath": request.url.path})
return Status(status="ok", version=settings.dd_version)


@tracer.wrap()
@app.get("/api/timeout", response_model=TimeoutResponse)
async def timeout_endpoint(request: Request, seconds: conint(ge=1, le=300) = Query(..., description="Number of seconds to wait")): # type: ignore # noqa: B008
"""
Expand Down
Loading
Loading