Skip to content

Commit

Permalink
Merge pull request #3504 from jdye64/3405
Browse files Browse the repository at this point in the history
[REVIEW] Externalized Kafka Datasource
  • Loading branch information
Keith Kraus authored Jun 29, 2020
2 parents 1e21c83 + eb98755 commit 0732f53
Show file tree
Hide file tree
Showing 12 changed files with 521 additions and 34 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- PR #5327 Add `cudf::cross_join` feature
- PR #5204 Concatenate strings columns using row separator as strings column
- PR #5342 Add support for `StringMethods.__getitem__`
- PR #3504 Add External Kafka Datasource
- PR #5356 Use `size_type` instead of `scalar` in `cudf::repeat`.
- PR #5397 Add internal implementation of nested loop equijoins.
- PR #5303 Add slice_strings functionality using delimiter string
Expand Down Expand Up @@ -2249,4 +2250,4 @@

# cuDF 0.2.0 and cuDF 0.1.0

These were initial releases of cuDF based on previously separate pyGDF and libGDF libraries.
These were initial releases of cuDF based on previously separate pyGDF and libGDF libraries.
60 changes: 40 additions & 20 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
# This script is used to build the component(s) in this repo from
# source, and can be called with various options to customize the
# build as needed (see the help output for details)

# Abort script on first error
set -e

Expand All @@ -18,22 +17,24 @@ ARGS=$*
# script, and that this script resides in the repo dir!
REPODIR=$(cd $(dirname $0); pwd)

VALIDARGS="clean libcudf cudf dask_cudf benchmarks tests -v -g -n --allgpuarch --disable_nvtx --show_depr_warn -h"
HELP="$0 [clean] [libcudf] [cudf] [dask_cudf] [benchmarks] [tests] [-v] [-g] [-n] [-h]
clean - remove all existing build artifacts and configuration (start
over)
libcudf - build the cudf C++ code only
cudf - build the cudf Python package
dask_cudf - build the dask_cudf Python package
benchmarks - build benchmarks
tests - build tests
-v - verbose build mode
-g - build for debug
-n - no install step
--allgpuarch - build for all supported GPU architectures
--disable_nvtx - disable inserting NVTX profiling ranges
--show_depr_warn - show cmake deprecation warnings
-h - print this text
VALIDARGS="clean libcudf cudf dask_cudf benchmarks tests libcudf_kafka -v -g -n -l --allgpuarch --disable_nvtx --show_depr_warn -h"
HELP="$0 [clean] [libcudf] [cudf] [dask_cudf] [benchmarks] [tests] [libcudf_kafka] [-v] [-g] [-n] [-h] [-l]
clean - remove all existing build artifacts and configuration (start
over)
libcudf - build the cudf C++ code only
cudf - build the cudf Python package
dask_cudf - build the dask_cudf Python package
benchmarks - build benchmarks
tests - build tests
libcudf_kafka - build the libcudf_kafka C++ code only
-v - verbose build mode
-g - build for debug
-n - no install step
-l - build legacy tests
--allgpuarch - build for all supported GPU architectures
--disable_nvtx - disable inserting NVTX profiling ranges
--show_depr_warn - show cmake deprecation warnings
-h - print this text
default action (no args) is to build and install 'libcudf' then 'cudf'
then 'dask_cudf' targets
Expand All @@ -52,6 +53,7 @@ BUILD_ALL_GPU_ARCH=0
BUILD_NVTX=ON
BUILD_TESTS=OFF
BUILD_DISABLE_DEPRECATION_WARNING=ON
BUILD_LIBCUDF_KAFKA=OFF

# Set defaults for vars that may not have been defined externally
# FIXME: if INSTALL_PREFIX is not set, check PREFIX, then check
Expand Down Expand Up @@ -108,6 +110,9 @@ fi
if hasArg --show_depr_warn; then
BUILD_DISABLE_DEPRECATION_WARNING=OFF
fi
if hasArg libcudf_kafka; then
BUILD_LIBCUDF_KAFKA=ON
fi

# If clean given, run it prior to any other steps
if hasArg clean; then
Expand All @@ -134,8 +139,7 @@ fi
################################################################################
# Configure, build, and install libcudf

if buildAll || hasArg libcudf; then

if buildAll || hasArg libcudf || hasArg libcudf_kafka; then
mkdir -p ${LIB_BUILD_DIR}
cd ${LIB_BUILD_DIR}
cmake -DCMAKE_INSTALL_PREFIX=${INSTALL_PREFIX} \
Expand All @@ -144,7 +148,8 @@ if buildAll || hasArg libcudf; then
-DUSE_NVTX=${BUILD_NVTX} \
-DBUILD_BENCHMARKS=${BUILD_BENCHMARKS} \
-DDISABLE_DEPRECATION_WARNING=${BUILD_DISABLE_DEPRECATION_WARNING} \
-DCMAKE_BUILD_TYPE=${BUILD_TYPE} $REPODIR/cpp
-DCMAKE_BUILD_TYPE=${BUILD_TYPE} \
-DBUILD_CUDF_KAFKA=${BUILD_LIBCUDF_KAFKA} $REPODIR/cpp
fi

