Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Externalized Kafka Datasource #3504

Merged
merged 191 commits into from
Jun 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
191 commits
Select commit Hold shift + click to select a range
955417f
3405: Base Kafka DataSource for CSV. Still need some integration test…
jdye64 Nov 18, 2019
4f25e24
Rebased on branch-0.12 and changed back build.sh goof
jdye64 Dec 4, 2019
791feee
Merge remote-tracking branch 'upstream/branch-0.12' into 3405
jdye64 Dec 10, 2019
b0c2436
Changelog and cython format fixes
jdye64 Dec 13, 2019
edd7871
Conda recipe updates and formatting
jdye64 Dec 16, 2019
a29a310
Merge branch 'branch-0.12' into 3405
jdye64 Dec 16, 2019
8328649
Disable Kafka Tests by default, add flag to build.sh to run Kafka tests
jdye64 Dec 17, 2019
0e4a546
Merge branch '3405' of https://github.com/jdye64/cudf into 3405
jdye64 Dec 17, 2019
28e68f7
Modify CMake to only include librdkafka headers for cudf
jdye64 Dec 17, 2019
167e9d5
Modify CMake to only include librdkafka headers for cudf tests
jdye64 Dec 17, 2019
484b480
testing theory with cmake build
jdye64 Dec 17, 2019
926772c
updates for start - end offset
jdye64 Dec 18, 2019
57a3a8b
Updates to cmake project
jdye64 Dec 18, 2019
44b866e
Merge remote-tracking branch 'upstream/branch-0.12' into 3405
jdye64 Dec 18, 2019
6e2a702
updates
jdye64 Dec 18, 2019
41acc9b
Added Kafka start_offset and read batch_size
jdye64 Dec 19, 2019
f96912b
flake8-cython checkstyle fix
jdye64 Dec 19, 2019
10fdc84
more flake8-cython checkstyle fixes
jdye64 Dec 19, 2019
4d51db3
more flake8-cython checkstyle fixes
jdye64 Dec 19, 2019
08379ce
more flake8-cython checkstyle fixes
jdye64 Dec 19, 2019
63f2bae
Target librdkafka exclusively for cudf
jdye64 Dec 19, 2019
754a835
Add librdkafka to libnvstrings recipe
jdye64 Dec 19, 2019
a3de60f
Updates to CMake for librdkafka recommended by Keith
jdye64 Dec 19, 2019
cd409bc
Removed librdkafka dependency to make CI fail and have keith investigate
jdye64 Dec 20, 2019
5d587e9
Updates to build.sh and cmake as suggested
jdye64 Dec 20, 2019
19aab7a
Smaller suggestion fixes
jdye64 Jan 1, 2020
fb86217
reverted formatting changes accidentally introduced
jdye64 Jan 3, 2020
44a7878
introduced unique_ptr to RdKafka::Conf instances and RdKafka::KafkaCo…
jdye64 Jan 7, 2020
f7f60ed
Removed std::move definition as no longer needed with passing by cons…
jdye64 Jan 7, 2020
6a8ff21
merged with upstream to get libcxx updates
jdye64 Jan 10, 2020
9034bdd
SQUASH-ME: Testing external
jdye64 Jan 13, 2020
a3b9430
SQUASH-ME: Testing external
jdye64 Jan 13, 2020
83e2bd9
Merge remote-tracking branch 'upstream/branch-0.13' into 3405
jdye64 Jan 23, 2020
54bf602
updates
jdye64 Jan 25, 2020
53e161d
updates
jdye64 Jan 27, 2020
7242eb8
Merge remote-tracking branch 'upstream/branch-0.13' into 3405
jdye64 Jan 27, 2020
8f7e260
updates
jdye64 Jan 27, 2020
4044730
Remove librdkafka references from conda recipes for libcudf
jdye64 Jan 27, 2020
70719c6
Removed all remaining references to librdkafka in libcudf
jdye64 Jan 27, 2020
dbbfad3
include external_datasource header
jdye64 Jan 28, 2020
fbad989
introduce libcudf_datasource_identifier
jdye64 Jan 28, 2020
ff85810
further cleanup to remove previous changes that will now be externalized
jdye64 Jan 28, 2020
dfb9c59
Crude ability to load external datasource identifier implemented
jdye64 Jan 28, 2020
775b285
EOD updates
jdye64 Jan 29, 2020
1be8d69
merged with upstream 0.13 branch
jdye64 Feb 5, 2020
1afd72f
successful read from Kafka external datasource checkpoint. still lots…
jdye64 Feb 6, 2020
4dbeb54
EOD updates
jdye64 Feb 7, 2020
9531921
EOD updates
jdye64 Feb 12, 2020
81c5987
merge upstream branch 0.13 into 3405
jdye64 Feb 12, 2020
712fc86
Created conda package build and modified datasource_factory to use bo…
jdye64 Feb 12, 2020
317bd99
update precedence for identifying external library dir
jdye64 Feb 13, 2020
eff1b68
Merge remote-tracking branch 'upstream/branch-0.13' into 3405
jdye64 Feb 13, 2020
1de0d71
local changes
jdye64 Feb 18, 2020
73f7aa2
Merge branch 'port_json' into kratos
jdye64 Feb 18, 2020
258331c
Merge remote-tracking branch 'upstream/branch-0.13' into kratos
jdye64 Feb 18, 2020
a5f1676
Merge branch 'port_json' into kratos
jdye64 Feb 18, 2020
d5a2e85
merge 3405_external work
jdye64 Feb 18, 2020
dca2a2e
updates for building
jdye64 Feb 18, 2020
beda37c
initial wiring for getting kafka watermark offsets
jdye64 Feb 19, 2020
45dbeda
Get offset for individual topicpartition
jdye64 Feb 19, 2020
3b12e30
updates
jdye64 Feb 20, 2020
3fa09d9
merge with upstream/branch-0.13
jdye64 Feb 20, 2020
9c1755d
first pass at gtest suite for external kafka
jdye64 Feb 21, 2020
bdc7a37
fixed test build
jdye64 Feb 21, 2020
3e4693b
fixed linker issues causing runs to fail
jdye64 Feb 21, 2020
4aec425
removed rebalancecb which for some reason was causing segfaults?
jdye64 Feb 21, 2020
63b7126
updates
jdye64 Feb 21, 2020
2b674bc
Added methods for dump and debugging current kafka configurations
jdye64 Feb 24, 2020
206c5b9
updated to create a cython global kafkadatasource to prevent having t…
jdye64 Feb 24, 2020
e3e5631
Added hooks for print consumer metadata and dumping the consumer conf…
jdye64 Feb 24, 2020
404f6c3
added function to get the latest committed offset
jdye64 Feb 24, 2020
efec281
updates
jdye64 Feb 26, 2020
c875a40
conda updates
jdye64 Feb 26, 2020
2ea31b5
resolved get_watermark_offset and conda packaging issues
jdye64 Feb 27, 2020
7aed403
Modified python code to return low, high tuple of watermark offsets
jdye64 Feb 27, 2020
36f11b3
Added produce functionality
jdye64 Feb 27, 2020
a3b790a
committed now accepts a list of partitions, metadata function fixed, …
jdye64 Feb 27, 2020
8d067e1
merged upstream/branch-0.13
jdye64 Mar 3, 2020
6a97447
updates
jdye64 Mar 3, 2020
772f8f7
Merge remote-tracking branch 'upstream/branch-0.13' into kratos
jdye64 Mar 4, 2020
06eefbf
multiple fixes around locating partitions, supplying topics, and crea…
jdye64 Mar 4, 2020
02e8919
updated
jdye64 Mar 9, 2020
e351052
Merge remote-tracking branch 'upstream/branch-0.13' into kratos
jdye64 Mar 9, 2020
1911236
Merge remote-tracking branch 'upstream/branch-0.13' into kratos
jdye64 Mar 9, 2020
fb8f7e5
update offset of toppar before reading
jdye64 Mar 9, 2020
ee2f091
Updated to use Confluent Kafka TopicPartition
jdye64 Mar 10, 2020
a292d9c
merge upstream branch-0.13
jdye64 Mar 12, 2020
63f99a9
Merge remote-tracking branch 'origin/kratos' into 3405
jdye64 Mar 12, 2020
d504544
refactored for Kafka Consumer instance to be used instead of KafkaHan…
jdye64 Mar 12, 2020
d4ce309
merge upstream branch/0.13
jdye64 Mar 16, 2020
9349026
updated for committed API
jdye64 Mar 17, 2020
b2e264d
updates
jdye64 Mar 18, 2020
cce1c48
updates
jdye64 Mar 18, 2020
5d07e81
updates
jdye64 Mar 18, 2020
df7340e
updates
jdye64 Mar 18, 2020
83f780f
Merge remote-tracking branch 'upstream/branch-0.13' into kratos
jdye64 Mar 19, 2020
8133df4
Merge remote-tracking branch 'upstream/branch-0.13' into kratos
jdye64 Mar 19, 2020
29c1f13
updates
jdye64 Mar 23, 2020
40209be
updates
jdye64 Mar 23, 2020
75bbf8b
merge upstraem/branch-0.14
jdye64 Mar 24, 2020
f0c2ab2
offsets update
jdye64 Mar 27, 2020
4461857
updates
jdye64 Mar 31, 2020
a60d45a
removed printf statement
jdye64 Mar 31, 2020
f1dd61f
updates
jdye64 Mar 31, 2020
b983366
Creating Cython class for librdkafka
jdye64 Apr 1, 2020
880130d
merge with upstream
jdye64 Apr 22, 2020
7307cbe
added conda recipe for CUDA 10.2
jdye64 Apr 22, 2020
a172eef
Change ValueError to RuntimeError
jdye64 Apr 24, 2020
c83c657
split out consumer and producer
jdye64 May 11, 2020
063055a
removed clandg
jdye64 May 11, 2020
7578a99
Cleaned up build.sh and removed unused components
jdye64 May 11, 2020
616d24f
note in custreamz_dev conda environments to note removing jdye64 once…
jdye64 May 11, 2020
e26bee7
added 'python' to the list of host requirements for custreamz conda r…
jdye64 May 11, 2020
1f54271
remove csv reader implementation for now
jdye64 May 11, 2020
6e33601
remove csv reader implementation for now
jdye64 May 11, 2020
e907357
Updated README to add build, install, and usage
jdye64 May 11, 2020
92b5341
Documented headers and kafka_consumer
jdye64 May 11, 2020
6ca2e61
finished breaking out kafka consumer and producer from the same datas…
jdye64 May 11, 2020
10cb349
Updated cython layer to reflect the refactoring done on the C++ backe…
jdye64 May 11, 2020
c9fe509
Documentation and numerous syntax fixes
jdye64 May 11, 2020
4992359
Merged with upstream/branch-0.14
jdye64 May 11, 2020
5d52c62
clang formatting
jdye64 May 12, 2020
41b2973
python/cython formatting
jdye64 May 12, 2020
1fb1b52
Only build external datasource tests if the parent cudf build support…
jdye64 May 12, 2020
b75f759
Merge remote-tracking branch 'upstream/branch-0.14' into 3405
jdye64 May 14, 2020
05d6d16
plumbing for invoking external datasource read operation via dlopen
jdye64 May 15, 2020
4d19fed
Remove external directory, moved kafka code to cpp/datasources/kafka,…
jdye64 May 15, 2020
db52c99
removed custreamz references as those will be moved to another commit
jdye64 May 15, 2020
a5546b0
Removed all custreamz and kafka external datasource conda code
jdye64 May 15, 2020
5884c14
removed external_datasource and datasource_factory from cudf codebase
jdye64 May 15, 2020
d782ab2
removed all custreamz python/cython code
jdye64 May 15, 2020
ebe6b3f
changes
jdye64 May 15, 2020
36895a7
Removed external datasource details from python and cython code
jdye64 May 15, 2020
f2e9f9f
more changes
jdye64 May 15, 2020
8e4c63e
Merge branch 'branch-0.14' of https://github.com/rapidsai/cudf into f…
vuule May 19, 2020
fdcb1ec
Merge branch 'branch-0.14' of https://github.com/rapidsai/cudf into f…
vuule May 19, 2020
1a9940d
Merge branch 'branch-0.14' of https://github.com/rapidsai/cudf into f…
vuule May 19, 2020
a5f5c3c
Merge branch 'branch-0.14' of https://github.com/rapidsai/cudf into f…
vuule May 19, 2020
554e504
add support for user datasource classes to source_info; merge most re…
vuule May 20, 2020
40f7891
Merge branch 'branch-0.14' of https://github.com/rapidsai/cudf into f…
vuule May 20, 2020
508e126
Add CSV test for user datasource type
vuule May 20, 2020
8e4a69e
Merge branch 'branch-0.14' of https://github.com/rapidsai/cudf into f…
vuule May 21, 2020
074fe7e
add missing docs for datasource
vuule May 21, 2020
bf3263a
misc clean up
vuule May 21, 2020
8c62adc
Update CHANGELOG.md
vuule May 21, 2020
5d919aa
meta
vuule May 21, 2020
1c0d29e
Merge branch 'fea-cuio-abstract-datasource' of https://github.com/vuu…
vuule May 21, 2020
b593d50
Merge branch 'branch-0.14' of https://github.com/rapidsai/cudf into f…
vuule May 22, 2020
4cd5422
Merge branch 'branch-0.14' of https://github.com/rapidsai/cudf into f…
vuule May 26, 2020
3b44d15
Fix copy-paste error
vuule May 26, 2020
fd3c6b1
Fix copy-paste error
vuule May 26, 2020
00ca3d6
Merge branch 'fea-cuio-abstract-datasource' of https://github.com/vuu…
vuule May 26, 2020
ed04a75
remove experimental namespace
vuule May 26, 2020
2b54670
Merge branch 'fea-cuio-abstract-datasource' of https://github.com/vuu…
vuule May 26, 2020
356bb26
Merge branch 'branch-0.15' of https://github.com/rapidsai/cudf into f…
vuule May 27, 2020
9e8daf5
rename datasource::empty()
vuule May 27, 2020
90ca594
Merge branch 'fea-cuio-abstract-datasource' of https://github.com/vuu…
vuule May 27, 2020
a2594a2
Merge branch 'branch-0.14' of https://github.com/rapidsai/cudf into f…
vuule May 27, 2020
847ce93
rename datasource empty -> is_empty
vuule May 27, 2020
d4f9e02
merge with upstream for cuIO datasource
jdye64 May 27, 2020
c024fb8
merge with upstream to get cuIO datasource feature
jdye64 May 27, 2020
27b0a38
test
jdye64 May 27, 2020
1bab498
enabled kafka datasource tests
jdye64 Jun 1, 2020
eaeca57
end to end working test, still needs polish
jdye64 Jun 1, 2020
7c5dd23
documentation and host_read to dst buffer logic
jdye64 Jun 1, 2020
f91d183
merge with upstream/branch-0.15
jdye64 Jun 1, 2020
caacedb
Refactored Kafka consumer to work with new cuIO datasource
jdye64 Jun 1, 2020
f8057a7
Merge remote-tracking branch 'upstream/branch-0.15' into 3405_no_dlopen
jdye64 Jun 4, 2020
fde9fa4
Merge branch '3405_no_dlopen' into 3405
jdye64 Jun 4, 2020
6877231
made several syntax and naming changes suggested during the review
jdye64 Jun 5, 2020
8515abd
addressed build name and other issues from review
jdye64 Jun 5, 2020
65342c7
updates based on review comments
jdye64 Jun 6, 2020
b0063bc
removed base_fixutre.hpp class which is not included and therefore ca…
jdye64 Jun 6, 2020
4b112a5
link to gtest_main, move datasource buffer to datasource header
jdye64 Jun 6, 2020
3f8627e
changes for linking and returning empty buffer when offset out of bounds
jdye64 Jun 10, 2020
e8bc3a5
upstream merge branch-0.15
jdye64 Jun 10, 2020
2ae77c0
refactored directory structure and added empty buffer
jdye64 Jun 10, 2020
cb910cd
include and lib outputs are now stored in 'libcudf_kafka' destination…
jdye64 Jun 11, 2020
299cb12
added libcudf includes to the cmake 'include_directores'
jdye64 Jun 11, 2020
ecfb2c6
refactor directory structure for kafka include directory
jdye64 Jun 11, 2020
3e18881
updated include header
jdye64 Jun 12, 2020
eea237a
enabled libcudf_kafka build in ci environment
jdye64 Jun 12, 2020
02d32ac
Fix header location for test
Jun 12, 2020
b70cebe
link rdkafka and libcudf_kafka to tests
jdye64 Jun 15, 2020
965b6c5
doxygen, harrism suggestions, and use chrono for time
jdye64 Jun 16, 2020
89296c3
formatting and cudf_expects fixes
jdye64 Jun 23, 2020
df1bdbd
upstream merge with branch-0.15
jdye64 Jun 24, 2020
a149749
Merge remote-tracking branch 'upstream/branch-0.15' into 3405
jdye64 Jun 24, 2020
5f080e2
do not read if offset requested is larger than buffer size for host_read
jdye64 Jun 24, 2020
ca28039
review changes, remove 'echo' from build.sh, add kafka_consumer to ne…
jdye64 Jun 29, 2020
eb98755
'datasource' namespace already in use so back to 'external'
jdye64 Jun 29, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- PR #5327 Add `cudf::cross_join` feature
- PR #5204 Concatenate strings columns using row separator as strings column
- PR #5342 Add support for `StringMethods.__getitem__`
- PR #3504 Add External Kafka Datasource
- PR #5356 Use `size_type` instead of `scalar` in `cudf::repeat`.
- PR #5397 Add internal implementation of nested loop equijoins.
- PR #5303 Add slice_strings functionality using delimiter string
Expand Down Expand Up @@ -2241,4 +2242,4 @@

