Skip to content

Commit

Permalink
[Core] Add task_name, task_function_name and actor_name in Structured…
Browse files Browse the repository at this point in the history
… Logging (ray-project#48703)

Signed-off-by: Mengjin Yan <[email protected]>
Signed-off-by: hjiang <[email protected]>
  • Loading branch information
MengjinYan authored and dentiny committed Dec 7, 2024
1 parent d185452 commit 31402c6
Show file tree
Hide file tree
Showing 10 changed files with 350 additions and 12 deletions.
3 changes: 3 additions & 0 deletions python/ray/_private/ray_logging/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class LogKey(str, Enum):
NODE_ID = "node_id"
ACTOR_ID = "actor_id"
TASK_ID = "task_id"
ACTOR_NAME = "actor_name"
TASK_NAME = "task_name"
TASK_FUNCTION_NAME = "task_func_name"

# Logger built-in context
ASCTIME = "asctime"
Expand Down
9 changes: 9 additions & 0 deletions python/ray/_private/ray_logging/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,13 @@ def filter(self, record):
task_id = runtime_context.get_task_id()
if task_id is not None:
setattr(record, LogKey.TASK_ID.value, task_id)
task_name = runtime_context.get_task_name()
if task_name is not None:
setattr(record, LogKey.TASK_NAME.value, task_name)
task_function_name = runtime_context.get_task_function_name()
if task_function_name is not None:
setattr(record, LogKey.TASK_FUNCTION_NAME.value, task_function_name)
actor_name = runtime_context.get_actor_name()
if actor_name is not None:
setattr(record, LogKey.ACTOR_NAME.value, actor_name)
return True
8 changes: 8 additions & 0 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,14 @@ def actor_name(self):
def current_task_id(self):
return self.core_worker.get_current_task_id()

@property
def current_task_name(self):
return self.core_worker.get_current_task_name()

@property
def current_task_function_name(self):
return self.core_worker.get_current_task_function_name()

@property
def current_node_id(self):
return self.core_worker.get_current_node_id()
Expand Down
55 changes: 53 additions & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ cdef optional[ObjectIDIndexType] NULL_PUT_INDEX = nullopt
# https://docs.python.org/3/library/contextvars.html#contextvars.ContextVar
# It is thread-safe.
async_task_id = contextvars.ContextVar('async_task_id', default=None)
async_task_name = contextvars.ContextVar('async_task_name', default=None)
async_task_function_name = contextvars.ContextVar('async_task_function_name',
default=None)


class DynamicObjectRefGenerator:
Expand Down Expand Up @@ -1815,7 +1818,8 @@ cdef void execute_task(
return core_worker.run_async_func_or_coro_in_event_loop(
async_function, function_descriptor,
name_of_concurrency_group_to_execute, task_id=task_id,
func_args=(actor, *arguments), func_kwargs=kwarguments)
task_name=task_name, func_args=(actor, *arguments),
func_kwargs=kwarguments)

return function(actor, *arguments, **kwarguments)

Expand Down Expand Up @@ -1927,7 +1931,8 @@ cdef void execute_task(
execute_streaming_generator_async(context),
function_descriptor,
name_of_concurrency_group_to_execute,
task_id=task_id)
task_id=task_id,
task_name=task_name)
else:
execute_streaming_generator_sync(context)

Expand Down Expand Up @@ -3415,6 +3420,48 @@ cdef class CoreWorker:
with nogil:
CCoreWorkerProcess.GetCoreWorker().Exit(c_exit_type, detail, null_ptr)

def get_current_task_name(self) -> str:
"""Return the current task name.

If it is a normal task, it returns the task name from the main thread.
If it is a threaded actor, it returns the task name for the current thread.
If it is async actor, it returns the task name stored in contextVar for
the current asyncio task.
"""
# We can only obtain the correct task name within asyncio task
# via async_task_name contextvar. We try this first.
# It is needed because the core worker's GetCurrentTask API
# doesn't have asyncio context, thus it cannot return the
# correct task name.
task_name = async_task_name.get()
if task_name is None:
# if it is not within asyncio context, fallback to TaskName
# obtainable from core worker.
task_name = CCoreWorkerProcess.GetCoreWorker().GetCurrentTaskName() \
.decode("utf-8")
return task_name

