Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
qkdreyer authored Jul 15, 2024
1 parent 9cafbba commit c2b4433
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 3 deletions.
24 changes: 23 additions & 1 deletion kafka_consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ PHP_METHOD(RdKafka_KafkaConsumer, close)
}
/* }}} */

/* {{{ proto Metadata RdKafka\KafkaConsumer::getMetadata(bool all_topics, RdKafka\Topic only_topic, int timeout_ms)
/* {{{ proto RdKafka\Metadata RdKafka\KafkaConsumer::getMetadata(bool $all_topics, RdKafka\Topic $only_topic, int $timeout_ms)
Request Metadata from broker */
PHP_METHOD(RdKafka_KafkaConsumer, getMetadata)
{
Expand Down Expand Up @@ -528,6 +528,28 @@ PHP_METHOD(RdKafka_KafkaConsumer, getMetadata)
}
/* }}} */

#ifdef HAS_CONTROLLERID
/* {{{ proto int RdKafka\KafkaConsumer::getControllerId(int $timeout_ms)
Returns the current ControllerId (controller broker id) as reported in broker metadata */
PHP_METHOD(RdKafka_KafkaConsumer, getControllerId)
{
kafka_object *intern;
zend_long timeout;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &timeout) == FAILURE) {
return;
}

intern = get_kafka_object(getThis());
if (!intern) {
return;
}

RETURN_LONG(rd_kafka_controllerid(intern->rk, timeout));
}
/* }}} */
#endif

/* {{{ proto RdKafka\KafkaConsumerTopic RdKafka\KafkaConsumer::newTopic(string $topic)
Returns a RdKafka\KafkaConsumerTopic object */
PHP_METHOD(RdKafka_KafkaConsumer, newTopic)
Expand Down
5 changes: 5 additions & 0 deletions kafka_consumer.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public function unsubscribe(): void {}
/** @tentative-return-type */
public function getMetadata(bool $all_topics, ?Topic $only_topic, int $timeout_ms): Metadata {}

#ifdef HAS_CONTROLLERID
/** @tentative-return-type */
public function getControllerId(int $timeout_ms): int {}
#endif

/** @tentative-return-type */
public function newTopic(string $topic_name, ?TopicConf $topic_conf = null): KafkaConsumerTopic {}

Expand Down
26 changes: 24 additions & 2 deletions rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ PHP_METHOD(RdKafka, addBrokers)
}
/* }}} */

/* {{{ proto RdKafka\Metadata::getMetadata(bool $all_topics, RdKafka\Topic $only_topic, int $timeout_ms)
/* {{{ proto RdKafka\Metadata RdKafka::getMetadata(bool $all_topics, RdKafka\Topic $only_topic, int $timeout_ms)
Request Metadata from broker */
PHP_METHOD(RdKafka, getMetadata)
{
Expand Down Expand Up @@ -388,7 +388,29 @@ PHP_METHOD(RdKafka, getMetadata)
kafka_metadata_init(return_value, metadata);
}
/* }}} */


#ifdef HAS_CONTROLLERID
/* {{{ proto int RdKafka::getControllerId(int $timeout_ms)
Returns the current ControllerId (controller broker id) as reported in broker metadata */
PHP_METHOD(RdKafka, getControllerId)
{
kafka_object *intern;
zend_long timeout;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &timeout) == FAILURE) {
return;
}

intern = get_kafka_object(getThis());
if (!intern) {
return;
}

RETURN_LONG(rd_kafka_controllerid(intern->rk, timeout));
}
/* }}} */
#endif

/* {{{ proto void RdKafka::setLogLevel(int $level)
Specifies the maximum logging level produced by internal kafka logging and debugging */
PHP_METHOD(RdKafka, setLogLevel)
Expand Down
5 changes: 5 additions & 0 deletions rdkafka.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ public function addBrokers(string $broker_list): int {}
/** @tentative-return-type */
public function getMetadata(bool $all_topics, ?RdKafka\Topic $only_topic, int $timeout_ms): RdKafka\Metadata {}

#ifdef HAS_CONTROLLERID
/** @tentative-return-type */
public function getControllerId(int $timeout_ms): int {}
#endif

/** @tentative-return-type */
public function getOutQLen(): int {}

Expand Down
31 changes: 31 additions & 0 deletions tests/controller_id.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
--TEST--
Display controller id
--SKIPIF--
<?php
require __DIR__ . '/integration-tests-check.php';
RD_KAFKA_BUILD_VERSION < 0x000b0500 && die("skip librdkafka < 0.11.5");
--FILE--
<?php
require __DIR__ . '/integration-tests-check.php';

$conf = new RdKafka\Conf();
if (RD_KAFKA_VERSION >= 0x090000 && false !== getenv('TEST_KAFKA_BROKER_VERSION')) {
$conf->set('broker.version.fallback', getenv('TEST_KAFKA_BROKER_VERSION'));
}
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));

$conf->setDrMsgCb(function ($rdkafka, $msg) {
var_dump($rdkafka, $msg);
});

$producer = new RdKafka\Producer($conf);
$consumer = new RdKafka\Consumer($conf);
$kafkaConsumer = new RdKafka\KafkaConsumer($conf);

echo $producer->getControllerId() . \PHP_EOL;
echo $consumer->getControllerId() . \PHP_EOL;
echo $kafkaConsumer->getControllerId() . \PHP_EOL;
--EXPECT--
1
1
1

0 comments on commit c2b4433

Please sign in to comment.