# cuDF 0.2.0 and cuDF 0.1.0

These were initial releases of cuDF based on previously separate pyGDF and libGDF libraries.
These were initial releases of cuDF based on previously separate pyGDF and libGDF libraries.
jdye64 marked this conversation as resolved.
Show resolved Hide resolved
60 changes: 40 additions & 20 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
# This script is used to build the component(s) in this repo from
# source, and can be called with various options to customize the
# build as needed (see the help output for details)

# Abort script on first error
set -e

Expand All @@ -18,22 +17,24 @@ ARGS=$*
# script, and that this script resides in the repo dir!
REPODIR=$(cd $(dirname $0); pwd)

VALIDARGS="clean libcudf cudf dask_cudf benchmarks tests -v -g -n --allgpuarch --disable_nvtx --show_depr_warn -h"
HELP="$0 [clean] [libcudf] [cudf] [dask_cudf] [benchmarks] [tests] [-v] [-g] [-n] [-h]
clean - remove all existing build artifacts and configuration (start
over)
libcudf - build the cudf C++ code only
cudf - build the cudf Python package
dask_cudf - build the dask_cudf Python package
benchmarks - build benchmarks
tests - build tests
-v - verbose build mode
-g - build for debug
-n - no install step
--allgpuarch - build for all supported GPU architectures
--disable_nvtx - disable inserting NVTX profiling ranges
--show_depr_warn - show cmake deprecation warnings
-h - print this text
VALIDARGS="clean libcudf cudf dask_cudf benchmarks tests libcudf_kafka -v -g -n -l --allgpuarch --disable_nvtx --show_depr_warn -h"
HELP="$0 [clean] [libcudf] [cudf] [dask_cudf] [benchmarks] [tests] [libcudf_kafka] [-v] [-g] [-n] [-h] [-l]
clean - remove all existing build artifacts and configuration (start
over)
libcudf - build the cudf C++ code only
cudf - build the cudf Python package
dask_cudf - build the dask_cudf Python package
benchmarks - build benchmarks
tests - build tests
libcudf_kafka - build the libcudf_kafka C++ code only
-v - verbose build mode
-g - build for debug
-n - no install step
-l - build legacy tests
--allgpuarch - build for all supported GPU architectures
--disable_nvtx - disable inserting NVTX profiling ranges
--show_depr_warn - show cmake deprecation warnings
-h - print this text
default action (no args) is to build and install 'libcudf' then 'cudf'
then 'dask_cudf' targets
Expand All @@ -52,6 +53,7 @@ BUILD_ALL_GPU_ARCH=0
BUILD_NVTX=ON
BUILD_TESTS=OFF
BUILD_DISABLE_DEPRECATION_WARNING=ON
BUILD_LIBCUDF_KAFKA=OFF

