Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Add task_name, task_function_name and actor_name in Structured Logging #48703

Merged
merged 19 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/ray/_private/ray_logging/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class LogKey(str, Enum):
NODE_ID = "node_id"
ACTOR_ID = "actor_id"
TASK_ID = "task_id"
ACTOR_NAME = "actor_name"
TASK_NAME = "task_name"
TASK_FUNCTION = "task_function"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need task_function? I thought we only need task_name which should be function name by default?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. By default, in general, we the task name as the function name.

I still see 2 differences between task_name & task_function. That's why I kept them both.

  1. It is possible to set a customized task name by specifying name in options, but task function will always be the function call name
  2. The specific string in task_function will contain the python module name comparing to default task_name will only contain the function itself.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alanwguo do we need function name or task name is enough for now. I feel having task_name is enough.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen task name can explode in cardinality but function name is a bit more stable. Having both could be useful where we can decide which to use depending on performance characteristics. For example if task_name is too high cardinality, we can choose to use func name exclusively.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: task_function_name. If you think it's too long, we can also do task_func_name

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synced offline. Changing to function name will implicitly indicate the parameter type and will be helpful to understand the field.


# Logger built-in context
ASCTIME = "asctime"
Expand Down
9 changes: 9 additions & 0 deletions python/ray/_private/ray_logging/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,13 @@ def filter(self, record):
task_id = runtime_context.get_task_id()
if task_id is not None:
setattr(record, LogKey.TASK_ID.value, task_id)
task_name = runtime_context.get_task_name()
if task_name is not None:
setattr(record, LogKey.TASK_NAME.value, task_name)
task_function = runtime_context.get_task_function()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_task_function_name

get_task_function makes me feel it might return the FunctionDescriptor

if task_function is not None:
setattr(record, LogKey.TASK_FUNCTION.value, task_function)
actor_name = runtime_context.get_actor_name()
if actor_name is not None:
setattr(record, LogKey.ACTOR_NAME.value, actor_name)
return True
8 changes: 8 additions & 0 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,14 @@ def actor_name(self):
def current_task_id(self):
return self.core_worker.get_current_task_id()

@property
def current_task_name(self):
return self.core_worker.get_current_task_name()

@property
def current_task_function(self):
return self.core_worker.get_current_task_function()

@property
def current_node_id(self):
return self.core_worker.get_current_node_id()
Expand Down
54 changes: 52 additions & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ cdef optional[ObjectIDIndexType] NULL_PUT_INDEX = nullopt
# https://docs.python.org/3/library/contextvars.html#contextvars.ContextVar
# It is thread-safe.
async_task_id = contextvars.ContextVar('async_task_id', default=None)
async_task_name = contextvars.ContextVar('async_task_name', default=None)
async_task_function = contextvars.ContextVar('async_task_function', default=None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task_function_name



class DynamicObjectRefGenerator:
Expand Down Expand Up @@ -1800,7 +1802,8 @@ cdef void execute_task(
return core_worker.run_async_func_or_coro_in_event_loop(
async_function, function_descriptor,
name_of_concurrency_group_to_execute, task_id=task_id,
func_args=(actor, *arguments), func_kwargs=kwarguments)
task_name=task_name, func_args=(actor, *arguments),
func_kwargs=kwarguments)

return function(actor, *arguments, **kwarguments)

Expand Down Expand Up @@ -1912,7 +1915,8 @@ cdef void execute_task(
execute_streaming_generator_async(context),
function_descriptor,
name_of_concurrency_group_to_execute,
task_id=task_id)
task_id=task_id,
task_name=task_name)
else:
execute_streaming_generator_sync(context)

Expand Down Expand Up @@ -3400,6 +3404,48 @@ cdef class CoreWorker:
with nogil:
CCoreWorkerProcess.GetCoreWorker().Exit(c_exit_type, detail, null_ptr)

def get_current_task_name(self) -> str:
"""Return the current task name.

If it is a normal task, it returns the task name from the main thread.
If it is a threaded actor, it returns the task name for the current thread.
If it is async actor, it returns the task name stored in contextVar for
the current asyncio task.
"""
# We can only obtain the correct task name within asyncio task
# via async_task_name contextvar. We try this first.
# It is needed because the core Worker's GetCurrentTask API
# doesn't have asyncio context, thus it cannot return the
# correct task name.
task_name = async_task_name.get()
if task_name is None:
# if it is not within asyncio context, fallback to TaskName
# obtainable from core worker.
task_name = CCoreWorkerProcess.GetCoreWorker().GetCurrentTaskName() \
.decode("utf-8")
return task_name

