Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

custreamz oauth callback for kafka (librdkafka) #9486

Merged
merged 84 commits into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
5595fda
pass pyobject to c++ library rather than map<std::string, std::string>
jdye64 Oct 13, 2021
0cfcfd0
Checkpoint: C++, Cython, and Python compiling working. Callback objec…
jdye64 Oct 14, 2021
1b6b19c
moved configuration building and validating logic to its own method
jdye64 Oct 14, 2021
9c08aa0
Merge branch 'rapidsai:branch-21.12' into custreamz_oauth
jdye64 Oct 21, 2021
25a8b33
Introduce callbacks class where all the possible kafka callbacks can …
jdye64 Oct 25, 2021
4af5b05
Refactored class names
jdye64 Oct 25, 2021
97f8830
Updated tests to match new function parameters
jdye64 Oct 26, 2021
38e52c1
Remove error of leaving previous decleration
jdye64 Oct 26, 2021
2401b8b
upgrade librdkafka version for CI
jdye64 Oct 26, 2021
64cdf1c
Merge remote-tracking branch 'upstream/branch-21.12' into custreamz_o…
jdye64 Oct 26, 2021
69a8639
link python development headers against test
jdye64 Oct 27, 2021
4e4ed72
latest version of librdkafka requires that committed offsets be > 0. …
jdye64 Oct 27, 2021
98ed97d
updates
jdye64 Oct 29, 2021
d1925ab
Make Python3 package REQUIRED in cmake
jdye64 Oct 29, 2021
9916e4d
updates per review
jdye64 Nov 2, 2021
2499a78
modified to use std::function
jdye64 Nov 4, 2021
ec2cab7
added back whitespace that was removed by accident
jdye64 Nov 4, 2021
4672922
temporarily update librdkakfa for CI testing
ajschmidt8 Nov 8, 2021
6a6e629
updates per review
jdye64 Nov 8, 2021
3524f56
merge conflict resolution and upstream branch-21.12 merge
jdye64 Nov 9, 2021
981d44d
test fixes
jdye64 Nov 9, 2021
269d962
removed python dependency from tests as libcudf brings it in
jdye64 Nov 9, 2021
186408d
Merge remote-tracking branch 'upstream/branch-21.12' into custreamz_o…
jdye64 Nov 10, 2021
20ecb3f
update custreamz read_gdf() to check type and handle appropriately
jdye64 Nov 10, 2021
ac77f2e
modify cmake
jdye64 Nov 11, 2021
18aa6f0
merge upstream/branch-21.12
jdye64 Nov 11, 2021
3c525a1
updated conda recipe to include python which is needed for the python…
jdye64 Nov 11, 2021
1d73af2
removing all doubt
jdye64 Nov 12, 2021
3c44432
add numpy to conda recipe build
jdye64 Nov 12, 2021
b5c3ffd
updates
jdye64 Nov 12, 2021
16bef4c
hopefully resolve conda errors
jdye64 Nov 12, 2021
1bbb134
Merge remote-tracking branch 'upstream/branch-21.12' into custreamz_o…
jdye64 Nov 13, 2021
fc4a4aa
Merge remote-tracking branch 'upstream/branch-21.12' into custreamz_o…
jdye64 Nov 19, 2021
fafc831
review updates
jdye64 Nov 19, 2021
43e1aa9
add python to conda recipes since that is needed for python-dev now
jdye64 Nov 20, 2021
ecb3ac7
Merge branch 'branch-22.02' into custreamz_oauth
jdye64 Nov 20, 2021
099ca27
add numpy to conda recipe for cudf_kafka
jdye64 Nov 20, 2021
1d96eef
add numpy to conda recipe for cudf_kafka
jdye64 Nov 20, 2021
5ac4736
add numpy to conda recipe for cudf_kafka
jdye64 Nov 21, 2021
7932468
add numpy to conda recipe for cudf_kafka
jdye64 Nov 21, 2021
9a5b2d9
add numpy to conda recipe for cudf_kafka
jdye64 Nov 21, 2021
ab8f6a0
add numpy to conda recipe for cudf_kafka
jdye64 Nov 22, 2021
9bbedd4
Debugging setup.py dependency issues
jdye64 Nov 22, 2021
b4f9232
Change CUDA version for debugging
jdye64 Nov 23, 2021
d1f4fe5
debug conda build for cudf_kafka
jdye64 Nov 23, 2021
037b386
debug cudf_kafka conda
jdye64 Nov 27, 2021
2c8adce
conda debugging
jdye64 Nov 29, 2021
ef5e072
Add numpy include directory
jdye64 Nov 29, 2021
72a3c49
Merge remote-tracking branch 'upstream/branch-22.02' into custreamz_o…
jdye64 Nov 29, 2021
5d57b1a
Add Python has host and build requirement since libcudf_kafka uses Py…
jdye64 Nov 29, 2021
0a3ee3a
Updated conda meta.yml files
jdye64 Nov 30, 2021
97897ad
Testing out a theory about python versions
jdye64 Dec 1, 2021
0347540
add cmake function for setting up conda environment
jdye64 Dec 2, 2021
a137d23
Merge remote-tracking branch 'upstream/branch-22.02' into custreamz_o…
jdye64 Dec 3, 2021
e10b1e9
Disable PROJECT_FLASH test
jdye64 Dec 6, 2021
0047d64
test manually specifying python version
jdye64 Dec 6, 2021
877030f
Merge remote-tracking branch 'upstream/branch-22.02' into custreamz_o…
jdye64 Dec 6, 2021
6af1cd1
Re-enable PROJECT_FLASH support
jdye64 Dec 6, 2021
25a27ef
use environment version of python
jdye64 Dec 6, 2021
a867d1b
fix typo
jdye64 Dec 6, 2021
e5d4f59
change version of python
jdye64 Dec 7, 2021
2f582bb
libcudf_kafka is not being passed a conda python environment variable
jdye64 Dec 7, 2021
33c9cbb
use python instead of PYTHON environment variable
jdye64 Dec 7, 2021
0cd3fe9
make versions strings instead of floats
jdye64 Dec 7, 2021
86f02c6
removed references in python in cpp
jdye64 Dec 8, 2021
c249b5f
introduce wrapper
jdye64 Dec 10, 2021
bcfae4a
Refactor to use functools.partial
jdye64 Dec 10, 2021
daca522
Remove python find_package command from cmake
jdye64 Dec 10, 2021
ac99019
Update test syntax after refactoring
jdye64 Dec 10, 2021
cc28000
Remove Python versions from anaconda builds to satisfy Java build pro…
jdye64 Dec 11, 2021
7e59211
update conda recipes to get the correct version of python-confluent-k…
jdye64 Dec 11, 2021
6c12fd7
Merge remote-tracking branch 'upstream/branch-22.02' into custreamz_o…
jdye64 Dec 11, 2021
ac46705
include librdkafka 1.7.0 in Java test gpu CI script
jdye64 Dec 12, 2021
47ee207
remove manual librdkafka updates since integration repo is merged and…
jdye64 Dec 13, 2021
1e1dc90
add back conda installs for ops test
jdye64 Dec 13, 2021
53f5465
clang formatting
jdye64 Dec 13, 2021
e0c7048
remove manual librdkafka updates now that gpuci integrations have bee…
jdye64 Dec 14, 2021
740a3bd
Merge upstream
jdye64 Dec 17, 2021
eba67e2
Fix missed merge conflict
jdye64 Dec 18, 2021
f365667
Update cpp/libcudf_kafka/include/cudf_kafka/kafka_callback.hpp
jdye64 Dec 22, 2021
5437b84
Update cpp/libcudf_kafka/src/kafka_callback.cpp
jdye64 Dec 22, 2021
831b84b
Address reviewers suggestions
jdye64 Dec 22, 2021
4087acc
Update source file years and also adjust import for cudf_kafka
jdye64 Jan 5, 2022
44eea02
Adjust source file years
jdye64 Jan 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conda/recipes/libcudf_kafka/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ requirements:
- cmake >=3.20.1
host:
- libcudf {{version}}
- librdkafka >=1.6.0,<1.7.0a0
- librdkafka >=1.7.0
run:
- {{ pin_compatible('librdkafka', max_pin='x.x') }} #TODO: librdkafka should be automatically included here by run_exports but is not
jdye64 marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
8 changes: 6 additions & 2 deletions cpp/libcudf_kafka/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ rapids_cpm_init()
include(cmake/thirdparty/get_cudf.cmake)
include(cmake/thirdparty/get_rdkafka.cmake)

