Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-0.32' into python-futur…
Browse files Browse the repository at this point in the history
…e-task
  • Loading branch information
pentschev committed May 5, 2023
2 parents 4e7c06d + fa70a3b commit 946bc61
Show file tree
Hide file tree
Showing 38 changed files with 302 additions and 400 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ concurrency:
jobs:
conda-cpp-build:
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/conda-cpp-build.yaml@branch-23.04
uses: rapidsai/shared-action-workflows/.github/workflows/conda-cpp-build.yaml@branch-23.06
with:
build_type: ${{ inputs.build_type || 'branch' }}
branch: ${{ inputs.branch }}
Expand All @@ -38,7 +38,7 @@ jobs:
upload-conda:
needs: [conda-cpp-build]
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/conda-upload-packages.yaml@branch-23.04
uses: rapidsai/shared-action-workflows/.github/workflows/conda-upload-packages.yaml@branch-23.06
with:
build_type: ${{ inputs.build_type || 'branch' }}
branch: ${{ inputs.branch }}
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,30 @@ jobs:
- conda-cpp-tests
- conda-python-tests
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/pr-builder.yaml@branch-23.04
uses: rapidsai/shared-action-workflows/.github/workflows/pr-builder.yaml@branch-23.06
checks:
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/checks.yaml@branch-23.04
uses: rapidsai/shared-action-workflows/.github/workflows/checks.yaml@branch-23.06
with:
enable_check_generated_files: false
conda-cpp-build:
needs: checks
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/conda-cpp-build.yaml@branch-23.04
uses: rapidsai/shared-action-workflows/.github/workflows/conda-cpp-build.yaml@branch-23.06
with:
build_type: pull-request
matrix_filter: map(select(.ARCH != "arm64"))
conda-cpp-tests:
needs: conda-cpp-build
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/conda-cpp-tests.yaml@branch-23.04
uses: rapidsai/shared-action-workflows/.github/workflows/conda-cpp-tests.yaml@branch-23.06
with:
build_type: pull-request
matrix_filter: map(select(.ARCH != "arm64"))
conda-python-tests:
needs: conda-cpp-build
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.04
uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.06
with:
build_type: pull-request
matrix_filter: map(select(.ARCH != "arm64"))
4 changes: 2 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ on:
jobs:
conda-cpp-tests:
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/conda-cpp-tests.yaml@branch-23.04
uses: rapidsai/shared-action-workflows/.github/workflows/conda-cpp-tests.yaml@branch-23.06
with:
build_type: nightly
branch: ${{ inputs.branch }}
Expand All @@ -25,7 +25,7 @@ jobs:
matrix_filter: map(select(.ARCH != "arm64"))
conda-python-tests:
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.04
uses: rapidsai/shared-action-workflows/.github/workflows/conda-python-tests.yaml@branch-23.06
with:
build_type: nightly
branch: ${{ inputs.branch }}
Expand Down
4 changes: 3 additions & 1 deletion build_and_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ run_py_benchmark() {
}

if [[ $RUN_CPP_TESTS != 0 ]]; then
${BINARY_PATH}/gtests/libucxx/UCXX_TEST
# UCX_TCP_CM_REUSEADDR=y to be able to bind immediately to the same port before
# `TIME_WAIT` timeout
UCX_TCP_CM_REUSEADDR=y ${BINARY_PATH}/gtests/libucxx/UCXX_TEST
fi
if [[ $RUN_CPP_BENCH != 0 ]]; then
# run_cpp_benchmark PROGRESS_MODE
Expand Down
30 changes: 18 additions & 12 deletions ci/release/update-version.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash
########################
# ucxx Version Updater #
# UCXX Version Updater #
########################

## Usage
Expand All @@ -11,13 +11,6 @@
# Example: 0.30.00
NEXT_FULL_TAG=$1

