diff --git a/CHANGELOG.md b/CHANGELOG.md index 92ebde6e01a..d019488589e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - PR #5327 Add `cudf::cross_join` feature - PR #5204 Concatenate strings columns using row separator as strings column - PR #5342 Add support for `StringMethods.__getitem__` +- PR #3504 Add External Kafka Datasource - PR #5356 Use `size_type` instead of `scalar` in `cudf::repeat`. - PR #5397 Add internal implementation of nested loop equijoins. - PR #5303 Add slice_strings functionality using delimiter string @@ -2249,4 +2250,4 @@ # cuDF 0.2.0 and cuDF 0.1.0 -These were initial releases of cuDF based on previously separate pyGDF and libGDF libraries. +These were initial releases of cuDF based on previously separate pyGDF and libGDF libraries. \ No newline at end of file diff --git a/build.sh b/build.sh index 1186c57e395..e6b7d6f8447 100755 --- a/build.sh +++ b/build.sh @@ -7,7 +7,6 @@ # This script is used to build the component(s) in this repo from # source, and can be called with various options to customize the # build as needed (see the help output for details) - # Abort script on first error set -e @@ -18,22 +17,24 @@ ARGS=$* # script, and that this script resides in the repo dir! REPODIR=$(cd $(dirname $0); pwd) -VALIDARGS="clean libcudf cudf dask_cudf benchmarks tests -v -g -n --allgpuarch --disable_nvtx --show_depr_warn -h" -HELP="$0 [clean] [libcudf] [cudf] [dask_cudf] [benchmarks] [tests] [-v] [-g] [-n] [-h] - clean - remove all existing build artifacts and configuration (start - over) - libcudf - build the cudf C++ code only - cudf - build the cudf Python package - dask_cudf - build the dask_cudf Python package - benchmarks - build benchmarks - tests - build tests - -v - verbose build mode - -g - build for debug - -n - no install step - --allgpuarch - build for all supported GPU architectures - --disable_nvtx - disable inserting NVTX profiling ranges - --show_depr_warn - show cmake deprecation warnings - -h - print this text +VALIDARGS="clean libcudf cudf dask_cudf benchmarks tests libcudf_kafka -v -g -n -l --allgpuarch --disable_nvtx --show_depr_warn -h" +HELP="$0 [clean] [libcudf] [cudf] [dask_cudf] [benchmarks] [tests] [libcudf_kafka] [-v] [-g] [-n] [-h] [-l] + clean - remove all existing build artifacts and configuration (start + over) + libcudf - build the cudf C++ code only + cudf - build the cudf Python package + dask_cudf - build the dask_cudf Python package + benchmarks - build benchmarks + tests - build tests + libcudf_kafka - build the libcudf_kafka C++ code only + -v - verbose build mode + -g - build for debug + -n - no install step + -l - build legacy tests + --allgpuarch - build for all supported GPU architectures + --disable_nvtx - disable inserting NVTX profiling ranges + --show_depr_warn - show cmake deprecation warnings + -h - print this text default action (no args) is to build and install 'libcudf' then 'cudf' then 'dask_cudf' targets @@ -52,6 +53,7 @@ BUILD_ALL_GPU_ARCH=0 BUILD_NVTX=ON BUILD_TESTS=OFF BUILD_DISABLE_DEPRECATION_WARNING=ON +BUILD_LIBCUDF_KAFKA=OFF # Set defaults for vars that may not have been defined externally # FIXME: if INSTALL_PREFIX is not set, check PREFIX, then check @@ -108,6 +110,9 @@ fi if hasArg --show_depr_warn; then BUILD_DISABLE_DEPRECATION_WARNING=OFF fi +if hasArg libcudf_kafka; then + BUILD_LIBCUDF_KAFKA=ON +fi # If clean given, run it prior to any other steps if hasArg clean; then @@ -134,8 +139,7 @@ fi ################################################################################ # Configure, build, and install libcudf -if buildAll || hasArg libcudf; then - +if buildAll || hasArg libcudf || hasArg libcudf_kafka; then mkdir -p ${LIB_BUILD_DIR} cd ${LIB_BUILD_DIR} cmake -DCMAKE_INSTALL_PREFIX=${INSTALL_PREFIX} \ @@ -144,7 +148,8 @@ if buildAll || hasArg libcudf; then -DUSE_NVTX=${BUILD_NVTX} \ -DBUILD_BENCHMARKS=${BUILD_BENCHMARKS} \ -DDISABLE_DEPRECATION_WARNING=${BUILD_DISABLE_DEPRECATION_WARNING} \ - -DCMAKE_BUILD_TYPE=${BUILD_TYPE} $REPODIR/cpp + -DCMAKE_BUILD_TYPE=${BUILD_TYPE} \ + -DBUILD_CUDF_KAFKA=${BUILD_LIBCUDF_KAFKA} $REPODIR/cpp fi if buildAll || hasArg libcudf; then @@ -187,3 +192,18 @@ if buildAll || hasArg dask_cudf; then python setup.py build_ext --inplace fi fi + +# Do not build libcudf_kafka with 'buildAll' +if hasArg libcudf_kafka; then + + cd ${LIB_BUILD_DIR} + if [[ ${INSTALL_TARGET} != "" ]]; then + make -j${PARALLEL_LEVEL} install_libcudf_kafka VERBOSE=${VERBOSE} + else + make -j${PARALLEL_LEVEL} libcudf_kafka VERBOSE=${VERBOSE} + fi + + if [[ ${BUILD_TESTS} == "ON" ]]; then + make -j${PARALLEL_LEVEL} build_tests_libcudf_kafka VERBOSE=${VERBOSE} + fi +fi diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index 70988a61e9e..461f35cc0bc 100755 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -97,6 +97,17 @@ else $WORKSPACE/build.sh clean libcudf cudf dask_cudf benchmarks tests -l fi +################################################################################ +# BUILD - Build libcudf_kafka from source +################################################################################ + +logger "Build libcudf_kafka..." +if [[ ${BUILD_MODE} == "pull-request" ]]; then + $WORKSPACE/build.sh clean libcudf_kafka tests +else + $WORKSPACE/build.sh clean libcudf_kafka tests -l +fi + ################################################################################ # TEST - Run GoogleTest and py.tests for libcudf, and # cuDF diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index b1659b25c41..3f14909ad38 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -138,6 +138,7 @@ set(CMAKE_EXE_LINKER_FLAGS "-Wl,--disable-new-dtags") option(BUILD_SHARED_LIBS "Build shared libraries" ON) option(BUILD_TESTS "Configure CMake to build tests" ON) option(BUILD_BENCHMARKS "Configure CMake to build (google) benchmarks" OFF) +option(BUILD_CUDF_KAFKA "Configure CMake to build cudf_kafka" OFF) ################################################################################################### # - cudart options -------------------------------------------------------------------------------- @@ -773,3 +774,72 @@ add_custom_command(OUTPUT CUDF_DOXYGEN VERBATIM) add_custom_target(docs_cudf DEPENDS CUDF_DOXYGEN) + + +#################################################################################################### +# - cudf_kafka - OFF by default due to increased number of dependencies +if(BUILD_CUDF_KAFKA) + + # cudf_kafka library + add_library(libcudf_kafka + libcudf_kafka/src/kafka_consumer.cpp + ) + + # Include paths + include_directories("${CMAKE_SOURCE_DIR}/libcudf_kafka/include" + "${CMAKE_CURRENT_SOURCE_DIR}/include/cudf") + + # Rename installation to proper name for later finding + set_target_properties(libcudf_kafka PROPERTIES OUTPUT_NAME "cudf_kafka") + set_target_properties(libcudf_kafka PROPERTIES BUILD_RPATH "\$ORIGIN") + + ################################################################################################### + # cudf_kafka - librdkafka ------------------------------------------------------------------------- + + find_path(RDKAFKA_INCLUDE "librdkafka" HINTS "$ENV{RDKAFKA_ROOT}/include") + find_library(RDKAFKA++_LIBRARY "rdkafka++" HINTS "$ENV{RDKAFKA_ROOT}/lib" "$ENV{RDKAFKA_ROOT}/build") + + message(STATUS "RDKAFKA: RDKAFKA++_LIBRARY set to ${RDKAFKA++_LIBRARY}") + message(STATUS "RDKAFKA: RDKAFKA_INCLUDE set to ${RDKAFKA_INCLUDE}") + + target_link_libraries(libcudf_kafka ${RDKAFKA++_LIBRARY}) + include_directories("${RDKAFKA_INCLUDE}") + + ################################################################################################### + # - cudf_kafka Install ---------------------------------------------------------------------------- + target_link_libraries(libcudf_kafka cudf) + + install(TARGETS libcudf_kafka + DESTINATION libcudf_kafka/lib + COMPONENT libcudf_kafka) + + install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/libcudf_kafka/include + DESTINATION include + COMPONENT libcudf_kafka) + + add_custom_target(install_libcudf_kafka + COMMAND "${CMAKE_COMMAND}" -DCOMPONENT=libcudf_kafka -P "${CMAKE_BINARY_DIR}/cmake_install.cmake" + DEPENDS libcudf_kafka) + + #################################################################################################### + # - cudf_kafka Tests + if(BUILD_TESTS) + if(GTEST_FOUND) + message(STATUS "Google C++ Testing Framework (Google Test) found in ${GTEST_ROOT}") + include_directories(${GTEST_INCLUDE_DIR}) + add_subdirectory(${CMAKE_SOURCE_DIR}/libcudf_kafka/tests) + else() + message(AUTHOR_WARNING "Google C++ Testing Framework (Google Test) not found: automated tests are disabled.") + endif(GTEST_FOUND) + endif(BUILD_TESTS) + + message(STATUS "CUDF_KAFKA_TEST_LIST set to: ${CUDF_KAFKA_TEST_LIST}") + + add_custom_target(build_tests_libcudf_kafka + DEPENDS ${CUDF_KAFKA_TEST_LIST}) + + add_custom_target(test_libcudf_kafka + COMMAND ctest + DEPENDS build_tests_libcudf_kafka) + +endif(BUILD_CUDF_KAFKA) diff --git a/cpp/doxygen/Doxyfile b/cpp/doxygen/Doxyfile index b25842f09ec..39229862568 100644 --- a/cpp/doxygen/Doxyfile +++ b/cpp/doxygen/Doxyfile @@ -817,7 +817,8 @@ WARN_LOGFILE = INPUT = main_page.md \ regex.md \ unicode.md \ - ../include + ../include \ + ../libcudf_kafka/include # This tag can be used to specify the character encoding of the source files # that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses diff --git a/cpp/include/cudf/io/datasource.hpp b/cpp/include/cudf/io/datasource.hpp index c85e81c9e45..9b9587b81a5 100644 --- a/cpp/include/cudf/io/datasource.hpp +++ b/cpp/include/cudf/io/datasource.hpp @@ -177,6 +177,27 @@ class datasource { * @return bool True if there is data, False otherwise */ virtual bool is_empty() const { return size() == 0; } + + /** + * @brief Implementation for non owning buffer where datasource holds buffer until destruction. + * + * @param[in] data Address of the buffer source data + * @param[in] size Bytes of the buffer size + **/ + class non_owning_buffer : public buffer { + public: + non_owning_buffer() : _data(0), _size(0) {} + + non_owning_buffer(uint8_t* data, size_t size) : _data(data), _size(size) {} + + size_t size() const override { return _size; } + + const uint8_t* data() const override { return _data; } + + private: + uint8_t* const _data; + size_t const _size; + }; }; } // namespace io diff --git a/cpp/include/doxygen_groups.h b/cpp/include/doxygen_groups.h index 03f6e8fe13f..fd446b032cc 100644 --- a/cpp/include/doxygen_groups.h +++ b/cpp/include/doxygen_groups.h @@ -114,6 +114,7 @@ * @} * @defgroup io_apis IO * @{ + * @defgroup io_datasources Datasources * @defgroup io_readers Readers * @defgroup io_writers Writers * @} diff --git a/cpp/libcudf_kafka/include/cudf_kafka/kafka_consumer.hpp b/cpp/libcudf_kafka/include/cudf_kafka/kafka_consumer.hpp new file mode 100644 index 00000000000..b9b4b59347e --- /dev/null +++ b/cpp/libcudf_kafka/include/cudf_kafka/kafka_consumer.hpp @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace cudf { +namespace io { +namespace external { +namespace kafka { + +/** + * @brief libcudf datasource for Apache Kafka + * + * @ingroup io_datasources + **/ +class kafka_consumer : public cudf::io::datasource { + public: + /** + * @brief Instantiate a Kafka consumer object. Documentation for librdkafka configurations can be + * found at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md + * + * @param configs key/value pairs of librdkafka configurations that will be + * passed to the librdkafka client + * @param topic_name name of the Kafka topic to consume from + * @param partition partition index to consume from between `0` and `TOPIC_NUM_PARTITIONS - 1` + * inclusive + * @param start_offset seek position for the specified TOPPAR (Topic/Partition combo) + * @param end_offset position in the specified TOPPAR to read to + * @param batch_timeout maximum (millisecond) read time allowed. If end_offset is not reached + * before batch_timeout, a smaller subset will be returned + * @param delimiter optional delimiter to insert into the output between kafka messages, Ex: "\n" + **/ + kafka_consumer(std::map configs, + std::string topic_name, + int partition, + int64_t start_offset, + int64_t end_offset, + int batch_timeout, + std::string delimiter); + + /** + * @brief Returns a buffer with a subset of data from Kafka Topic + * + * @param[in] offset Bytes from the start + * @param[in] size Bytes to read + * + * @return The data buffer + */ + std::unique_ptr host_read(size_t offset, size_t size) override; + + /** + * @brief Returns the size of the data in Kafka buffer + * + * @return size_t The size of the source data in bytes + */ + size_t size() const override; + + /** + * @brief Reads a selected range into a preallocated buffer. + * + * @param[in] offset Bytes from the start + * @param[in] size Bytes to read + * @param[in] dst Address of the existing host memory + * + * @return The number of bytes read (can be smaller than size) + */ + size_t host_read(size_t offset, size_t size, uint8_t *dst) override; + + virtual ~kafka_consumer(){}; + + private: + std::unique_ptr kafka_conf; // RDKafka configuration object + std::unique_ptr consumer; + + std::string topic_name; + int partition; + int64_t start_offset; + int64_t end_offset; + int batch_timeout; + std::string delimiter; + + std::string buffer; + + private: + RdKafka::ErrorCode update_consumer_topic_partition_assignment(std::string const &topic, + int partition, + int64_t offset); + + /** + * Convenience method for getting "now()" in Kafka's standard format + **/ + int64_t now(); + + void consume_to_buffer(); +}; + +} // namespace kafka +} // namespace external +} // namespace io +} // namespace cudf diff --git a/cpp/libcudf_kafka/src/kafka_consumer.cpp b/cpp/libcudf_kafka/src/kafka_consumer.cpp new file mode 100644 index 00000000000..39985c73a6e --- /dev/null +++ b/cpp/libcudf_kafka/src/kafka_consumer.cpp @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "cudf_kafka/kafka_consumer.hpp" +#include +#include +#include + +namespace cudf { +namespace io { +namespace external { +namespace kafka { + +kafka_consumer::kafka_consumer(std::map configs, + std::string topic_name, + int partition, + int64_t start_offset, + int64_t end_offset, + int batch_timeout, + std::string delimiter) + : topic_name(topic_name), + partition(partition), + start_offset(start_offset), + end_offset(end_offset), + batch_timeout(batch_timeout), + delimiter(delimiter) +{ + kafka_conf = std::unique_ptr(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)); + + for (auto const &key_value : configs) { + std::string error_string; + CUDF_EXPECTS(RdKafka::Conf::ConfResult::CONF_OK == + kafka_conf->set(key_value.first, key_value.second, error_string), + "Invalid Kafka configuration"); + } + + // Kafka 0.9 > requires group.id in the configuration + std::string conf_val; + CUDF_EXPECTS(RdKafka::Conf::ConfResult::CONF_OK == kafka_conf->get("group_id", conf_val), + "Kafka group.id must be configured"); + + std::string errstr; + consumer = std::unique_ptr( + RdKafka::KafkaConsumer::create(kafka_conf.get(), errstr)); + + // Pre fill the local buffer with messages so the datasource->size() invocation + // will return a valid size. + consume_to_buffer(); +} + +std::unique_ptr kafka_consumer::host_read(size_t offset, size_t size) +{ + if (offset > buffer.size()) { return 0; } + size = std::min(size, buffer.size() - offset); + return std::make_unique((uint8_t *)buffer.data() + offset, size); +} + +size_t kafka_consumer::host_read(size_t offset, size_t size, uint8_t *dst) +{ + if (offset > buffer.size()) { return 0; } + auto const read_size = std::min(size, buffer.size() - offset); + memcpy(dst, buffer.data() + offset, size); + return read_size; +} + +size_t kafka_consumer::size() const { return buffer.size(); } + +/** + * Change the TOPPAR assignment for this consumer instance + **/ +RdKafka::ErrorCode kafka_consumer::update_consumer_topic_partition_assignment( + std::string const &topic, int partition, int64_t offset) +{ + std::vector topic_partitions; + topic_partitions.push_back(RdKafka::TopicPartition::create(topic, partition, offset)); + return consumer.get()->assign(topic_partitions); +} + +void kafka_consumer::consume_to_buffer() +{ + update_consumer_topic_partition_assignment(topic_name, partition, start_offset); + + int64_t messages_read = 0; + auto end = std::chrono::steady_clock::now() + std::chrono::milliseconds(batch_timeout); + + while (messages_read < end_offset - start_offset && end > std::chrono::steady_clock::now()) { + std::unique_ptr msg{ + consumer->consume((end - std::chrono::steady_clock::now()).count())}; + + if (msg->err() == RdKafka::ErrorCode::ERR_NO_ERROR) { + buffer.append(static_cast(msg->payload())); + buffer.append(delimiter); + messages_read++; + } + } +} + +} // namespace kafka +} // namespace external +} // namespace io +} // namespace cudf diff --git a/cpp/libcudf_kafka/tests/CMakeLists.txt b/cpp/libcudf_kafka/tests/CMakeLists.txt new file mode 100644 index 00000000000..b19babfe96e --- /dev/null +++ b/cpp/libcudf_kafka/tests/CMakeLists.txt @@ -0,0 +1,83 @@ +cmake_minimum_required(VERSION 3.12 FATAL_ERROR) + +project(KAFKA_TESTS LANGUAGES C CXX CUDA) + +if(NOT CMAKE_CUDA_COMPILER) + message(SEND_ERROR "CMake cannot locate a CUDA compiler") +endif() + +################################################################################################### +# - conda environment ----------------------------------------------------------------------------- + +if("$ENV{CONDA_BUILD}" STREQUAL "1") + set(CMAKE_SYSTEM_PREFIX_PATH "$ENV{BUILD_PREFIX};$ENV{PREFIX};${CMAKE_SYSTEM_PREFIX_PATH}") + set(CONDA_INCLUDE_DIRS "$ENV{BUILD_PREFIX}/include" "$ENV{PREFIX}/include") + set(CONDA_LINK_DIRS "$ENV{BUILD_PREFIX}/lib" "$ENV{PREFIX}/lib") + message(STATUS "Conda build detected, CMAKE_SYSTEM_PREFIX_PATH set to: ${CMAKE_SYSTEM_PREFIX_PATH}") +elseif(DEFINED ENV{CONDA_PREFIX}) + set(CMAKE_SYSTEM_PREFIX_PATH "$ENV{CONDA_PREFIX};${CMAKE_SYSTEM_PREFIX_PATH}") + set(CONDA_INCLUDE_DIRS "$ENV{CONDA_PREFIX}/include") + set(CONDA_LINK_DIRS "$ENV{CONDA_PREFIX}/lib") + message(STATUS "Conda environment detected, CMAKE_SYSTEM_PREFIX_PATH set to: ${CMAKE_SYSTEM_PREFIX_PATH}") +endif("$ENV{CONDA_BUILD}" STREQUAL "1") + +################################################################################################### +# - compiler function ----------------------------------------------------------------------------- + +set(CUDF_KAFKA_TEST_LIST CACHE INTERNAL "CUDF_KAFKA_TEST_LIST") + +function(ConfigureTest CMAKE_TEST_NAME CMAKE_TEST_SRC) + add_executable(${CMAKE_TEST_NAME} + ${CMAKE_TEST_SRC}) + set_target_properties(${CMAKE_TEST_NAME} PROPERTIES POSITION_INDEPENDENT_CODE ON) + target_link_libraries(${CMAKE_TEST_NAME} gmock gtest gtest_main cudf pthread libcudf_kafka) + set_target_properties(${CMAKE_TEST_NAME} PROPERTIES + RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/gtests") + add_test(NAME ${CMAKE_TEST_NAME} COMMAND ${CMAKE_TEST_NAME}) + set(CUDF_KAFKA_TEST_LIST ${CUDF_KAFKA_TEST_LIST} ${CMAKE_TEST_NAME} CACHE INTERNAL "CUDF_KAFKA_TEST_LIST") +endfunction(ConfigureTest) + +################################################################################################### +# - include paths --------------------------------------------------------------------------------- + +if(CMAKE_CUDA_TOOLKIT_INCLUDE_DIRECTORIES) + include_directories("${CMAKE_CUDA_TOOLKIT_INCLUDE_DIRECTORIES}") +endif() + +include_directories("${CMAKE_BINARY_DIR}/include" + "${CMAKE_SOURCE_DIR}/include" + "${CMAKE_SOURCE_DIR}../../../tests" + "${CMAKE_SOURCE_DIR}" + "${CMAKE_SOURCE_DIR}/src" + "${GTEST_INCLUDE_DIR}" + "${RDKAFKA_INCLUDE}") + +if(CONDA_INCLUDE_DIRS) + include_directories("${CONDA_INCLUDE_DIRS}") +endif(CONDA_INCLUDE_DIRS) + +################################################################################################### +# - library paths --------------------------------------------------------------------------------- + +link_directories("${CMAKE_CUDA_IMPLICIT_LINK_DIRECTORIES}" # CMAKE_CUDA_IMPLICIT_LINK_DIRECTORIES is an undocumented/unsupported variable containing the link directories for nvcc + "${CMAKE_BINARY_DIR}/lib" + "${CMAKE_BINARY_DIR}" + "${FLATBUFFERS_LIBRARY_DIR}" + "${GTEST_LIBRARY_DIR}" + "${RMM_LIBRARY}" + "${RDKAFKA++_LIBRARY}") + +if(CONDA_LINK_DIRS) + link_directories("${CONDA_LINK_DIRS}") +endif(CONDA_LINK_DIRS) + +################################################################################################### +### test sources ################################################################################## +################################################################################################### + +################################################################################################### +# - create tests ---------------------------------------------------------------------------------- + +ConfigureTest(CUDF_KAFKA_HOST_READ "${CMAKE_CURRENT_SOURCE_DIR}/kafka_consumer_tests.cpp") + +enable_testing() diff --git a/cpp/libcudf_kafka/tests/kafka_consumer_tests.cpp b/cpp/libcudf_kafka/tests/kafka_consumer_tests.cpp new file mode 100644 index 00000000000..901d4f8b34b --- /dev/null +++ b/cpp/libcudf_kafka/tests/kafka_consumer_tests.cpp @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include "cudf_kafka/kafka_consumer.hpp" + +#include +#include + +namespace kafka = cudf::io::external::kafka; + +struct KafkaDatasourceTest : public ::testing::Test { +}; + +TEST_F(KafkaDatasourceTest, MissingGroupID) +{ + // group.id is a required configuration. + std::map kafka_configs; + kafka_configs.insert({"bootstrap.servers", "localhost:9092"}); + + EXPECT_THROW(kafka::kafka_consumer kc(kafka_configs, "csv-topic", 0, 0, 3, 5000, "\n"), + cudf::logic_error); +} + +TEST_F(KafkaDatasourceTest, InvalidConfigValues) +{ + // Give a made up configuration value + std::map kafka_configs; + kafka_configs.insert({"completely_made_up_config", "wrong"}); + + EXPECT_THROW(kafka::kafka_consumer kc(kafka_configs, "csv-topic", 0, 0, 3, 5000, "\n"), + cudf::logic_error); + + kafka_configs.clear(); + + // Give a good config property with a bad value + kafka_configs.insert({"message.max.bytes", "this should be a number not text"}); + EXPECT_THROW(kafka::kafka_consumer kc(kafka_configs, "csv-topic", 0, 0, 3, 5000, "\n"), + cudf::logic_error); +} diff --git a/cpp/tests/io/csv_test.cpp b/cpp/tests/io/csv_test.cpp index 1bd577f0f18..e5c4ab809d4 100644 --- a/cpp/tests/io/csv_test.cpp +++ b/cpp/tests/io/csv_test.cpp @@ -1117,17 +1117,6 @@ TEST_F(CsvReaderTest, EmptyFileWithWriter) } class TestSource : public cudf::io::datasource { - class TestBuffer : public buffer { - uint8_t* const _data; - size_t const _size; - - public: - TestBuffer(uint8_t* data, size_t size) : _data(data), _size(size) {} - - virtual size_t size() const override { return _size; } - virtual const uint8_t* data() const override { return _data; } - }; - public: std::string const str; @@ -1135,7 +1124,7 @@ class TestSource : public cudf::io::datasource { std::unique_ptr host_read(size_t offset, size_t size) override { size = std::min(size, str.size() - offset); - return std::make_unique((uint8_t*)str.data() + offset, size); + return std::make_unique((uint8_t*)str.data() + offset, size); } size_t host_read(size_t offset, size_t size, uint8_t* dst) override