From 23603d16804f13f16ed4cb2c45831836e080017e Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Thu, 6 Jan 2022 12:19:44 -0500 Subject: [PATCH] custreamz oauth callback for kafka (librdkafka) (#9486) Previously it was impossible to use custreamz with oauth enabled Kafka brokers. This PR adds a feature so that the user can supply a Python function which is invoked to get the oauth token, from a http endpoint for example, and then supply that token to librdkafka to be used in both the initial connection to kafka and also subsequently as the token becomes stale. This closes #9410 Authors: - Jeremy Dyer (https://github.com/jdye64) - AJ Schmidt (https://github.com/ajschmidt8) Approvers: - Robert Maynard (https://github.com/robertmaynard) - Vyas Ramasubramani (https://github.com/vyasr) - Ram (Ramakrishna Prabhu) (https://github.com/rgsl888prabhu) - AJ Schmidt (https://github.com/ajschmidt8) URL: https://github.com/rapidsai/cudf/pull/9486 --- ci/gpu/build.sh | 2 +- conda/recipes/cudf_kafka/build.sh | 2 +- conda/recipes/cudf_kafka/meta.yaml | 15 ++-- conda/recipes/custreamz/build.sh | 2 +- conda/recipes/custreamz/meta.yaml | 18 ++--- conda/recipes/libcudf_kafka/build.sh | 2 +- conda/recipes/libcudf_kafka/meta.yaml | 4 +- cpp/libcudf_kafka/CMakeLists.txt | 13 +++- .../cmake/thirdparty/get_cudf.cmake | 2 +- .../cmake/thirdparty/get_rdkafka.cmake | 2 +- .../include/cudf_kafka/kafka_callback.hpp | 71 ++++++++++++++++++ .../include/cudf_kafka/kafka_consumer.hpp | 32 ++++++-- cpp/libcudf_kafka/src/kafka_callback.cpp | 48 ++++++++++++ cpp/libcudf_kafka/src/kafka_consumer.cpp | 46 +++++++++--- cpp/libcudf_kafka/tests/CMakeLists.txt | 7 +- .../tests/kafka_consumer_tests.cpp | 36 ++++++--- .../cudf_kafka/cudf_kafka/_lib/.kafka.pxd.swo | Bin 0 -> 12288 bytes python/cudf_kafka/cudf_kafka/_lib/kafka.pxd | 12 ++- python/cudf_kafka/cudf_kafka/_lib/kafka.pyx | 36 +++++++-- python/custreamz/custreamz/kafka.py | 17 ++--- .../custreamz/custreamz/tests/test_kafka.py | 5 +- 21 files changed, 295 insertions(+), 77 deletions(-) create mode 100644 cpp/libcudf_kafka/include/cudf_kafka/kafka_callback.hpp create mode 100644 cpp/libcudf_kafka/src/kafka_callback.cpp create mode 100644 python/cudf_kafka/cudf_kafka/_lib/.kafka.pxd.swo diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index a557a2ef066..4ac2fe79bf6 100755 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -89,7 +89,7 @@ gpuci_mamba_retry install -y \ "ucx-py=${UCX_PY_VERSION}" # https://docs.rapids.ai/maintainers/depmgmt/ -# gpuci_mamba_retry remove --force rapids-build-env rapids-notebook-env +# gpuci_conda_retry remove --force rapids-build-env rapids-notebook-env # gpuci_mamba_retry install -y "your-pkg=1.0.0" diff --git a/conda/recipes/cudf_kafka/build.sh b/conda/recipes/cudf_kafka/build.sh index 3db559c144d..5d8720f1c98 100644 --- a/conda/recipes/cudf_kafka/build.sh +++ b/conda/recipes/cudf_kafka/build.sh @@ -1,4 +1,4 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # This assumes the script is executed from the root of the repo directory ./build.sh -v cudf_kafka diff --git a/conda/recipes/cudf_kafka/meta.yaml b/conda/recipes/cudf_kafka/meta.yaml index e450d306cbe..d434e53c9b1 100644 --- a/conda/recipes/cudf_kafka/meta.yaml +++ b/conda/recipes/cudf_kafka/meta.yaml @@ -1,9 +1,9 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. {% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') + environ.get('VERSION_SUFFIX', '') %} {% set minor_version = version.split('.')[0] + '.' + version.split('.')[1] %} -{% set py_version=environ.get('CONDA_PY', 36) %} -{% set cuda_version='.'.join(environ.get('CUDA', '11.5').split('.')[:2]) %} +{% set cuda_version = '.'.join(environ.get('CUDA', '11.5').split('.')[:2]) %} +{% set py_version = environ.get('python', '3.8') %} package: name: cudf_kafka @@ -14,7 +14,7 @@ source: build: number: {{ GIT_DESCRIBE_NUMBER }} - string: py{{ py_version }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }} + string: py{{ py_version.replace('.', '') }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }} script_env: - CC - CXX @@ -26,14 +26,15 @@ requirements: build: - cmake >=3.20.1 host: - - python + - python {{ py_version }} - cython >=0.29,<0.30 - - setuptools - cudf {{ version }} - libcudf_kafka {{ version }} + - setuptools run: + - python {{ py_version }} - libcudf_kafka {{ version }} - - python-confluent-kafka + - python-confluent-kafka >=1.7.0,<1.8.0a0=py{{ py_version.replace('.', '') }}* - cudf {{ version }} test: # [linux64] diff --git a/conda/recipes/custreamz/build.sh b/conda/recipes/custreamz/build.sh index 6ce9e4f21a9..88fccf90c69 100644 --- a/conda/recipes/custreamz/build.sh +++ b/conda/recipes/custreamz/build.sh @@ -1,4 +1,4 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # This assumes the script is executed from the root of the repo directory ./build.sh -v custreamz diff --git a/conda/recipes/custreamz/meta.yaml b/conda/recipes/custreamz/meta.yaml index a8b096d4892..73f4727b70b 100644 --- a/conda/recipes/custreamz/meta.yaml +++ b/conda/recipes/custreamz/meta.yaml @@ -1,9 +1,9 @@ -# Copyright (c) 2018-2019, NVIDIA CORPORATION. +# Copyright (c) 2018-2022, NVIDIA CORPORATION. {% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') + environ.get('VERSION_SUFFIX', '') %} {% set minor_version = version.split('.')[0] + '.' + version.split('.')[1] %} -{% set py_version=environ.get('CONDA_PY', 36) %} -{% set cuda_version='.'.join(environ.get('CUDA', '11.5').split('.')[:2]) %} +{% set cuda_version = '.'.join(environ.get('CUDA', '11.5').split('.')[:2]) %} +{% set py_version = environ.get('python', '3.8') %} package: name: custreamz @@ -14,7 +14,7 @@ source: build: number: {{ GIT_DESCRIBE_NUMBER }} - string: py{{ py_version }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }} + string: py{{ py_version.replace('.', '') }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }} script_env: - VERSION_SUFFIX - PARALLEL_LEVEL @@ -24,16 +24,16 @@ build: requirements: host: - - python - - python-confluent-kafka + - python {{ py_version }} + - python-confluent-kafka >=1.7.0,<1.8.0a0=py{{ py_version.replace('.', '') }}* - cudf_kafka {{ version }} run: - - python - - streamz + - python {{ py_version }} + - streamz - cudf {{ version }} - dask>=2021.11.1,<=2021.11.2 - distributed>=2021.11.1,<=2021.11.2 - - python-confluent-kafka + - python-confluent-kafka >=1.7.0,<1.8.0a0=py{{ py_version.replace('.', '') }}* - cudf_kafka {{ version }} test: # [linux64] diff --git a/conda/recipes/libcudf_kafka/build.sh b/conda/recipes/libcudf_kafka/build.sh index cbe4584cb63..b656f55a64e 100644 --- a/conda/recipes/libcudf_kafka/build.sh +++ b/conda/recipes/libcudf_kafka/build.sh @@ -1,4 +1,4 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. if [[ -z "$PROJECT_FLASH" || "$PROJECT_FLASH" == "0" ]]; then # This assumes the script is executed from the root of the repo directory diff --git a/conda/recipes/libcudf_kafka/meta.yaml b/conda/recipes/libcudf_kafka/meta.yaml index 6b15890e7c7..0b274f3a41d 100644 --- a/conda/recipes/libcudf_kafka/meta.yaml +++ b/conda/recipes/libcudf_kafka/meta.yaml @@ -1,4 +1,4 @@ -# Copyright (c) 2018-2021, NVIDIA CORPORATION. +# Copyright (c) 2018-2022, NVIDIA CORPORATION. {% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') + environ.get('VERSION_SUFFIX', '') %} {% set minor_version = version.split('.')[0] + '.' + version.split('.')[1] %} @@ -26,7 +26,7 @@ requirements: - cmake >=3.20.1 host: - libcudf {{version}} - - librdkafka >=1.6.0,<1.7.0a0 + - librdkafka >=1.7.0,<1.8.0a0 run: - {{ pin_compatible('librdkafka', max_pin='x.x') }} #TODO: librdkafka should be automatically included here by run_exports but is not diff --git a/cpp/libcudf_kafka/CMakeLists.txt b/cpp/libcudf_kafka/CMakeLists.txt index d0874b57c2d..e6abba207d9 100644 --- a/cpp/libcudf_kafka/CMakeLists.txt +++ b/cpp/libcudf_kafka/CMakeLists.txt @@ -1,5 +1,5 @@ # ============================================================================= -# Copyright (c) 2018-2021, NVIDIA CORPORATION. +# Copyright (c) 2018-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except # in compliance with the License. You may obtain a copy of the License at @@ -29,6 +29,10 @@ project( # Set a default build type if none was specified rapids_cmake_build_type(Release) +# ################################################################################################## +# * conda environment ----------------------------------------------------------------------------- +rapids_cmake_support_conda_env(conda_env MODIFY_PREFIX_PATH) + # ################################################################################################## # * Build options option(BUILD_TESTS "Build tests for libcudf_kafka" ON) @@ -55,7 +59,7 @@ endif() # ################################################################################################## # * library target -------------------------------------------------------------------------------- -add_library(cudf_kafka SHARED src/kafka_consumer.cpp) +add_library(cudf_kafka SHARED src/kafka_consumer.cpp src/kafka_callback.cpp) # ################################################################################################## # * include paths --------------------------------------------------------------------------------- @@ -68,6 +72,11 @@ target_include_directories( # * library paths --------------------------------------------------------------------------------- target_link_libraries(cudf_kafka PUBLIC cudf::cudf RDKAFKA::RDKAFKA) +# Add Conda library, and include paths if specified +if(TARGET conda_env) + target_link_libraries(cudf_kafka PRIVATE conda_env) +endif() + set_target_properties( cudf_kafka PROPERTIES BUILD_RPATH "\$ORIGIN" INSTALL_RPATH "\$ORIGIN" # set target compile options diff --git a/cpp/libcudf_kafka/cmake/thirdparty/get_cudf.cmake b/cpp/libcudf_kafka/cmake/thirdparty/get_cudf.cmake index 1e04d40a7d5..aa4c5b60e7a 100644 --- a/cpp/libcudf_kafka/cmake/thirdparty/get_cudf.cmake +++ b/cpp/libcudf_kafka/cmake/thirdparty/get_cudf.cmake @@ -1,5 +1,5 @@ # ============================================================================= -# Copyright (c) 2021, NVIDIA CORPORATION. +# Copyright (c) 2021-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except # in compliance with the License. You may obtain a copy of the License at diff --git a/cpp/libcudf_kafka/cmake/thirdparty/get_rdkafka.cmake b/cpp/libcudf_kafka/cmake/thirdparty/get_rdkafka.cmake index 3b3342cb297..5c3c9f01f17 100644 --- a/cpp/libcudf_kafka/cmake/thirdparty/get_rdkafka.cmake +++ b/cpp/libcudf_kafka/cmake/thirdparty/get_rdkafka.cmake @@ -1,5 +1,5 @@ # ============================================================================= -# Copyright (c) 2021, NVIDIA CORPORATION. +# Copyright (c) 2021-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except # in compliance with the License. You may obtain a copy of the License at diff --git a/cpp/libcudf_kafka/include/cudf_kafka/kafka_callback.hpp b/cpp/libcudf_kafka/include/cudf_kafka/kafka_callback.hpp new file mode 100644 index 00000000000..a4ff18054b1 --- /dev/null +++ b/cpp/libcudf_kafka/include/cudf_kafka/kafka_callback.hpp @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2021-2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include + +#include +#include +#include + +namespace cudf { +namespace io { +namespace external { +namespace kafka { + +/** + * @brief Python Callback function wrapper type used for Kafka OAuth events + * + * The KafkaConsumer calls the `kafka_oauth_callback_wrapper_type` when the existing + * oauth token is considered expired by the KafkaConsumer. Typically that + * means this will be invoked a single time when the KafkaConsumer is created + * to get the initial token and then intermediately as the token becomes + * expired. + * + * The callback function signature is: + * `std::map kafka_oauth_callback_wrapper_type(void*)` + * + * The callback function returns a std::map, + * where the std::map consists of the Oauth token and its + * linux epoch expiration time. Generally the token and expiration + * time is retrieved from an external service by the callback. + * Ex: [token, token_expiration_in_epoch] + */ +using kafka_oauth_callback_wrapper_type = std::map (*)(void*); +using python_callable_type = void*; + +/** + * @brief Callback to retrieve OAuth token from external source. Invoked when + * token refresh is required. + */ +class python_oauth_refresh_callback : public RdKafka::OAuthBearerTokenRefreshCb { + public: + python_oauth_refresh_callback(kafka_oauth_callback_wrapper_type callback_wrapper, + python_callable_type python_callable); + + void oauthbearer_token_refresh_cb(RdKafka::Handle* handle, const std::string& oauthbearer_config); + + private: + kafka_oauth_callback_wrapper_type callback_wrapper_; + python_callable_type python_callable_; +}; + +} // namespace kafka +} // namespace external +} // namespace io +} // namespace cudf diff --git a/cpp/libcudf_kafka/include/cudf_kafka/kafka_consumer.hpp b/cpp/libcudf_kafka/include/cudf_kafka/kafka_consumer.hpp index 464d1cd71b1..c65774d2e1a 100644 --- a/cpp/libcudf_kafka/include/cudf_kafka/kafka_consumer.hpp +++ b/cpp/libcudf_kafka/include/cudf_kafka/kafka_consumer.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,10 +15,14 @@ */ #pragma once -#include -#include +#include "kafka_callback.hpp" + #include + #include + +#include +#include #include #include #include @@ -48,8 +52,15 @@ class kafka_consumer : public cudf::io::datasource { * * @param configs key/value pairs of librdkafka configurations that will be * passed to the librdkafka client + * @param python_callable `python_callable_type` pointer to a Python functools.partial object + * @param callable_wrapper `kafka_oauth_callback_wrapper_type` Cython wrapper that will + * be used to invoke the `python_callable`. This wrapper serves the purpose + * of preventing us from having to link against the Python development library + * in libcudf_kafka. */ - kafka_consumer(std::map const& configs); + kafka_consumer(std::map configs, + python_callable_type python_callable, + kafka_oauth_callback_wrapper_type callable_wrapper); /** * @brief Instantiate a Kafka consumer object. Documentation for librdkafka configurations can be @@ -57,6 +68,11 @@ class kafka_consumer : public cudf::io::datasource { * * @param configs key/value pairs of librdkafka configurations that will be * passed to the librdkafka client + * @param python_callable `python_callable_type` pointer to a Python functools.partial object + * @param callable_wrapper `kafka_oauth_callback_wrapper_type` Cython wrapper that will + * be used to invoke the `python_callable`. This wrapper serves the purpose + * of preventing us from having to link against the Python development library + * in libcudf_kafka. * @param topic_name name of the Kafka topic to consume from * @param partition partition index to consume from between `0` and `TOPIC_NUM_PARTITIONS - 1` * inclusive @@ -66,7 +82,9 @@ class kafka_consumer : public cudf::io::datasource { * before batch_timeout, a smaller subset will be returned * @param delimiter optional delimiter to insert into the output between kafka messages, Ex: "\n" */ - kafka_consumer(std::map const& configs, + kafka_consumer(std::map configs, + python_callable_type python_callable, + kafka_oauth_callback_wrapper_type callable_wrapper, std::string const& topic_name, int partition, int64_t start_offset, @@ -178,6 +196,10 @@ class kafka_consumer : public cudf::io::datasource { std::unique_ptr kafka_conf; // RDKafka configuration object std::unique_ptr consumer; + std::map configs; + python_callable_type python_callable_; + kafka_oauth_callback_wrapper_type callable_wrapper_; + std::string topic_name; int partition; int64_t start_offset; diff --git a/cpp/libcudf_kafka/src/kafka_callback.cpp b/cpp/libcudf_kafka/src/kafka_callback.cpp new file mode 100644 index 00000000000..6b98747c145 --- /dev/null +++ b/cpp/libcudf_kafka/src/kafka_callback.cpp @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021-2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "cudf_kafka/kafka_callback.hpp" + +#include + +namespace cudf { +namespace io { +namespace external { +namespace kafka { + +python_oauth_refresh_callback::python_oauth_refresh_callback( + kafka_oauth_callback_wrapper_type callback_wrapper, python_callable_type python_callable) + : callback_wrapper_(callback_wrapper), python_callable_(python_callable){}; + +void python_oauth_refresh_callback::oauthbearer_token_refresh_cb( + RdKafka::Handle* handle, std::string const& oauthbearer_config) +{ + std::map resp = callback_wrapper_(python_callable_); + + // Build parameters to pass to librdkafka + std::string token = resp["token"]; + int64_t token_lifetime_ms = std::stoll(resp["token_expiration_in_epoch"]); + std::list extensions; // currently not supported + std::string errstr; + CUDF_EXPECTS( + RdKafka::ErrorCode::ERR_NO_ERROR == + handle->oauthbearer_set_token(token, token_lifetime_ms, "kafka", extensions, errstr), + "Error occurred while setting the oauthbearer token"); +} + +} // namespace kafka +} // namespace external +} // namespace io +} // namespace cudf diff --git a/cpp/libcudf_kafka/src/kafka_consumer.cpp b/cpp/libcudf_kafka/src/kafka_consumer.cpp index 4f7cdba632e..49e89a56e60 100644 --- a/cpp/libcudf_kafka/src/kafka_consumer.cpp +++ b/cpp/libcudf_kafka/src/kafka_consumer.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,10 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #include "cudf_kafka/kafka_consumer.hpp" -#include + #include + +#include #include namespace cudf { @@ -24,8 +25,13 @@ namespace io { namespace external { namespace kafka { -kafka_consumer::kafka_consumer(std::map const& configs) - : kafka_conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)) +kafka_consumer::kafka_consumer(std::map configs, + python_callable_type python_callable, + kafka_oauth_callback_wrapper_type callable_wrapper) + : configs(configs), + python_callable_(python_callable), + callable_wrapper_(callable_wrapper), + kafka_conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)) { for (auto const& key_value : configs) { std::string error_string; @@ -34,6 +40,14 @@ kafka_consumer::kafka_consumer(std::map const& configs "Invalid Kafka configuration"); } + if (python_callable_ != nullptr) { + std::string error_string; + python_oauth_refresh_callback cb(callable_wrapper_, python_callable_); + CUDF_EXPECTS(RdKafka::Conf::ConfResult::CONF_OK == + kafka_conf->set("oauthbearer_token_refresh_cb", &cb, error_string), + "Failed to set Kafka oauth callback"); + } + // Kafka 0.9 > requires group.id in the configuration std::string conf_val; CUDF_EXPECTS(RdKafka::Conf::ConfResult::CONF_OK == kafka_conf->get("group.id", conf_val), @@ -44,22 +58,26 @@ kafka_consumer::kafka_consumer(std::map const& configs RdKafka::KafkaConsumer::create(kafka_conf.get(), errstr)); } -kafka_consumer::kafka_consumer(std::map const& configs, +kafka_consumer::kafka_consumer(std::map configs, + python_callable_type python_callable, + kafka_oauth_callback_wrapper_type callback_wrapper, std::string const& topic_name, int partition, int64_t start_offset, int64_t end_offset, int batch_timeout, std::string const& delimiter) - : topic_name(topic_name), + : configs(configs), + python_callable_(python_callable), + callable_wrapper_(callback_wrapper), + topic_name(topic_name), partition(partition), start_offset(start_offset), end_offset(end_offset), batch_timeout(batch_timeout), - delimiter(delimiter) + delimiter(delimiter), + kafka_conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)) { - kafka_conf = std::unique_ptr(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)); - for (auto const& key_value : configs) { std::string error_string; CUDF_EXPECTS(RdKafka::Conf::ConfResult::CONF_OK == @@ -67,6 +85,14 @@ kafka_consumer::kafka_consumer(std::map const& configs "Invalid Kafka configuration"); } + if (python_callable_ != nullptr) { + std::string error_string; + python_oauth_refresh_callback cb(callable_wrapper_, python_callable_); + CUDF_EXPECTS(RdKafka::Conf::ConfResult::CONF_OK == + kafka_conf->set("oauthbearer_token_refresh_cb", &cb, error_string), + "Failed to set Kafka oauth callback"); + } + // Kafka 0.9 > requires group.id in the configuration std::string conf_val; CUDF_EXPECTS(RdKafka::Conf::ConfResult::CONF_OK == kafka_conf->get("group.id", conf_val), diff --git a/cpp/libcudf_kafka/tests/CMakeLists.txt b/cpp/libcudf_kafka/tests/CMakeLists.txt index 3920758f3f2..db2131ba00c 100644 --- a/cpp/libcudf_kafka/tests/CMakeLists.txt +++ b/cpp/libcudf_kafka/tests/CMakeLists.txt @@ -1,5 +1,5 @@ # ============================================================================= -# Copyright (c) 2018-2021, NVIDIA CORPORATION. +# Copyright (c) 2018-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except # in compliance with the License. You may obtain a copy of the License at @@ -23,8 +23,9 @@ function(ConfigureTest test_name) ${test_name} PROPERTIES RUNTIME_OUTPUT_DIRECTORY "$" ) - target_link_libraries(${test_name} PRIVATE GTest::gmock_main GTest::gtest_main cudf_kafka) - + target_link_libraries( + ${test_name} PRIVATE GTest::gmock GTest::gmock_main GTest::gtest_main cudf_kafka + ) add_test(NAME ${test_name} COMMAND ${test_name}) endfunction() diff --git a/cpp/libcudf_kafka/tests/kafka_consumer_tests.cpp b/cpp/libcudf_kafka/tests/kafka_consumer_tests.cpp index ca4b70531db..613c2435f4d 100644 --- a/cpp/libcudf_kafka/tests/kafka_consumer_tests.cpp +++ b/cpp/libcudf_kafka/tests/kafka_consumer_tests.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "cudf_kafka/kafka_consumer.hpp" +#include #include #include #include @@ -32,25 +32,37 @@ TEST_F(KafkaDatasourceTest, MissingGroupID) { // group.id is a required configuration. std::map kafka_configs; - kafka_configs.insert({"bootstrap.servers", "localhost:9092"}); + kafka_configs["bootstrap.servers"] = "localhost:9092"; - EXPECT_THROW(kafka::kafka_consumer kc(kafka_configs, "csv-topic", 0, 0, 3, 5000, "\n"), - cudf::logic_error); + kafka::python_callable_type python_callable; + kafka::kafka_oauth_callback_wrapper_type callback_wrapper; + + EXPECT_THROW( + kafka::kafka_consumer kc( + kafka_configs, python_callable, callback_wrapper, "csv-topic", 0, 0, 3, 5000, "\n"), + cudf::logic_error); } TEST_F(KafkaDatasourceTest, InvalidConfigValues) { // Give a made up configuration value std::map kafka_configs; - kafka_configs.insert({"completely_made_up_config", "wrong"}); + kafka_configs["completely_made_up_config"] = "wrong"; - EXPECT_THROW(kafka::kafka_consumer kc(kafka_configs, "csv-topic", 0, 0, 3, 5000, "\n"), - cudf::logic_error); + kafka::python_callable_type python_callable; + kafka::kafka_oauth_callback_wrapper_type callback_wrapper; - kafka_configs.clear(); + EXPECT_THROW( + kafka::kafka_consumer kc( + kafka_configs, python_callable, callback_wrapper, "csv-topic", 0, 0, 3, 5000, "\n"), + cudf::logic_error); // Give a good config property with a bad value - kafka_configs.insert({"message.max.bytes", "this should be a number not text"}); - EXPECT_THROW(kafka::kafka_consumer kc(kafka_configs, "csv-topic", 0, 0, 3, 5000, "\n"), - cudf::logic_error); + kafka_configs.clear(); + kafka_configs["message.max.bytes"] = "this should be a number not text"; + + EXPECT_THROW( + kafka::kafka_consumer kc( + kafka_configs, python_callable, callback_wrapper, "csv-topic", 0, 0, 3, 5000, "\n"), + cudf::logic_error); } diff --git a/python/cudf_kafka/cudf_kafka/_lib/.kafka.pxd.swo b/python/cudf_kafka/cudf_kafka/_lib/.kafka.pxd.swo new file mode 100644 index 0000000000000000000000000000000000000000..624b60798ae94ece673e4028670bd4d559d7c663 GIT binary patch literal 12288 zcmeI2%ZnUE9LHzI-CdpT zn%N!j5fsFW2mKEc<0D2q=v7Y&9z2?eiYJd=a`6#JB!2r{-TRnbL=dTh5B;cL{i^C) zk6-spH>Xa|&e0?FDS~uAAwS%F!+dbh!z0&^6LPjS;C^_2NOJpD%Q7ccg})L=Q(D5^ zxWKEC!^s)m=eF=1?#fBlYqcl6f$R!5IyY9#_KK;V8n)G(jPBOGwbsUt^oPi5E1(t73TOqi0$KsBfL1^&pcVLEDImQO@)~+{ zN7Ac>1fPS?fDb0XuX_l& z2rhtEz{_9(%!9|kA#md!LVf|?fXm=*;DS@&MKB5~;P%~wTm_fFD(HhMcpUt+8*K*f zfakz2@HKS%3S0yiK&az~-~%uK3>*hz-~hM|t*?Qr;1YNboCk~G5V&zCA%B9);3KdE z&VYm9Tl`4#CU_RifKgBZf8fWO-_sv#wl&8n67yQTP5Z)XQD%#PR|9Uh$EoGY!_$VO z(sH=y$r>Rgl&B?4IF2PHZ*9?%Y)G?C4jZzxhPZb_h9@0| z%=?g$D5?~WraBX&{uDZsbUmD`EmN9-==qF~MZM3jBjsw;6&=e~3M6&4xNSiue9RP8 z7rIJY-83cZ8p&iYWJelvH&u3h9=>(*Wzy6_ZnSLNqYIE z3j90E58E>drDowq&~vSGJ#Ki?U&^*gC2z<#(*?x*jMV(rzZr-y|X{wI@#nX}Qudb(4hMZ4KSJ=H&v% zbFdgtH*QNbVKBL^j$2KXjAPYuEP60bO9InuKm$b&QtD88Sq{TWk<_GgOuJPG&w1C@ zTbH8sP*XK{Z)IGJ<(pb&p&fgR;hGNL)KWCh;t7xnvnRU-Gi|$RvX$IA`qJ580uR{^ z_p9n0Plp7VZL1S(K9@GEVgt>0Tvz!qS;k_ox4IS3QkHYJgsONh$WtQU!U($R( zF59EsVHr8_c#xUr9uoiX92&;+Ju{0hV~>eNGURE!@nyln;g0YJnU6fu6s=4zGr@4H zdD;DNf#w in libcudf_kafka +# we introduce this wrapper in Cython +cdef map[string, string] oauth_callback_wrapper(void *ctx): + return ((ctx))() + + cdef class KafkaDatasource(Datasource): def __cinit__(self, - map[string, string] kafka_configs, + object kafka_configs, string topic=b"", int32_t partition=-1, int64_t start_offset=0, int64_t end_offset=0, int32_t batch_timeout=10000, string delimiter=b"",): + + cdef map[string, string] configs + cdef void* python_callable = nullptr + cdef map[string, string] (*python_callable_wrapper)(void *) + + for key in kafka_configs: + if key == 'oauth_cb': + if callable(kafka_configs[key]): + python_callable = kafka_configs[key] + python_callable_wrapper = &oauth_callback_wrapper + else: + raise TypeError("'oauth_cb' configuration must \ + be a Python callable object") + else: + configs[key.encode()] = kafka_configs[key].encode() + if topic != b"" and partition != -1: self.c_datasource = \ - make_unique[kafka_consumer](kafka_configs, + make_unique[kafka_consumer](configs, + python_callable, + python_callable_wrapper, topic, partition, start_offset, @@ -32,7 +56,9 @@ cdef class KafkaDatasource(Datasource): delimiter) else: self.c_datasource = \ - make_unique[kafka_consumer](kafka_configs) + make_unique[kafka_consumer](configs, + python_callable, + python_callable_wrapper) cdef datasource* get_datasource(self) nogil: return self.c_datasource.get() diff --git a/python/custreamz/custreamz/kafka.py b/python/custreamz/custreamz/kafka.py index a301660a2e4..f5d5031602f 100644 --- a/python/custreamz/custreamz/kafka.py +++ b/python/custreamz/custreamz/kafka.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. import confluent_kafka as ck from cudf_kafka._lib.kafka import KafkaDatasource @@ -25,13 +25,7 @@ def __init__(self, kafka_configs): """ self.kafka_configs = kafka_configs - - self.kafka_confs = { - str.encode(key): str.encode(value) - for key, value in self.kafka_configs.items() - } - - self.kafka_meta_client = KafkaDatasource(self.kafka_confs) + self.kafka_meta_client = KafkaDatasource(kafka_configs) def list_topics(self, specific_topic=None): @@ -145,7 +139,7 @@ def read_gdf( ) kafka_datasource = KafkaDatasource( - self.kafka_confs, + self.kafka_configs, topic.encode(), partition, start, @@ -173,7 +167,10 @@ def read_gdf( kafka_datasource.close(batch_timeout) if result is not None: - return cudf.DataFrame._from_table(result) + if isinstance(result, cudf.DataFrame): + return result + else: + return cudf.DataFrame._from_data(result) else: # empty Dataframe return cudf.DataFrame() diff --git a/python/custreamz/custreamz/tests/test_kafka.py b/python/custreamz/custreamz/tests/test_kafka.py index d29ebf8db8b..ad3b829544b 100644 --- a/python/custreamz/custreamz/tests/test_kafka.py +++ b/python/custreamz/custreamz/tests/test_kafka.py @@ -5,11 +5,10 @@ from cudf.testing._utils import assert_eq -@pytest.mark.parametrize("commit_offset", [-1, 0, 1, 1000]) +@pytest.mark.parametrize("commit_offset", [1, 45, 100, 22, 1000, 10]) @pytest.mark.parametrize("topic", ["cudf-kafka-test-topic"]) def test_kafka_offset(kafka_client, topic, commit_offset): - ck_top = ck.TopicPartition(topic, 0, commit_offset) - offsets = [ck_top] + offsets = [ck.TopicPartition(topic, 0, commit_offset)] kafka_client.commit(offsets=offsets) # Get the offsets that were just committed to Kafka