# Get current version
CURRENT_TAG=$(git tag | grep -xE 'v[0-9\.]+' | sort --version-sort | tail -n 1 | tr -d 'v')
CURRENT_MAJOR=$(echo $CURRENT_TAG | awk '{split($0, a, "."); print a[1]}')
CURRENT_MINOR=$(echo $CURRENT_TAG | awk '{split($0, a, "."); print a[2]}')
CURRENT_PATCH=$(echo $CURRENT_TAG | awk '{split($0, a, "."); print a[3]}')
CURRENT_SHORT_TAG=${CURRENT_MAJOR}.${CURRENT_MINOR}

#Get <major>.<minor> for next version
NEXT_MAJOR=$(echo $NEXT_FULL_TAG | awk '{split($0, a, "."); print a[1]}')
NEXT_MINOR=$(echo $NEXT_FULL_TAG | awk '{split($0, a, "."); print a[2]}')
Expand All @@ -26,17 +19,30 @@ NEXT_SHORT_TAG=${NEXT_MAJOR}.${NEXT_MINOR}
# Get RAPIDS version associated w/ ucx-py version
NEXT_RAPIDS_VERSION="$(curl -sL https://version.gpuci.io/ucx-py/${NEXT_SHORT_TAG})"

# Need to distutils-normalize the versions for some use cases
NEXT_SHORT_TAG_PEP440=$(python -c "from setuptools.extern import packaging; print(packaging.version.Version('${NEXT_RAPIDS_VERSION}'))")
echo "Next tag is ${NEXT_SHORT_TAG_PEP440}"

echo "Preparing release $CURRENT_TAG => $NEXT_FULL_TAG"
echo "Preparing release: $NEXT_FULL_TAG"

# Inplace sed replace; workaround for Linux and Mac
function sed_runner() {
sed -i.bak ''"$1"'' $2 && rm -f ${2}.bak
}

sed_runner "s/cudf=.*/cudf=${NEXT_RAPIDS_VERSION}/g" dependencies.yaml
sed_runner "s/rmm=.*/rmm=${NEXT_RAPIDS_VERSION}/g" dependencies.yaml
sed_runner "s/rmm =.*/rmm =${NEXT_RAPIDS_VERSION}/g" conda/recipes/ucxx/conda_build_config.yaml
# bump RAPIDS libs
sed_runner "/- rmm =/ s/=.*/=${NEXT_RAPIDS_VERSION}/g" conda/recipes/ucxx/meta.yaml
for FILE in conda/environments/*.yaml dependencies.yaml; do
sed_runner "/- cuda==/ s/==.*/==${NEXT_SHORT_TAG_PEP440}\.*/g" ${FILE};
sed_runner "/- cudf==/ s/==.*/==${NEXT_SHORT_TAG_PEP440}\.*/g" ${FILE};
sed_runner "/- dask-cuda==/ s/==.*/==${NEXT_SHORT_TAG_PEP440}\.*/g" ${FILE};
sed_runner "/- dask-cudf==/ s/==.*/==${NEXT_SHORT_TAG_PEP440}\.*/g" ${FILE};
sed_runner "/- librmm==/ s/==.*/==${NEXT_SHORT_TAG_PEP440}\.*/g" ${FILE};
sed_runner "/- rmm==/ s/==.*/==${NEXT_SHORT_TAG_PEP440}\.*/g" ${FILE};
done

# rapids-cmake version
sed_runner 's/'"branch-.*\/RAPIDS.cmake"'/'"branch-${NEXT_RAPIDS_VERSION}\/RAPIDS.cmake"'/g' fetch_rapids.cmake

for FILE in .github/workflows/*.yaml; do
sed_runner "/shared-action-workflows/ s/@.*/@branch-${NEXT_RAPIDS_VERSION}/g" "${FILE}"
Expand Down
4 changes: 2 additions & 2 deletions ci/test_cpp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ print_system_stats
BINARY_PATH=${CONDA_PREFIX}/bin

