Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PythonFutureTask #46

Merged
merged 27 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c624e35
Add C++-backed `ucxx::python::PythonFutureTask`
pentschev May 5, 2023
1d10a54
Add `PythonFutureTask` example
pentschev May 5, 2023
dfb1f6e
Add Python examples include directory to CMake
pentschev May 5, 2023
ebf0120
Merge remote-tracking branch 'upstream/branch-0.35' into python-futur…
pentschev Oct 26, 2023
6552c96
Fix linting
pentschev Oct 26, 2023
86609ac
Fix include
pentschev Oct 27, 2023
70f64df
Update `CODEOWNERS` with new CMake file
pentschev Oct 27, 2023
0ce583f
Add Python future with asyncio event loop
pentschev Oct 27, 2023
5272bb9
Add C++ future -> Python future notifier example to CI
pentschev Oct 30, 2023
6c06dc1
Merge remote-tracking branch 'upstream/branch-0.35' into python-futur…
pentschev Oct 31, 2023
fadc96b
Merge remote-tracking branch 'upstream/branch-0.35' into python-futur…
pentschev Nov 2, 2023
32b800c
Update test to match latest cuDF changes
pentschev Nov 9, 2023
e621984
Clarify GIL requirement in `PythonFutureTask` constructor
pentschev Nov 13, 2023
d71faa3
Fix error handlers
pentschev Nov 13, 2023
e4b928d
Move `PythonFutureTask` implementation details
pentschev Nov 13, 2023
24989fe
Use `PythonFutureTask` move constructor instead of pointers
pentschev Nov 13, 2023
9a7491f
Mark Python future C functions as `noexcept`
pentschev Nov 13, 2023
1e68b19
Remove explicit `ucxx::python::PythonFutureTask` destructor
pentschev Nov 13, 2023
f6a8c29
Merge remote-tracking branch 'upstream/branch-0.36' into python-futur…
pentschev Nov 22, 2023
514627d
Remove unnecessary `_readyPool`
pentschev Nov 22, 2023
72eb539
Add a garbage collector for `PythonFutureTask`s
pentschev Nov 27, 2023
6e63140
Collect `PythonFutureTask`s during `~ApplicationThread`
pentschev Nov 27, 2023
7e0ad1b
Hold the mutex during `~PythonFutureTaskCollector()`
pentschev Nov 27, 2023
161e015
Merge remote-tracking branch 'upstream/branch-0.36' into python-futur…
pentschev Nov 29, 2023
e55efdd
Remove unnecessary CMake rpath entry
pentschev Nov 30, 2023
25cb66c
Merge remote-tracking branch 'origin/python-future-task' into python-…
pentschev Nov 30, 2023
8439f95
Removed unnecessary CMake `ASSOCIATED_TARGETS`
pentschev Nov 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
#cpp code owners
cpp/ @rapidsai/ucxx-cpp-codeowners
cpp/ @rapidsai/ucxx-cpp-codeowners

#python code owners
python/ @rapidsai/ucxx-python-codeowners
python/ @rapidsai/ucxx-python-codeowners

#cmake code owners
cpp/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners
cpp/python/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners
cpp/examples/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners
cpp/benchmarks/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners
cpp/tests/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners
python/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners
python/ucxx/_lib/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners
fetch_rapids.cmake @rapidsai/ucxx-cmake-codeowners
**/cmake/ @rapidsai/ucxx-cmake-codeowners
cpp/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners
cpp/python/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners
cpp/examples/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners
cpp/benchmarks/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners
cpp/tests/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners
python/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners
python/ucxx/examples/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners
python/ucxx/_lib/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners
fetch_rapids.cmake @rapidsai/ucxx-cmake-codeowners
**/cmake/ @rapidsai/ucxx-cmake-codeowners