def get_current_task_function_name(self) -> str:
"""Return the current task function.

If it is a normal task, it returns the task function from the main thread.
If it is a threaded actor, it returns the task function for the current thread.
If it is async actor, it returns the task function stored in contextVar for
the current asyncio task.
"""
# We can only obtain the correct task function within asyncio task
# via async_task_function_name contextvar. We try this first.
# It is needed because the core Worker's GetCurrentTask API
# doesn't have asyncio context, thus it cannot return the
# correct task function.
task_function_name = async_task_function_name.get()
if task_function_name is None:
# if it is not within asyncio context, fallback to TaskName
# obtainable from core worker.
task_function_name = CCoreWorkerProcess.GetCoreWorker() \
.GetCurrentTaskFunctionName().decode("utf-8")
return task_function_name

def get_current_task_id(self) -> TaskID:
"""Return the current task ID.

Expand Down Expand Up @@ -4822,6 +4869,7 @@ cdef class CoreWorker:
specified_cgname: str,
*,
task_id: Optional[TaskID] = None,
task_name: Optional[str] = None,
func_args: Optional[Tuple] = None,
func_kwargs: Optional[Dict] = None,
):
Expand Down Expand Up @@ -4868,6 +4916,9 @@ cdef class CoreWorker:
try:
if task_id:
async_task_id.set(task_id)
if task_name is not None:
async_task_name.set(task_name)
async_task_function_name.set(function_descriptor.repr)

if inspect.isawaitable(func_or_coro):
coroutine = func_or_coro
Expand Down
2 changes: 2 additions & 0 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:

CJobID GetCurrentJobId()
CTaskID GetCurrentTaskId()
const c_string GetCurrentTaskName()
const c_string GetCurrentTaskFunctionName()
void UpdateTaskIsDebuggerPaused(
const CTaskID &task_id,
const c_bool is_debugger_paused)
Expand Down
124 changes: 114 additions & 10 deletions python/ray/runtime_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def get_worker_id(self) -> str:
@property
@Deprecated(message="Use get_task_id() instead", warning=True)
def task_id(self):
"""Get current task ID for this worker or driver.
"""Get current task ID for this worker.
Task ID is the id of a Ray task.
This shouldn't be used in a driver process.
Expand Down Expand Up @@ -155,7 +155,7 @@ def f():
Returns:
The current worker's task id. None if there's no task id.
"""
# only worker mode has actor_id
# only worker mode has task_id
assert (
self.worker.mode == ray._private.worker.WORKER_MODE
), f"This method is only available when the process is a\
Expand All @@ -165,7 +165,7 @@ def f():
return task_id if not task_id.is_nil() else None