# Set defaults for vars that may not have been defined externally
# FIXME: if INSTALL_PREFIX is not set, check PREFIX, then check
Expand Down Expand Up @@ -108,6 +110,9 @@ fi
if hasArg --show_depr_warn; then
BUILD_DISABLE_DEPRECATION_WARNING=OFF
fi
if hasArg libcudf_kafka; then
BUILD_LIBCUDF_KAFKA=ON
fi

# If clean given, run it prior to any other steps
if hasArg clean; then
Expand All @@ -134,8 +139,7 @@ fi
################################################################################
# Configure, build, and install libcudf

if buildAll || hasArg libcudf; then

if buildAll || hasArg libcudf || hasArg libcudf_kafka; then
mkdir -p ${LIB_BUILD_DIR}
cd ${LIB_BUILD_DIR}
cmake -DCMAKE_INSTALL_PREFIX=${INSTALL_PREFIX} \
Expand All @@ -144,7 +148,8 @@ if buildAll || hasArg libcudf; then
-DUSE_NVTX=${BUILD_NVTX} \
-DBUILD_BENCHMARKS=${BUILD_BENCHMARKS} \
-DDISABLE_DEPRECATION_WARNING=${BUILD_DISABLE_DEPRECATION_WARNING} \
-DCMAKE_BUILD_TYPE=${BUILD_TYPE} $REPODIR/cpp
-DCMAKE_BUILD_TYPE=${BUILD_TYPE} \
-DBUILD_CUDF_KAFKA=${BUILD_LIBCUDF_KAFKA} $REPODIR/cpp
fi