#build/ops code owners
.github/ @rapidsai/ops-codeowners
ci/ @rapidsai/ops-codeowners
conda/ @rapidsai/ops-codeowners
.github/ @rapidsai/ops-codeowners
ci/ @rapidsai/ops-codeowners
conda/ @rapidsai/ops-codeowners
3 changes: 3 additions & 0 deletions ci/test_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,6 @@ run_distributed_ucxx_tests thread 0 0
run_distributed_ucxx_tests thread 0 1
run_distributed_ucxx_tests thread 1 0
run_distributed_ucxx_tests thread 1 1

rapids-logger "C++ future -> Python future notifier example"
python -m ucxx.examples.python_future_task_example
3 changes: 3 additions & 0 deletions cpp/python/include/ucxx/python/constructors.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ namespace python {

std::shared_ptr<::ucxx::Future> createFuture(std::shared_ptr<::ucxx::Notifier> notifier);

std::shared_ptr<::ucxx::Future> createFutureWithEventLoop(
PyObject* asyncioEventLoop, std::shared_ptr<::ucxx::Notifier> notifier);

std::shared_ptr<::ucxx::Notifier> createNotifier();

std::shared_ptr<::ucxx::Worker> createWorker(std::shared_ptr<ucxx::Context> context,
Expand Down
54 changes: 54 additions & 0 deletions cpp/python/include/ucxx/python/future.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,60 @@ PyObject* future_set_result(PyObject* future, PyObject* value);
*/
PyObject* future_set_exception(PyObject* future, PyObject* exception, const char* message);

/**
* @brief Create a Python asyncio future with associated event loop.
*
* Create Python asyncio future associated with the event loop passed via the `event_loop`
* argument, effectively equal to calling `loop.create_future()` directly in Python.
*
* Note that this call will take the Python GIL and requires that the current thread have
* an asynchronous event loop set.
*
* @param[in] event_loop the Python asyncio event loop to which the future will belong to.
*
* @returns The Python asyncio future object.
*/
PyObject* create_python_future_with_event_loop(PyObject* event_loop);

/**
* @brief Set the result of a Python future with associated event loop.
*
* Schedule setting the result of a Python future in the given event loop using the
* threadsafe method `event_loop.call_soon_threadsafe`. The event loop given must be the
* same specified when creating the future object with `create_python_future`.
*
* Note that this may be called from any thread and will take the Python GIL to run.
*
* @param[in] future Python object containing the `_asyncio.Future` object.
* @param[in] value Python object containing an arbitrary value to set the future result
* to.
*
* @returns The result of the call to `_asyncio.Future.set_result()`.
*/
PyObject* future_set_result_with_event_loop(PyObject* event_loop,
PyObject* future,
PyObject* value);

/**
* @brief Set the exception of a Python future with associated event loop.
*
* Schedule setting an exception of a Python future in the given event loop using the
* threadsafe method `event_loop.call_soon_threadsafe`. The event loop given must be the
* same specified when creating the future object with `create_python_future`.
*
* Note that this may be called from any thread and will take the Python GIL to run.
*
* @param[in] future Python object containing the `_asyncio.Future` object.
* @param[in] exception a Python exception derived of the `Exception` class.
* @param[in] message human-readable error message for the exception.
*
* @returns The result of the call to `_asyncio.Future.set_result()`.
*/
PyObject* future_set_exception_with_event_loop(PyObject* event_loop,
PyObject* future,
PyObject* exception,
const char* message);

} // namespace python

} // namespace ucxx
25 changes: 23 additions & 2 deletions cpp/python/include/ucxx/python/python_future.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ namespace python {

class Future : public ::ucxx::Future {
private:
PyObject* _handle{create_python_future()}; ///< The handle to the Python future
PyObject* _asyncioEventLoop{nullptr}; ///< The asyncio event loop the Python future belongs to.
PyObject* _handle{nullptr}; ///< The handle to the Python future

/**
* @brief Construct a future that may be notified from a notifier thread.
Expand All @@ -32,9 +33,12 @@ class Future : public ::ucxx::Future {
* This class may also be used to set the result or exception from any thread, but that
* currently requires explicitly taking the GIL before calling `set()`.
*
* @param[in] asyncioEventLoop pointer to a valid Python object containing the event loop
* that the application is using, to which the future will
* belong to.
* @param[in] notifier notifier object running on a separate thread.
*/
explicit Future(std::shared_ptr<::ucxx::Notifier> notifier);
explicit Future(PyObject* asyncioEventLoop, std::shared_ptr<::ucxx::Notifier> notifier);

public:
Future() = delete;
Expand All @@ -56,6 +60,23 @@ class Future : public ::ucxx::Future {
*/
friend std::shared_ptr<::ucxx::Future> createFuture(std::shared_ptr<::ucxx::Notifier> notifier);

/**
* @brief Constructor of `shared_ptr<ucxx::python::Future>`.
*
* The constructor for a `shared_ptr<ucxx::python::Future>` object. The default
* constructor is made private to ensure all UCXX objects are shared pointers and correct
* lifetime management.
*
* @param[in] asyncioEventLoop pointer to a valid Python object containing the event loop
* that the application is using, to which the future will
* belong to.
* @param[in] notifier notifier object running on a separate thread.
*
* @returns The `shared_ptr<ucxx::python::Worker>` object
*/
friend std::shared_ptr<::ucxx::Future> createFutureWithEventLoop(
PyObject* asyncioEventLoop, std::shared_ptr<::ucxx::Notifier> notifier);

/**
* @brief Virtual destructor.
*
Expand Down
213 changes: 213 additions & 0 deletions cpp/python/include/ucxx/python/python_future_task.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES.
* SPDX-License-Identifier: BSD-3-Clause
*/
#pragma once

#include <chrono>
#include <future>
#include <memory>
#include <queue>
#include <sstream>
#include <string>
#include <utility>
#include <vector>

#include <iostream>

#include <Python.h>

#include <ucxx/log.h>
#include <ucxx/python/future.h>

namespace ucxx {

namespace python {

template <typename ReturnType, typename... TaskArgs>
class PythonFutureTask : public std::enable_shared_from_this<PythonFutureTask<ReturnType>> {
private:
std::packaged_task<ReturnType(TaskArgs...)> _task{}; ///< The user-defined C++ task to run
std::function<PyObject*(ReturnType)>
_pythonConvert{}; ///< Function to convert the C++ result into Python value
PyObject* _asyncioEventLoop{}; ///< The handle to the Python asyncio event loop
PyObject* _handle{}; ///< The handle to the Python future
std::future<ReturnType> _future{}; ///< The C++ future containing the task result

/**
* @brief Set the result value of the Python future.
*
* Set the result value of the underlying Python future using the `pythonConvert` function
* specified in the constructor to convert the C++ result into the `PyObject*`.
*
* This function will take the GIL to convert the C++ result into the `PyObject*`.
*
* @param[in] result the C++ value that will be converted and set as the result of the
* Python future.
*/
void setResult(const ReturnType result)
{
// PyLong_FromSize_t requires the GIL
if (_handle == nullptr) throw std::runtime_error("Invalid object or already released");
PyGILState_STATE state = PyGILState_Ensure();
ucxx::python::future_set_result_with_event_loop(
_asyncioEventLoop, _handle, _pythonConvert(result));
madsbk marked this conversation as resolved.
Show resolved Hide resolved
PyGILState_Release(state);
}

/**
* @brief Set the exception of the Python future.
*
* Set the exception of the underlying Python future. Currently any exceptions that the
* task may raise must be derived from `std::exception`.
*
* @param[in] pythonException the Python exception type to raise.
* @param[in] message the message of the exception.
*/
void setPythonException(PyObject* pythonException, const std::string& message)
{
if (_handle == nullptr) throw std::runtime_error("Invalid object or already released");
ucxx::python::future_set_exception_with_event_loop(
_asyncioEventLoop, _handle, pythonException, message.c_str());
}

/**
* @brief Parse C++ exception as a Python exception and set the Python future exception.
*
* Parse a C++ exception as a Python exception and set the Python future exception.
* Currently any exceptions that the task may raise must be derived from `std::exception`.
*
* @param[in] exception the C++ exception that was raised by the user-defined task.
*/
void setException(const std::exception& exception)
{
try {
throw exception;
} catch (const std::bad_alloc& e) {
setPythonException(PyExc_MemoryError, e.what());
} catch (const std::bad_cast& e) {
setPythonException(PyExc_TypeError, e.what());
} catch (const std::bad_typeid& e) {
setPythonException(PyExc_TypeError, e.what());
} catch (const std::domain_error& e) {
setPythonException(PyExc_ValueError, e.what());
} catch (const std::invalid_argument& e) {
setPythonException(PyExc_ValueError, e.what());
} catch (const std::ios_base::failure& e) {
setPythonException(PyExc_IOError, e.what());
} catch (const std::out_of_range& e) {
setPythonException(PyExc_IndexError, e.what());
} catch (const std::overflow_error& e) {
setPythonException(PyExc_OverflowError, e.what());
} catch (const std::range_error& e) {
setPythonException(PyExc_ArithmeticError, e.what());
} catch (const std::underflow_error& e) {
setPythonException(PyExc_ArithmeticError, e.what());
} catch (const std::exception& e) {
setPythonException(PyExc_RuntimeError, e.what());
} catch (...) {
setPythonException(PyExc_RuntimeError, "Unknown exception");
wence- marked this conversation as resolved.
Show resolved Hide resolved
}
}

public:
/**
* @brief Construct a Python future backed by C++ `std::packaged_task`.
*
* Construct a future object that receives a user-defined C++ `std::packaged_task` which
* runs asynchronously using an internal `std::async` that ultimately notifies a Python
* future that can be awaited in Python code.
*
pentschev marked this conversation as resolved.
Show resolved Hide resolved
* @param[in] task the user-defined C++ task.
* @param[in] pythonConvert C-Python function to convert a C object into a `PyObject*`
* representing the result of the task.
* @param[in] asyncioEventLoop pointer to a valid Python object containing the event loop
* that the application is using, to which the Python future
* will belong to.
* @param[in] launchPolicy launch policy for the async C++ task.
*/
explicit PythonFutureTask(std::packaged_task<ReturnType(TaskArgs...)> task,
std::function<PyObject*(ReturnType)> pythonConvert,
PyObject* asyncioEventLoop,
std::launch launchPolicy = std::launch::async)
: _task{std::move(task)},
_pythonConvert(pythonConvert),
_asyncioEventLoop(asyncioEventLoop),
_handle{ucxx::python::create_python_future_with_event_loop(asyncioEventLoop)}
{
_future = std::async(launchPolicy, [this]() {
std::future<ReturnType> result = this->_task.get_future();
this->_task();
try {
const ReturnType r = result.get();
this->setResult(r);
return r;
} catch (std::exception& e) {
this->setException(e);
}
});
}
PythonFutureTask(const PythonFutureTask&) = delete;
PythonFutureTask& operator=(PythonFutureTask const&) = delete;
PythonFutureTask(PythonFutureTask&& o) = delete;
PythonFutureTask& operator=(PythonFutureTask&& o) = delete;
madsbk marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Python future destructor.
*
* Decrement the reference count of the underlying Python future.
*/
~PythonFutureTask() { Py_XDECREF(_handle); }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think calling Py_XDECREF(_handle); is thread-safe (e.g. it might trigger the execution of a __del__).
Is it safe to grab the GIL here? Or should we delay the decref to a Python cleanup routine?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is safe as long as the current thread is able to take the GIL, i.e. the "main" thread releases the GIL at some point. However, that doesn't mean it couldn't cause deadlocks depending on how the application makes use of that. Definitely a Python cleanup routine could make things safer but I can't think of a cleanup routine that would not require some complex Python mechanism, meaning it can't be implemented here alone, perhaps you already have something in mind? The only alternative I can think of, which is definitely not a good one, would be to implement a public release() method that the user is supposed to call where it is known that taking the GIL is safe and then change the destructor to print a warning/error if release() wasn't called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, we already have a release() which has a different meaning (release the handle to the user). Replace release() in my comment above with free().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need _handle after setResult() or setPythonException() has been called? If not, maybe let them release the handle?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _handle is the asyncio.Future object, so if the user never got a reference to it via getHandle()/release() before the C++ future completed and notified the Python future then the user will never be able to get it later if we release it in setResult()/setPythonException(). Therefore, I don't think this would work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, that's probably our best option. It will take me until next week or the week after to get back to this, but I'll try to make it work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this one is still to be resolved. In any case, Py_XDECREF is not safe to call without the gil (so one should PyGILState_Ensure before calling it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have now introduced a new singleton class PythonFutureTaskCollector which allows pushing PyObject* when ~PythonFutureTask() runs instead of it requiring the GIL to decref those. The GIL and decref now happens when PythonFutureTaskCollector::collect() gets called, but that needs to be done by the application itself, which currently runs it during ~ApplicationThread.

This is not an optimal solution, but I think it's probably the only one that guarantees objects are actually decrefed at some point, the alternative may be to register another atexit handler. A periodic call of that method could also improve things a bit during runtime but is not enough to ensure everything gets cleaned up before exiting, as we may always miss the last few destroyed PythonFutureTask if the periodic call happens to be called one last time before some ~PythonFutureTask. Perhaps I'm overlooking some obvious better and guaranteed way to ensure we don't leave any PyObject* behind, if you can think of any please let me know @madsbk @wence- .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of deferred collection with stop-the-world cleanup points is the way I (with others) solve morally the same problem in a distributed setting (where two processes have to agree to destruct objects collectively). This is kind of similar because we're needing to do collection at a point when we can guarantee synchronisation of some resource (in this case the gil + any other locks that might be required).

So I think this is fine. You might imagine that you'd want to also call this "collect" periodically during submission of tasks if that happens on the main thread that will hold the gil.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think this is fine. You might imagine that you'd want to also call this "collect" periodically during submission of tasks if that happens on the main thread that will hold the gil.

I agree with that idea, what I'm not sure is who should be responsible for that. I can think of a few different approaches:

  1. Within the PythonFutureTask, for example every time a new one is constructed;
  2. As part of the ApplicationThread, perhaps during progressUntilSync();
  3. Directly from Python with something similar to tornado.ioloop.PeriodicCallback;
  4. Directly from Python with looping forever submitting a new async task for every iteration, similar to PollingMode.

I think we should be doing one or more of those, but it's difficult to decide the best approach. Furthermore, ApplicationThread is meant to be an example only, how to actually do it probably depends on the actual application needs so we don't need to cover every possible alternative. We will need to find an appropriate solution to this for UCXX though, but that will happen in a follow-up PR.


/**
* @brief Get the C++ future.
*
* Get the underlying C++ future that can be awaited and have its result read.
*
* @returns The underlying C++ future.
*/
std::future<ReturnType>& getFuture() { return _future; }
pentschev marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Get the underlying future `PyObject*` handle but does not release ownership.
*
* Get the underlying `PyObject*` handle without releasing ownership. This can be useful
* for example for logging, where we want to see the address of the pointer but do not
* want to transfer ownership.
*
* @warning The destructor will also destroy the Python future, a pointer taken via this
* method will cause the object to become invalid.
*
* @throws std::runtime_error if the object is invalid or has been already released.
*
* @returns The underlying `PyObject*` handle.
*/
PyObject* getHandle()
{
if (_handle == nullptr) throw std::runtime_error("Invalid object or already released");

return _handle;
}

/**
* @brief Get the underlying future `PyObject*` handle and release ownership.
*
* Get the underlying `PyObject*` handle releasing ownership. This should be used when
* the future needs to be permanently transferred to Python code. After calling this
* method the object becomes invalid for any other uses.
*
* @throws std::runtime_error if the object is invalid or has been already released.
*
* @returns The underlying `PyObject*` handle.
*/
PyObject* release()
{
if (_handle == nullptr) throw std::runtime_error("Invalid object or already released");

return std::exchange(_handle, nullptr);
}
};

} // namespace python

} // namespace ucxx
Loading