run_tests() {
CMD_LINE="UCX_TCP_CM_REUSEADDR=y timeout 10m ${BINARY_PATH}/gtests/libucxx/UCXX_TEST --gtest_filter=-*DelayedSubmission*ProgressTagMulti*:ListenerTest.CloseCallback:ListenerTest.IsAlive:ListenerTest.RaiseOnError"
CMD_LINE="UCX_TCP_CM_REUSEADDR=y timeout 10m ${BINARY_PATH}/gtests/libucxx/UCXX_TEST --gtest_filter=-ListenerTest.CloseCallback:ListenerTest.IsAlive:ListenerTest.RaiseOnError"

rapids-logger "Running: \n - ${CMD_LINE}"

UCX_TCP_CM_REUSEADDR=y timeout 10m ${BINARY_PATH}/gtests/libucxx/UCXX_TEST --gtest_filter=-*DelayedSubmission*ProgressTagMulti*:ListenerTest.CloseCallback:ListenerTest.IsAlive:ListenerTest.RaiseOnError
UCX_TCP_CM_REUSEADDR=y timeout 10m ${BINARY_PATH}/gtests/libucxx/UCXX_TEST --gtest_filter=-ListenerTest.CloseCallback:ListenerTest.IsAlive:ListenerTest.RaiseOnError
}

