Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

custreamz oauth callback for kafka (librdkafka) #9486

Merged
merged 84 commits into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
5595fda
pass pyobject to c++ library rather than map<std::string, std::string>
jdye64 Oct 13, 2021
0cfcfd0
Checkpoint: C++, Cython, and Python compiling working. Callback objec…
jdye64 Oct 14, 2021
1b6b19c
moved configuration building and validating logic to its own method
jdye64 Oct 14, 2021
9c08aa0
Merge branch 'rapidsai:branch-21.12' into custreamz_oauth
jdye64 Oct 21, 2021
25a8b33
Introduce callbacks class where all the possible kafka callbacks can …
jdye64 Oct 25, 2021
4af5b05
Refactored class names
jdye64 Oct 25, 2021
97f8830
Updated tests to match new function parameters
jdye64 Oct 26, 2021
38e52c1
Remove error of leaving previous decleration
jdye64 Oct 26, 2021
2401b8b
upgrade librdkafka version for CI
jdye64 Oct 26, 2021
64cdf1c
Merge remote-tracking branch 'upstream/branch-21.12' into custreamz_o…
jdye64 Oct 26, 2021
69a8639
link python development headers against test
jdye64 Oct 27, 2021
4e4ed72
latest version of librdkafka requires that committed offsets be > 0. …
jdye64 Oct 27, 2021
98ed97d
updates
jdye64 Oct 29, 2021
d1925ab
Make Python3 package REQUIRED in cmake
jdye64 Oct 29, 2021
9916e4d
updates per review
jdye64 Nov 2, 2021
2499a78
modified to use std::function
jdye64 Nov 4, 2021
ec2cab7
added back whitespace that was removed by accident
jdye64 Nov 4, 2021
4672922
temporarily update librdkakfa for CI testing
ajschmidt8 Nov 8, 2021
6a6e629
updates per review
jdye64 Nov 8, 2021
3524f56
merge conflict resolution and upstream branch-21.12 merge
jdye64 Nov 9, 2021
981d44d
test fixes
jdye64 Nov 9, 2021
269d962
removed python dependency from tests as libcudf brings it in
jdye64 Nov 9, 2021
186408d
Merge remote-tracking branch 'upstream/branch-21.12' into custreamz_o…
jdye64 Nov 10, 2021
20ecb3f
update custreamz read_gdf() to check type and handle appropriately
jdye64 Nov 10, 2021
ac77f2e
modify cmake
jdye64 Nov 11, 2021
18aa6f0
merge upstream/branch-21.12
jdye64 Nov 11, 2021
3c525a1
updated conda recipe to include python which is needed for the python…
jdye64 Nov 11, 2021
1d73af2
removing all doubt
jdye64 Nov 12, 2021
3c44432
add numpy to conda recipe build
jdye64 Nov 12, 2021
b5c3ffd
updates
jdye64 Nov 12, 2021
16bef4c
hopefully resolve conda errors
jdye64 Nov 12, 2021
1bbb134
Merge remote-tracking branch 'upstream/branch-21.12' into custreamz_o…
jdye64 Nov 13, 2021
fc4a4aa
Merge remote-tracking branch 'upstream/branch-21.12' into custreamz_o…
jdye64 Nov 19, 2021
fafc831
review updates
jdye64 Nov 19, 2021
43e1aa9
add python to conda recipes since that is needed for python-dev now
jdye64 Nov 20, 2021
ecb3ac7
Merge branch 'branch-22.02' into custreamz_oauth
jdye64 Nov 20, 2021
099ca27
add numpy to conda recipe for cudf_kafka
jdye64 Nov 20, 2021
1d96eef
add numpy to conda recipe for cudf_kafka
jdye64 Nov 20, 2021
5ac4736
add numpy to conda recipe for cudf_kafka
jdye64 Nov 21, 2021
7932468
add numpy to conda recipe for cudf_kafka
jdye64 Nov 21, 2021
9a5b2d9
add numpy to conda recipe for cudf_kafka
jdye64 Nov 21, 2021
ab8f6a0
add numpy to conda recipe for cudf_kafka
jdye64 Nov 22, 2021
9bbedd4
Debugging setup.py dependency issues
jdye64 Nov 22, 2021
b4f9232
Change CUDA version for debugging
jdye64 Nov 23, 2021
d1f4fe5
debug conda build for cudf_kafka
jdye64 Nov 23, 2021
037b386
debug cudf_kafka conda
jdye64 Nov 27, 2021
2c8adce
conda debugging
jdye64 Nov 29, 2021
ef5e072
Add numpy include directory
jdye64 Nov 29, 2021
72a3c49
Merge remote-tracking branch 'upstream/branch-22.02' into custreamz_o…
jdye64 Nov 29, 2021
5d57b1a
Add Python has host and build requirement since libcudf_kafka uses Py…
jdye64 Nov 29, 2021
0a3ee3a
Updated conda meta.yml files
jdye64 Nov 30, 2021
97897ad
Testing out a theory about python versions
jdye64 Dec 1, 2021
0347540
add cmake function for setting up conda environment
jdye64 Dec 2, 2021
a137d23
Merge remote-tracking branch 'upstream/branch-22.02' into custreamz_o…
jdye64 Dec 3, 2021
e10b1e9
Disable PROJECT_FLASH test
jdye64 Dec 6, 2021
0047d64
test manually specifying python version
jdye64 Dec 6, 2021
877030f
Merge remote-tracking branch 'upstream/branch-22.02' into custreamz_o…
jdye64 Dec 6, 2021
6af1cd1
Re-enable PROJECT_FLASH support
jdye64 Dec 6, 2021
25a27ef
use environment version of python
jdye64 Dec 6, 2021
a867d1b
fix typo
jdye64 Dec 6, 2021
e5d4f59
change version of python
jdye64 Dec 7, 2021
2f582bb
libcudf_kafka is not being passed a conda python environment variable
jdye64 Dec 7, 2021
33c9cbb
use python instead of PYTHON environment variable
jdye64 Dec 7, 2021
0cd3fe9
make versions strings instead of floats
jdye64 Dec 7, 2021
86f02c6
removed references in python in cpp
jdye64 Dec 8, 2021
c249b5f
introduce wrapper
jdye64 Dec 10, 2021
bcfae4a
Refactor to use functools.partial
jdye64 Dec 10, 2021
daca522
Remove python find_package command from cmake
jdye64 Dec 10, 2021
ac99019
Update test syntax after refactoring
jdye64 Dec 10, 2021
cc28000
Remove Python versions from anaconda builds to satisfy Java build pro…
jdye64 Dec 11, 2021
7e59211
update conda recipes to get the correct version of python-confluent-k…
jdye64 Dec 11, 2021
6c12fd7
Merge remote-tracking branch 'upstream/branch-22.02' into custreamz_o…
jdye64 Dec 11, 2021
ac46705
include librdkafka 1.7.0 in Java test gpu CI script
jdye64 Dec 12, 2021
47ee207
remove manual librdkafka updates since integration repo is merged and…
jdye64 Dec 13, 2021
1e1dc90
add back conda installs for ops test
jdye64 Dec 13, 2021
53f5465
clang formatting
jdye64 Dec 13, 2021
e0c7048
remove manual librdkafka updates now that gpuci integrations have bee…
jdye64 Dec 14, 2021
740a3bd
Merge upstream
jdye64 Dec 17, 2021
eba67e2
Fix missed merge conflict
jdye64 Dec 18, 2021
f365667
Update cpp/libcudf_kafka/include/cudf_kafka/kafka_callback.hpp
jdye64 Dec 22, 2021
5437b84
Update cpp/libcudf_kafka/src/kafka_callback.cpp
jdye64 Dec 22, 2021
831b84b
Address reviewers suggestions
jdye64 Dec 22, 2021
4087acc
Update source file years and also adjust import for cudf_kafka
jdye64 Jan 5, 2022
44eea02
Adjust source file years
jdye64 Jan 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/gpu/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ gpuci_mamba_retry install -y \
"ucx-py=${UCX_PY_VERSION}"

