Skip to content

Commit

Permalink
Add missing methods to support incremental rebalance (#541)
Browse files Browse the repository at this point in the history
  • Loading branch information
ikeberlein authored Aug 30, 2024
1 parent 9cafbba commit 4b085d2
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 1 deletion.
8 changes: 7 additions & 1 deletion config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ if test "$PHP_RDKAFKA" != "no"; then
fi
done
fi

if test -z "$RDKAFKA_DIR"; then
AC_MSG_RESULT([not found])
AC_MSG_ERROR([Please reinstall the rdkafka distribution])
Expand Down Expand Up @@ -90,6 +90,12 @@ if test "$PHP_RDKAFKA" != "no"; then
AC_MSG_WARN([oauthbearer token refresh cb is not available])
])

AC_CHECK_LIB($LIBNAME,[rd_kafka_incremental_assign, rd_kafka_incremental_unassign],[
AC_DEFINE(HAS_RD_KAFKA_INCREMENTAL_ASSIGN,1,[ ])
],[
AC_MSG_WARN([no rd_kafka_incremental_(un)assign, incremental rebalance support will not be available])
])

LDFLAGS="$ORIG_LDFLAGS"
CPPFLAGS="$ORIG_CPPFLAGS"

Expand Down
53 changes: 53 additions & 0 deletions kafka_consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,59 @@ PHP_METHOD(RdKafka_KafkaConsumer, assign)
}
/* }}} */

#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
static void consumer_incremental_op(int assign, INTERNAL_FUNCTION_PARAMETERS) /* {{{ */
{
HashTable *htopars = NULL;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "h", &htopars) == FAILURE || !htopars) {
return;
}

object_intern *intern = get_object(getThis());
if (!intern) {
return;
}

rd_kafka_topic_partition_list_t *topics = array_arg_to_kafka_topic_partition_list(1, htopars);
if (!topics) {
return;
}

rd_kafka_error_t *err;

if (assign) {
err = rd_kafka_incremental_assign(intern->rk, topics);
} else {
err = rd_kafka_incremental_unassign(intern->rk, topics);
}

rd_kafka_topic_partition_list_destroy(topics);

if (err) {
zend_throw_exception(ce_kafka_exception, rd_kafka_error_string(err), 0);
rd_kafka_error_destroy(err);
}
}
/* }}} */

/* {{{ proto void RdKafka\KafkaConsumer::incrementalAssign(array $topics)
Incremental assignment of partitions to consume */
PHP_METHOD(RdKafka_KafkaConsumer, incrementalAssign)
{
consumer_incremental_op(1, INTERNAL_FUNCTION_PARAM_PASSTHRU);
}
/* }}} */

/* {{{ proto void RdKafka\KafkaConsumer::incrementalUnassign(array $topics)
Incremental unassign of partitions to consume */
PHP_METHOD(RdKafka_KafkaConsumer, incrementalUnassign)
{
consumer_incremental_op(0, INTERNAL_FUNCTION_PARAM_PASSTHRU);
}
/* }}} */
#endif // !HAS_RD_KAFKA_INCREMENTAL_ASSIGN

/* {{{ proto array RdKafka\KafkaConsumer::getAssignment()
Returns the current partition getAssignment */
PHP_METHOD(RdKafka_KafkaConsumer, getAssignment)
Expand Down
8 changes: 8 additions & 0 deletions kafka_consumer.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ public function __construct(Conf $conf) {}
/** @tentative-return-type */
public function assign(?array $topic_partitions = null): void {}

#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
/** @tentative-return-type */
public function incrementalAssign(array $topic_partitions): void {}

/** @tentative-return-type */
public function incrementalUnassign(array $topic_partitions): void {}
#endif

/** @tentative-return-type */
public function getAssignment(): array {}

Expand Down
18 changes: 18 additions & 0 deletions kafka_consumer_arginfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaCon
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, topic_partitions, IS_ARRAY, 1, "null")
ZEND_END_ARG_INFO()

#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 0, IS_VOID, 0)
ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, 0, 0, IS_VOID, 0)
ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0)
ZEND_END_ARG_INFO()
#endif

ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getAssignment, 0, 0, IS_ARRAY, 0)
ZEND_END_ARG_INFO()

Expand Down Expand Up @@ -70,6 +80,10 @@ ZEND_END_ARG_INFO()

ZEND_METHOD(RdKafka_KafkaConsumer, __construct);
ZEND_METHOD(RdKafka_KafkaConsumer, assign);
#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
ZEND_METHOD(RdKafka_KafkaConsumer, incrementalAssign);
ZEND_METHOD(RdKafka_KafkaConsumer, incrementalUnassign);
#endif
ZEND_METHOD(RdKafka_KafkaConsumer, getAssignment);
ZEND_METHOD(RdKafka_KafkaConsumer, commit);
ZEND_METHOD(RdKafka_KafkaConsumer, close);
Expand All @@ -91,6 +105,10 @@ ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions);
static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, assign, arginfo_class_RdKafka_KafkaConsumer_assign, ZEND_ACC_PUBLIC)
#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
ZEND_ME(RdKafka_KafkaConsumer, incrementalAssign, arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, incrementalUnassign, arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, ZEND_ACC_PUBLIC)
#endif
ZEND_ME(RdKafka_KafkaConsumer, getAssignment, arginfo_class_RdKafka_KafkaConsumer_getAssignment, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, commit, arginfo_class_RdKafka_KafkaConsumer_commit, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, close, arginfo_class_RdKafka_KafkaConsumer_close, ZEND_ACC_PUBLIC)
Expand Down
18 changes: 18 additions & 0 deletions kafka_consumer_legacy_arginfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_assign, 0, 0, 0)
ZEND_ARG_INFO(0, topic_partitions)
ZEND_END_ARG_INFO()

#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 0, 0)
ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, 0, 0, 0)
ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0)
ZEND_END_ARG_INFO()
#endif

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getAssignment, 0, 0, 0)
ZEND_END_ARG_INFO()

Expand Down Expand Up @@ -69,6 +79,10 @@ ZEND_END_ARG_INFO()

ZEND_METHOD(RdKafka_KafkaConsumer, __construct);
ZEND_METHOD(RdKafka_KafkaConsumer, assign);
#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
ZEND_METHOD(RdKafka_KafkaConsumer, incrementalAssign);
ZEND_METHOD(RdKafka_KafkaConsumer, incrementalUnassign);
#endif
ZEND_METHOD(RdKafka_KafkaConsumer, getAssignment);
ZEND_METHOD(RdKafka_KafkaConsumer, commit);
ZEND_METHOD(RdKafka_KafkaConsumer, close);
Expand All @@ -90,6 +104,10 @@ ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions);
static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, assign, arginfo_class_RdKafka_KafkaConsumer_assign, ZEND_ACC_PUBLIC)
#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
ZEND_ME(RdKafka_KafkaConsumer, incrementalAssign, arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, incrementalUnassign, arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, ZEND_ACC_PUBLIC)
#endif
ZEND_ME(RdKafka_KafkaConsumer, getAssignment, arginfo_class_RdKafka_KafkaConsumer_getAssignment, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, commit, arginfo_class_RdKafka_KafkaConsumer_commit, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, close, arginfo_class_RdKafka_KafkaConsumer_close, ZEND_ACC_PUBLIC)
Expand Down

0 comments on commit 4b085d2

Please sign in to comment.