Skip to content

Commit

Permalink
Merge branch 'branch-22.02' into clang-tidy
Browse files Browse the repository at this point in the history
  • Loading branch information
codereport committed Jan 7, 2022
2 parents dff04d2 + 7656277 commit e772fc3
Show file tree
Hide file tree
Showing 53 changed files with 690 additions and 790 deletions.
2 changes: 1 addition & 1 deletion ci/gpu/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ gpuci_mamba_retry install -y \
"ucx-py=${UCX_PY_VERSION}"

# https://docs.rapids.ai/maintainers/depmgmt/
# gpuci_mamba_retry remove --force rapids-build-env rapids-notebook-env
# gpuci_conda_retry remove --force rapids-build-env rapids-notebook-env
# gpuci_mamba_retry install -y "your-pkg=1.0.0"


Expand Down
2 changes: 1 addition & 1 deletion conda/recipes/cudf_kafka/build.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.

# This assumes the script is executed from the root of the repo directory
./build.sh -v cudf_kafka
15 changes: 8 additions & 7 deletions conda/recipes/cudf_kafka/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.

{% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') + environ.get('VERSION_SUFFIX', '') %}
{% set minor_version = version.split('.')[0] + '.' + version.split('.')[1] %}
{% set py_version=environ.get('CONDA_PY', 36) %}
{% set cuda_version='.'.join(environ.get('CUDA', '11.5').split('.')[:2]) %}
{% set cuda_version = '.'.join(environ.get('CUDA', '11.5').split('.')[:2]) %}
{% set py_version = environ.get('python', '3.8') %}

package:
name: cudf_kafka
Expand All @@ -14,7 +14,7 @@ source:

build:
number: {{ GIT_DESCRIBE_NUMBER }}
string: py{{ py_version }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }}
string: py{{ py_version.replace('.', '') }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }}
script_env:
- CC
- CXX
Expand All @@ -26,14 +26,15 @@ requirements:
build:
- cmake >=3.20.1
host:
- python
- python {{ py_version }}
- cython >=0.29,<0.30
- setuptools
- cudf {{ version }}
- libcudf_kafka {{ version }}
- setuptools
run:
- python {{ py_version }}
- libcudf_kafka {{ version }}
- python-confluent-kafka
- python-confluent-kafka >=1.7.0,<1.8.0a0=py{{ py_version.replace('.', '') }}*
- cudf {{ version }}

test: # [linux64]
Expand Down
2 changes: 1 addition & 1 deletion conda/recipes/custreamz/build.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.

# This assumes the script is executed from the root of the repo directory
./build.sh -v custreamz
18 changes: 9 additions & 9 deletions conda/recipes/custreamz/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Copyright (c) 2018-2019, NVIDIA CORPORATION.
# Copyright (c) 2018-2022, NVIDIA CORPORATION.

{% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') + environ.get('VERSION_SUFFIX', '') %}
{% set minor_version = version.split('.')[0] + '.' + version.split('.')[1] %}
{% set py_version=environ.get('CONDA_PY', 36) %}
{% set cuda_version='.'.join(environ.get('CUDA', '11.5').split('.')[:2]) %}
{% set cuda_version = '.'.join(environ.get('CUDA', '11.5').split('.')[:2]) %}
{% set py_version = environ.get('python', '3.8') %}

package:
name: custreamz
Expand All @@ -14,7 +14,7 @@ source:

build:
number: {{ GIT_DESCRIBE_NUMBER }}
string: py{{ py_version }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }}
string: py{{ py_version.replace('.', '') }}_{{ GIT_DESCRIBE_HASH }}_{{ GIT_DESCRIBE_NUMBER }}
script_env:
- VERSION_SUFFIX
- PARALLEL_LEVEL
Expand All @@ -24,16 +24,16 @@ build:

requirements:
host:
- python
- python-confluent-kafka
- python {{ py_version }}
- python-confluent-kafka >=1.7.0,<1.8.0a0=py{{ py_version.replace('.', '') }}*
- cudf_kafka {{ version }}
run:
- python
- streamz
- python {{ py_version }}
- streamz
- cudf {{ version }}
- dask>=2021.11.1,<=2021.11.2
- distributed>=2021.11.1,<=2021.11.2
- python-confluent-kafka
- python-confluent-kafka >=1.7.0,<1.8.0a0=py{{ py_version.replace('.', '') }}*
- cudf_kafka {{ version }}

test: # [linux64]
Expand Down
2 changes: 1 addition & 1 deletion conda/recipes/libcudf_kafka/build.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.

if [[ -z "$PROJECT_FLASH" || "$PROJECT_FLASH" == "0" ]]; then
# This assumes the script is executed from the root of the repo directory
Expand Down
4 changes: 2 additions & 2 deletions conda/recipes/libcudf_kafka/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2018-2021, NVIDIA CORPORATION.
# Copyright (c) 2018-2022, NVIDIA CORPORATION.