# https://docs.rapids.ai/maintainers/depmgmt/
# gpuci_mamba_retry remove --force rapids-build-env rapids-notebook-env
# gpuci_conda_retry remove --force rapids-build-env rapids-notebook-env
jdye64 marked this conversation as resolved.
Show resolved Hide resolved
# gpuci_mamba_retry install -y "your-pkg=1.0.0"


Expand Down
2 changes: 1 addition & 1 deletion conda/recipes/cudf_kafka/build.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.

# This assumes the script is executed from the root of the repo directory
./build.sh -v cudf_kafka
15 changes: 8 additions & 7 deletions conda/recipes/cudf_kafka/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.

{% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') + environ.get('VERSION_SUFFIX', '') %}
{% set minor_version = version.split('.')[0] + '.' + version.split('.')[1] %}
{% set py_version=environ.get('CONDA_PY', 36) %}
{% set cuda_version='.'.join(environ.get('CUDA', '11.5').split('.')[:2]) %}
{% set cuda_version = '.'.join(environ.get('CUDA', '11.5').split('.')[:2]) %}
{% set py_version = environ.get('python', '3.8') %}

package:
name: cudf_kafka
Expand All @@ -14,7 +14,7 @@ source:

build:
number: {{ GIT_DESCRIBE_NUMBER }}
string: py{{ py_version }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }}
string: py{{ py_version.replace('.', '') }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }}
script_env:
- CC
- CXX
Expand All @@ -26,14 +26,15 @@ requirements:
build:
- cmake >=3.20.1
host:
- python
- python {{ py_version }}
- cython >=0.29,<0.30
- setuptools
jdye64 marked this conversation as resolved.
Show resolved Hide resolved
- cudf {{ version }}
- libcudf_kafka {{ version }}
- setuptools
run:
- python {{ py_version }}
- libcudf_kafka {{ version }}
jdye64 marked this conversation as resolved.
Show resolved Hide resolved
- python-confluent-kafka
- python-confluent-kafka >=1.7.0,<1.8.0a0=py{{ py_version.replace('.', '') }}*
- cudf {{ version }}