def get_current_task_function(self) -> str:
"""Return the current task function.

If it is a normal task, it returns the task function from the main thread.
If it is a threaded actor, it returns the task function for the current thread.
If it is async actor, it returns the task function stored in contextVar for
the current asyncio task.
"""
# We can only obtain the correct task function within asyncio task
# via async_task_function contextvar. We try this first.
# It is needed because the core Worker's GetCurrentTask API
# doesn't have asyncio context, thus it cannot return the
# correct task function.
task_function = async_task_function.get()
if task_function is None:
# if it is not within asyncio context, fallback to TaskName
# obtainable from core worker.
task_function = CCoreWorkerProcess.GetCoreWorker() \
.GetCurrentTaskFunction().decode("utf-8")
return task_function

def get_current_task_id(self) -> TaskID:
"""Return the current task ID.

Expand Down Expand Up @@ -4796,6 +4842,7 @@ cdef class CoreWorker:
specified_cgname: str,
*,
task_id: Optional[TaskID] = None,
task_name: Optional[str] = None,
func_args: Optional[Tuple] = None,
func_kwargs: Optional[Dict] = None,
):
Expand Down Expand Up @@ -4842,6 +4889,9 @@ cdef class CoreWorker:
try:
if task_id:
async_task_id.set(task_id)
if task_name:
async_task_name.set(task_name)
async_task_function.set(function_descriptor.repr)

if inspect.isawaitable(func_or_coro):
coroutine = func_or_coro
Expand Down
2 changes: 2 additions & 0 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:

CJobID GetCurrentJobId()
CTaskID GetCurrentTaskId()
const c_string GetCurrentTaskName()
const c_string GetCurrentTaskFunction()
void UpdateTaskIsDebuggerPaused(
const CTaskID &task_id,
const c_bool is_debugger_paused)
Expand Down
106 changes: 96 additions & 10 deletions python/ray/runtime_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def get_worker_id(self) -> str:
@property
@Deprecated(message="Use get_task_id() instead", warning=True)
def task_id(self):
"""Get current task ID for this worker or driver.
"""Get current task ID for this worker.

Task ID is the id of a Ray task.
This shouldn't be used in a driver process.
Expand Down Expand Up @@ -155,7 +155,7 @@ def f():
Returns:
The current worker's task id. None if there's no task id.
"""
# only worker mode has actor_id
# only worker mode has task_id
assert (
self.worker.mode == ray._private.worker.WORKER_MODE
), f"This method is only available when the process is a\
Expand All @@ -165,7 +165,7 @@ def f():
return task_id if not task_id.is_nil() else None

def get_task_id(self) -> Optional[str]:
"""Get current task ID for this worker or driver.
"""Get current task ID for this worker.

Task ID is the id of a Ray task. The ID will be in hex format.
This shouldn't be used in a driver process.
Expand Down Expand Up @@ -201,7 +201,7 @@ def get_task_id():
Returns:
The current worker's task id in hex. None if there's no task id.
"""
# only worker mode has actor_id
# only worker mode has task_id
if self.worker.mode != ray._private.worker.WORKER_MODE:
logger.warning(
"This method is only available when the process is a "
Expand All @@ -212,12 +212,98 @@ def get_task_id():
return task_id.hex() if not task_id.is_nil() else None

def _get_current_task_id(self) -> TaskID:
async_task_id = ray._raylet.async_task_id.get()
if async_task_id is None:
task_id = self.worker.current_task_id
else:
task_id = async_task_id
return task_id
return self.worker.current_task_id

def get_task_name(self) -> Optional[str]:
"""Get current task name for this worker.

Task name by default is the task's funciton call string. It can also be
specified in options when triggering a task.

Example:

.. testcode::
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add an example for the async case?


import ray

@ray.remote
class Actor:
def get_task_name(self):
return ray.get_runtime_context().get_task_name()

@ray.remote
def get_task_name():
return ray.get_runtime_context().get_task_name()

a = Actor.remote()
# Task names are available for actor tasks.
print(ray.get(a.get_task_name.remote()))
# Task names are available for normal tasks.
# Get default task name
print(ray.get(get_task_name.remote()))
# Get specified task name
print(ray.get(get_task_name.options(name="task_name").remote()))

.. testoutput::
:options: +MOCK

Actor.get_task_name
get_task_name
task_nams

