From 7f9d31eac7bdbcfa027586f2af22963fe80be528 Mon Sep 17 00:00:00 2001 From: Ivan Keberlein Date: Sat, 3 Jun 2023 18:59:05 +0300 Subject: [PATCH] Add missing methods to support incremental rebalance --- config.m4 | 8 ++++- kafka_consumer.c | 53 +++++++++++++++++++++++++++++++++ kafka_consumer.stub.php | 8 +++++ kafka_consumer_arginfo.h | 18 +++++++++++ kafka_consumer_legacy_arginfo.h | 18 +++++++++++ 5 files changed, 104 insertions(+), 1 deletion(-) diff --git a/config.m4 b/config.m4 index 9a10e91f..b1f75236 100644 --- a/config.m4 +++ b/config.m4 @@ -19,7 +19,7 @@ if test "$PHP_RDKAFKA" != "no"; then fi done fi - + if test -z "$RDKAFKA_DIR"; then AC_MSG_RESULT([not found]) AC_MSG_ERROR([Please reinstall the rdkafka distribution]) @@ -90,6 +90,12 @@ if test "$PHP_RDKAFKA" != "no"; then AC_MSG_WARN([oauthbearer token refresh cb is not available]) ]) + AC_CHECK_LIB($LIBNAME,[rd_kafka_incremental_assign, rd_kafka_incremental_unassign],[ + AC_DEFINE(HAS_RD_KAFKA_INCREMENTAL_ASSIGN,1,[ ]) + ],[ + AC_MSG_WARN([no rd_kafka_incremental_(un)assign, incremental rebalance support will not be available]) + ]) + LDFLAGS="$ORIG_LDFLAGS" CPPFLAGS="$ORIG_CPPFLAGS" diff --git a/kafka_consumer.c b/kafka_consumer.c index f00a20d5..8678343e 100644 --- a/kafka_consumer.c +++ b/kafka_consumer.c @@ -212,6 +212,59 @@ PHP_METHOD(RdKafka_KafkaConsumer, assign) } /* }}} */ +#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN +static void consumer_incremental_op(int assign, INTERNAL_FUNCTION_PARAMETERS) /* {{{ */ +{ + HashTable *htopars = NULL; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "h", &htopars) == FAILURE || !htopars) { + return; + } + + object_intern *intern = get_object(getThis()); + if (!intern) { + return; + } + + rd_kafka_topic_partition_list_t *topics = array_arg_to_kafka_topic_partition_list(1, htopars); + if (!topics) { + return; + } + + rd_kafka_error_t *err; + + if (assign) { + err = rd_kafka_incremental_assign(intern->rk, topics); + } else { + err = rd_kafka_incremental_unassign(intern->rk, topics); + } + + rd_kafka_topic_partition_list_destroy(topics); + + if (err) { + zend_throw_exception(ce_kafka_exception, rd_kafka_error_string(err), 0); + rd_kafka_error_destroy(err); + } +} +/* }}} */ + +/* {{{ proto void RdKafka\KafkaConsumer::incrementalAssign(array $topics) + Incremental assignment of partitions to consume */ +PHP_METHOD(RdKafka_KafkaConsumer, incrementalAssign) +{ + consumer_incremental_op(1, INTERNAL_FUNCTION_PARAM_PASSTHRU); +} +/* }}} */ + +/* {{{ proto void RdKafka\KafkaConsumer::incrementalUnassign(array $topics) + Incremental unassign of partitions to consume */ +PHP_METHOD(RdKafka_KafkaConsumer, incrementalUnassign) +{ + consumer_incremental_op(0, INTERNAL_FUNCTION_PARAM_PASSTHRU); +} +/* }}} */ +#endif // !HAS_RD_KAFKA_INCREMENTAL_ASSIGN + /* {{{ proto array RdKafka\KafkaConsumer::getAssignment() Returns the current partition getAssignment */ PHP_METHOD(RdKafka_KafkaConsumer, getAssignment) diff --git a/kafka_consumer.stub.php b/kafka_consumer.stub.php index 023b0a46..d12ef2d4 100644 --- a/kafka_consumer.stub.php +++ b/kafka_consumer.stub.php @@ -21,6 +21,14 @@ public function __construct(Conf $conf) {} /** @tentative-return-type */ public function assign(?array $topic_partitions = null): void {} +#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN + /** @tentative-return-type */ + public function incrementalAssign(array $topic_partitions): void {} + + /** @tentative-return-type */ + public function incrementalUnassign(array $topic_partitions): void {} +#endif + /** @tentative-return-type */ public function getAssignment(): array {} diff --git a/kafka_consumer_arginfo.h b/kafka_consumer_arginfo.h index 4ba236ba..8ff01db2 100644 --- a/kafka_consumer_arginfo.h +++ b/kafka_consumer_arginfo.h @@ -9,6 +9,16 @@ ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaCon ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, topic_partitions, IS_ARRAY, 1, "null") ZEND_END_ARG_INFO() +#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN +ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 0, IS_VOID, 0) + ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, 0, 0, IS_VOID, 0) + ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0) +ZEND_END_ARG_INFO() +#endif + ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getAssignment, 0, 0, IS_ARRAY, 0) ZEND_END_ARG_INFO() @@ -70,6 +80,10 @@ ZEND_END_ARG_INFO() ZEND_METHOD(RdKafka_KafkaConsumer, __construct); ZEND_METHOD(RdKafka_KafkaConsumer, assign); +#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN +ZEND_METHOD(RdKafka_KafkaConsumer, incrementalAssign); +ZEND_METHOD(RdKafka_KafkaConsumer, incrementalUnassign); +#endif ZEND_METHOD(RdKafka_KafkaConsumer, getAssignment); ZEND_METHOD(RdKafka_KafkaConsumer, commit); ZEND_METHOD(RdKafka_KafkaConsumer, close); @@ -91,6 +105,10 @@ ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions); static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, assign, arginfo_class_RdKafka_KafkaConsumer_assign, ZEND_ACC_PUBLIC) +#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN + ZEND_ME(RdKafka_KafkaConsumer, incrementalAssign, arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, ZEND_ACC_PUBLIC) + ZEND_ME(RdKafka_KafkaConsumer, incrementalUnassign, arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, ZEND_ACC_PUBLIC) +#endif ZEND_ME(RdKafka_KafkaConsumer, getAssignment, arginfo_class_RdKafka_KafkaConsumer_getAssignment, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, commit, arginfo_class_RdKafka_KafkaConsumer_commit, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, close, arginfo_class_RdKafka_KafkaConsumer_close, ZEND_ACC_PUBLIC) diff --git a/kafka_consumer_legacy_arginfo.h b/kafka_consumer_legacy_arginfo.h index 6069e1ba..f2e83eee 100644 --- a/kafka_consumer_legacy_arginfo.h +++ b/kafka_consumer_legacy_arginfo.h @@ -9,6 +9,16 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_assign, 0, 0, 0) ZEND_ARG_INFO(0, topic_partitions) ZEND_END_ARG_INFO() +#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 0, 0) + ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, 0, 0, 0) + ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0) +ZEND_END_ARG_INFO() +#endif + ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getAssignment, 0, 0, 0) ZEND_END_ARG_INFO() @@ -69,6 +79,10 @@ ZEND_END_ARG_INFO() ZEND_METHOD(RdKafka_KafkaConsumer, __construct); ZEND_METHOD(RdKafka_KafkaConsumer, assign); +#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN +ZEND_METHOD(RdKafka_KafkaConsumer, incrementalAssign); +ZEND_METHOD(RdKafka_KafkaConsumer, incrementalUnassign); +#endif ZEND_METHOD(RdKafka_KafkaConsumer, getAssignment); ZEND_METHOD(RdKafka_KafkaConsumer, commit); ZEND_METHOD(RdKafka_KafkaConsumer, close); @@ -90,6 +104,10 @@ ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions); static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, assign, arginfo_class_RdKafka_KafkaConsumer_assign, ZEND_ACC_PUBLIC) +#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN + ZEND_ME(RdKafka_KafkaConsumer, incrementalAssign, arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, ZEND_ACC_PUBLIC) + ZEND_ME(RdKafka_KafkaConsumer, incrementalUnassign, arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, ZEND_ACC_PUBLIC) +#endif ZEND_ME(RdKafka_KafkaConsumer, getAssignment, arginfo_class_RdKafka_KafkaConsumer_getAssignment, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, commit, arginfo_class_RdKafka_KafkaConsumer_commit, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, close, arginfo_class_RdKafka_KafkaConsumer_close, ZEND_ACC_PUBLIC)