if buildAll || hasArg libcudf; then
Expand Down Expand Up @@ -187,3 +192,18 @@ if buildAll || hasArg dask_cudf; then
python setup.py build_ext --inplace
fi
fi

# Do not build libcudf_kafka with 'buildAll'
if hasArg libcudf_kafka; then

cd ${LIB_BUILD_DIR}
if [[ ${INSTALL_TARGET} != "" ]]; then
make -j${PARALLEL_LEVEL} install_libcudf_kafka VERBOSE=${VERBOSE}
else
make -j${PARALLEL_LEVEL} libcudf_kafka VERBOSE=${VERBOSE}
fi

if [[ ${BUILD_TESTS} == "ON" ]]; then
make -j${PARALLEL_LEVEL} build_tests_libcudf_kafka VERBOSE=${VERBOSE}
fi
fi
11 changes: 11 additions & 0 deletions ci/gpu/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ else
$WORKSPACE/build.sh clean libcudf cudf dask_cudf benchmarks tests -l
fi

################################################################################
# BUILD - Build libcudf_kafka from source
################################################################################

logger "Build libcudf_kafka..."
if [[ ${BUILD_MODE} == "pull-request" ]]; then
$WORKSPACE/build.sh clean libcudf_kafka tests
else
$WORKSPACE/build.sh clean libcudf_kafka tests -l
fi