run_benchmark() {
Expand Down
12 changes: 7 additions & 5 deletions ci/test_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,11 @@ run_py_benchmark ucxx-core thread 0 0
run_py_benchmark ucxx-core thread 1 0 0 1 0

for nbuf in 1 8; do
# run_py_benchmark BACKEND PROGRESS_MODE ASYNCIO_WAIT ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE NBUFFERS SLOW
run_py_benchmark ucxx-async thread 0 0 0 ${nbuf} 0
run_py_benchmark ucxx-async thread 0 0 1 ${nbuf} 0
run_py_benchmark ucxx-async thread 0 1 0 ${nbuf} 0
run_py_benchmark ucxx-async thread 0 1 1 ${nbuf} 0
if [[ ! $RAPIDS_CUDA_VERSION =~ 11.2.* ]]; then
# run_py_benchmark BACKEND PROGRESS_MODE ASYNCIO_WAIT ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE NBUFFERS SLOW
run_py_benchmark ucxx-async thread 0 0 0 ${nbuf} 0
run_py_benchmark ucxx-async thread 0 0 1 ${nbuf} 0
run_py_benchmark ucxx-async thread 0 1 0 ${nbuf} 0
run_py_benchmark ucxx-async thread 0 1 1 ${nbuf} 0
fi
done
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ channels:
- nvidia
dependencies:
# Base
- python=3.8
- python=3.9
- cudatoolkit=11.8
- pip
# RAPIDS
- cudf=23.04
- dask-cudf=23.04
- dask-cuda=23.04
- cudf==23.6.*
- dask-cuda==23.6.*
- dask-cudf==23.6.*
- librmm==23.6.*
- rmm==23.6.*
- ucx
# UCX Build
- libtool
Expand Down
5 changes: 4 additions & 1 deletion conda/recipes/ucxx/conda_build_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,8 @@ cmake:
- ">=3.23.1,!=3.25.0"

python:
- 3.8
- 3.9
- 3.10

ucx:
- ">=1.13.0,<1.15.0"
3 changes: 2 additions & 1 deletion conda/recipes/ucxx/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ build:
- SCCACHE_S3_KEY_PREFIX=libucxx-aarch64 # [aarch64]
- SCCACHE_S3_KEY_PREFIX=libucxx-linux64 # [linux64]
- SCCACHE_S3_USE_SSL
- SCCACHE_S3_NO_CREDENTIALS

requirements:
build:
Expand Down Expand Up @@ -159,7 +160,7 @@ outputs:
- numpy 1.21
- {{ pin_subpackage('libucxx', exact=True) }}
- ucx
- rmm =23.04.*
- rmm =23.06
run:
- python * *_cpython
- packaging
Expand Down
2 changes: 1 addition & 1 deletion cpp/benchmarks/perftest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ std::function<void()> getProgressFunction(std::shared_ptr<ucxx::Worker> worker,
switch (progressMode) {
case ProgressMode::Polling: return std::bind(std::mem_fn(&ucxx::Worker::progress), worker);
case ProgressMode::Blocking:
return std::bind(std::mem_fn(&ucxx::Worker::progressWorkerEvent), worker);
return std::bind(std::mem_fn(&ucxx::Worker::progressWorkerEvent), worker, -1);
case ProgressMode::Wait: return std::bind(std::mem_fn(&ucxx::Worker::waitProgress), worker);
default: return []() {};
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/examples/basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ std::function<void()> getProgressFunction(std::shared_ptr<ucxx::Worker> worker,
switch (progressMode) {
case ProgressMode::Polling: return std::bind(std::mem_fn(&ucxx::Worker::progress), worker);
case ProgressMode::Blocking:
return std::bind(std::mem_fn(&ucxx::Worker::progressWorkerEvent), worker);
return std::bind(std::mem_fn(&ucxx::Worker::progressWorkerEvent), worker, -1);
case ProgressMode::Wait: return std::bind(std::mem_fn(&ucxx::Worker::waitProgress), worker);
default: return []() {};
}
Expand Down
17 changes: 8 additions & 9 deletions cpp/include/ucxx/constructors.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,14 @@ std::shared_ptr<RequestStream> createRequestStream(std::shared_ptr<Endpoint> end
size_t length,
const bool enablePythonFuture);

std::shared_ptr<RequestTag> createRequestTag(
std::shared_ptr<Component> endpointOrWorker,
bool send,
void* buffer,
size_t length,
ucp_tag_t tag,
const bool enablePythonFuture,
std::function<void(std::shared_ptr<void>)> callbackFunction,
std::shared_ptr<void> callbackData);
std::shared_ptr<RequestTag> createRequestTag(std::shared_ptr<Component> endpointOrWorker,
bool send,
void* buffer,
size_t length,
ucp_tag_t tag,
const bool enablePythonFuture,
RequestCallbackUserFunction callbackFunction,
RequestCallbackUserData callbackData);

std::shared_ptr<RequestTagMulti> createRequestTagMultiSend(std::shared_ptr<Endpoint> endpoint,
const std::vector<void*>& buffer,
Expand Down
28 changes: 13 additions & 15 deletions cpp/include/ucxx/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Endpoint : public Component {
* @param[in] endpointErrorHandling whether to enable endpoint error handling.
*/
Endpoint(std::shared_ptr<Component> workerOrListener,
std::unique_ptr<ucp_ep_params_t, EpParamsDeleter> params,
ucp_ep_params_t* params,
bool endpointErrorHandling);

/**
Expand Down Expand Up @@ -319,13 +319,12 @@ class Endpoint : public Component {
*
* @returns Request to be subsequently checked for the completion and its state.
*/
std::shared_ptr<Request> tagSend(
void* buffer,
size_t length,
ucp_tag_t tag,
const bool enablePythonFuture = false,
std::function<void(std::shared_ptr<void>)> callbackFunction = nullptr,
std::shared_ptr<void> callbackData = nullptr);
std::shared_ptr<Request> tagSend(void* buffer,
size_t length,
ucp_tag_t tag,
const bool enablePythonFuture = false,
RequestCallbackUserFunction callbackFunction = nullptr,
RequestCallbackUserData callbackData = nullptr);

/**
* @brief Enqueue a tag receive operation.
Expand All @@ -350,13 +349,12 @@ class Endpoint : public Component {
*
* @returns Request to be subsequently checked for the completion and its state.
*/
std::shared_ptr<Request> tagRecv(
void* buffer,
size_t length,
ucp_tag_t tag,
const bool enablePythonFuture = false,
std::function<void(std::shared_ptr<void>)> callbackFunction = nullptr,
std::shared_ptr<void> callbackData = nullptr);
std::shared_ptr<Request> tagRecv(void* buffer,
size_t length,
ucp_tag_t tag,
const bool enablePythonFuture = false,
RequestCallbackUserFunction callbackFunction = nullptr,
RequestCallbackUserData callbackData = nullptr);

/**
* @brief Enqueue a multi-buffer tag send operation.
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/ucxx/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class Request : public Component {
std::string _status_msg{}; ///< Human-readable status message
void* _request{nullptr}; ///< Pointer to UCP request
std::shared_ptr<Future> _future{nullptr}; ///< Future to notify upon completion
std::function<void(std::shared_ptr<void>)> _callback{nullptr}; ///< Completion callback
std::shared_ptr<void> _callbackData{nullptr}; ///< Completion callback data
RequestCallbackUserFunction _callback{nullptr}; ///< Completion callback
RequestCallbackUserData _callbackData{nullptr}; ///< Completion callback data
std::shared_ptr<Worker> _worker{
nullptr}; ///< Worker that generated request (if not from endpoint)
std::shared_ptr<Endpoint> _endpoint{
Expand Down
23 changes: 11 additions & 12 deletions cpp/include/ucxx/request_tag.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ class RequestTag : public Request {
void* buffer,
size_t length,
ucp_tag_t tag,
const bool enablePythonFuture = false,
std::function<void(std::shared_ptr<void>)> callbackFunction = nullptr,
std::shared_ptr<void> callbackData = nullptr);
const bool enablePythonFuture = false,
RequestCallbackUserFunction callbackFunction = nullptr,
RequestCallbackUserData callbackData = nullptr);

public:
/**
Expand Down Expand Up @@ -85,15 +85,14 @@ class RequestTag : public Request {
*
* @returns The `shared_ptr<ucxx::RequestTag>` object
*/
friend std::shared_ptr<RequestTag> createRequestTag(
std::shared_ptr<Component> endpointOrWorker,
bool send,
void* buffer,
size_t length,
ucp_tag_t tag,
const bool enablePythonFuture,
std::function<void(std::shared_ptr<void>)> callbackFunction,
std::shared_ptr<void> callbackData);
friend std::shared_ptr<RequestTag> createRequestTag(std::shared_ptr<Component> endpointOrWorker,
bool send,
void* buffer,
size_t length,
ucp_tag_t tag,
const bool enablePythonFuture,
RequestCallbackUserFunction callbackFunction,
RequestCallbackUserData callbackData);

virtual void populateDelayedSubmission();

Expand Down
8 changes: 6 additions & 2 deletions cpp/include/ucxx/request_tag_multi.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,10 @@ class RequestTagMulti : public std::enable_shared_from_this<RequestTagMulti> {
* which will be later used to evaluate if all frames completed and set the final status
* of the multi-transfer request and the Python future, if enabled.
*
* @param[in] status the status of the request being completed.
* @param[in] request the `ucxx::BufferRequest` object containing a single tag .
*/
void markCompleted(std::shared_ptr<void> request);
void markCompleted(ucs_status_t status, RequestCallbackUserData request);

/**
* @brief Callback to submit request to receive new header or frames.
Expand All @@ -231,9 +232,12 @@ class RequestTagMulti : public std::enable_shared_from_this<RequestTagMulti> {
* containing the `next` flag set, then the next request is another header. Otherwise, the
* next incoming message(s) is(are) frame(s).
*
* When called, the callback receives a single argument, the status of the current request.
*
* @param[in] status the status of the request being completed.
* @throws std::runtime_error if called by a send request.
*/
void callback();
void callback(ucs_status_t status);

/**
* @brief Return the status of the request.
Expand Down
4 changes: 4 additions & 0 deletions cpp/include/ucxx/typedefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#pragma once

#include <functional>
#include <memory>
#include <string>
#include <unordered_map>

Expand Down Expand Up @@ -32,4 +33,7 @@ typedef enum {

typedef std::unordered_map<std::string, std::string> ConfigMap;

typedef std::function<void(ucs_status_t, std::shared_ptr<void>)> RequestCallbackUserFunction;
typedef std::shared_ptr<void> RequestCallbackUserData;

} // namespace ucxx
Loading

0 comments on commit 946bc61

Please sign in to comment.