Skip to content

Commit

Permalink
Contextual logging
Browse files Browse the repository at this point in the history
  • Loading branch information
mjh1 committed Feb 7, 2025
1 parent 6a9a95e commit 2280845
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 22 deletions.
29 changes: 25 additions & 4 deletions runner/app/live/infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from streamer.protocol.zeromq import ZeroMQProtocol


async def main(*, http_port: int, stream_protocol: str, subscribe_url: str, publish_url: str, control_url: str, events_url: str, pipeline: str, params: dict, input_timeout: int):
async def main(*, http_port: int, stream_protocol: str, subscribe_url: str, publish_url: str, control_url: str, events_url: str, pipeline: str, params: dict, input_timeout: int, request_id: str):
if stream_protocol == "trickle":
protocol = TrickleProtocol(subscribe_url, publish_url, control_url, events_url)
elif stream_protocol == "zeromq":
Expand All @@ -32,7 +32,7 @@ async def main(*, http_port: int, stream_protocol: str, subscribe_url: str, publ
else:
raise ValueError(f"Unsupported protocol: {stream_protocol}")

streamer = PipelineStreamer(protocol, pipeline, input_timeout, params or {})
streamer = PipelineStreamer(protocol, pipeline, input_timeout, params or {}, request_id)

api = None
try:
Expand Down Expand Up @@ -108,6 +108,12 @@ def signal_handler(sig, _):
action="store_true",
help="Enable verbose (debug) logging"
)
parser.add_argument(
"--request-id",
type=str,
default="",
help="The Livepeer request ID associated with this video stream"
)
args = parser.parse_args()
try:
params = json.loads(args.initial_params)
Expand All @@ -117,12 +123,26 @@ def signal_handler(sig, _):

log_level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(
format='%(asctime)s %(levelname)-8s %(message)s',
format='%(asctime)s %(levelname)-8s request_id=' + args.request_id + ' %(message)s',
level=log_level,
datefmt='%Y-%m-%d %H:%M:%S')
if args.verbose:
os.environ['VERBOSE_LOGGING'] = '1' # enable verbose logging in subprocesses

# formatter = logging.Formatter(
# '%(asctime)s %(levelname)-8s request_id=%(request_id)s %(message)s',
# defaults={"request_id": ""},
# datefmt='%Y-%m-%d %H:%M:%S'
# )
# handler = logging.StreamHandler()
# handler.setFormatter(formatter)
# handler.setLevel(log_level)
# logger = logging.getLogger() # Root logger
# logger.setLevel(log_level)
# logger.addHandler(handler)
# logger.addFilter(ContextFilter()) # Attach the filter to all loggers
# logging_config.request_id = args.request_id

try:
asyncio.run(
main(
Expand All @@ -134,7 +154,8 @@ def signal_handler(sig, _):
events_url=args.events_url,
pipeline=args.pipeline,
params=params,
input_timeout=args.input_timeout
input_timeout=args.input_timeout,
request_id=args.request_id
)
)
# We force an exit here to ensure that the process terminates. If any asyncio tasks or
Expand Down
11 changes: 6 additions & 5 deletions runner/app/live/streamer/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@

class PipelineProcess:
@staticmethod
def start(pipeline_name: str, params: dict):
instance = PipelineProcess(pipeline_name)
def start(pipeline_name: str, params: dict, request_id: str):
instance = PipelineProcess(pipeline_name, request_id)
if params:
instance.update_params(params)
instance.process.start()
instance.start_time = time.time()
return instance

def __init__(self, pipeline_name: str):
def __init__(self, pipeline_name: str, request_id: str):
self.pipeline_name = pipeline_name
self.ctx = mp.get_context("spawn")

Expand All @@ -36,6 +36,7 @@ def __init__(self, pipeline_name: str):
self.done = self.ctx.Event()
self.process = self.ctx.Process(target=self.process_loop, args=())
self.start_time = 0
self.request_id = request_id

async def stop(self):
await asyncio.to_thread(self._stop_sync)
Expand Down Expand Up @@ -160,12 +161,12 @@ def _setup_logging(self):

level = logging.DEBUG if os.environ.get('VERBOSE_LOGGING') == '1' else logging.INFO
logging.basicConfig(
format='%(asctime)s %(levelname)-8s %(message)s',
format='%(asctime)s %(levelname)-8s request_id=' + self.request_id + ' %(message)s',
level=level,
datefmt='%Y-%m-%d %H:%M:%S')

queue_handler = LogQueueHandler(self)
queue_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)-8s %(message)s'))
queue_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)-8s request_id=' + self.request_id + ' %(message)s'))
logging.getLogger().addHandler(queue_handler)

# Tee stdout and stderr to our log queue while preserving original output
Expand Down
7 changes: 4 additions & 3 deletions runner/app/live/streamer/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
status_report_interval = 10