################################################################################
# TEST - Run GoogleTest and py.tests for libcudf, and
# cuDF
Expand Down
70 changes: 70 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ set(CMAKE_EXE_LINKER_FLAGS "-Wl,--disable-new-dtags")
option(BUILD_SHARED_LIBS "Build shared libraries" ON)
option(BUILD_TESTS "Configure CMake to build tests" ON)
option(BUILD_BENCHMARKS "Configure CMake to build (google) benchmarks" OFF)
option(BUILD_CUDF_KAFKA "Configure CMake to build cudf_kafka" OFF)

###################################################################################################
# - cudart options --------------------------------------------------------------------------------
Expand Down Expand Up @@ -773,3 +774,72 @@ add_custom_command(OUTPUT CUDF_DOXYGEN
VERBATIM)

add_custom_target(docs_cudf DEPENDS CUDF_DOXYGEN)


####################################################################################################
# - cudf_kafka - OFF by default due to increased number of dependencies
if(BUILD_CUDF_KAFKA)

# cudf_kafka library
add_library(libcudf_kafka
libcudf_kafka/src/kafka_consumer.cpp
)

# Include paths
include_directories("${CMAKE_SOURCE_DIR}/libcudf_kafka/include"
"${CMAKE_CURRENT_SOURCE_DIR}/include/cudf")

