Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing methods to support incremental rebalance #541

Merged
merged 1 commit into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ if test "$PHP_RDKAFKA" != "no"; then
fi
done
fi

if test -z "$RDKAFKA_DIR"; then
AC_MSG_RESULT([not found])
AC_MSG_ERROR([Please reinstall the rdkafka distribution])
Expand Down Expand Up @@ -90,6 +90,12 @@ if test "$PHP_RDKAFKA" != "no"; then
AC_MSG_WARN([oauthbearer token refresh cb is not available])
])

AC_CHECK_LIB($LIBNAME,[rd_kafka_incremental_assign, rd_kafka_incremental_unassign],[
AC_DEFINE(HAS_RD_KAFKA_INCREMENTAL_ASSIGN,1,[ ])
],[
AC_MSG_WARN([no rd_kafka_incremental_(un)assign, incremental rebalance support will not be available])
])

LDFLAGS="$ORIG_LDFLAGS"
CPPFLAGS="$ORIG_CPPFLAGS"

Expand Down
53 changes: 53 additions & 0 deletions kafka_consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,59 @@ PHP_METHOD(RdKafka_KafkaConsumer, assign)
}
/* }}} */

#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
static void consumer_incremental_op(int assign, INTERNAL_FUNCTION_PARAMETERS) /* {{{ */
{
HashTable *htopars = NULL;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "h", &htopars) == FAILURE || !htopars) {
return;
}

object_intern *intern = get_object(getThis());
if (!intern) {
return;
}

rd_kafka_topic_partition_list_t *topics = array_arg_to_kafka_topic_partition_list(1, htopars);
if (!topics) {
return;
}

rd_kafka_error_t *err;

if (assign) {
err = rd_kafka_incremental_assign(intern->rk, topics);
} else {
err = rd_kafka_incremental_unassign(intern->rk, topics);
}

rd_kafka_topic_partition_list_destroy(topics);

if (err) {
zend_throw_exception(ce_kafka_exception, rd_kafka_error_string(err), 0);
rd_kafka_error_destroy(err);
}
}
/* }}} */

/* {{{ proto void RdKafka\KafkaConsumer::incrementalAssign(array $topics)
Incremental assignment of partitions to consume */
PHP_METHOD(RdKafka_KafkaConsumer, incrementalAssign)
{
consumer_incremental_op(1, INTERNAL_FUNCTION_PARAM_PASSTHRU);
}
/* }}} */

/* {{{ proto void RdKafka\KafkaConsumer::incrementalUnassign(array $topics)
Incremental unassign of partitions to consume */
PHP_METHOD(RdKafka_KafkaConsumer, incrementalUnassign)
{
consumer_incremental_op(0, INTERNAL_FUNCTION_PARAM_PASSTHRU);
}
/* }}} */
#endif // !HAS_RD_KAFKA_INCREMENTAL_ASSIGN

/* {{{ proto array RdKafka\KafkaConsumer::getAssignment()
Returns the current partition getAssignment */
PHP_METHOD(RdKafka_KafkaConsumer, getAssignment)
Expand Down
8 changes: 8 additions & 0 deletions kafka_consumer.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ public function __construct(Conf $conf) {}
/** @tentative-return-type */
public function assign(?array $topic_partitions = null): void {}

#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
/** @tentative-return-type */
public function incrementalAssign(array $topic_partitions): void {}

/** @tentative-return-type */
public function incrementalUnassign(array $topic_partitions): void {}
#endif

/** @tentative-return-type */
public function getAssignment(): array {}

Expand Down
18 changes: 18 additions & 0 deletions kafka_consumer_arginfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaCon
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, topic_partitions, IS_ARRAY, 1, "null")
ZEND_END_ARG_INFO()

#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 0, IS_VOID, 0)
ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, 0, 0, IS_VOID, 0)
ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0)
ZEND_END_ARG_INFO()
#endif

ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getAssignment, 0, 0, IS_ARRAY, 0)
ZEND_END_ARG_INFO()

Expand Down Expand Up @@ -70,6 +80,10 @@ ZEND_END_ARG_INFO()

ZEND_METHOD(RdKafka_KafkaConsumer, __construct);
ZEND_METHOD(RdKafka_KafkaConsumer, assign);
#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
ZEND_METHOD(RdKafka_KafkaConsumer, incrementalAssign);
ZEND_METHOD(RdKafka_KafkaConsumer, incrementalUnassign);
#endif
ZEND_METHOD(RdKafka_KafkaConsumer, getAssignment);
ZEND_METHOD(RdKafka_KafkaConsumer, commit);
ZEND_METHOD(RdKafka_KafkaConsumer, close);
Expand All @@ -91,6 +105,10 @@ ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions);
static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, assign, arginfo_class_RdKafka_KafkaConsumer_assign, ZEND_ACC_PUBLIC)
#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
ZEND_ME(RdKafka_KafkaConsumer, incrementalAssign, arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, incrementalUnassign, arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, ZEND_ACC_PUBLIC)
#endif
ZEND_ME(RdKafka_KafkaConsumer, getAssignment, arginfo_class_RdKafka_KafkaConsumer_getAssignment, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, commit, arginfo_class_RdKafka_KafkaConsumer_commit, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, close, arginfo_class_RdKafka_KafkaConsumer_close, ZEND_ACC_PUBLIC)
Expand Down
18 changes: 18 additions & 0 deletions kafka_consumer_legacy_arginfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_assign, 0, 0, 0)
ZEND_ARG_INFO(0, topic_partitions)
ZEND_END_ARG_INFO()

#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 0, 0)
ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, 0, 0, 0)
ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0)
ZEND_END_ARG_INFO()
#endif

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getAssignment, 0, 0, 0)
ZEND_END_ARG_INFO()

Expand Down Expand Up @@ -69,6 +79,10 @@ ZEND_END_ARG_INFO()

ZEND_METHOD(RdKafka_KafkaConsumer, __construct);
ZEND_METHOD(RdKafka_KafkaConsumer, assign);
#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
ZEND_METHOD(RdKafka_KafkaConsumer, incrementalAssign);
ZEND_METHOD(RdKafka_KafkaConsumer, incrementalUnassign);
#endif
ZEND_METHOD(RdKafka_KafkaConsumer, getAssignment);
ZEND_METHOD(RdKafka_KafkaConsumer, commit);
ZEND_METHOD(RdKafka_KafkaConsumer, close);
Expand All @@ -90,6 +104,10 @@ ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions);
static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, assign, arginfo_class_RdKafka_KafkaConsumer_assign, ZEND_ACC_PUBLIC)
#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
ZEND_ME(RdKafka_KafkaConsumer, incrementalAssign, arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, incrementalUnassign, arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, ZEND_ACC_PUBLIC)
#endif
ZEND_ME(RdKafka_KafkaConsumer, getAssignment, arginfo_class_RdKafka_KafkaConsumer_getAssignment, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, commit, arginfo_class_RdKafka_KafkaConsumer_commit, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, close, arginfo_class_RdKafka_KafkaConsumer_close, ZEND_ACC_PUBLIC)
Expand Down
Loading