if buildAll || hasArg libcudf; then
Expand Down Expand Up @@ -187,3 +192,18 @@ if buildAll || hasArg dask_cudf; then
python setup.py build_ext --inplace
fi
fi

# Do not build libcudf_kafka with 'buildAll'
if hasArg libcudf_kafka; then

cd ${LIB_BUILD_DIR}
if [[ ${INSTALL_TARGET} != "" ]]; then
make -j${PARALLEL_LEVEL} install_libcudf_kafka VERBOSE=${VERBOSE}
else
make -j${PARALLEL_LEVEL} libcudf_kafka VERBOSE=${VERBOSE}
fi

if [[ ${BUILD_TESTS} == "ON" ]]; then
make -j${PARALLEL_LEVEL} build_tests_libcudf_kafka VERBOSE=${VERBOSE}
fi
fi
11 changes: 11 additions & 0 deletions ci/gpu/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ else
$WORKSPACE/build.sh clean libcudf cudf dask_cudf benchmarks tests -l
fi

################################################################################
# BUILD - Build libcudf_kafka from source
################################################################################

logger "Build libcudf_kafka..."
if [[ ${BUILD_MODE} == "pull-request" ]]; then
$WORKSPACE/build.sh clean libcudf_kafka tests
else
$WORKSPACE/build.sh clean libcudf_kafka tests -l
fi

################################################################################
# TEST - Run GoogleTest and py.tests for libcudf, and
# cuDF
Expand Down
70 changes: 70 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ set(CMAKE_EXE_LINKER_FLAGS "-Wl,--disable-new-dtags")
option(BUILD_SHARED_LIBS "Build shared libraries" ON)
option(BUILD_TESTS "Configure CMake to build tests" ON)
option(BUILD_BENCHMARKS "Configure CMake to build (google) benchmarks" OFF)
option(BUILD_CUDF_KAFKA "Configure CMake to build cudf_kafka" OFF)

###################################################################################################
# - cudart options --------------------------------------------------------------------------------
Expand Down Expand Up @@ -773,3 +774,72 @@ add_custom_command(OUTPUT CUDF_DOXYGEN
VERBATIM)

add_custom_target(docs_cudf DEPENDS CUDF_DOXYGEN)


####################################################################################################
# - cudf_kafka - OFF by default due to increased number of dependencies
if(BUILD_CUDF_KAFKA)

# cudf_kafka library
add_library(libcudf_kafka
libcudf_kafka/src/kafka_consumer.cpp
)

# Include paths
include_directories("${CMAKE_SOURCE_DIR}/libcudf_kafka/include"
"${CMAKE_CURRENT_SOURCE_DIR}/include/cudf")

# Rename installation to proper name for later finding
set_target_properties(libcudf_kafka PROPERTIES OUTPUT_NAME "cudf_kafka")
set_target_properties(libcudf_kafka PROPERTIES BUILD_RPATH "\$ORIGIN")

###################################################################################################
# cudf_kafka - librdkafka -------------------------------------------------------------------------

find_path(RDKAFKA_INCLUDE "librdkafka" HINTS "$ENV{RDKAFKA_ROOT}/include")
find_library(RDKAFKA++_LIBRARY "rdkafka++" HINTS "$ENV{RDKAFKA_ROOT}/lib" "$ENV{RDKAFKA_ROOT}/build")

message(STATUS "RDKAFKA: RDKAFKA++_LIBRARY set to ${RDKAFKA++_LIBRARY}")
message(STATUS "RDKAFKA: RDKAFKA_INCLUDE set to ${RDKAFKA_INCLUDE}")

target_link_libraries(libcudf_kafka ${RDKAFKA++_LIBRARY})
include_directories("${RDKAFKA_INCLUDE}")

###################################################################################################
# - cudf_kafka Install ----------------------------------------------------------------------------
target_link_libraries(libcudf_kafka cudf)

install(TARGETS libcudf_kafka
DESTINATION libcudf_kafka/lib
COMPONENT libcudf_kafka)

install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/libcudf_kafka/include
DESTINATION include
COMPONENT libcudf_kafka)

add_custom_target(install_libcudf_kafka
COMMAND "${CMAKE_COMMAND}" -DCOMPONENT=libcudf_kafka -P "${CMAKE_BINARY_DIR}/cmake_install.cmake"
DEPENDS libcudf_kafka)

####################################################################################################
# - cudf_kafka Tests
if(BUILD_TESTS)
if(GTEST_FOUND)
message(STATUS "Google C++ Testing Framework (Google Test) found in ${GTEST_ROOT}")
include_directories(${GTEST_INCLUDE_DIR})
add_subdirectory(${CMAKE_SOURCE_DIR}/libcudf_kafka/tests)
else()
message(AUTHOR_WARNING "Google C++ Testing Framework (Google Test) not found: automated tests are disabled.")
endif(GTEST_FOUND)
endif(BUILD_TESTS)