Returns:
The current worker's task name
"""
# only worker mode has task_name
if self.worker.mode != ray._private.worker.WORKER_MODE:
logger.warning(
"This method is only available when the process is a "
f"worker. Current mode: {self.worker.mode}"
)
return None
return self.worker.current_task_name

def get_task_function(self) -> Optional[str]:
"""Get current task function string for this worker.

Example:

.. testcode::
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add an example for the async case?


import ray

@ray.remote
class Actor:
def get_task_function(self):
return ray.get_runtime_context().get_task_function()

@ray.remote
def get_task_function():
return ray.get_runtime_context().get_task_function()

a = Actor.remote()
# Task functions are available for actor tasks.
print(ray.get(a.get_task_function.remote()))
# Task functions are available for normal tasks.
print(ray.get(get_task_function.remote()))

.. testoutput::
:options: +MOCK

[python modual name].Actor.get_task_name
[python modual name].get_task_name

Returns:
The current worker's task function call string
"""
# only worker mode has task_function
if self.worker.mode != ray._private.worker.WORKER_MODE:
logger.warning(
"This method is only available when the process is a "
f"worker. Current mode: {self.worker.mode}"
)
return None
return self.worker.current_task_function

@property
@Deprecated(message="Use get_actor_id() instead", warning=True)
Expand Down
12 changes: 12 additions & 0 deletions python/ray/serve/tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ def fn(*args):
"actor_id": ray.get_runtime_context().get_actor_id(),
"worker_id": ray.get_runtime_context().get_worker_id(),
"node_id": ray.get_runtime_context().get_node_id(),
"task_name": ray.get_runtime_context().get_task_name(),
"task_function": ray.get_runtime_context().get_task_function(),
"actor_name": ray.get_runtime_context().get_actor_name(),
}

@serve.deployment(
Expand All @@ -276,6 +279,9 @@ def __call__(self, req: starlette.requests.Request):
"actor_id": ray.get_runtime_context().get_actor_id(),
"worker_id": ray.get_runtime_context().get_worker_id(),
"node_id": ray.get_runtime_context().get_node_id(),
"task_name": ray.get_runtime_context().get_task_name(),
"task_function": ray.get_runtime_context().get_task_function(),
"actor_name": ray.get_runtime_context().get_actor_name(),
}

serve.run(fn.bind(), name="app1", route_prefix="/fn")
Expand Down Expand Up @@ -326,6 +332,9 @@ def check_log():
f'"worker_id": "{resp["worker_id"]}", '
f'"node_id": "{resp["node_id"]}", '
f'"actor_id": "{resp["actor_id"]}", '
f'"task_name": "{resp["task_name"]}", '
f'"task_function": "{resp["task_function"]}", '
f'"actor_name": "{resp["actor_name"]}", '
f'"deployment": "{resp["app_name"]}_fn", '
f'"replica": "{method_replica_id}", '
f'"component_name": "replica".*'
Expand All @@ -338,6 +347,9 @@ def check_log():
f'"worker_id": "{resp2["worker_id"]}", '
f'"node_id": "{resp2["node_id"]}", '
f'"actor_id": "{resp2["actor_id"]}", '
f'"task_name": "{resp2["task_name"]}", '
f'"task_function": "{resp2["task_function"]}", '
f'"actor_name": "{resp2["actor_name"]}", '
f'"deployment": "{resp2["app_name"]}_Model", '
f'"replica": "{class_method_replica_id}", '
f'"component_name": "replica".*'
Expand Down
6 changes: 6 additions & 0 deletions python/ray/tests/test_logging_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@ def f():
"worker_id": runtime_context.get_worker_id(),
"node_id": runtime_context.get_node_id(),
"task_id": runtime_context.get_task_id(),
"task_name": runtime_context.get_task_name(),
"task_function": runtime_context.get_task_function(),
}
for attr in should_exist:
assert hasattr(record, attr)
assert getattr(record, attr) == expected_values[attr]
assert not hasattr(record, "actor_id")
assert not hasattr(record, "actor_name")

obj_ref = f.remote()
ray.get(obj_ref)
Expand All @@ -77,7 +80,10 @@ def f(self):
"worker_id": runtime_context.get_worker_id(),
"node_id": runtime_context.get_node_id(),
"actor_id": runtime_context.get_actor_id(),
"actor_name": runtime_context.get_actor_name(),
"task_id": runtime_context.get_task_id(),
"task_name": runtime_context.get_task_name(),
"task_function": runtime_context.get_task_function(),
}
for attr in should_exist:
assert hasattr(record, attr)
Expand Down
Loading