diff --git a/kafka_consumer.c b/kafka_consumer.c index f00a20d5..0adbc44c 100644 --- a/kafka_consumer.c +++ b/kafka_consumer.c @@ -489,7 +489,7 @@ PHP_METHOD(RdKafka_KafkaConsumer, close) } /* }}} */ -/* {{{ proto Metadata RdKafka\KafkaConsumer::getMetadata(bool all_topics, RdKafka\Topic only_topic, int timeout_ms) +/* {{{ proto RdKafka\Metadata RdKafka\KafkaConsumer::getMetadata(bool $all_topics, RdKafka\Topic $only_topic, int $timeout_ms) Request Metadata from broker */ PHP_METHOD(RdKafka_KafkaConsumer, getMetadata) { @@ -528,6 +528,28 @@ PHP_METHOD(RdKafka_KafkaConsumer, getMetadata) } /* }}} */ +#ifdef HAS_CONTROLLERID +/* {{{ proto int RdKafka\KafkaConsumer::getControllerId(int $timeout_ms) + Returns the current ControllerId (controller broker id) as reported in broker metadata */ +PHP_METHOD(RdKafka_KafkaConsumer, getControllerId) +{ + kafka_object *intern; + zend_long timeout; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &timeout) == FAILURE) { + return; + } + + intern = get_kafka_object(getThis()); + if (!intern) { + return; + } + + RETURN_LONG(rd_kafka_controllerid(intern->rk, timeout)); +} +/* }}} */ +#endif + /* {{{ proto RdKafka\KafkaConsumerTopic RdKafka\KafkaConsumer::newTopic(string $topic) Returns a RdKafka\KafkaConsumerTopic object */ PHP_METHOD(RdKafka_KafkaConsumer, newTopic) diff --git a/kafka_consumer.stub.php b/kafka_consumer.stub.php index 023b0a46..f87799f3 100644 --- a/kafka_consumer.stub.php +++ b/kafka_consumer.stub.php @@ -48,6 +48,11 @@ public function unsubscribe(): void {} /** @tentative-return-type */ public function getMetadata(bool $all_topics, ?Topic $only_topic, int $timeout_ms): Metadata {} +#ifdef HAS_CONTROLLERID + /** @tentative-return-type */ + public function getControllerId(int $timeout_ms): int {} +#endif + /** @tentative-return-type */ public function newTopic(string $topic_name, ?TopicConf $topic_conf = null): KafkaConsumerTopic {} diff --git a/rdkafka.c b/rdkafka.c index dca76ca6..25e7b67f 100644 --- a/rdkafka.c +++ b/rdkafka.c @@ -350,7 +350,7 @@ PHP_METHOD(RdKafka, addBrokers) } /* }}} */ -/* {{{ proto RdKafka\Metadata::getMetadata(bool $all_topics, RdKafka\Topic $only_topic, int $timeout_ms) +/* {{{ proto RdKafka\Metadata RdKafka::getMetadata(bool $all_topics, RdKafka\Topic $only_topic, int $timeout_ms) Request Metadata from broker */ PHP_METHOD(RdKafka, getMetadata) { @@ -388,7 +388,29 @@ PHP_METHOD(RdKafka, getMetadata) kafka_metadata_init(return_value, metadata); } /* }}} */ - + +#ifdef HAS_CONTROLLERID +/* {{{ proto int RdKafka::getControllerId(int $timeout_ms) + Returns the current ControllerId (controller broker id) as reported in broker metadata */ +PHP_METHOD(RdKafka, getControllerId) +{ + kafka_object *intern; + zend_long timeout; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &timeout) == FAILURE) { + return; + } + + intern = get_kafka_object(getThis()); + if (!intern) { + return; + } + + RETURN_LONG(rd_kafka_controllerid(intern->rk, timeout)); +} +/* }}} */ +#endif + /* {{{ proto void RdKafka::setLogLevel(int $level) Specifies the maximum logging level produced by internal kafka logging and debugging */ PHP_METHOD(RdKafka, setLogLevel) diff --git a/rdkafka.stub.php b/rdkafka.stub.php index be04e20c..c8facba8 100644 --- a/rdkafka.stub.php +++ b/rdkafka.stub.php @@ -20,6 +20,11 @@ public function addBrokers(string $broker_list): int {} /** @tentative-return-type */ public function getMetadata(bool $all_topics, ?RdKafka\Topic $only_topic, int $timeout_ms): RdKafka\Metadata {} +#ifdef HAS_CONTROLLERID + /** @tentative-return-type */ + public function getControllerId(int $timeout_ms): int {} +#endif + /** @tentative-return-type */ public function getOutQLen(): int {} diff --git a/tests/controller_id.phpt b/tests/controller_id.phpt new file mode 100644 index 00000000..19a14714 --- /dev/null +++ b/tests/controller_id.phpt @@ -0,0 +1,31 @@ +--TEST-- +Display controller id +--SKIPIF-- += 0x090000 && false !== getenv('TEST_KAFKA_BROKER_VERSION')) { + $conf->set('broker.version.fallback', getenv('TEST_KAFKA_BROKER_VERSION')); +} +$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); + +$conf->setDrMsgCb(function ($rdkafka, $msg) { + var_dump($rdkafka, $msg); +}); + +$producer = new RdKafka\Producer($conf); +$consumer = new RdKafka\Consumer($conf); +$kafkaConsumer = new RdKafka\KafkaConsumer($conf); + +echo $producer->getControllerId() . \PHP_EOL; +echo $consumer->getControllerId() . \PHP_EOL; +echo $kafkaConsumer->getControllerId() . \PHP_EOL; +--EXPECT-- +1 +1 +1