{% set version = environ.get('GIT_DESCRIBE_TAG', '0.0.0.dev').lstrip('v') + environ.get('VERSION_SUFFIX', '') %}
{% set minor_version = version.split('.')[0] + '.' + version.split('.')[1] %}
Expand Down Expand Up @@ -26,7 +26,7 @@ requirements:
- cmake >=3.20.1
host:
- libcudf {{version}}
- librdkafka >=1.6.0,<1.7.0a0
- librdkafka >=1.7.0,<1.8.0a0
run:
- {{ pin_compatible('librdkafka', max_pin='x.x') }} #TODO: librdkafka should be automatically included here by run_exports but is not

Expand Down
20 changes: 12 additions & 8 deletions cpp/benchmarks/io/parquet/parquet_reader_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ void BM_parq_read_varying_options(benchmark::State& state)
auto const use_pandas_metadata = (flags & 2) != 0;
auto const ts_type = cudf::data_type{static_cast<cudf::type_id>(state.range(state_idx++))};

auto const data_types =
dtypes_for_column_selection(get_type_or_group({int32_t(type_group_id::INTEGRAL),
int32_t(type_group_id::FLOATING_POINT),
int32_t(type_group_id::FIXED_POINT),
int32_t(type_group_id::TIMESTAMP),
int32_t(cudf::type_id::STRING),
int32_t(cudf::type_id::LIST)}),
col_sel);
auto const data_types = dtypes_for_column_selection(
get_type_or_group({static_cast<int32_t>(type_group_id::INTEGRAL),
static_cast<int32_t>(type_group_id::FLOATING_POINT),
static_cast<int32_t>(type_group_id::FIXED_POINT),
static_cast<int32_t>(type_group_id::TIMESTAMP),
static_cast<int32_t>(cudf::type_id::STRING),
static_cast<int32_t>(cudf::type_id::LIST)}),
col_sel);
auto const tbl = create_random_table(data_types, data_types.size(), table_size_bytes{data_size});
auto const view = tbl->view();

Expand Down Expand Up @@ -181,6 +181,9 @@ BENCHMARK_REGISTER_F(ParquetRead, column_selection)
->Unit(benchmark::kMillisecond)
->UseManualTime();

// Disabled until we add an API to read metadata from a parquet file and determine num row groups.
// https://github.com/rapidsai/cudf/pull/9963#issuecomment-1004832863
/*
BENCHMARK_DEFINE_F(ParquetRead, row_selection)
(::benchmark::State& state) { BM_parq_read_varying_options(state); }
BENCHMARK_REGISTER_F(ParquetRead, row_selection)
Expand All @@ -191,6 +194,7 @@ BENCHMARK_REGISTER_F(ParquetRead, row_selection)
{int32_t(cudf::type_id::EMPTY)}})
->Unit(benchmark::kMillisecond)
->UseManualTime();
*/

BENCHMARK_DEFINE_F(ParquetRead, misc_options)
(::benchmark::State& state) { BM_parq_read_varying_options(state); }
Expand Down
13 changes: 11 additions & 2 deletions cpp/libcudf_kafka/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# =============================================================================
# Copyright (c) 2018-2021, NVIDIA CORPORATION.
# Copyright (c) 2018-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -29,6 +29,10 @@ project(
# Set a default build type if none was specified
rapids_cmake_build_type(Release)

# ##################################################################################################
# * conda environment -----------------------------------------------------------------------------
rapids_cmake_support_conda_env(conda_env MODIFY_PREFIX_PATH)

# ##################################################################################################
# * Build options
option(BUILD_TESTS "Build tests for libcudf_kafka" ON)
Expand All @@ -55,7 +59,7 @@ endif()

# ##################################################################################################
# * library target --------------------------------------------------------------------------------
add_library(cudf_kafka SHARED src/kafka_consumer.cpp)
add_library(cudf_kafka SHARED src/kafka_consumer.cpp src/kafka_callback.cpp)

