Skip to content

Commit

Permalink
custreamz oauth callback for kafka (librdkafka) (#9486)
Browse files Browse the repository at this point in the history
Previously it was impossible to use custreamz with oauth enabled Kafka brokers. This PR adds a feature so that the user can supply a Python function which is invoked to get the oauth token, from a http endpoint for example, and then supply that token to librdkafka to be used in both the initial connection to kafka and also subsequently as the token becomes stale.

This closes #9410

Authors:
  - Jeremy Dyer (https://github.com/jdye64)
  - AJ Schmidt (https://github.com/ajschmidt8)

Approvers:
  - Robert Maynard (https://github.com/robertmaynard)
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Ram (Ramakrishna Prabhu) (https://github.com/rgsl888prabhu)
  - AJ Schmidt (https://github.com/ajschmidt8)

URL: #9486
  • Loading branch information
jdye64 authored Jan 6, 2022
1 parent a61fc55 commit 23603d1
Show file tree
Hide file tree
Showing 21 changed files with 295 additions and 77 deletions.
2 changes: 1 addition & 1 deletion ci/gpu/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ gpuci_mamba_retry install -y \
"ucx-py=${UCX_PY_VERSION}"

# https://docs.rapids.ai/maintainers/depmgmt/
# gpuci_mamba_retry remove --force rapids-build-env rapids-notebook-env
# gpuci_conda_retry remove --force rapids-build-env rapids-notebook-env
# gpuci_mamba_retry install -y "your-pkg=1.0.0"


Expand Down
2 changes: 1 addition & 1 deletion conda/recipes/cudf_kafka/build.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.

# This assumes the script is executed from the root of the repo directory
./build.sh -v cudf_kafka
15 changes: 8 additions & 7 deletions conda/recipes/cudf_kafka/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.

{% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') + environ.get('VERSION_SUFFIX', '') %}
{% set minor_version = version.split('.')[0] + '.' + version.split('.')[1] %}
{% set py_version=environ.get('CONDA_PY', 36) %}
{% set cuda_version='.'.join(environ.get('CUDA', '11.5').split('.')[:2]) %}
{% set cuda_version = '.'.join(environ.get('CUDA', '11.5').split('.')[:2]) %}
{% set py_version = environ.get('python', '3.8') %}

package:
name: cudf_kafka
Expand All @@ -14,7 +14,7 @@ source:

build:
number: {{ GIT_DESCRIBE_NUMBER }}
string: py{{ py_version }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }}
string: py{{ py_version.replace('.', '') }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }}
script_env:
- CC
- CXX
Expand All @@ -26,14 +26,15 @@ requirements:
build:
- cmake >=3.20.1
host:
- python
- python {{ py_version }}
- cython >=0.29,<0.30
- setuptools
- cudf {{ version }}
- libcudf_kafka {{ version }}
- setuptools
run:
- python {{ py_version }}
- libcudf_kafka {{ version }}
- python-confluent-kafka
- python-confluent-kafka >=1.7.0,<1.8.0a0=py{{ py_version.replace('.', '') }}*
- cudf {{ version }}

test: # [linux64]
Expand Down
2 changes: 1 addition & 1 deletion conda/recipes/custreamz/build.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.

# This assumes the script is executed from the root of the repo directory
./build.sh -v custreamz
18 changes: 9 additions & 9 deletions conda/recipes/custreamz/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Copyright (c) 2018-2019, NVIDIA CORPORATION.
# Copyright (c) 2018-2022, NVIDIA CORPORATION.

{% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') + environ.get('VERSION_SUFFIX', '') %}
{% set minor_version = version.split('.')[0] + '.' + version.split('.')[1] %}
{% set py_version=environ.get('CONDA_PY', 36) %}
{% set cuda_version='.'.join(environ.get('CUDA', '11.5').split('.')[:2]) %}
{% set cuda_version = '.'.join(environ.get('CUDA', '11.5').split('.')[:2]) %}
{% set py_version = environ.get('python', '3.8') %}

package:
name: custreamz
Expand All @@ -14,7 +14,7 @@ source:

build:
number: {{ GIT_DESCRIBE_NUMBER }}
string: py{{ py_version }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }}
string: py{{ py_version.replace('.', '') }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }}
script_env:
- VERSION_SUFFIX
- PARALLEL_LEVEL
Expand All @@ -24,16 +24,16 @@ build:

requirements:
host:
- python
- python-confluent-kafka
- python {{ py_version }}
- python-confluent-kafka >=1.7.0,<1.8.0a0=py{{ py_version.replace('.', '') }}*
- cudf_kafka {{ version }}
run:
- python
- streamz
- python {{ py_version }}
- streamz
- cudf {{ version }}
- dask>=2021.11.1,<=2021.11.2
- distributed>=2021.11.1,<=2021.11.2
- python-confluent-kafka
- python-confluent-kafka >=1.7.0,<1.8.0a0=py{{ py_version.replace('.', '') }}*
- cudf_kafka {{ version }}

test: # [linux64]
Expand Down
2 changes: 1 addition & 1 deletion conda/recipes/libcudf_kafka/build.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.

if [[ -z "$PROJECT_FLASH" || "$PROJECT_FLASH" == "0" ]]; then
# This assumes the script is executed from the root of the repo directory
Expand Down
4 changes: 2 additions & 2 deletions conda/recipes/libcudf_kafka/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2018-2021, NVIDIA CORPORATION.
# Copyright (c) 2018-2022, NVIDIA CORPORATION.

{% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') + environ.get('VERSION_SUFFIX', '') %}
{% set minor_version = version.split('.')[0] + '.' + version.split('.')[1] %}
Expand Down Expand Up @@ -26,7 +26,7 @@ requirements:
- cmake >=3.20.1
host:
- libcudf {{version}}
- librdkafka >=1.6.0,<1.7.0a0
- librdkafka >=1.7.0,<1.8.0a0
run:
- {{ pin_compatible('librdkafka', max_pin='x.x') }} #TODO: librdkafka should be automatically included here by run_exports but is not

Expand Down
13 changes: 11 additions & 2 deletions cpp/libcudf_kafka/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# =============================================================================
# Copyright (c) 2018-2021, NVIDIA CORPORATION.
# Copyright (c) 2018-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -29,6 +29,10 @@ project(
# Set a default build type if none was specified
rapids_cmake_build_type(Release)

# ##################################################################################################
# * conda environment -----------------------------------------------------------------------------
rapids_cmake_support_conda_env(conda_env MODIFY_PREFIX_PATH)

# ##################################################################################################
# * Build options
option(BUILD_TESTS "Build tests for libcudf_kafka" ON)
Expand All @@ -55,7 +59,7 @@ endif()

# ##################################################################################################
# * library target --------------------------------------------------------------------------------
add_library(cudf_kafka SHARED src/kafka_consumer.cpp)
add_library(cudf_kafka SHARED src/kafka_consumer.cpp src/kafka_callback.cpp)

# ##################################################################################################
# * include paths ---------------------------------------------------------------------------------
Expand All @@ -68,6 +72,11 @@ target_include_directories(
# * library paths ---------------------------------------------------------------------------------
target_link_libraries(cudf_kafka PUBLIC cudf::cudf RDKAFKA::RDKAFKA)

# Add Conda library, and include paths if specified
if(TARGET conda_env)
target_link_libraries(cudf_kafka PRIVATE conda_env)
endif()

set_target_properties(
cudf_kafka
PROPERTIES BUILD_RPATH "\$ORIGIN" INSTALL_RPATH "\$ORIGIN" # set target compile options
Expand Down
2 changes: 1 addition & 1 deletion cpp/libcudf_kafka/cmake/thirdparty/get_cudf.cmake
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# =============================================================================
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
Expand Down
2 changes: 1 addition & 1 deletion cpp/libcudf_kafka/cmake/thirdparty/get_rdkafka.cmake
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# =============================================================================
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
Expand Down
71 changes: 71 additions & 0 deletions cpp/libcudf_kafka/include/cudf_kafka/kafka_callback.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <cudf/io/datasource.hpp>

#include <librdkafka/rdkafkacpp.h>

#include <map>
#include <memory>
#include <string>

namespace cudf {
namespace io {
namespace external {
namespace kafka {

/**
* @brief Python Callback function wrapper type used for Kafka OAuth events
*
* The KafkaConsumer calls the `kafka_oauth_callback_wrapper_type` when the existing
* oauth token is considered expired by the KafkaConsumer. Typically that
* means this will be invoked a single time when the KafkaConsumer is created
* to get the initial token and then intermediately as the token becomes
* expired.
*
* The callback function signature is:
* `std::map<std::string, std::string> kafka_oauth_callback_wrapper_type(void*)`
*
* The callback function returns a std::map<std::string, std::string>,
* where the std::map consists of the Oauth token and its
* linux epoch expiration time. Generally the token and expiration
* time is retrieved from an external service by the callback.
* Ex: [token, token_expiration_in_epoch]
*/
using kafka_oauth_callback_wrapper_type = std::map<std::string, std::string> (*)(void*);
using python_callable_type = void*;

/**
* @brief Callback to retrieve OAuth token from external source. Invoked when
* token refresh is required.
*/
class python_oauth_refresh_callback : public RdKafka::OAuthBearerTokenRefreshCb {
public:
python_oauth_refresh_callback(kafka_oauth_callback_wrapper_type callback_wrapper,
python_callable_type python_callable);

void oauthbearer_token_refresh_cb(RdKafka::Handle* handle, const std::string& oauthbearer_config);

private:
kafka_oauth_callback_wrapper_type callback_wrapper_;
python_callable_type python_callable_;
};

} // namespace kafka
} // namespace external
} // namespace io
} // namespace cudf
32 changes: 27 additions & 5 deletions cpp/libcudf_kafka/include/cudf_kafka/kafka_consumer.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,10 +15,14 @@
*/
#pragma once

#include <algorithm>
#include <chrono>
#include "kafka_callback.hpp"

#include <cudf/io/datasource.hpp>

#include <librdkafka/rdkafkacpp.h>

#include <algorithm>
#include <chrono>
#include <map>
#include <memory>
#include <string>
Expand Down Expand Up @@ -48,15 +52,27 @@ class kafka_consumer : public cudf::io::datasource {
*
* @param configs key/value pairs of librdkafka configurations that will be
* passed to the librdkafka client
* @param python_callable `python_callable_type` pointer to a Python functools.partial object
* @param callable_wrapper `kafka_oauth_callback_wrapper_type` Cython wrapper that will
* be used to invoke the `python_callable`. This wrapper serves the purpose
* of preventing us from having to link against the Python development library
* in libcudf_kafka.
*/
kafka_consumer(std::map<std::string, std::string> const& configs);
kafka_consumer(std::map<std::string, std::string> configs,
python_callable_type python_callable,
kafka_oauth_callback_wrapper_type callable_wrapper);

/**
* @brief Instantiate a Kafka consumer object. Documentation for librdkafka configurations can be
* found at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
*
* @param configs key/value pairs of librdkafka configurations that will be
* passed to the librdkafka client
* @param python_callable `python_callable_type` pointer to a Python functools.partial object
* @param callable_wrapper `kafka_oauth_callback_wrapper_type` Cython wrapper that will
* be used to invoke the `python_callable`. This wrapper serves the purpose
* of preventing us from having to link against the Python development library
* in libcudf_kafka.
* @param topic_name name of the Kafka topic to consume from
* @param partition partition index to consume from between `0` and `TOPIC_NUM_PARTITIONS - 1`
* inclusive
Expand All @@ -66,7 +82,9 @@ class kafka_consumer : public cudf::io::datasource {
* before batch_timeout, a smaller subset will be returned
* @param delimiter optional delimiter to insert into the output between kafka messages, Ex: "\n"
*/
kafka_consumer(std::map<std::string, std::string> const& configs,
kafka_consumer(std::map<std::string, std::string> configs,
python_callable_type python_callable,
kafka_oauth_callback_wrapper_type callable_wrapper,
std::string const& topic_name,
int partition,
int64_t start_offset,
Expand Down Expand Up @@ -178,6 +196,10 @@ class kafka_consumer : public cudf::io::datasource {
std::unique_ptr<RdKafka::Conf> kafka_conf; // RDKafka configuration object
std::unique_ptr<RdKafka::KafkaConsumer> consumer;

std::map<std::string, std::string> configs;
python_callable_type python_callable_;
kafka_oauth_callback_wrapper_type callable_wrapper_;

std::string topic_name;
int partition;
int64_t start_offset;
Expand Down
48 changes: 48 additions & 0 deletions cpp/libcudf_kafka/src/kafka_callback.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cudf_kafka/kafka_callback.hpp"

#include <librdkafka/rdkafkacpp.h>

namespace cudf {
namespace io {
namespace external {
namespace kafka {

python_oauth_refresh_callback::python_oauth_refresh_callback(
kafka_oauth_callback_wrapper_type callback_wrapper, python_callable_type python_callable)
: callback_wrapper_(callback_wrapper), python_callable_(python_callable){};

void python_oauth_refresh_callback::oauthbearer_token_refresh_cb(
RdKafka::Handle* handle, std::string const& oauthbearer_config)
{
std::map<std::string, std::string> resp = callback_wrapper_(python_callable_);

// Build parameters to pass to librdkafka
std::string token = resp["token"];
int64_t token_lifetime_ms = std::stoll(resp["token_expiration_in_epoch"]);
std::list<std::string> extensions; // currently not supported
std::string errstr;
CUDF_EXPECTS(
RdKafka::ErrorCode::ERR_NO_ERROR ==
handle->oauthbearer_set_token(token, token_lifetime_ms, "kafka", extensions, errstr),
"Error occurred while setting the oauthbearer token");
}

} // namespace kafka
} // namespace external
} // namespace io
} // namespace cudf
Loading

0 comments on commit 23603d1

Please sign in to comment.