-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
128 warning for duplicate named tasks #135
Conversation
src/python/parla/common/spawn.py
Outdated
@@ -82,6 +80,13 @@ def spawn(task=None, | |||
idx = task | |||
|
|||
task = taskspace[idx] | |||
lock = threading.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't protect against the race condition as each instance of spawn will have a separate lock instance.
The lock would have to be a field of the shared Task instance (that said I think a lock solution may have too big of a performance hit; a C++ function with a CAS state check/set is likely the fastest but worth measuring.)
Testing the race will be hard though, since reproducing it relies on the GIL owner switching between the byte code instructions state for the check and set. This will be very unlikely with the standard switch interval (5ms) until the GIL Optional PEP goes through 🤞
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe something like the state function was for a bit (with >=
) :
parla-experimental/src/c/backend/task.cpp
Line 327 in bfdb9a6
return status; |
Edit: err, whoops, that's not linking correctly to the right commit hash (c70f9fc):
Task::State InnerTask::set_state(int state) {
Task::State new_state = static_cast<Task::State>(state);
Task::State old_state;
bool success = true;
do {
old_state = this->state.load();
if (old_state > new_state) {
success = false;
}
} while (!this->state.compare_exchange_weak(old_state, new_state));
if (!success) {
throw std::runtime_error("Task States must always be increasing.");
}
return old_state;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not thread-safe right?
Edit: ohh, it using compare_exchange_weak
, got it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was reading through this - https://www.codeproject.com/Articles/808305/Understand-std-atomic-compare-exchange-weak-in-Cpl, and it states the following about using while loop for compare_exchange_weak
:
Note that we generally cannot use this pattern to implement a mutex. Otherwise, multiple threads may be inside the critical section at the same time.
Would compare_exchange_strong
suit better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think either is okay because the loop will reject failures if it is not the only thread inside the critical region, i.e. when this->state.load()
changes between when it is read and when it would be modified, and try again until it was the only one inside the region.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh! I'm actually surprised that terminates without tagging the cython wrapper of task::set_state
as except +
!
Does this interrupt and shut down the runtime or does the program hang?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shuts down the runtime
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although, I would guess it doesn't hit the "Python layer" terminate path so the logs are not dumped?
self.release() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, maybe more importantly, it is possible the new set_state
function leads to errors on await
and the spawned continuation tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although, I would guess it doesn't hit the "Python layer" terminate path so the logs are not dumped? : yes
src/python/parla/common/spawn.py
Outdated
if(task.py_state != "SPAWNED"): | ||
task.py_state = "SPAWNED" | ||
else: | ||
raise Exception("Duplicate task ID spawned. This will cause runtime to hang. Aborting...") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe change exception type to RuntimeError? Also might be better to print the duplicate ID
src/python/parla/common/spawn.py
Outdated
try: | ||
scheduler.spawn_task(task) | ||
except RuntimeError: | ||
raise RuntimeError("Task IDs can only be increasing. Possibly duplicate task ID present: " + str(task)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recommend changing this to say "Conflicting task state while spawning task. Possible duplicate TaskID..."
but otherwise LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated!
Adding PR for showing an exception to the users regarding duplicate task names