# ##################################################################################################
# * include paths ---------------------------------------------------------------------------------
Expand All @@ -68,6 +72,11 @@ target_include_directories(
# * library paths ---------------------------------------------------------------------------------
target_link_libraries(cudf_kafka PUBLIC cudf::cudf RDKAFKA::RDKAFKA)

# Add Conda library, and include paths if specified
if(TARGET conda_env)
target_link_libraries(cudf_kafka PRIVATE conda_env)
endif()

set_target_properties(
cudf_kafka
PROPERTIES BUILD_RPATH "\$ORIGIN" INSTALL_RPATH "\$ORIGIN" # set target compile options
Expand Down
2 changes: 1 addition & 1 deletion cpp/libcudf_kafka/cmake/thirdparty/get_cudf.cmake
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# =============================================================================
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
Expand Down
2 changes: 1 addition & 1 deletion cpp/libcudf_kafka/cmake/thirdparty/get_rdkafka.cmake
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# =============================================================================
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
Expand Down
71 changes: 71 additions & 0 deletions cpp/libcudf_kafka/include/cudf_kafka/kafka_callback.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <cudf/io/datasource.hpp>

#include <librdkafka/rdkafkacpp.h>

#include <map>
#include <memory>
#include <string>

namespace cudf {
namespace io {
namespace external {
namespace kafka {

/**
* @brief Python Callback function wrapper type used for Kafka OAuth events
*
* The KafkaConsumer calls the `kafka_oauth_callback_wrapper_type` when the existing
* oauth token is considered expired by the KafkaConsumer. Typically that
* means this will be invoked a single time when the KafkaConsumer is created
* to get the initial token and then intermediately as the token becomes
* expired.
*
* The callback function signature is:
* `std::map<std::string, std::string> kafka_oauth_callback_wrapper_type(void*)`
*
* The callback function returns a std::map<std::string, std::string>,
* where the std::map consists of the Oauth token and its
* linux epoch expiration time. Generally the token and expiration
* time is retrieved from an external service by the callback.
* Ex: [token, token_expiration_in_epoch]
*/
using kafka_oauth_callback_wrapper_type = std::map<std::string, std::string> (*)(void*);
using python_callable_type = void*;

/**
* @brief Callback to retrieve OAuth token from external source. Invoked when
* token refresh is required.
*/
class python_oauth_refresh_callback : public RdKafka::OAuthBearerTokenRefreshCb {
public:
python_oauth_refresh_callback(kafka_oauth_callback_wrapper_type callback_wrapper,
python_callable_type python_callable);

void oauthbearer_token_refresh_cb(RdKafka::Handle* handle, const std::string& oauthbearer_config);

private:
kafka_oauth_callback_wrapper_type callback_wrapper_;
python_callable_type python_callable_;
};

} // namespace kafka
} // namespace external
} // namespace io
} // namespace cudf
32 changes: 27 additions & 5 deletions cpp/libcudf_kafka/include/cudf_kafka/kafka_consumer.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,10 +15,14 @@
*/
#pragma once

#include <algorithm>
#include <chrono>
#include "kafka_callback.hpp"

#include <cudf/io/datasource.hpp>

#include <librdkafka/rdkafkacpp.h>

#include <algorithm>
#include <chrono>
#include <map>
#include <memory>
#include <string>
Expand Down Expand Up @@ -48,15 +52,27 @@ class kafka_consumer : public cudf::io::datasource {
*
* @param configs key/value pairs of librdkafka configurations that will be
* passed to the librdkafka client
* @param python_callable `python_callable_type` pointer to a Python functools.partial object
* @param callable_wrapper `kafka_oauth_callback_wrapper_type` Cython wrapper that will
* be used to invoke the `python_callable`. This wrapper serves the purpose
* of preventing us from having to link against the Python development library
* in libcudf_kafka.
*/
kafka_consumer(std::map<std::string, std::string> const& configs);
kafka_consumer(std::map<std::string, std::string> configs,
python_callable_type python_callable,
kafka_oauth_callback_wrapper_type callable_wrapper);

/**
* @brief Instantiate a Kafka consumer object. Documentation for librdkafka configurations can be
* found at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
*
* @param configs key/value pairs of librdkafka configurations that will be
* passed to the librdkafka client
* @param python_callable `python_callable_type` pointer to a Python functools.partial object
* @param callable_wrapper `kafka_oauth_callback_wrapper_type` Cython wrapper that will
* be used to invoke the `python_callable`. This wrapper serves the purpose
* of preventing us from having to link against the Python development library
* in libcudf_kafka.
* @param topic_name name of the Kafka topic to consume from
* @param partition partition index to consume from between `0` and `TOPIC_NUM_PARTITIONS - 1`
* inclusive
Expand All @@ -66,7 +82,9 @@ class kafka_consumer : public cudf::io::datasource {
* before batch_timeout, a smaller subset will be returned
* @param delimiter optional delimiter to insert into the output between kafka messages, Ex: "\n"
*/
kafka_consumer(std::map<std::string, std::string> const& configs,
kafka_consumer(std::map<std::string, std::string> configs,
python_callable_type python_callable,
kafka_oauth_callback_wrapper_type callable_wrapper,
std::string const& topic_name,
int partition,
int64_t start_offset,
Expand Down Expand Up @@ -178,6 +196,10 @@ class kafka_consumer : public cudf::io::datasource {
std::unique_ptr<RdKafka::Conf> kafka_conf; // RDKafka configuration object
std::unique_ptr<RdKafka::KafkaConsumer> consumer;

std::map<std::string, std::string> configs;
python_callable_type python_callable_;
kafka_oauth_callback_wrapper_type callable_wrapper_;

std::string topic_name;
int partition;
int64_t start_offset;
Expand Down
Loading

0 comments on commit e772fc3

Please sign in to comment.