def get_task_id(self) -> Optional[str]:
"""Get current task ID for this worker or driver.
"""Get current task ID for this worker.
Task ID is the id of a Ray task. The ID will be in hex format.
This shouldn't be used in a driver process.
Expand Down Expand Up @@ -201,7 +201,7 @@ def get_task_id():
Returns:
The current worker's task id in hex. None if there's no task id.
"""
# only worker mode has actor_id
# only worker mode has task_id
if self.worker.mode != ray._private.worker.WORKER_MODE:
logger.warning(
"This method is only available when the process is a "
Expand All @@ -212,12 +212,116 @@ def get_task_id():
return task_id.hex() if not task_id.is_nil() else None

def _get_current_task_id(self) -> TaskID:
async_task_id = ray._raylet.async_task_id.get()
if async_task_id is None:
task_id = self.worker.current_task_id
else:
task_id = async_task_id
return task_id
return self.worker.current_task_id

def get_task_name(self) -> Optional[str]:
"""Get current task name for this worker.
Task name by default is the task's funciton call string. It can also be
specified in options when triggering a task.
Example:
.. testcode::
import ray
@ray.remote
class Actor:
def get_task_name(self):
return ray.get_runtime_context().get_task_name()
@ray.remote
class AsyncActor:
async def get_task_name(self):
return ray.get_runtime_context().get_task_name()
@ray.remote
def get_task_name():
return ray.get_runtime_context().get_task_name()
a = Actor.remote()
b = AsyncActor.remote()
# Task names are available for actor tasks.
print(ray.get(a.get_task_name.remote()))
# Task names are avaiable for async actor tasks.
print(ray.get(b.get_task_name.remote()))
# Task names are available for normal tasks.
# Get default task name
print(ray.get(get_task_name.remote()))
# Get specified task name
print(ray.get(get_task_name.options(name="task_name").remote()))
.. testoutput::
:options: +MOCK
Actor.get_task_name
AsyncActor.get_task_name
get_task_name
task_nams
Returns:
The current worker's task name
"""
# only worker mode has task_name
if self.worker.mode != ray._private.worker.WORKER_MODE:
logger.warning(
"This method is only available when the process is a "
f"worker. Current mode: {self.worker.mode}"
)
return None
return self.worker.current_task_name

def get_task_function_name(self) -> Optional[str]:
"""Get current task function name string for this worker.
Example:
.. testcode::
import ray
@ray.remote
class Actor:
def get_task_function_name(self):
return ray.get_runtime_context().get_task_function_name()
@ray.remote
class AsyncActor:
async def get_task_function_name(self):
return ray.get_runtime_context().get_task_function_name()
@ray.remote
def get_task_function_name():
return ray.get_runtime_context().get_task_function_name()
a = Actor.remote()
b = AsyncActor.remote()
# Task functions are available for actor tasks.
print(ray.get(a.get_task_function_name.remote()))
# Task functions are available for async actor tasks.
print(ray.get(b.get_task_function_name.remote()))
# Task functions are available for normal tasks.
print(ray.get(get_task_function_name.remote()))
.. testoutput::
:options: +MOCK
[python modual name].Actor.get_task_function_name
[python modual name].AsyncActor.get_task_function_name
[python modual name].get_task_function_name
Returns:
The current worker's task function call string
"""
# only worker mode has task_function_name
if self.worker.mode != ray._private.worker.WORKER_MODE:
logger.warning(
"This method is only available when the process is a "
f"worker. Current mode: {self.worker.mode}"
)
return None
return self.worker.current_task_function_name

@property
@Deprecated(message="Use get_actor_id() instead", warning=True)
Expand Down
12 changes: 12 additions & 0 deletions python/ray/serve/tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,9 @@ def fn(*args):
"actor_id": ray.get_runtime_context().get_actor_id(),
"worker_id": ray.get_runtime_context().get_worker_id(),
"node_id": ray.get_runtime_context().get_node_id(),
"task_name": ray.get_runtime_context().get_task_name(),
"task_func_name": ray.get_runtime_context().get_task_function_name(),
"actor_name": ray.get_runtime_context().get_actor_name(),
}

@serve.deployment(
Expand All @@ -369,6 +372,9 @@ def __call__(self, req: starlette.requests.Request):
"actor_id": ray.get_runtime_context().get_actor_id(),
"worker_id": ray.get_runtime_context().get_worker_id(),
"node_id": ray.get_runtime_context().get_node_id(),
"task_name": ray.get_runtime_context().get_task_name(),
"task_func_name": ray.get_runtime_context().get_task_function_name(),
"actor_name": ray.get_runtime_context().get_actor_name(),
}

serve.run(fn.bind(), name="app1", route_prefix="/fn")
Expand Down Expand Up @@ -418,6 +424,9 @@ def check_log():
f'"worker_id": "{resp["worker_id"]}", '
f'"node_id": "{resp["node_id"]}", '
f'"actor_id": "{resp["actor_id"]}", '
f'"task_name": "{resp["task_name"]}", '
f'"task_func_name": "{resp["task_func_name"]}", '
f'"actor_name": "{resp["actor_name"]}", '
f'"deployment": "{resp["app_name"]}_fn", '
f'"replica": "{method_replica_id}", '
f'"component_name": "replica".*'
Expand All @@ -430,6 +439,9 @@ def check_log():
f'"worker_id": "{resp2["worker_id"]}", '
f'"node_id": "{resp2["node_id"]}", '
f'"actor_id": "{resp2["actor_id"]}", '
f'"task_name": "{resp2["task_name"]}", '
f'"task_func_name": "{resp2["task_func_name"]}", '
f'"actor_name": "{resp2["actor_name"]}", '
f'"deployment": "{resp2["app_name"]}_Model", '
f'"replica": "{class_method_replica_id}", '
f'"component_name": "replica".*'
Expand Down
6 changes: 6 additions & 0 deletions python/ray/tests/test_logging_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@ def f():
"worker_id": runtime_context.get_worker_id(),
"node_id": runtime_context.get_node_id(),
"task_id": runtime_context.get_task_id(),
"task_name": runtime_context.get_task_name(),
"task_func_name": runtime_context.get_task_function_name(),
}
for attr in should_exist:
assert hasattr(record, attr)
assert getattr(record, attr) == expected_values[attr]
assert not hasattr(record, "actor_id")
assert not hasattr(record, "actor_name")

obj_ref = f.remote()
ray.get(obj_ref)
Expand All @@ -77,7 +80,10 @@ def f(self):
"worker_id": runtime_context.get_worker_id(),
"node_id": runtime_context.get_node_id(),
"actor_id": runtime_context.get_actor_id(),
"actor_name": runtime_context.get_actor_name(),
"task_id": runtime_context.get_task_id(),
"task_name": runtime_context.get_task_name(),
"task_func_name": runtime_context.get_task_function_name(),
}
for attr in should_exist:
assert hasattr(record, attr)
Expand Down
Loading

0 comments on commit 31402c6

Please sign in to comment.