diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 3beb9876..40df2d79 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,21 +1,22 @@ #cpp code owners -cpp/ @rapidsai/ucxx-cpp-codeowners +cpp/ @rapidsai/ucxx-cpp-codeowners #python code owners -python/ @rapidsai/ucxx-python-codeowners +python/ @rapidsai/ucxx-python-codeowners #cmake code owners -cpp/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners -cpp/python/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners -cpp/examples/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners -cpp/benchmarks/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners -cpp/tests/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners -python/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners -python/ucxx/_lib/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners -fetch_rapids.cmake @rapidsai/ucxx-cmake-codeowners -**/cmake/ @rapidsai/ucxx-cmake-codeowners +cpp/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners +cpp/python/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners +cpp/examples/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners +cpp/benchmarks/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners +cpp/tests/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners +python/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners +python/ucxx/examples/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners +python/ucxx/_lib/CMakeLists.txt @rapidsai/ucxx-cmake-codeowners +fetch_rapids.cmake @rapidsai/ucxx-cmake-codeowners +**/cmake/ @rapidsai/ucxx-cmake-codeowners #build/ops code owners -.github/ @rapidsai/ops-codeowners -ci/ @rapidsai/ops-codeowners -conda/ @rapidsai/ops-codeowners +.github/ @rapidsai/ops-codeowners +ci/ @rapidsai/ops-codeowners +conda/ @rapidsai/ops-codeowners diff --git a/ci/test_python.sh b/ci/test_python.sh index 169674e2..79b932db 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -132,3 +132,6 @@ run_distributed_ucxx_tests thread 0 0 run_distributed_ucxx_tests thread 0 1 run_distributed_ucxx_tests thread 1 0 run_distributed_ucxx_tests thread 1 1 + +rapids-logger "C++ future -> Python future notifier example" +python -m ucxx.examples.python_future_task_example diff --git a/cpp/python/CMakeLists.txt b/cpp/python/CMakeLists.txt index b03161f8..45819c77 100644 --- a/cpp/python/CMakeLists.txt +++ b/cpp/python/CMakeLists.txt @@ -18,6 +18,7 @@ add_library( src/future.cpp src/notifier.cpp src/python_future.cpp + src/python_future_task_collector.cpp src/worker.cpp ) diff --git a/cpp/python/include/ucxx/python/api.h b/cpp/python/include/ucxx/python/api.h index 477f1f4c..95c234f9 100644 --- a/cpp/python/include/ucxx/python/api.h +++ b/cpp/python/include/ucxx/python/api.h @@ -8,4 +8,5 @@ #include #include #include +#include #include diff --git a/cpp/python/include/ucxx/python/constructors.h b/cpp/python/include/ucxx/python/constructors.h index 0838b72b..53ea6b38 100644 --- a/cpp/python/include/ucxx/python/constructors.h +++ b/cpp/python/include/ucxx/python/constructors.h @@ -19,6 +19,9 @@ namespace python { std::shared_ptr<::ucxx::Future> createFuture(std::shared_ptr<::ucxx::Notifier> notifier); +std::shared_ptr<::ucxx::Future> createFutureWithEventLoop( + PyObject* asyncioEventLoop, std::shared_ptr<::ucxx::Notifier> notifier); + std::shared_ptr<::ucxx::Notifier> createNotifier(); std::shared_ptr<::ucxx::Worker> createWorker(std::shared_ptr context, diff --git a/cpp/python/include/ucxx/python/future.h b/cpp/python/include/ucxx/python/future.h index 0583ad87..217bcbcd 100644 --- a/cpp/python/include/ucxx/python/future.h +++ b/cpp/python/include/ucxx/python/future.h @@ -21,7 +21,7 @@ namespace python { * * @returns The Python asyncio future object. */ -PyObject* create_python_future(); +PyObject* create_python_future() noexcept; /** * @brief Set the result of a Python future. @@ -37,7 +37,7 @@ PyObject* create_python_future(); * * @returns The result of the call to `_asyncio.Future.set_result()`. */ -PyObject* future_set_result(PyObject* future, PyObject* value); +PyObject* future_set_result(PyObject* future, PyObject* value) noexcept; /** * @brief Set the exception of a Python future. @@ -53,7 +53,61 @@ PyObject* future_set_result(PyObject* future, PyObject* value); * * @returns The result of the call to `_asyncio.Future.set_result()`. */ -PyObject* future_set_exception(PyObject* future, PyObject* exception, const char* message); +PyObject* future_set_exception(PyObject* future, PyObject* exception, const char* message) noexcept; + +/** + * @brief Create a Python asyncio future with associated event loop. + * + * Create Python asyncio future associated with the event loop passed via the `event_loop` + * argument, effectively equal to calling `loop.create_future()` directly in Python. + * + * Note that this call will take the Python GIL and requires that the current thread have + * an asynchronous event loop set. + * + * @param[in] event_loop the Python asyncio event loop to which the future will belong to. + * + * @returns The Python asyncio future object. + */ +PyObject* create_python_future_with_event_loop(PyObject* event_loop) noexcept; + +/** + * @brief Set the result of a Python future with associated event loop. + * + * Schedule setting the result of a Python future in the given event loop using the + * threadsafe method `event_loop.call_soon_threadsafe`. The event loop given must be the + * same specified when creating the future object with `create_python_future`. + * + * Note that this may be called from any thread and will take the Python GIL to run. + * + * @param[in] future Python object containing the `_asyncio.Future` object. + * @param[in] value Python object containing an arbitrary value to set the future result + * to. + * + * @returns The result of the call to `_asyncio.Future.set_result()`. + */ +PyObject* future_set_result_with_event_loop(PyObject* event_loop, + PyObject* future, + PyObject* value) noexcept; + +/** + * @brief Set the exception of a Python future with associated event loop. + * + * Schedule setting an exception of a Python future in the given event loop using the + * threadsafe method `event_loop.call_soon_threadsafe`. The event loop given must be the + * same specified when creating the future object with `create_python_future`. + * + * Note that this may be called from any thread and will take the Python GIL to run. + * + * @param[in] future Python object containing the `_asyncio.Future` object. + * @param[in] exception a Python exception derived of the `Exception` class. + * @param[in] message human-readable error message for the exception. + * + * @returns The result of the call to `_asyncio.Future.set_result()`. + */ +PyObject* future_set_exception_with_event_loop(PyObject* event_loop, + PyObject* future, + PyObject* exception, + const char* message) noexcept; } // namespace python diff --git a/cpp/python/include/ucxx/python/python_future.h b/cpp/python/include/ucxx/python/python_future.h index ec323293..4354ab99 100644 --- a/cpp/python/include/ucxx/python/python_future.h +++ b/cpp/python/include/ucxx/python/python_future.h @@ -21,7 +21,8 @@ namespace python { class Future : public ::ucxx::Future { private: - PyObject* _handle{create_python_future()}; ///< The handle to the Python future + PyObject* _asyncioEventLoop{nullptr}; ///< The asyncio event loop the Python future belongs to. + PyObject* _handle{nullptr}; ///< The handle to the Python future /** * @brief Construct a future that may be notified from a notifier thread. @@ -32,9 +33,12 @@ class Future : public ::ucxx::Future { * This class may also be used to set the result or exception from any thread, but that * currently requires explicitly taking the GIL before calling `set()`. * + * @param[in] asyncioEventLoop pointer to a valid Python object containing the event loop + * that the application is using, to which the future will + * belong to. * @param[in] notifier notifier object running on a separate thread. */ - explicit Future(std::shared_ptr<::ucxx::Notifier> notifier); + explicit Future(PyObject* asyncioEventLoop, std::shared_ptr<::ucxx::Notifier> notifier); public: Future() = delete; @@ -56,6 +60,23 @@ class Future : public ::ucxx::Future { */ friend std::shared_ptr<::ucxx::Future> createFuture(std::shared_ptr<::ucxx::Notifier> notifier); + /** + * @brief Constructor of `shared_ptr`. + * + * The constructor for a `shared_ptr` object. The default + * constructor is made private to ensure all UCXX objects are shared pointers and correct + * lifetime management. + * + * @param[in] asyncioEventLoop pointer to a valid Python object containing the event loop + * that the application is using, to which the future will + * belong to. + * @param[in] notifier notifier object running on a separate thread. + * + * @returns The `shared_ptr` object + */ + friend std::shared_ptr<::ucxx::Future> createFutureWithEventLoop( + PyObject* asyncioEventLoop, std::shared_ptr<::ucxx::Notifier> notifier); + /** * @brief Virtual destructor. * diff --git a/cpp/python/include/ucxx/python/python_future_task.h b/cpp/python/include/ucxx/python/python_future_task.h new file mode 100644 index 00000000..0141f1aa --- /dev/null +++ b/cpp/python/include/ucxx/python/python_future_task.h @@ -0,0 +1,297 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: BSD-3-Clause + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include +#include +#include + +namespace ucxx { + +namespace python { + +namespace detail { + +template +struct PythonFutureTask { + public: + std::packaged_task _task{}; ///< The user-defined C++ task to run + std::function + _pythonConvert{}; ///< Function to convert the C++ result into Python value + PyObject* _asyncioEventLoop{}; ///< The handle to the Python asyncio event loop + PyObject* _handle{}; ///< The handle to the Python future + std::future _future{}; ///< The C++ future containing the task result + + private: + /** + * @brief Set the result value of the Python future. + * + * Set the result value of the underlying Python future using the `pythonConvert` function + * specified in the constructor to convert the C++ result into the `PyObject*`. + * + * This function will take the GIL to convert the C++ result into the `PyObject*`. + * + * @param[in] result the C++ value that will be converted and set as the result of the + * Python future. + */ + void setResult(const ReturnType result) + { + // PyLong_FromSize_t requires the GIL + if (_handle == nullptr) throw std::runtime_error("Invalid object or already released"); + + PyGILState_STATE state = PyGILState_Ensure(); + ucxx::python::future_set_result_with_event_loop( + _asyncioEventLoop, _handle, _pythonConvert(result)); + PyGILState_Release(state); + } + + /** + * @brief Set the exception of the Python future. + * + * Set the exception of the underlying Python future. Currently any exceptions that the + * task may raise must be derived from `std::exception`. + * + * @param[in] pythonException the Python exception type to raise. + * @param[in] message the message of the exception. + */ + void setPythonException(PyObject* pythonException, const std::string& message) + { + if (_handle == nullptr) throw std::runtime_error("Invalid object or already released"); + + PyGILState_STATE state = PyGILState_Ensure(); + ucxx::python::future_set_exception_with_event_loop( + _asyncioEventLoop, _handle, pythonException, message.c_str()); + PyGILState_Release(state); + } + + /** + * @brief Parse C++ exception as a Python exception and set the Python future exception. + * + * Parse a C++ exception as a Python exception and set the Python future exception. + * Currently any exceptions that the task may raise must be derived from `std::exception`. + * + * @param[in] exception the C++ exception that was raised by the user-defined task. + */ + void setException(const std::exception& exception) + { + try { + throw exception; + } catch (const std::bad_alloc& e) { + setPythonException(PyExc_MemoryError, e.what()); + } catch (const std::bad_cast& e) { + setPythonException(PyExc_TypeError, e.what()); + } catch (const std::bad_typeid& e) { + setPythonException(PyExc_TypeError, e.what()); + } catch (const std::domain_error& e) { + setPythonException(PyExc_ValueError, e.what()); + } catch (const std::invalid_argument& e) { + setPythonException(PyExc_ValueError, e.what()); + } catch (const std::ios_base::failure& e) { + setPythonException(PyExc_IOError, e.what()); + } catch (const std::out_of_range& e) { + setPythonException(PyExc_IndexError, e.what()); + } catch (const std::overflow_error& e) { + setPythonException(PyExc_OverflowError, e.what()); + } catch (const std::range_error& e) { + setPythonException(PyExc_ArithmeticError, e.what()); + } catch (const std::underflow_error& e) { + setPythonException(PyExc_ArithmeticError, e.what()); + } catch (const std::exception& e) { + setPythonException(PyExc_RuntimeError, e.what()); + } catch (...) { + setPythonException(PyExc_RuntimeError, "Unknown exception"); + } + } + + public: + explicit PythonFutureTask(std::packaged_task task, + std::function pythonConvert, + PyObject* asyncioEventLoop, + std::launch launchPolicy = std::launch::async) + : _task{std::move(task)}, + _pythonConvert(std::move(pythonConvert)), + _asyncioEventLoop(asyncioEventLoop), + _handle{ucxx::python::create_python_future_with_event_loop(asyncioEventLoop)}, + _future{std::async(launchPolicy, [this]() { + std::future result = this->_task.get_future(); + this->_task(); + try { + const ReturnType r = result.get(); + this->setResult(r); + return r; + } catch (std::exception& e) { + this->setException(e); + } + })} + { + } + + PythonFutureTask(const PythonFutureTask&) = delete; + PythonFutureTask& operator=(PythonFutureTask const&) = delete; + PythonFutureTask(PythonFutureTask&& o) = default; + PythonFutureTask& operator=(PythonFutureTask&& o) = default; + + /** + * @brief Python future destructor. + * + * Register the handle for future garbage collection. It is unsafe to require the GIL here + * since the exact time the destructor is called may be unpredictable w.r.t. the Python + * application, and thus requiring the GIL here may result in deadlocks. The application + * is thus responsible to ensure `PythonFutureTaskCollector::push()` is regularly called + * and ultimately responsible for cleaning up before terminating, otherwise a resource + * leakage may occur. + */ + ~PythonFutureTask() { ucxx::python::PythonFutureTaskCollector::get().push(_handle); } + + /** + * @brief Get the C++ future. + * + * Get the underlying C++ future that can be awaited and have its result read. + * + * @returns The underlying C++ future. + */ + std::future& getFuture() + { + if (_handle == nullptr) throw std::runtime_error("Invalid object or already released"); + + return _future; + } + + /** + * @brief Get the underlying future `PyObject*` handle but does not release ownership. + * + * Get the underlying `PyObject*` handle without releasing ownership. This can be useful + * for example for logging, where we want to see the address of the pointer but do not + * want to transfer ownership. + * + * @warning The destructor will also destroy the Python future, a pointer taken via this + * method will cause the object to become invalid. + * + * @throws std::runtime_error if the object is invalid or has been already released. + * + * @returns The underlying `PyObject*` handle. + */ + PyObject* getHandle() + { + if (_handle == nullptr) throw std::runtime_error("Invalid object or already released"); + + return _handle; + } + + /** + * @brief Get the underlying future `PyObject*` handle and release ownership. + * + * Get the underlying `PyObject*` handle releasing ownership. This should be used when + * the future needs to be permanently transferred to Python code. After calling this + * method the object becomes invalid for any other uses. + * + * @throws std::runtime_error if the object is invalid or has been already released. + * + * @returns The underlying `PyObject*` handle. + */ + PyObject* release() + { + if (_handle == nullptr) throw std::runtime_error("Invalid object or already released"); + + return std::exchange(_handle, nullptr); + } +}; + +} // namespace detail + +template +class PythonFutureTask : public std::enable_shared_from_this> { + private: + std::unique_ptr> _detail{ + nullptr}; ///< internal C++ task/future manager + + public: + /** + * @brief Construct a Python future backed by C++ `std::packaged_task`. + * + * Construct a future object that receives a user-defined C++ `std::packaged_task` which + * runs asynchronously using an internal `std::async` that ultimately notifies a Python + * future that can be awaited in Python code. + * + * Note that this call will take the Python GIL and requires that the current thread have + * an asynchronous event loop set. + * + * @param[in] task the user-defined C++ task. + * @param[in] pythonConvert C-Python function to convert a C object into a `PyObject*` + * representing the result of the task. + * @param[in] asyncioEventLoop pointer to a valid Python object containing the event loop + * that the application is using, to which the Python future + * will belong to. + * @param[in] launchPolicy launch policy for the async C++ task. + */ + explicit PythonFutureTask(std::packaged_task task, + std::function pythonConvert, + PyObject* asyncioEventLoop, + std::launch launchPolicy = std::launch::async) + : _detail(std::make_unique>( + std::move(task), std::move(pythonConvert), asyncioEventLoop, launchPolicy)) + { + } + PythonFutureTask(const PythonFutureTask&) = delete; + PythonFutureTask& operator=(PythonFutureTask const&) = delete; + PythonFutureTask(PythonFutureTask&& o) = default; + PythonFutureTask& operator=(PythonFutureTask&& o) = default; + + /** + * @brief Get the C++ future. + * + * Get the underlying C++ future that can be awaited and have its result read. + * + * @returns The underlying C++ future. + */ + std::future& getFuture() { return _detail->getFuture(); } + + /** + * @brief Get the underlying future `PyObject*` handle but does not release ownership. + * + * Get the underlying `PyObject*` handle without releasing ownership. This can be useful + * for example for logging, where we want to see the address of the pointer but do not + * want to transfer ownership. + * + * @warning The destructor will also destroy the Python future, a pointer taken via this + * method will cause the object to become invalid. + * + * @throws std::runtime_error if the object is invalid or has been already released. + * + * @returns The underlying `PyObject*` handle. + */ + PyObject* getHandle() { return _detail->getHandle(); } + + /** + * @brief Get the underlying future `PyObject*` handle and release ownership. + * + * Get the underlying `PyObject*` handle releasing ownership. This should be used when + * the future needs to be permanently transferred to Python code. After calling this + * method the object becomes invalid for any other uses. + * + * @throws std::runtime_error if the object is invalid or has been already released. + * + * @returns The underlying `PyObject*` handle. + */ + PyObject* release() { return _detail->release(); } +}; + +} // namespace python + +} // namespace ucxx diff --git a/cpp/python/include/ucxx/python/python_future_task_collector.h b/cpp/python/include/ucxx/python/python_future_task_collector.h new file mode 100644 index 00000000..57a1b72c --- /dev/null +++ b/cpp/python/include/ucxx/python/python_future_task_collector.h @@ -0,0 +1,72 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: BSD-3-Clause + */ +#pragma once + +#include +#include + +#include + +namespace ucxx { + +namespace python { + +class PythonFutureTaskCollector { + public: + std::vector _toCollect{}; ///< Tasks to be collected + std::mutex _mutex{}; ///< Mutex to provide safe access to `_toCollect`. + + /** + * Get reference to `PythonFutureTaskCollector` instance. + * + * `PythonFutureTaskCollector` is a singleton and thus must not be directly instantiated. + * Instead, users should call this static method to get a reference to its instance. + */ + static PythonFutureTaskCollector& get(); + + /** + * Push a Python handle to be later collected. + * + * Push a Python handle to be later collected. This method does not require the GIL. + */ + void push(PyObject* handle); + + /** + * Decrement each reference previously pushed exactly once. + * + * Decrement each reference (i.e., garbage collect) that was previously pushed via the + * `push()` method exactly once, cleaning internal references at the end. + * + * WARNING: Calling this method will attempt to take the GIL, so make sure no other thread + * currently owns it while this thread also competes for other resources that the Python + * thread holding the GIL may require as it may cause a deadlock. + */ + void collect(); + + private: + /** + * Private constructor. + * + * Private constructor to prevent accidental instantiation of multiple collectors. + */ + PythonFutureTaskCollector(); + + public: + PythonFutureTaskCollector(const PythonFutureTaskCollector&) = delete; + PythonFutureTaskCollector& operator=(PythonFutureTaskCollector const&) = delete; + PythonFutureTaskCollector(PythonFutureTaskCollector&& o) = delete; + PythonFutureTaskCollector& operator=(PythonFutureTaskCollector&& o) = delete; + + /** + * Destructor. + * + * Destructor of the collector. Warns if any tasks were pushed but not collected. + */ + ~PythonFutureTaskCollector(); +}; + +} // namespace python + +} // namespace ucxx diff --git a/cpp/python/src/future.cpp b/cpp/python/src/future.cpp index 23325cab..722b8188 100644 --- a/cpp/python/src/future.cpp +++ b/cpp/python/src/future.cpp @@ -10,16 +10,28 @@ namespace ucxx { namespace python { -PyObject* asyncio_str = NULL; -PyObject* future_str = NULL; -PyObject* asyncio_future_object = NULL; +PyObject* asyncio_str = NULL; +PyObject* asyncio_future_object = NULL; +PyObject* call_soon_threadsafe_str = NULL; +PyObject* create_future_str = NULL; +PyObject* future_str = NULL; +PyObject* set_exception_str = NULL; +PyObject* set_result_str = NULL; static int intern_strings(void) { asyncio_str = PyUnicode_InternFromString("asyncio"); if (asyncio_str == NULL) { return -1; } + call_soon_threadsafe_str = PyUnicode_InternFromString("call_soon_threadsafe"); + if (call_soon_threadsafe_str == NULL) { return -1; } + create_future_str = PyUnicode_InternFromString("create_future"); + if (create_future_str == NULL) { return -1; } future_str = PyUnicode_InternFromString("Future"); if (future_str == NULL) { return -1; } + set_exception_str = PyUnicode_InternFromString("set_exception"); + if (set_exception_str == NULL) { return -1; } + set_result_str = PyUnicode_InternFromString("set_result"); + if (set_result_str == NULL) { return -1; } return 0; } @@ -167,6 +179,126 @@ PyObject* future_set_exception(PyObject* future, PyObject* exception, const char return result; } +PyObject* create_python_future_with_event_loop(PyObject* event_loop) +{ + PyObject* result = NULL; + + PyGILState_STATE state = PyGILState_Ensure(); + + if (init_ucxx_python() < 0) { + if (!PyErr_Occurred()) PyErr_SetString(PyExc_RuntimeError, "could not allocate internals."); + goto finish; + } + + result = PyObject_CallMethodObjArgs(event_loop, create_future_str, NULL); + if (PyErr_Occurred()) { + ucxx_trace_req("Error calling event loop `create_future`."); + PyErr_Print(); + } + +finish: + PyGILState_Release(state); + return result; +} + +PyObject* future_set_result_with_event_loop(PyObject* event_loop, PyObject* future, PyObject* value) +{ + PyObject* result = NULL; + PyObject* set_result_callable = NULL; + + PyGILState_STATE state = PyGILState_Ensure(); + + if (init_ucxx_python() < 0) { + if (!PyErr_Occurred()) PyErr_SetString(PyExc_RuntimeError, "could not allocate internals."); + goto finish; + } + + set_result_callable = PyObject_GetAttr(future, set_result_str); + if (PyErr_Occurred()) { + ucxx_trace_req("Error getting future `set_result` method."); + PyErr_Print(); + goto finish; + } + if (!PyCallable_Check(set_result_callable)) { + PyErr_Format(PyExc_RuntimeError, + "%s.%s is not callable.", + PyUnicode_1BYTE_DATA(future), + PyUnicode_1BYTE_DATA(set_result_str)); + goto finish; + } + + result = PyObject_CallMethodObjArgs( + event_loop, call_soon_threadsafe_str, set_result_callable, value, NULL); + if (PyErr_Occurred()) { + ucxx_trace_req("Error calling `call_soon_threadsafe` to set future result."); + PyErr_Print(); + } + +finish: + Py_XDECREF(set_result_callable); + PyGILState_Release(state); + return result; +} + +PyObject* future_set_exception_with_event_loop(PyObject* event_loop, + PyObject* future, + PyObject* exception, + const char* message) +{ + PyObject* result = NULL; + PyObject* set_exception_callable = NULL; + PyObject* message_object = NULL; + PyObject* message_tuple = NULL; + PyObject* formed_exception = NULL; + + PyGILState_STATE state = PyGILState_Ensure(); + + if (init_ucxx_python() < 0) { + if (!PyErr_Occurred()) PyErr_SetString(PyExc_RuntimeError, "could not allocate internals."); + goto finish; + } + + set_exception_callable = PyObject_GetAttr(future, set_exception_str); + if (PyErr_Occurred()) { + ucxx_trace_req("Error getting future `set_exception` method."); + PyErr_Print(); + goto finish; + } + if (!PyCallable_Check(set_exception_callable)) { + PyErr_Format(PyExc_RuntimeError, + "%s.%s is not callable.", + PyUnicode_1BYTE_DATA(future), + PyUnicode_1BYTE_DATA(set_exception_str)); + goto finish; + } + + message_object = PyUnicode_FromString(message); + if (message_object == NULL) goto err; + message_tuple = PyTuple_Pack(1, message_object); + if (message_tuple == NULL) goto err; + formed_exception = PyObject_Call(exception, message_tuple, NULL); + if (formed_exception == NULL) goto err; + + result = PyObject_CallMethodObjArgs( + event_loop, call_soon_threadsafe_str, set_exception_callable, formed_exception, NULL); + if (PyErr_Occurred()) { + ucxx_trace_req("Error calling `call_soon_threadsafe` to set future exception."); + PyErr_Print(); + } + goto finish; + +err: + PyErr_Format(PyExc_RuntimeError, + "Error while forming exception for `asyncio.Future.set_exception`."); +finish: + Py_XDECREF(message_object); + Py_XDECREF(message_tuple); + Py_XDECREF(formed_exception); + Py_XDECREF(set_exception_callable); + PyGILState_Release(state); + return result; +} + } // namespace python } // namespace ucxx diff --git a/cpp/python/src/python_future.cpp b/cpp/python/src/python_future.cpp index 1a12fc65..cb50d052 100644 --- a/cpp/python/src/python_future.cpp +++ b/cpp/python/src/python_future.cpp @@ -17,11 +17,25 @@ namespace ucxx { namespace python { -Future::Future(std::shared_ptr<::ucxx::Notifier> notifier) : ::ucxx::Future(notifier) {} +Future::Future(PyObject* asyncioEventLoop, std::shared_ptr<::ucxx::Notifier> notifier) + : ::ucxx::Future(notifier), + _asyncioEventLoop(asyncioEventLoop), + _handle{asyncioEventLoop == nullptr ? create_python_future() + : create_python_future_with_event_loop(asyncioEventLoop)} +{ +} std::shared_ptr<::ucxx::Future> createFuture(std::shared_ptr<::ucxx::Notifier> notifier) { - return std::shared_ptr<::ucxx::Future>(new ::ucxx::python::Future(notifier)); + return std::shared_ptr<::ucxx::Future>(new ::ucxx::python::Future(nullptr, notifier)); +} + +std::shared_ptr<::ucxx::Future> createFutureWithEventLoop( + PyObject* asyncioEventLoop, std::shared_ptr<::ucxx::Notifier> notifier) +{ + if (asyncioEventLoop == nullptr) + throw std::runtime_error("The asyncio event loop cannot be a nullptr"); + return std::shared_ptr<::ucxx::Future>(new ::ucxx::python::Future(asyncioEventLoop, notifier)); } Future::~Future() @@ -39,11 +53,21 @@ void Future::set(ucs_status_t status) ucxx_trace_req( "Future::set() this: %p, _handle: %p, status: %s", this, _handle, ucs_status_string(status)); - if (status == UCS_OK) - future_set_result(_handle, Py_True); - else - future_set_exception( - _handle, get_python_exception_from_ucs_status(status), ucs_status_string(status)); + if (status == UCS_OK) { + if (_asyncioEventLoop == nullptr) + future_set_result(_handle, Py_True); + else + future_set_result_with_event_loop(_asyncioEventLoop, _handle, Py_True); + } else { + if (_asyncioEventLoop == nullptr) + future_set_exception( + _handle, get_python_exception_from_ucs_status(status), ucs_status_string(status)); + else + future_set_exception_with_event_loop(_asyncioEventLoop, + _handle, + get_python_exception_from_ucs_status(status), + ucs_status_string(status)); + } } void Future::notify(ucs_status_t status) diff --git a/cpp/python/src/python_future_task_collector.cpp b/cpp/python/src/python_future_task_collector.cpp new file mode 100644 index 00000000..d9d15305 --- /dev/null +++ b/cpp/python/src/python_future_task_collector.cpp @@ -0,0 +1,59 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include + +#include + +#include +#include + +namespace ucxx { + +namespace python { + +PythonFutureTaskCollector& PythonFutureTaskCollector::get() +{ + static PythonFutureTaskCollector collector; + return collector; +} + +void PythonFutureTaskCollector::push(PyObject* handle) +{ + std::lock_guard lock(_mutex); + _toCollect.push_back(handle); +} + +void PythonFutureTaskCollector::collect() +{ + PyGILState_STATE state = PyGILState_Ensure(); + + { + std::lock_guard lock(_mutex); + for (auto& handle : _toCollect) + Py_XDECREF(handle); + ucxx_trace("Collected %lu PythonFutureTasks", _toCollect.size()); + _toCollect.clear(); + } + + PyGILState_Release(state); +} + +PythonFutureTaskCollector::PythonFutureTaskCollector() {} + +PythonFutureTaskCollector::~PythonFutureTaskCollector() +{ + { + std::lock_guard lock(_mutex); + + if (_toCollect.size() > 0) + ucxx_warn("Destroying PythonFutureTaskCollector with %lu uncollected tasks", + _toCollect.size()); + } +} + +} // namespace python + +} // namespace ucxx diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 9d5a6e96..80e7bbc2 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -48,6 +48,7 @@ endif() rapids_cython_init() +add_subdirectory(ucxx/examples) add_subdirectory(ucxx/_lib) if(DEFINED cython_lib_dir) diff --git a/python/ucxx/examples/.clang-format b/python/ucxx/examples/.clang-format new file mode 100644 index 00000000..26b9a5bf --- /dev/null +++ b/python/ucxx/examples/.clang-format @@ -0,0 +1,155 @@ +--- +# Refer to the following link for the explanation of each params: +# http://releases.llvm.org/8.0.0/tools/clang/docs/ClangFormatStyleOptions.html +Language: Cpp +# BasedOnStyle: Google +AccessModifierOffset: -1 +AlignAfterOpenBracket: Align +AlignConsecutiveAssignments: true +AlignConsecutiveBitFields: true +AlignConsecutiveDeclarations: false +AlignConsecutiveMacros: true +AlignEscapedNewlines: Left +AlignOperands: true +AlignTrailingComments: true +AllowAllArgumentsOnNextLine: true +AllowAllConstructorInitializersOnNextLine: true +AllowAllParametersOfDeclarationOnNextLine: true +AllowShortBlocksOnASingleLine: true +AllowShortCaseLabelsOnASingleLine: true +AllowShortEnumsOnASingleLine: true +AllowShortFunctionsOnASingleLine: All +AllowShortIfStatementsOnASingleLine: true +AllowShortLambdasOnASingleLine: true +AllowShortLoopsOnASingleLine: false +# This is deprecated +AlwaysBreakAfterDefinitionReturnType: None +AlwaysBreakAfterReturnType: None +AlwaysBreakBeforeMultilineStrings: true +AlwaysBreakTemplateDeclarations: Yes +BinPackArguments: false +BinPackParameters: false +BraceWrapping: + AfterClass: false + AfterControlStatement: false + AfterEnum: false + AfterFunction: false + AfterNamespace: false + AfterObjCDeclaration: false + AfterStruct: false + AfterUnion: false + AfterExternBlock: false + BeforeCatch: false + BeforeElse: false + IndentBraces: false + # disabling the below splits, else, they'll just add to the vertical length of source files! + SplitEmptyFunction: false + SplitEmptyRecord: false + SplitEmptyNamespace: false +BreakAfterJavaFieldAnnotations: false +BreakBeforeBinaryOperators: None +BreakBeforeBraces: WebKit +BreakBeforeInheritanceComma: false +BreakBeforeTernaryOperators: true +BreakConstructorInitializersBeforeComma: false +BreakConstructorInitializers: BeforeColon +BreakInheritanceList: BeforeColon +BreakStringLiterals: true +ColumnLimit: 100 +CommentPragmas: '^ IWYU pragma:' +CompactNamespaces: false +ConstructorInitializerAllOnOneLineOrOnePerLine: true +# Kept the below 2 to be the same as `IndentWidth` to keep everything uniform +ConstructorInitializerIndentWidth: 2 +ContinuationIndentWidth: 2 +Cpp11BracedListStyle: true +DerivePointerAlignment: false +DisableFormat: false +ExperimentalAutoDetectBinPacking: false +FixNamespaceComments: true +ForEachMacros: + - foreach + - Q_FOREACH + - BOOST_FOREACH +IncludeBlocks: Preserve +IncludeIsMainRegex: '([-_](test|unittest))?$' +IndentCaseLabels: true +IndentPPDirectives: None +IndentWidth: 2 +IndentWrappedFunctionNames: false +JavaScriptQuotes: Leave +JavaScriptWrapImports: true +KeepEmptyLinesAtTheStartOfBlocks: false +MacroBlockBegin: '' +MacroBlockEnd: '' +MaxEmptyLinesToKeep: 1 +NamespaceIndentation: None +ObjCBinPackProtocolList: Never +ObjCBlockIndentWidth: 2 +ObjCSpaceAfterProperty: false +ObjCSpaceBeforeProtocolList: true +PenaltyBreakAssignment: 2 +PenaltyBreakBeforeFirstCallParameter: 1 +PenaltyBreakComment: 300 +PenaltyBreakFirstLessLess: 120 +PenaltyBreakString: 1000 +PenaltyBreakTemplateDeclaration: 10 +PenaltyExcessCharacter: 1000000 +PenaltyReturnTypeOnItsOwnLine: 200 +PointerAlignment: Left +RawStringFormats: + - Language: Cpp + Delimiters: + - cc + - CC + - cpp + - Cpp + - CPP + - 'c++' + - 'C++' + CanonicalDelimiter: '' + - Language: TextProto + Delimiters: + - pb + - PB + - proto + - PROTO + EnclosingFunctions: + - EqualsProto + - EquivToProto + - PARSE_PARTIAL_TEXT_PROTO + - PARSE_TEST_PROTO + - PARSE_TEXT_PROTO + - ParseTextOrDie + - ParseTextProtoOrDie + CanonicalDelimiter: '' + BasedOnStyle: google +# Enabling comment reflow causes doxygen comments to be messed up in their formats! +ReflowComments: true +SortIncludes: true +SortUsingDeclarations: true +SpaceAfterCStyleCast: false +SpaceAfterTemplateKeyword: true +SpaceBeforeAssignmentOperators: true +SpaceBeforeCpp11BracedList: false +SpaceBeforeCtorInitializerColon: true +SpaceBeforeInheritanceColon: true +SpaceBeforeParens: ControlStatements +SpaceBeforeRangeBasedForLoopColon: true +SpaceBeforeSquareBrackets: false +SpaceInEmptyBlock: false +SpaceInEmptyParentheses: false +SpacesBeforeTrailingComments: 2 +SpacesInAngles: false +SpacesInConditionalStatement: false +SpacesInContainerLiterals: true +SpacesInCStyleCastParentheses: false +SpacesInParentheses: false +SpacesInSquareBrackets: false +Standard: c++17 +StatementMacros: + - Q_UNUSED + - QT_REQUIRE_VERSION +# Be consistent with indent-width, even for people who use tab for indentation! +TabWidth: 2 +UseTab: Never diff --git a/python/ucxx/examples/CMakeLists.txt b/python/ucxx/examples/CMakeLists.txt new file mode 100644 index 00000000..b7c3868c --- /dev/null +++ b/python/ucxx/examples/CMakeLists.txt @@ -0,0 +1,15 @@ +# ================================================================================= +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: BSD 3-Clause License +# ================================================================================= + +set(cython_sources python_future_task_app.pyx) +set(linked_libraries ucxx::ucxx ucxx::python Python3::Python) + +rapids_cython_create_modules( + CXX + SOURCE_FILES "${cython_sources}" + LINKED_LIBRARIES "${linked_libraries}" +) + +target_include_directories(python_future_task_app PRIVATE ".") diff --git a/python/ucxx/examples/CPPLINT.cfg b/python/ucxx/examples/CPPLINT.cfg new file mode 100644 index 00000000..711c7e41 --- /dev/null +++ b/python/ucxx/examples/CPPLINT.cfg @@ -0,0 +1,2 @@ +filter=+build/include_what_you_use,-build/c++11,-build/header_guard,-build/include_order,-readability/todo,-whitespace +linelength=100 diff --git a/python/ucxx/examples/__init__.pxd b/python/ucxx/examples/__init__.pxd new file mode 100644 index 00000000..43a3c4f2 --- /dev/null +++ b/python/ucxx/examples/__init__.pxd @@ -0,0 +1,4 @@ +# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: BSD-3-Clause + +# cython: language_level=3 diff --git a/python/ucxx/examples/__init__.py b/python/ucxx/examples/__init__.py new file mode 100644 index 00000000..90c81587 --- /dev/null +++ b/python/ucxx/examples/__init__.py @@ -0,0 +1,2 @@ +# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: BSD-3-Clause diff --git a/python/ucxx/examples/python_future_task.h b/python/ucxx/examples/python_future_task.h new file mode 100644 index 00000000..6e8974c0 --- /dev/null +++ b/python/ucxx/examples/python_future_task.h @@ -0,0 +1,156 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: BSD-3-Clause + */ +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace ucxx { + +namespace python_future_task { + +typedef ucxx::python::PythonFutureTask PythonFutureTask; +typedef std::vector FuturePool; +typedef std::shared_ptr FuturePoolPtr; + +class ApplicationThread { + private: + std::thread _thread{}; ///< Thread object + bool _stop{false}; ///< Signal to stop on next iteration + + public: + ApplicationThread(PyObject* asyncioEventLoop, + std::shared_ptr incomingPoolMutex, + FuturePoolPtr incomingPool) + { + ucxx_warn("Starting application thread"); + _thread = std::thread(ApplicationThread::progressUntilSync, + asyncioEventLoop, + incomingPoolMutex, + incomingPool, + std::ref(_stop)); + } + + ~ApplicationThread() + { + ucxx_warn("~ApplicationThread"); + if (!_thread.joinable()) { + ucxx_warn("Application thread not running or already stopped"); + return; + } + + _stop = true; + _thread.join(); + } + + static void submit(std::shared_ptr incomingPoolMutex, + FuturePoolPtr incomingPool, + FuturePoolPtr processingPool) + { + // ucxx_warn("Application submitting %lu tasks", incomingPool->size()); + std::lock_guard lock(*incomingPoolMutex); + for (auto it = incomingPool->begin(); it != incomingPool->end();) { + auto& task = *it; + processingPool->push_back(std::move(task)); + it = incomingPool->erase(it); + } + } + + static void processLoop(FuturePoolPtr processingPool) + { + // ucxx_warn("Processing %lu tasks", processingPool->size()); + while (!processingPool->empty()) { + for (auto it = processingPool->begin(); it != processingPool->end();) { + auto& task = *it; + auto& future = task.getFuture(); + + // 10 ms + std::future_status status = future.wait_for(std::chrono::duration(0.01)); + if (status == std::future_status::ready) { + ucxx_warn("Task %llu ready", future.get()); + it = processingPool->erase(it); + continue; + } + + ++it; + } + } + } + + static void progressUntilSync(PyObject* asyncioEventLoop, + std::shared_ptr incomingPoolMutex, + FuturePoolPtr incomingPool, + const bool& stop) + { + ucxx_warn("Application thread started"); + auto processingPool = std::make_shared(); + while (!stop) { + // ucxx_warn("Application thread loop"); + ApplicationThread::submit(incomingPoolMutex, incomingPool, processingPool); + ApplicationThread::processLoop(processingPool); + } + } +}; + +class Application { + private: + std::unique_ptr _thread{nullptr}; ///< The progress thread object + std::shared_ptr _incomingPoolMutex{ + std::make_shared()}; ///< Mutex to access the Python futures pool + FuturePoolPtr _incomingPool{std::make_shared()}; ///< Incoming task pool + PyObject* _asyncioEventLoop{nullptr}; + + public: + explicit Application(PyObject* asyncioEventLoop) : _asyncioEventLoop(asyncioEventLoop) + { + ucxx::parseLogLevel(); + + ucxx_warn("Launching application"); + + _thread = + std::make_unique(_asyncioEventLoop, _incomingPoolMutex, _incomingPool); + } + + ~Application() { ucxx::python::PythonFutureTaskCollector::get().collect(); } + + PyObject* submit(double duration = 1.0, size_t id = 0) + { + ucxx_warn("Submitting task with id: %llu, duration: %f", id, duration); + auto task = ucxx::python::PythonFutureTask( + std::packaged_task([duration, id]() { + ucxx_warn("Task with id %llu sleeping for %f", id, duration); + // Seems like _GLIBCXX_NO_SLEEP or _GLIBCXX_USE_NANOSLEEP is defined + // std::this_thread::sleep_for(std::chrono::duration(duration)); + ::usleep(size_t(duration * 1e6)); + return id; + }), + PyLong_FromSize_t, + _asyncioEventLoop); + + { + std::lock_guard lock(*_incomingPoolMutex); + auto handle = task.getHandle(); + _incomingPool->push_back(std::move(task)); + return handle; + } + } +}; + +} // namespace python_future_task + +} // namespace ucxx diff --git a/python/ucxx/examples/python_future_task_api.pxd b/python/ucxx/examples/python_future_task_api.pxd new file mode 100644 index 00000000..8e23c4e7 --- /dev/null +++ b/python/ucxx/examples/python_future_task_api.pxd @@ -0,0 +1,20 @@ +# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: BSD-3-Clause + +# distutils: language = c++ +# cython: language_level=3 + + +cdef extern from "Python.h" nogil: + ctypedef struct PyObject + + +cdef extern from "" namespace "ucxx::python" nogil: + cdef void raise_py_error() + + +cdef extern from "python_future_task.h" namespace "ucxx::python_future_task" nogil: + cdef cppclass Application: + Application(PyObject* asyncio_event_loop) + PyObject* submit(double duration, long long id) + void* getFuture() except +raise_py_error diff --git a/python/ucxx/examples/python_future_task_app.pyx b/python/ucxx/examples/python_future_task_app.pyx new file mode 100644 index 00000000..2e856ea0 --- /dev/null +++ b/python/ucxx/examples/python_future_task_app.pyx @@ -0,0 +1,32 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: BSD-3-Clause + +# distutils: language = c++ +# cython: language_level=3 + +from cpython.ref cimport PyObject +from libcpp.memory cimport make_unique, unique_ptr +from libcpp.utility cimport move + +from . cimport python_future_task_api +from .python_future_task_api cimport * + + +cdef class PythonFutureTaskApplication(): + cdef unique_ptr[Application] _application + + def __init__(self, asyncio_event_loop): + cdef PyObject* asyncio_event_loop_ptr = asyncio_event_loop + + with nogil: + self._application = move(make_unique[Application](asyncio_event_loop_ptr)) + + def submit(self, duration=1.0, id=0): + cdef double cpp_duration = duration + cdef long long cpp_id = id + cdef PyObject* future_ptr + + with nogil: + future_ptr = self._application.get().submit(cpp_duration, cpp_id) + + return future_ptr diff --git a/python/ucxx/examples/python_future_task_example.py b/python/ucxx/examples/python_future_task_example.py new file mode 100644 index 00000000..8e16af85 --- /dev/null +++ b/python/ucxx/examples/python_future_task_example.py @@ -0,0 +1,35 @@ +import asyncio +from random import random +from typing import List + +from ucxx.examples.python_future_task_app import PythonFutureTaskApplication + + +def submit_task( + cpp_app: PythonFutureTaskApplication, + num_tasks: int = 10, + max_task_duration: float = 1.0, +) -> List[asyncio.Future]: + return [ + cpp_app.submit(duration=random() * max_task_duration, id=t) + for t in range(num_tasks) + ] + + +async def main(): + cpp_app = PythonFutureTaskApplication(asyncio.get_running_loop()) + num_tasks = 10 + max_task_duration = 3.0 + + tasks = submit_task( + cpp_app=cpp_app, num_tasks=num_tasks, max_task_duration=max_task_duration + ) + print("Tasks submitted") + results = await asyncio.gather(*tasks) + print(f"Future {results=}", flush=True) + assert all(got == expected for got, expected in zip(results, range(num_tasks))) + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + loop.run_until_complete(main())