class PipelineStreamer:
def __init__(self, protocol: StreamProtocol, pipeline: str, input_timeout: int, params: dict):
def __init__(self, protocol: StreamProtocol, pipeline: str, input_timeout: int, params: dict, request_id: str):
self.protocol = protocol
self.pipeline = pipeline
self.params = params
Expand All @@ -32,13 +32,14 @@ def __init__(self, protocol: StreamProtocol, pipeline: str, input_timeout: int,

self.main_tasks: list[asyncio.Task] = []
self.tasks_supervisor_task: asyncio.Task | None = None
self.request_id = request_id

async def start(self):
if self.process:
raise RuntimeError("Streamer already started")

self.stop_event.clear()
self.process = PipelineProcess.start(self.pipeline, self.params)
self.process = PipelineProcess.start(self.pipeline, self.params, self.request_id)
await self.protocol.start()

# We need a bunch of concurrent tasks to run the streamer. So we start them all in background and then also start
Expand Down Expand Up @@ -99,7 +100,7 @@ async def _restart_process(self):
# don't call the full start/stop methods since we only want to restart the process
await self.process.stop()

self.process = PipelineProcess.start(self.pipeline, self.params)
self.process = PipelineProcess.start(self.pipeline, self.params, self.request_id)
self.status.inference_status.restart_count += 1
self.status.inference_status.last_restart_time = time.time()
self.status.inference_status.last_restart_logs = restart_logs
Expand Down
25 changes: 25 additions & 0 deletions runner/app/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import contextvars
import logging

# Create a context variable to store request_id per request
request_id_var = contextvars.ContextVar("request_id", default="")

class ContextFilter(logging.Filter):
"""Logging filter to add request_id from contextvars."""
def filter(self, record):
record.__dict__.setdefault("request_id", request_id_var.get())
return True

def config_logging():
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - request_id=%(request_id)s %(message)s",
defaults={"request_id": ""},
datefmt='%Y-%m-%d %H:%M:%S'
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
handler.setLevel(logging.INFO)
logger = logging.getLogger() # Root logger
logger.setLevel(logging.INFO)
logger.addHandler(handler)
logger.addFilter(ContextFilter()) # Attach the filter to all loggers
9 changes: 1 addition & 8 deletions runner/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from fastapi import FastAPI
from fastapi.routing import APIRoute
from app.utils.hardware import HardwareInfo
from app.log import config_logging

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -132,14 +133,6 @@ def load_route(pipeline: str) -> any:
raise EnvironmentError(f"{pipeline} is not a valid pipeline")


def config_logging():
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO,
force=True,
)


def use_route_names_as_operation_ids(app: FastAPI) -> None:
for route in app.routes:
if isinstance(route, APIRoute):
Expand Down
4 changes: 3 additions & 1 deletion runner/app/pipelines/live_video_to_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from app.pipelines.base import Pipeline, HealthCheck
from app.pipelines.utils import get_model_dir, get_torch_device
from app.utils.errors import InferenceError
from app.routes.live_video_to_video import request_id_var

logger = logging.getLogger(__name__)

Expand All @@ -37,7 +38,7 @@ def __call__( # type: ignore
raise RuntimeError("Pipeline already running")

try:
logger.info(f"Starting stream, subscribe={subscribe_url} publish={publish_url}, control={control_url}, events={events_url}")
logging.info(f"Starting stream, subscribe={subscribe_url} publish={publish_url}, control={control_url}, events={events_url}")
self.start_process(
pipeline=self.model_id, # we use the model_id as the pipeline name for now
http_port=8888,
Expand All @@ -46,6 +47,7 @@ def __call__( # type: ignore
control_url=control_url,
events_url=events_url,
initial_params=json.dumps(params),
request_id=request_id_var.get(),
# TODO: set torch device from self.torch_device
)
return
Expand Down
5 changes: 4 additions & 1 deletion runner/app/routes/live_video_to_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
http_error,
handle_pipeline_exception,
)
from fastapi import APIRouter, Depends, status
from fastapi import APIRouter, Depends, status, Header
from fastapi.responses import JSONResponse
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from pydantic import BaseModel, Field
from app.log import request_id_var

router = APIRouter()

Expand Down Expand Up @@ -108,7 +109,9 @@ async def live_video_to_video(
params: LiveVideoToVideoParams,
pipeline: Pipeline = Depends(get_pipeline),
token: HTTPAuthorizationCredentials = Depends(HTTPBearer(auto_error=False)),
requestID: str = Header(None),
):
request_id_var.set(requestID)
auth_token = os.environ.get("AUTH_TOKEN")
if auth_token:
if not token or token.credentials != auth_token:
Expand Down

0 comments on commit 2280845

Please sign in to comment.