# Locate Python Development headers
find_package(Python3 COMPONENTS Interpreter Development)
jdye64 marked this conversation as resolved.
Show resolved Hide resolved

# # GTests if enabled
if (BUILD_TESTS)
# GoogleTest
Expand All @@ -57,7 +60,8 @@ endif()
###################################################################################################
# - library target --------------------------------------------------------------------------------
add_library(cudf_kafka SHARED
src/kafka_consumer.cpp)
src/kafka_consumer.cpp
src/kafka_callback.cpp)

###################################################################################################
# - include paths ---------------------------------------------------------------------------------
Expand All @@ -68,7 +72,7 @@ target_include_directories(cudf_kafka

###################################################################################################
# - library paths ---------------------------------------------------------------------------------
target_link_libraries(cudf_kafka PUBLIC cudf::cudf RDKAFKA::RDKAFKA)
target_link_libraries(cudf_kafka PUBLIC cudf::cudf RDKAFKA::RDKAFKA Python3::Python)

set_target_properties(cudf_kafka
PROPERTIES BUILD_RPATH "\$ORIGIN"
Expand Down
50 changes: 50 additions & 0 deletions cpp/libcudf_kafka/include/cudf_kafka/kafka_callback.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#define PY_SSIZE_T_CLEAN
#include <Python.h>

#include <librdkafka/rdkafkacpp.h>
#include <cudf/io/datasource.hpp>
#include <map>
#include <memory>
#include <string>

namespace cudf {
namespace io {
namespace external {
namespace kafka {

/**
* @brief Callback to retrieve OAuth token from external source. Invoked when
* token refresh is required.
*/
class PythonOAuthRefreshCb : public RdKafka::OAuthBearerTokenRefreshCb {
jdye64 marked this conversation as resolved.
Show resolved Hide resolved
public:
PythonOAuthRefreshCb(PyObject* callback, PyObject* args);

void oauthbearer_token_refresh_cb(RdKafka::Handle* handle, const std::string& oauthbearer_config);
jdye64 marked this conversation as resolved.
Show resolved Hide resolved

private:
PyObject* callback;
PyObject* args;
};

} // namespace kafka
} // namespace external
} // namespace io
} // namespace cudf
16 changes: 14 additions & 2 deletions cpp/libcudf_kafka/include/cudf_kafka/kafka_consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
*/
#pragma once

#define PY_SSIZE_T_CLEAN
#include <Python.h>

#include <librdkafka/rdkafkacpp.h>
#include <algorithm>
#include <chrono>
#include <cudf/io/datasource.hpp>
#include <map>
#include <memory>
#include <string>
#include "kafka_callback.hpp"
jdye64 marked this conversation as resolved.
Show resolved Hide resolved

namespace cudf {
namespace io {
Expand Down Expand Up @@ -49,7 +53,7 @@ class kafka_consumer : public cudf::io::datasource {
* @param configs key/value pairs of librdkafka configurations that will be
* passed to the librdkafka client
*/
kafka_consumer(std::map<std::string, std::string> const& configs);
kafka_consumer(PyObject* configs);

/**
* @brief Instantiate a Kafka consumer object. Documentation for librdkafka configurations can be
Expand All @@ -66,7 +70,7 @@ class kafka_consumer : public cudf::io::datasource {
* before batch_timeout, a smaller subset will be returned
* @param delimiter optional delimiter to insert into the output between kafka messages, Ex: "\n"
*/
kafka_consumer(std::map<std::string, std::string> const& configs,
kafka_consumer(PyObject* configs,
std::string const& topic_name,
int partition,
int64_t start_offset,
Expand Down Expand Up @@ -178,6 +182,12 @@ class kafka_consumer : public cudf::io::datasource {
std::unique_ptr<RdKafka::Conf> kafka_conf; // RDKafka configuration object
std::unique_ptr<RdKafka::KafkaConsumer> consumer;

// Configurations that can be Python callables. Anything else is expected to be a str
const std::vector<std::string> callableConfigs{"oauth_cb"};

// The Python configuration dict that was used to create this instance
PyObject* conf_dict;

std::string topic_name;
int partition;
int64_t start_offset;
Expand All @@ -193,6 +203,8 @@ class kafka_consumer : public cudf::io::datasource {
int partition,
int64_t offset);

void build_validate_configs(PyObject* python_config_dict);

/**
* Convenience method for getting "now()" in Kafka's standard format
*/
Expand Down
62 changes: 62 additions & 0 deletions cpp/libcudf_kafka/src/kafka_callback.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cudf_kafka/kafka_callback.hpp"
jdye64 marked this conversation as resolved.
Show resolved Hide resolved

namespace cudf {
namespace io {
namespace external {
namespace kafka {

PythonOAuthRefreshCb::PythonOAuthRefreshCb(PyObject* callback, PyObject* args)
: callback(callback), args(args){};

void PythonOAuthRefreshCb::oauthbearer_token_refresh_cb(RdKafka::Handle* handle,
const std::string& oauthbearer_config)
{
CUDF_EXPECTS(PyCallable_Check(callback), "A Python callable is required");

// Make sure that we own the GIL
PyGILState_STATE state = PyGILState_Ensure();
PyObject* result = PyObject_CallObject(callback, args);
Py_XINCREF(result);

// Set the token in the Kafka context
if (result) {
CUDF_EXPECTS(PyDict_Check(result),
"cudf_kafka requires a Dictionary response from the Python OAuthRefreshCb with "
"dictionary keys (token, token_lifetime_ms, principal, extensions)");

// Ensure that expected keys are present from the Python callback response.
std::string token = PyUnicode_AsUTF8(PyDict_GetItemString(result, "token"));
int64_t token_lifetime_ms =
PyLong_AsLongLong(PyDict_GetItemString(result, "token_lifetime_ms"));
std::string principal = PyUnicode_AsUTF8(PyDict_GetItemString(result, "principal"));
std::list<std::string> extensions;
std::string errstr;

handle->oauthbearer_set_token(token, token_lifetime_ms, principal, extensions, errstr);
} else {
handle->oauthbearer_set_token_failure("");
}

Py_XDECREF(result);
PyGILState_Release(state);
}

} // namespace kafka
} // namespace external
} // namespace io
} // namespace cudf
73 changes: 44 additions & 29 deletions cpp/libcudf_kafka/src/kafka_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "cudf_kafka/kafka_consumer.hpp"
#include <librdkafka/rdkafkacpp.h>
#include <chrono>
Expand All @@ -24,27 +23,17 @@ namespace io {
namespace external {
namespace kafka {

kafka_consumer::kafka_consumer(std::map<std::string, std::string> const& configs)
: kafka_conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL))
kafka_consumer::kafka_consumer(PyObject* confdict)
: conf_dict(confdict), kafka_conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL))
{
for (auto const& key_value : configs) {
std::string error_string;
CUDF_EXPECTS(RdKafka::Conf::ConfResult::CONF_OK ==
kafka_conf->set(key_value.first, key_value.second, error_string),
"Invalid Kafka configuration");
}

// Kafka 0.9 > requires group.id in the configuration
std::string conf_val;
CUDF_EXPECTS(RdKafka::Conf::ConfResult::CONF_OK == kafka_conf->get("group.id", conf_val),
"Kafka group.id must be configured");
build_validate_configs(confdict);

std::string errstr;
consumer = std::unique_ptr<RdKafka::KafkaConsumer>(
RdKafka::KafkaConsumer::create(kafka_conf.get(), errstr));
}

kafka_consumer::kafka_consumer(std::map<std::string, std::string> const& configs,
kafka_consumer::kafka_consumer(PyObject* confdict,
std::string const& topic_name,
int partition,
int64_t start_offset,
Expand All @@ -56,21 +45,11 @@ kafka_consumer::kafka_consumer(std::map<std::string, std::string> const& configs
start_offset(start_offset),
end_offset(end_offset),
batch_timeout(batch_timeout),
delimiter(delimiter)
delimiter(delimiter),
conf_dict(confdict),
kafka_conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL))
{
kafka_conf = std::unique_ptr<RdKafka::Conf>(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));

for (auto const& key_value : configs) {
std::string error_string;
CUDF_EXPECTS(RdKafka::Conf::ConfResult::CONF_OK ==
kafka_conf->set(key_value.first, key_value.second, error_string),
"Invalid Kafka configuration");
}

// Kafka 0.9 > requires group.id in the configuration
std::string conf_val;
CUDF_EXPECTS(RdKafka::Conf::ConfResult::CONF_OK == kafka_conf->get("group.id", conf_val),
"Kafka group.id must be configured");
build_validate_configs(confdict);

std::string errstr;
consumer = std::unique_ptr<RdKafka::KafkaConsumer>(
Expand All @@ -81,6 +60,42 @@ kafka_consumer::kafka_consumer(std::map<std::string, std::string> const& configs
consume_to_buffer();
}

/**
* @brief Builds and validates Kafka C++ configuration object from Python values
*
* @param kafka_configs
* Python Dict of configuration values and possibly callables for callbacks
*/
void kafka_consumer::build_validate_configs(PyObject* python_config_dict)
{
Py_ssize_t pos = 0;
PyObject *ko, *vo;

while (PyDict_Next(python_config_dict, &pos, &ko, &vo)) {
CUDF_EXPECTS(PyUnicode_Check(ko), "expected kafka configuration property name as type string");
std::string key(PyUnicode_AsUTF8(ko));
std::string valueType(Py_TYPE(vo)->tp_name);

std::string error_string;
if (std::find(callableConfigs.begin(), callableConfigs.end(), key) != callableConfigs.end()) {
// Properly configure the callable. This is a Python callback for oauth processing
PythonOAuthRefreshCb cb(vo, NULL);
kafka_conf->set("oauthbearer_token_refresh_cb", &cb, error_string);
} else {
CUDF_EXPECTS(valueType.compare("str") == 0,
"Only string values are supported for this configuration");
CUDF_EXPECTS(RdKafka::Conf::ConfResult::CONF_OK ==
kafka_conf->set(key, PyUnicode_AsUTF8(vo), error_string),
"Invalid Kafka configuration provided");
}
}

// Kafka 0.9 > requires group.id in the configuration
std::string conf_val;
CUDF_EXPECTS(RdKafka::Conf::ConfResult::CONF_OK == kafka_conf->get("group.id", conf_val),
"Kafka group.id must be configured");
}

std::unique_ptr<cudf::io::datasource::buffer> kafka_consumer::host_read(size_t offset, size_t size)
{
if (offset > buffer.size()) { return 0; }
Expand Down
5 changes: 4 additions & 1 deletion cpp/libcudf_kafka/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@
# limitations under the License.
#=============================================================================

# Locate Python Development headers
find_package(Python3 COMPONENTS Interpreter Development)

###################################################################################################
# - compiler function -----------------------------------------------------------------------------

function(ConfigureTest test_name )
add_executable(${test_name} ${ARGN})
set_target_properties(${test_name}
PROPERTIES RUNTIME_OUTPUT_DIRECTORY "$<BUILD_INTERFACE:${CUDA_KAFKA_BINARY_DIR}/gtests>")
target_link_libraries(${test_name} PRIVATE GTest::gmock_main GTest::gtest_main cudf_kafka)
target_link_libraries(${test_name} PRIVATE GTest::gmock_main GTest::gtest_main cudf_kafka Python3::Python)

add_test(NAME ${test_name} COMMAND ${test_name})
endfunction()
Expand Down
13 changes: 6 additions & 7 deletions cpp/libcudf_kafka/tests/kafka_consumer_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
* limitations under the License.
*/

#define PY_SSIZE_T_CLEAN
#include <Python.h>

#include <gtest/gtest.h>
#include <map>
#include <memory>
Expand All @@ -31,8 +34,7 @@ struct KafkaDatasourceTest : public ::testing::Test {
TEST_F(KafkaDatasourceTest, MissingGroupID)
{
// group.id is a required configuration.
std::map<std::string, std::string> kafka_configs;
kafka_configs.insert({"bootstrap.servers", "localhost:9092"});
PyObject* kafka_configs = Py_BuildValue("{s:s}", "bootstrap.servers", "localhost:9092");

EXPECT_THROW(kafka::kafka_consumer kc(kafka_configs, "csv-topic", 0, 0, 3, 5000, "\n"),
cudf::logic_error);
Expand All @@ -41,16 +43,13 @@ TEST_F(KafkaDatasourceTest, MissingGroupID)
TEST_F(KafkaDatasourceTest, InvalidConfigValues)
{
// Give a made up configuration value
std::map<std::string, std::string> kafka_configs;
kafka_configs.insert({"completely_made_up_config", "wrong"});
PyObject* kafka_configs = Py_BuildValue("{s:s}", "completely_made_up_config", "wrong");

EXPECT_THROW(kafka::kafka_consumer kc(kafka_configs, "csv-topic", 0, 0, 3, 5000, "\n"),
cudf::logic_error);

kafka_configs.clear();

// Give a good config property with a bad value
kafka_configs.insert({"message.max.bytes", "this should be a number not text"});
kafka_configs = Py_BuildValue("{s:s}", "message.max.bytes", "his should be a number not text");
EXPECT_THROW(kafka::kafka_consumer kc(kafka_configs, "csv-topic", 0, 0, 3, 5000, "\n"),
cudf::logic_error);
}
4 changes: 2 additions & 2 deletions python/cudf_kafka/cudf_kafka/_lib/kafka.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ cdef extern from "kafka_consumer.hpp" \

cpdef cppclass kafka_consumer:

kafka_consumer(map[string, string] configs) except +
kafka_consumer(object configs) except +

kafka_consumer(map[string, string] configs,
kafka_consumer(object configs,
string topic_name,
int32_t partition,
int64_t start_offset,
Expand Down
Loading