test: # [linux64]
Expand Down
2 changes: 1 addition & 1 deletion conda/recipes/custreamz/build.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.

# This assumes the script is executed from the root of the repo directory
./build.sh -v custreamz
18 changes: 9 additions & 9 deletions conda/recipes/custreamz/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Copyright (c) 2018-2019, NVIDIA CORPORATION.
# Copyright (c) 2018-2022, NVIDIA CORPORATION.

{% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') + environ.get('VERSION_SUFFIX', '') %}
{% set minor_version = version.split('.')[0] + '.' + version.split('.')[1] %}
{% set py_version=environ.get('CONDA_PY', 36) %}
{% set cuda_version='.'.join(environ.get('CUDA', '11.5').split('.')[:2]) %}
{% set cuda_version = '.'.join(environ.get('CUDA', '11.5').split('.')[:2]) %}
{% set py_version = environ.get('python', '3.8') %}

package:
name: custreamz
Expand All @@ -14,7 +14,7 @@ source:

build:
number: {{ GIT_DESCRIBE_NUMBER }}
string: py{{ py_version }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }}
string: py{{ py_version.replace('.', '') }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }}
script_env:
- VERSION_SUFFIX
- PARALLEL_LEVEL
Expand All @@ -24,16 +24,16 @@ build:

requirements:
host:
- python
- python-confluent-kafka
- python {{ py_version }}
- python-confluent-kafka >=1.7.0,<1.8.0a0=py{{ py_version.replace('.', '') }}*
- cudf_kafka {{ version }}
run:
- python
- streamz
- python {{ py_version }}
- streamz
- cudf {{ version }}
- dask>=2021.11.1,<=2021.11.2
- distributed>=2021.11.1,<=2021.11.2
- python-confluent-kafka
- python-confluent-kafka >=1.7.0,<1.8.0a0=py{{ py_version.replace('.', '') }}*
- cudf_kafka {{ version }}

test: # [linux64]
Expand Down
2 changes: 1 addition & 1 deletion conda/recipes/libcudf_kafka/build.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.

if [[ -z "$PROJECT_FLASH" || "$PROJECT_FLASH" == "0" ]]; then
# This assumes the script is executed from the root of the repo directory
Expand Down
4 changes: 2 additions & 2 deletions conda/recipes/libcudf_kafka/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2018-2021, NVIDIA CORPORATION.
# Copyright (c) 2018-2022, NVIDIA CORPORATION.

{% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') + environ.get('VERSION_SUFFIX', '') %}
{% set minor_version = version.split('.')[0] + '.' + version.split('.')[1] %}
Expand Down Expand Up @@ -26,7 +26,7 @@ requirements:
- cmake >=3.20.1
host:
- libcudf {{version}}
- librdkafka >=1.6.0,<1.7.0a0
- librdkafka >=1.7.0,<1.8.0a0
run:
- {{ pin_compatible('librdkafka', max_pin='x.x') }} #TODO: librdkafka should be automatically included here by run_exports but is not
jdye64 marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
13 changes: 11 additions & 2 deletions cpp/libcudf_kafka/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# =============================================================================
# Copyright (c) 2018-2021, NVIDIA CORPORATION.
# Copyright (c) 2018-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -29,6 +29,10 @@ project(
# Set a default build type if none was specified
rapids_cmake_build_type(Release)

# ##################################################################################################
# * conda environment -----------------------------------------------------------------------------
rapids_cmake_support_conda_env(conda_env MODIFY_PREFIX_PATH)

# ##################################################################################################
# * Build options
option(BUILD_TESTS "Build tests for libcudf_kafka" ON)
Expand All @@ -55,7 +59,7 @@ endif()

# ##################################################################################################
# * library target --------------------------------------------------------------------------------
add_library(cudf_kafka SHARED src/kafka_consumer.cpp)
add_library(cudf_kafka SHARED src/kafka_consumer.cpp src/kafka_callback.cpp)

# ##################################################################################################
# * include paths ---------------------------------------------------------------------------------
Expand All @@ -68,6 +72,11 @@ target_include_directories(
# * library paths ---------------------------------------------------------------------------------
target_link_libraries(cudf_kafka PUBLIC cudf::cudf RDKAFKA::RDKAFKA)

# Add Conda library, and include paths if specified
if(TARGET conda_env)
target_link_libraries(cudf_kafka PRIVATE conda_env)
endif()

set_target_properties(
cudf_kafka
PROPERTIES BUILD_RPATH "\$ORIGIN" INSTALL_RPATH "\$ORIGIN" # set target compile options
Expand Down
2 changes: 1 addition & 1 deletion cpp/libcudf_kafka/cmake/thirdparty/get_cudf.cmake
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# =============================================================================
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
Expand Down
2 changes: 1 addition & 1 deletion cpp/libcudf_kafka/cmake/thirdparty/get_rdkafka.cmake
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# =============================================================================
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
Expand Down
71 changes: 71 additions & 0 deletions cpp/libcudf_kafka/include/cudf_kafka/kafka_callback.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <cudf/io/datasource.hpp>

#include <librdkafka/rdkafkacpp.h>

#include <map>
#include <memory>
#include <string>

namespace cudf {
namespace io {
namespace external {
namespace kafka {

/**
* @brief Python Callback function wrapper type used for Kafka OAuth events
*
* The KafkaConsumer calls the `kafka_oauth_callback_wrapper_type` when the existing
* oauth token is considered expired by the KafkaConsumer. Typically that
* means this will be invoked a single time when the KafkaConsumer is created
* to get the initial token and then intermediately as the token becomes
* expired.
*
* The callback function signature is:
* `std::map<std::string, std::string> kafka_oauth_callback_wrapper_type(void*)`
*
* The callback function returns a std::map<std::string, std::string>,
* where the std::map consists of the Oauth token and its
* linux epoch expiration time. Generally the token and expiration
* time is retrieved from an external service by the callback.
* Ex: [token, token_expiration_in_epoch]
*/
using kafka_oauth_callback_wrapper_type = std::map<std::string, std::string> (*)(void*);
using python_callable_type = void*;

/**
* @brief Callback to retrieve OAuth token from external source. Invoked when
* token refresh is required.
*/
class python_oauth_refresh_callback : public RdKafka::OAuthBearerTokenRefreshCb {
public:
python_oauth_refresh_callback(kafka_oauth_callback_wrapper_type callback_wrapper,
python_callable_type python_callable);

void oauthbearer_token_refresh_cb(RdKafka::Handle* handle, const std::string& oauthbearer_config);
jdye64 marked this conversation as resolved.
Show resolved Hide resolved

private:
kafka_oauth_callback_wrapper_type callback_wrapper_;
python_callable_type python_callable_;
};

} // namespace kafka
} // namespace external
} // namespace io
} // namespace cudf
32 changes: 27 additions & 5 deletions cpp/libcudf_kafka/include/cudf_kafka/kafka_consumer.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,10 +15,14 @@
*/
#pragma once

#include <algorithm>
#include <chrono>
#include "kafka_callback.hpp"

#include <cudf/io/datasource.hpp>

#include <librdkafka/rdkafkacpp.h>

#include <algorithm>
#include <chrono>
#include <map>
#include <memory>
#include <string>
Expand Down Expand Up @@ -48,15 +52,27 @@ class kafka_consumer : public cudf::io::datasource {
*
* @param configs key/value pairs of librdkafka configurations that will be
* passed to the librdkafka client
* @param python_callable `python_callable_type` pointer to a Python functools.partial object
jdye64 marked this conversation as resolved.
Show resolved Hide resolved
* @param callable_wrapper `kafka_oauth_callback_wrapper_type` Cython wrapper that will
* be used to invoke the `python_callable`. This wrapper serves the purpose
* of preventing us from having to link against the Python development library
* in libcudf_kafka.
*/
kafka_consumer(std::map<std::string, std::string> const& configs);
kafka_consumer(std::map<std::string, std::string> configs,
python_callable_type python_callable,
kafka_oauth_callback_wrapper_type callable_wrapper);

/**
* @brief Instantiate a Kafka consumer object. Documentation for librdkafka configurations can be
* found at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
*
* @param configs key/value pairs of librdkafka configurations that will be
* passed to the librdkafka client
* @param python_callable `python_callable_type` pointer to a Python functools.partial object
jdye64 marked this conversation as resolved.
Show resolved Hide resolved
* @param callable_wrapper `kafka_oauth_callback_wrapper_type` Cython wrapper that will
* be used to invoke the `python_callable`. This wrapper serves the purpose
* of preventing us from having to link against the Python development library
* in libcudf_kafka.
* @param topic_name name of the Kafka topic to consume from
* @param partition partition index to consume from between `0` and `TOPIC_NUM_PARTITIONS - 1`
* inclusive
Expand All @@ -66,7 +82,9 @@ class kafka_consumer : public cudf::io::datasource {
* before batch_timeout, a smaller subset will be returned
* @param delimiter optional delimiter to insert into the output between kafka messages, Ex: "\n"
*/
kafka_consumer(std::map<std::string, std::string> const& configs,
kafka_consumer(std::map<std::string, std::string> configs,
python_callable_type python_callable,
kafka_oauth_callback_wrapper_type callable_wrapper,
std::string const& topic_name,
int partition,
int64_t start_offset,
Expand Down Expand Up @@ -178,6 +196,10 @@ class kafka_consumer : public cudf::io::datasource {
std::unique_ptr<RdKafka::Conf> kafka_conf; // RDKafka configuration object
std::unique_ptr<RdKafka::KafkaConsumer> consumer;

std::map<std::string, std::string> configs;
python_callable_type python_callable_;
kafka_oauth_callback_wrapper_type callable_wrapper_;

std::string topic_name;
int partition;
int64_t start_offset;
Expand Down
48 changes: 48 additions & 0 deletions cpp/libcudf_kafka/src/kafka_callback.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cudf_kafka/kafka_callback.hpp"
jdye64 marked this conversation as resolved.
Show resolved Hide resolved

#include <librdkafka/rdkafkacpp.h>
jdye64 marked this conversation as resolved.
Show resolved Hide resolved

namespace cudf {
namespace io {
namespace external {
namespace kafka {

python_oauth_refresh_callback::python_oauth_refresh_callback(
kafka_oauth_callback_wrapper_type callback_wrapper, python_callable_type python_callable)
: callback_wrapper_(callback_wrapper), python_callable_(python_callable){};

void python_oauth_refresh_callback::oauthbearer_token_refresh_cb(
RdKafka::Handle* handle, std::string const& oauthbearer_config)
{
std::map<std::string, std::string> resp = callback_wrapper_(python_callable_);

// Build parameters to pass to librdkafka
std::string token = resp["token"];
int64_t token_lifetime_ms = std::stoll(resp["token_expiration_in_epoch"]);
std::list<std::string> extensions; // currently not supported
std::string errstr;
CUDF_EXPECTS(
RdKafka::ErrorCode::ERR_NO_ERROR ==
handle->oauthbearer_set_token(token, token_lifetime_ms, "kafka", extensions, errstr),
"Error occurred while setting the oauthbearer token");
}

} // namespace kafka
} // namespace external
} // namespace io
} // namespace cudf
Loading