diff --git a/kafka_consumer.c b/kafka_consumer.c index f00a20d5..6b994a02 100644 --- a/kafka_consumer.c +++ b/kafka_consumer.c @@ -212,6 +212,67 @@ PHP_METHOD(RdKafka_KafkaConsumer, assign) } /* }}} */ +#if RD_KAFKA_VERSION >= 0x010600ff +static void consumer_incremental_op(int assign, INTERNAL_FUNCTION_PARAMETERS) /* {{{ */ +{ + HashTable *htopars = NULL; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "h", &htopars) == FAILURE || !htopars) { + return; + } + + object_intern *intern = get_object(getThis()); + if (!intern) { + return; + } + + rd_kafka_topic_partition_list_t *topics = array_arg_to_kafka_topic_partition_list(1, htopars); + if (!topics) { + return; + } + + rd_kafka_error_t *err; + + if (assign) { + err = rd_kafka_incremental_assign(intern->rk, topics); + } else { + err = rd_kafka_incremental_unassign(intern->rk, topics); + } + + rd_kafka_topic_partition_list_destroy(topics); + + if (err) { + zend_throw_exception(ce_kafka_exception, rd_kafka_error_string(err), 0); + rd_kafka_error_destroy(err); + } +} +/* }}} */ +#endif + +/* {{{ proto void RdKafka\KafkaConsumer::incrementalAssign(array $topics) + Incremental assignment of partitions to consume */ +PHP_METHOD(RdKafka_KafkaConsumer, incrementalAssign) +{ +#if RD_KAFKA_VERSION >= 0x010600ff + consumer_incremental_op(1, INTERNAL_FUNCTION_PARAM_PASSTHRU); +#else + zend_throw_exception(ce_kafka_exception, "Method incrementalAssign is not implemented", 0); +#endif +} +/* }}} */ + +/* {{{ proto void RdKafka\KafkaConsumer::incrementalUnassign(array $topics) + Incremental unassign of partitions to consume */ +PHP_METHOD(RdKafka_KafkaConsumer, incrementalUnassign) +{ +#if RD_KAFKA_VERSION >= 0x010600ff + consumer_incremental_op(0, INTERNAL_FUNCTION_PARAM_PASSTHRU); +#else + zend_throw_exception(ce_kafka_exception, "Method incrementalUnassign is not implemented", 0); +#endif +} +/* }}} */ + /* {{{ proto array RdKafka\KafkaConsumer::getAssignment() Returns the current partition getAssignment */ PHP_METHOD(RdKafka_KafkaConsumer, getAssignment) diff --git a/kafka_consumer.stub.php b/kafka_consumer.stub.php index 023b0a46..315ee960 100644 --- a/kafka_consumer.stub.php +++ b/kafka_consumer.stub.php @@ -21,6 +21,12 @@ public function __construct(Conf $conf) {} /** @tentative-return-type */ public function assign(?array $topic_partitions = null): void {} + /** @tentative-return-type */ + public function incrementalAssign(array $topic_partitions): void {} + + /** @tentative-return-type */ + public function incrementalUnassign(array $topic_partitions): void {} + /** @tentative-return-type */ public function getAssignment(): array {} diff --git a/kafka_consumer_arginfo.h b/kafka_consumer_arginfo.h index 4ba236ba..2f687ca8 100644 --- a/kafka_consumer_arginfo.h +++ b/kafka_consumer_arginfo.h @@ -9,6 +9,14 @@ ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaCon ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, topic_partitions, IS_ARRAY, 1, "null") ZEND_END_ARG_INFO() +ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 0, IS_VOID, 0) + ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, 0, 0, IS_VOID, 0) + ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0) +ZEND_END_ARG_INFO() + ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getAssignment, 0, 0, IS_ARRAY, 0) ZEND_END_ARG_INFO() @@ -70,6 +78,8 @@ ZEND_END_ARG_INFO() ZEND_METHOD(RdKafka_KafkaConsumer, __construct); ZEND_METHOD(RdKafka_KafkaConsumer, assign); +ZEND_METHOD(RdKafka_KafkaConsumer, incrementalAssign); +ZEND_METHOD(RdKafka_KafkaConsumer, incrementalUnassign); ZEND_METHOD(RdKafka_KafkaConsumer, getAssignment); ZEND_METHOD(RdKafka_KafkaConsumer, commit); ZEND_METHOD(RdKafka_KafkaConsumer, close); @@ -91,6 +101,8 @@ ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions); static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, assign, arginfo_class_RdKafka_KafkaConsumer_assign, ZEND_ACC_PUBLIC) + ZEND_ME(RdKafka_KafkaConsumer, incrementalAssign, arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, ZEND_ACC_PUBLIC) + ZEND_ME(RdKafka_KafkaConsumer, incrementalUnassign, arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, getAssignment, arginfo_class_RdKafka_KafkaConsumer_getAssignment, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, commit, arginfo_class_RdKafka_KafkaConsumer_commit, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, close, arginfo_class_RdKafka_KafkaConsumer_close, ZEND_ACC_PUBLIC) diff --git a/kafka_consumer_legacy_arginfo.h b/kafka_consumer_legacy_arginfo.h index 6069e1ba..60e4d64d 100644 --- a/kafka_consumer_legacy_arginfo.h +++ b/kafka_consumer_legacy_arginfo.h @@ -9,6 +9,14 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_assign, 0, 0, 0) ZEND_ARG_INFO(0, topic_partitions) ZEND_END_ARG_INFO() +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 0, 0) + ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, 0, 0, 0) + ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0) +ZEND_END_ARG_INFO() + ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getAssignment, 0, 0, 0) ZEND_END_ARG_INFO() @@ -69,6 +77,8 @@ ZEND_END_ARG_INFO() ZEND_METHOD(RdKafka_KafkaConsumer, __construct); ZEND_METHOD(RdKafka_KafkaConsumer, assign); +ZEND_METHOD(RdKafka_KafkaConsumer, incrementalAssign); +ZEND_METHOD(RdKafka_KafkaConsumer, incrementalUnassign); ZEND_METHOD(RdKafka_KafkaConsumer, getAssignment); ZEND_METHOD(RdKafka_KafkaConsumer, commit); ZEND_METHOD(RdKafka_KafkaConsumer, close); @@ -90,6 +100,8 @@ ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions); static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, assign, arginfo_class_RdKafka_KafkaConsumer_assign, ZEND_ACC_PUBLIC) + ZEND_ME(RdKafka_KafkaConsumer, incrementalAssign, arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, ZEND_ACC_PUBLIC) + ZEND_ME(RdKafka_KafkaConsumer, incrementalUnassign, arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, getAssignment, arginfo_class_RdKafka_KafkaConsumer_getAssignment, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, commit, arginfo_class_RdKafka_KafkaConsumer_commit, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, close, arginfo_class_RdKafka_KafkaConsumer_close, ZEND_ACC_PUBLIC)