# Rename installation to proper name for later finding
set_target_properties(libcudf_kafka PROPERTIES OUTPUT_NAME "cudf_kafka")
set_target_properties(libcudf_kafka PROPERTIES BUILD_RPATH "\$ORIGIN")

###################################################################################################
# cudf_kafka - librdkafka -------------------------------------------------------------------------

find_path(RDKAFKA_INCLUDE "librdkafka" HINTS "$ENV{RDKAFKA_ROOT}/include")
find_library(RDKAFKA++_LIBRARY "rdkafka++" HINTS "$ENV{RDKAFKA_ROOT}/lib" "$ENV{RDKAFKA_ROOT}/build")

message(STATUS "RDKAFKA: RDKAFKA++_LIBRARY set to ${RDKAFKA++_LIBRARY}")
message(STATUS "RDKAFKA: RDKAFKA_INCLUDE set to ${RDKAFKA_INCLUDE}")

target_link_libraries(libcudf_kafka ${RDKAFKA++_LIBRARY})
include_directories("${RDKAFKA_INCLUDE}")

###################################################################################################
# - cudf_kafka Install ----------------------------------------------------------------------------
target_link_libraries(libcudf_kafka cudf)

install(TARGETS libcudf_kafka
DESTINATION libcudf_kafka/lib
COMPONENT libcudf_kafka)

