Skip to content

Commit

Permalink
Add missing methods to support incremental rebalance
Browse files Browse the repository at this point in the history
  • Loading branch information
ikeberlein authored and Ivan Keberlein committed Jun 6, 2023
1 parent 0aee7cf commit 4139963
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 0 deletions.
61 changes: 61 additions & 0 deletions kafka_consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,67 @@ PHP_METHOD(RdKafka_KafkaConsumer, assign)
}
/* }}} */

#if RD_KAFKA_VERSION >= 0x010600ff
static void consumer_incremental_op(int assign, INTERNAL_FUNCTION_PARAMETERS) /* {{{ */
{
HashTable *htopars = NULL;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "h", &htopars) == FAILURE || !htopars) {
return;
}

object_intern *intern = get_object(getThis());
if (!intern) {
return;
}

rd_kafka_topic_partition_list_t *topics = array_arg_to_kafka_topic_partition_list(1, htopars);
if (!topics) {
return;
}

rd_kafka_error_t *err;

if (assign) {
err = rd_kafka_incremental_assign(intern->rk, topics);
} else {
err = rd_kafka_incremental_unassign(intern->rk, topics);
}

rd_kafka_topic_partition_list_destroy(topics);

if (err) {
zend_throw_exception(ce_kafka_exception, rd_kafka_error_string(err), 0);
rd_kafka_error_destroy(err);
}
}
/* }}} */
#endif

/* {{{ proto void RdKafka\KafkaConsumer::incrementalAssign(array $topics)
Incremental assignment of partitions to consume */
PHP_METHOD(RdKafka_KafkaConsumer, incrementalAssign)
{
#if RD_KAFKA_VERSION >= 0x010600ff
consumer_incremental_op(1, INTERNAL_FUNCTION_PARAM_PASSTHRU);
#else
zend_throw_exception(ce_kafka_exception, "Method incrementalAssign is not implemented", 0);
#endif
}
/* }}} */

/* {{{ proto void RdKafka\KafkaConsumer::incrementalUnassign(array $topics)
Incremental unassign of partitions to consume */
PHP_METHOD(RdKafka_KafkaConsumer, incrementalUnassign)
{
#if RD_KAFKA_VERSION >= 0x010600ff
consumer_incremental_op(0, INTERNAL_FUNCTION_PARAM_PASSTHRU);
#else
zend_throw_exception(ce_kafka_exception, "Method incrementalUnassign is not implemented", 0);
#endif
}
/* }}} */

/* {{{ proto array RdKafka\KafkaConsumer::getAssignment()
Returns the current partition getAssignment */
PHP_METHOD(RdKafka_KafkaConsumer, getAssignment)
Expand Down
6 changes: 6 additions & 0 deletions kafka_consumer.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ public function __construct(Conf $conf) {}
/** @tentative-return-type */
public function assign(?array $topic_partitions = null): void {}

/** @tentative-return-type */
public function incrementalAssign(array $topic_partitions): void {}

/** @tentative-return-type */
public function incrementalUnassign(array $topic_partitions): void {}

/** @tentative-return-type */
public function getAssignment(): array {}

Expand Down
12 changes: 12 additions & 0 deletions kafka_consumer_arginfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaCon
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, topic_partitions, IS_ARRAY, 1, "null")
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 0, IS_VOID, 0)
ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, 0, 0, IS_VOID, 0)
ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getAssignment, 0, 0, IS_ARRAY, 0)
ZEND_END_ARG_INFO()

Expand Down Expand Up @@ -70,6 +78,8 @@ ZEND_END_ARG_INFO()

ZEND_METHOD(RdKafka_KafkaConsumer, __construct);
ZEND_METHOD(RdKafka_KafkaConsumer, assign);
ZEND_METHOD(RdKafka_KafkaConsumer, incrementalAssign);
ZEND_METHOD(RdKafka_KafkaConsumer, incrementalUnassign);
ZEND_METHOD(RdKafka_KafkaConsumer, getAssignment);
ZEND_METHOD(RdKafka_KafkaConsumer, commit);
ZEND_METHOD(RdKafka_KafkaConsumer, close);
Expand All @@ -91,6 +101,8 @@ ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions);
static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, assign, arginfo_class_RdKafka_KafkaConsumer_assign, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, incrementalAssign, arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, incrementalUnassign, arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, getAssignment, arginfo_class_RdKafka_KafkaConsumer_getAssignment, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, commit, arginfo_class_RdKafka_KafkaConsumer_commit, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, close, arginfo_class_RdKafka_KafkaConsumer_close, ZEND_ACC_PUBLIC)
Expand Down
12 changes: 12 additions & 0 deletions kafka_consumer_legacy_arginfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_assign, 0, 0, 0)
ZEND_ARG_INFO(0, topic_partitions)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 0, 0)
ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, 0, 0, 0)
ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getAssignment, 0, 0, 0)
ZEND_END_ARG_INFO()

Expand Down Expand Up @@ -69,6 +77,8 @@ ZEND_END_ARG_INFO()

ZEND_METHOD(RdKafka_KafkaConsumer, __construct);
ZEND_METHOD(RdKafka_KafkaConsumer, assign);
ZEND_METHOD(RdKafka_KafkaConsumer, incrementalAssign);
ZEND_METHOD(RdKafka_KafkaConsumer, incrementalUnassign);
ZEND_METHOD(RdKafka_KafkaConsumer, getAssignment);
ZEND_METHOD(RdKafka_KafkaConsumer, commit);
ZEND_METHOD(RdKafka_KafkaConsumer, close);
Expand All @@ -90,6 +100,8 @@ ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions);
static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, assign, arginfo_class_RdKafka_KafkaConsumer_assign, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, incrementalAssign, arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, incrementalUnassign, arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, getAssignment, arginfo_class_RdKafka_KafkaConsumer_getAssignment, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, commit, arginfo_class_RdKafka_KafkaConsumer_commit, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, close, arginfo_class_RdKafka_KafkaConsumer_close, ZEND_ACC_PUBLIC)
Expand Down

0 comments on commit 4139963

Please sign in to comment.