message(STATUS "CUDF_KAFKA_TEST_LIST set to: ${CUDF_KAFKA_TEST_LIST}")

add_custom_target(build_tests_libcudf_kafka
DEPENDS ${CUDF_KAFKA_TEST_LIST})

add_custom_target(test_libcudf_kafka
COMMAND ctest
DEPENDS build_tests_libcudf_kafka)

endif(BUILD_CUDF_KAFKA)
3 changes: 2 additions & 1 deletion cpp/doxygen/Doxyfile
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,8 @@ WARN_LOGFILE =
INPUT = main_page.md \
regex.md \
unicode.md \
../include
../include \
../libcudf_kafka/include

# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses
Expand Down
21 changes: 21 additions & 0 deletions cpp/include/cudf/io/datasource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,27 @@ class datasource {
* @return bool True if there is data, False otherwise
*/
virtual bool is_empty() const { return size() == 0; }

/**
* @brief Implementation for non owning buffer where datasource holds buffer until destruction.
*
* @param[in] data Address of the buffer source data
* @param[in] size Bytes of the buffer size
**/
class non_owning_buffer : public buffer {
public:
non_owning_buffer() : _data(0), _size(0) {}

non_owning_buffer(uint8_t* data, size_t size) : _data(data), _size(size) {}

size_t size() const override { return _size; }

const uint8_t* data() const override { return _data; }

private:
uint8_t* const _data;
size_t const _size;
};
};

} // namespace io
Expand Down
1 change: 1 addition & 0 deletions cpp/include/doxygen_groups.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
* @}
* @defgroup io_apis IO
* @{
* @defgroup io_datasources Datasources
* @defgroup io_readers Readers
* @defgroup io_writers Writers
* @}
Expand Down
120 changes: 120 additions & 0 deletions cpp/libcudf_kafka/include/cudf_kafka/kafka_consumer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <librdkafka/rdkafkacpp.h>
#include <algorithm>
#include <chrono>
#include <cudf/io/datasource.hpp>
#include <map>
#include <memory>
#include <string>

namespace cudf {
namespace io {
namespace external {
namespace kafka {

/**
* @brief libcudf datasource for Apache Kafka
*
* @ingroup io_datasources
**/
class kafka_consumer : public cudf::io::datasource {
public:
/**
* @brief Instantiate a Kafka consumer object. Documentation for librdkafka configurations can be
* found at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
*
* @param configs key/value pairs of librdkafka configurations that will be
* passed to the librdkafka client
* @param topic_name name of the Kafka topic to consume from
* @param partition partition index to consume from between `0` and `TOPIC_NUM_PARTITIONS - 1`
* inclusive
* @param start_offset seek position for the specified TOPPAR (Topic/Partition combo)
* @param end_offset position in the specified TOPPAR to read to
* @param batch_timeout maximum (millisecond) read time allowed. If end_offset is not reached
* before batch_timeout, a smaller subset will be returned
* @param delimiter optional delimiter to insert into the output between kafka messages, Ex: "\n"
**/
kafka_consumer(std::map<std::string, std::string> configs,
std::string topic_name,
int partition,
int64_t start_offset,
int64_t end_offset,
int batch_timeout,
std::string delimiter);

/**
* @brief Returns a buffer with a subset of data from Kafka Topic
*
* @param[in] offset Bytes from the start
* @param[in] size Bytes to read
*
* @return The data buffer
*/
std::unique_ptr<cudf::io::datasource::buffer> host_read(size_t offset, size_t size) override;

/**
* @brief Returns the size of the data in Kafka buffer
*
* @return size_t The size of the source data in bytes
*/
size_t size() const override;

/**
* @brief Reads a selected range into a preallocated buffer.
*
* @param[in] offset Bytes from the start
* @param[in] size Bytes to read
* @param[in] dst Address of the existing host memory
*
* @return The number of bytes read (can be smaller than size)
*/
size_t host_read(size_t offset, size_t size, uint8_t *dst) override;

virtual ~kafka_consumer(){};

private:
std::unique_ptr<RdKafka::Conf> kafka_conf; // RDKafka configuration object
std::unique_ptr<RdKafka::KafkaConsumer> consumer;

std::string topic_name;
int partition;
int64_t start_offset;
int64_t end_offset;
int batch_timeout;
std::string delimiter;

std::string buffer;

private:
RdKafka::ErrorCode update_consumer_topic_partition_assignment(std::string const &topic,
int partition,
int64_t offset);

/**
* Convenience method for getting "now()" in Kafka's standard format
**/
int64_t now();

void consume_to_buffer();
};

} // namespace kafka
} // namespace external
} // namespace io
} // namespace cudf
Loading

0 comments on commit 0732f53

Please sign in to comment.