install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/libcudf_kafka/include
DESTINATION include
COMPONENT libcudf_kafka)
jdye64 marked this conversation as resolved.
Show resolved Hide resolved

add_custom_target(install_libcudf_kafka
COMMAND "${CMAKE_COMMAND}" -DCOMPONENT=libcudf_kafka -P "${CMAKE_BINARY_DIR}/cmake_install.cmake"
DEPENDS libcudf_kafka)

####################################################################################################
# - cudf_kafka Tests
if(BUILD_TESTS)
if(GTEST_FOUND)
message(STATUS "Google C++ Testing Framework (Google Test) found in ${GTEST_ROOT}")
include_directories(${GTEST_INCLUDE_DIR})
add_subdirectory(${CMAKE_SOURCE_DIR}/libcudf_kafka/tests)
else()
message(AUTHOR_WARNING "Google C++ Testing Framework (Google Test) not found: automated tests are disabled.")
endif(GTEST_FOUND)
endif(BUILD_TESTS)

message(STATUS "CUDF_KAFKA_TEST_LIST set to: ${CUDF_KAFKA_TEST_LIST}")

add_custom_target(build_tests_libcudf_kafka
DEPENDS ${CUDF_KAFKA_TEST_LIST})

add_custom_target(test_libcudf_kafka
COMMAND ctest
DEPENDS build_tests_libcudf_kafka)

endif(BUILD_CUDF_KAFKA)
3 changes: 2 additions & 1 deletion cpp/doxygen/Doxyfile
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,8 @@ WARN_LOGFILE =
INPUT = main_page.md \
regex.md \
unicode.md \
../include
../include \
../libcudf_kafka/include
jdye64 marked this conversation as resolved.
Show resolved Hide resolved

# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses
Expand Down
21 changes: 21 additions & 0 deletions cpp/include/cudf/io/datasource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,27 @@ class datasource {
* @return bool True if there is data, False otherwise
*/
virtual bool is_empty() const { return size() == 0; }

/**
* @brief Implementation for non owning buffer where datasource holds buffer until destruction.
*
* @param[in] data Address of the buffer source data
* @param[in] size Bytes of the buffer size
**/
class non_owning_buffer : public buffer {
public:
non_owning_buffer() : _data(0), _size(0) {}

non_owning_buffer(uint8_t* data, size_t size) : _data(data), _size(size) {}

size_t size() const override { return _size; }

const uint8_t* data() const override { return _data; }

private:
uint8_t* const _data;
size_t const _size;
};
};

} // namespace io
Expand Down
1 change: 1 addition & 0 deletions cpp/include/doxygen_groups.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
* @}
* @defgroup io_apis IO
* @{
* @defgroup io_datasources Datasources
* @defgroup io_readers Readers
* @defgroup io_writers Writers
* @}
Expand Down
120 changes: 120 additions & 0 deletions cpp/libcudf_kafka/include/cudf_kafka/kafka_consumer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <librdkafka/rdkafkacpp.h>
#include <algorithm>
#include <chrono>
#include <cudf/io/datasource.hpp>
#include <map>
#include <memory>
#include <string>

namespace cudf {
namespace io {
namespace external {
namespace kafka {

/**
* @brief libcudf datasource for Apache Kafka
*
* @ingroup io_datasources
**/
class kafka_consumer : public cudf::io::datasource {
public:
/**
* @brief Instantiate a Kafka consumer object. Documentation for librdkafka configurations can be
* found at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
*
jdye64 marked this conversation as resolved.
Show resolved Hide resolved
* @param configs key/value pairs of librdkafka configurations that will be
* passed to the librdkafka client
* @param topic_name name of the Kafka topic to consume from
* @param partition partition index to consume from between `0` and `TOPIC_NUM_PARTITIONS - 1`
* inclusive
* @param start_offset seek position for the specified TOPPAR (Topic/Partition combo)
jdye64 marked this conversation as resolved.
Show resolved Hide resolved
* @param end_offset position in the specified TOPPAR to read to
* @param batch_timeout maximum (millisecond) read time allowed. If end_offset is not reached
* before batch_timeout, a smaller subset will be returned
* @param delimiter optional delimiter to insert into the output between kafka messages, Ex: "\n"
**/
kafka_consumer(std::map<std::string, std::string> configs,
std::string topic_name,
int partition,
int64_t start_offset,
int64_t end_offset,
int batch_timeout,
std::string delimiter);

/**
* @brief Returns a buffer with a subset of data from Kafka Topic
*
* @param[in] offset Bytes from the start
* @param[in] size Bytes to read
*
* @return The data buffer
*/
std::unique_ptr<cudf::io::datasource::buffer> host_read(size_t offset, size_t size) override;
jdye64 marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Returns the size of the data in Kafka buffer
*
* @return size_t The size of the source data in bytes
*/
size_t size() const override;
jdye64 marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Reads a selected range into a preallocated buffer.
*
* @param[in] offset Bytes from the start
* @param[in] size Bytes to read
* @param[in] dst Address of the existing host memory
*
* @return The number of bytes read (can be smaller than size)
*/
size_t host_read(size_t offset, size_t size, uint8_t *dst) override;
jdye64 marked this conversation as resolved.
Show resolved Hide resolved

virtual ~kafka_consumer(){};

private:
std::unique_ptr<RdKafka::Conf> kafka_conf; // RDKafka configuration object
std::unique_ptr<RdKafka::KafkaConsumer> consumer;

std::string topic_name;
int partition;
int64_t start_offset;
int64_t end_offset;
int batch_timeout;
std::string delimiter;

std::string buffer;

private:
RdKafka::ErrorCode update_consumer_topic_partition_assignment(std::string const &topic,
jdye64 marked this conversation as resolved.
Show resolved Hide resolved
int partition,
int64_t offset);

/**
* Convenience method for getting "now()" in Kafka's standard format
**/
int64_t now();

void consume_to_buffer();
};

} // namespace kafka
} // namespace external
} // namespace io
} // namespace cudf
Loading