-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Split block and evict task #8
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you separate the threshold changes into a separate PR, so this PR is just the change to add an option for blocking/evicting tasks? We can sync in person on how to implement the threshold.
src/ray/common/ray_config_def.h
Outdated
RAY_CONFIG(bool, enable_EvictTasks, false) | ||
|
||
// Whether to use EvictTasks when spill required | ||
RAY_CONFIG(bool, enable_BlockandEvictTasks, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remove this? I think it's confusing and unnecessary with the other two flags.
return Status::TransientObjectStoreFull("Waiting for higher priority tasks to finish"); | ||
} | ||
RAY_LOG(INFO) << "[JAE_DEBUG] should_spill set"; | ||
SetShouldSpill(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should set should_spill_ here, right? We should allow the callbacks to set this later on. What was the reason for setting it?
@@ -214,7 +256,9 @@ Status CreateRequestQueue::ProcessRequests() { | |||
|
|||
// If we make it here, then there is nothing left in the queue. It's safe to | |||
// run new tasks again. | |||
RAY_UNUSED(on_object_creation_blocked_callback_(ray::Priority())); | |||
if(!RayConfig::instance().enable_BlockandEvictTasks() || RayConfig::instance().enable_BlockTasks()){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is right. Shouldn't we call the callback when we are back under the threshold, inside the main loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- ProcessRequest() will set if we are under or over the threshold.
- If we are over the threshold, we should call BlockTasks() on the lower priority object currently in the object store.
- If we are under the threshold, then we should call BlockTasks(Priority()).
This means we also need to track what the lowest priority object currently is.
return Status::TransientObjectStoreFull("Waiting for higher priority tasks to finish"); | ||
} | ||
if(RayConfig::instance().enable_BlockandEvictTasks()){ | ||
on_object_creation_blocked_callback_(queue_it->first.first); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of having two separate callbacks, can we do one callback that sets the blocked priority and a flag for whether to also evict tasks?
I think this will be cleaner and also easier to debug. Since the callbacks here are concurrent, it's going to be hard to figure out what's going on if there are too many callbacks.
if(RayConfig::instance().enable_BlockTasks()){ | ||
RAY_LOG(DEBUG) << "[JAE_DEBUG] calling object_creation_blocked_callback priority " | ||
<< queue_it->first.first.score; | ||
on_object_creation_blocked_callback_(queue_it->first.first , true, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should call the callback a single time with the right flags, instead of calling it twice (this is to avoid having concurrent callbacks).
… stuck with producers and 2 is when consumers are dependent on multiple objects
if (!should_spill_) { | ||
RAY_LOG(INFO) << "Object creation of priority " << queue_it->first.first << " blocked"; | ||
return Status::TransientObjectStoreFull("Waiting for higher priority tasks to finish"); | ||
if (RayConfig::instance().enable_BlockTasks()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this also check if block_tasks_required
is true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, where is the enable_EvictTasks() flag used? I only see it at the end of the loop but I thought it should also use it here?
<< lowest_pri; | ||
on_object_creation_blocked_callback_(lowest_pri, true, false); | ||
if(!RayConfig::instance().enable_BlockTasksSpill()){ | ||
spill_objects_callback_(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we do something simpler like just return early here?
@@ -214,15 +252,19 @@ Status CreateRequestQueue::ProcessRequests() { | |||
|
|||
// If we make it here, then there is nothing left in the queue. It's safe to | |||
// run new tasks again. | |||
RAY_UNUSED(on_object_creation_blocked_callback_(ray::Priority())); | |||
if (RayConfig::instance().enable_BlockTasks() && | |||
!RayConfig::instance().enable_EvictTasks()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we check enableEvictTasks here?
ray::Priority ObjectStore::GetLowestPriObject() { | ||
// Return the lowest priority object in object_table | ||
auto it = object_table_.begin(); | ||
ray::Priority lowest_priority = it->second->GetPriority(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this segfault if the object table is empty?
it++; | ||
for (; it != object_table_.end(); it++){ | ||
ray::Priority p = it->second->GetPriority(); | ||
if(lowest_priority < p){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this actually choosing the highest priority object instead of the lowest?
allocated_percentage = 0; | ||
} | ||
if (block_tasks_required != nullptr) { | ||
if (allocated_percentage >= block_tasks_threshold_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is the block tasks threshold set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is set from ray_config. In default they are 1.0
@@ -168,6 +168,14 @@ Status CreateRequestQueue::ProcessRequests() { | |||
if (spilling_required) { | |||
spill_objects_callback_(); | |||
} | |||
//Block and evict tasks are called if the object store reaches over a threshold | |||
if(RayConfig::instance().enable_BlockTasks() && block_tasks_required){ | |||
on_object_creation_blocked_callback_(lowest_pri, true, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may have talked about this earlier, but can we change this so that we only call the callback once? This is to reduce the chance of a race condition since these callbacks are async.
… calling taskblock callback twice
We encountered SIGSEGV when running Python test `python/ray/tests/test_failure_2.py::test_list_named_actors_timeout`. The stack is: ``` #0 0x00007fffed30f393 in std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&) () from /lib64/libstdc++.so.6 #1 0x00007fffee707649 in ray::RayLog::GetLoggerName() () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so #2 0x00007fffee70aa90 in ray::SpdLogMessage::Flush() () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so #3 0x00007fffee70af28 in ray::RayLog::~RayLog() () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so #4 0x00007fffee2b570d in ray::asio::testing::(anonymous namespace)::DelayManager::Init() [clone .constprop.0] () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so #5 0x00007fffedd0d95a in _GLOBAL__sub_I_asio_chaos.cc () from /home/admin/dev/Arc/merge/ray/python/ray/_raylet.so #6 0x00007ffff7fe282a in call_init.part () from /lib64/ld-linux-x86-64.so.2 #7 0x00007ffff7fe2931 in _dl_init () from /lib64/ld-linux-x86-64.so.2 #8 0x00007ffff7fe674c in dl_open_worker () from /lib64/ld-linux-x86-64.so.2 #9 0x00007ffff7b82e79 in _dl_catch_exception () from /lib64/libc.so.6 #10 0x00007ffff7fe5ffe in _dl_open () from /lib64/ld-linux-x86-64.so.2 #11 0x00007ffff7d5f39c in dlopen_doit () from /lib64/libdl.so.2 #12 0x00007ffff7b82e79 in _dl_catch_exception () from /lib64/libc.so.6 #13 0x00007ffff7b82f13 in _dl_catch_error () from /lib64/libc.so.6 #14 0x00007ffff7d5fb09 in _dlerror_run () from /lib64/libdl.so.2 #15 0x00007ffff7d5f42a in dlopen@@GLIBC_2.2.5 () from /lib64/libdl.so.2 #16 0x00007fffef04d330 in py_dl_open (self=<optimized out>, args=<optimized out>) at /tmp/python-build.20220507135524.257789/Python-3.7.11/Modules/_ctypes/callproc.c:1369 ``` The root cause is that when loading `_raylet.so`, `static DelayManager _delay_manager` is initialized and `RAY_LOG(ERROR) << "RAY_testing_asio_delay_us is set to " << delay_env;` is executed. However, the static variables declared in `logging.cc` are not initialized yet (in this case, `std::string RayLog::logger_name_ = "ray_log_sink"`). It's better not to rely on the initialization order of static variables in different compilation units because it's not guaranteed. I propose to change all `RAY_LOG`s to `std::cerr` in `DelayManager::Init()`. The crash happens in Ant's internal codebase. Not sure why this test case passes in the community version though. BTW, I've tried different approaches: 1. Using a static local variable in `get_delay_us` and remove the global variable. This doesn't work because `init()` needs to access the variable as well. 2. Defining the global variable as type `std::unique_ptr<DelayManager>` and initialize it in `get_delay_us`. This works but it requires a lock to be thread-safe.
Why are these changes needed?
Previous version implemented blocktasks but with currently submitted task which caused a deadlock (NSDI23/single~/debug/deadlock.py)
This PR is to find the lowest priority from object_table and set blocktask according to it.
with blocktasks with non-spill, there are 2 deadlock cases found in NSDI23/single~/debug/deadlock1.py and 2.py
All files from NSDI23 directory are microbenchmark. You can ignore them.
Many changes here are from ClangFormat changes. I listed the files you can ignore
Files changes for BlockTasks
L34 ~ 167 are just ClangFormat and argument added for block_task threshold (which not used for now) so you can ignore this
Files changes for searching the lowest priority object in the object_table
Files you can skip
Many changes in arguments are from ClangFormat. You can ignore these changes from the following files
You want to check out L185 ~ 218