Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consuming from Invalid Topic and End of Valid Topic is Indistinguishable #1540

Closed
4 of 9 tasks
David-V-Spencer opened this issue Nov 21, 2017 · 8 comments
Closed
4 of 9 tasks

Comments

@David-V-Spencer
Copy link

Description

Using C# Kafka-dotnet subscribing to a valid or invalid topic and calling
var messageConsumed = Consumer.Consume(out kafkaMessage, pollInterval)
the consumer just returns a null value making it unclear if the consumer is just reading from the end of a valid topic and there is no more messages or if the consumer is subscribed to an invalid topic.

Observed behavior is consuming from an invalid topic silently returns a null message.

Expected behavior for the case of an invalid topic was that it would be similar to what is displayed by producer: Trying to consume from an invalid topic would result in a Message being returned with invalid offset and error code such as code 3 for Unknown Topic or Partition, or at the very least and exception being thrown for consuming from invalid topic.

Shouldn't there be a clear difference between consuming from the end of a valid topic and a topic that doesn't exist so the implementation knows that it is in an error state?

As a side note it occurred to me to check for the OnError or OnConsumeError events while checking for any similar issues and neither of these events are being triggered in the invalid topic scenario.

Initially reported on Confluent-kafka-dotnet at confluentinc/confluent-kafka-dotnet#373

How to reproduce

Just consuming from a none existent topic as follows:

try
{
    var config  = new Dictionary<string, object>
    {
        {"group.id", "ConsumerId"},
        {"enable.auto.commit", false},
        {"log.connection.close", false},
        {"socket.keepalive.enable", true},
        {"statistics.interval.ms", 60000},
        {"bootstrap.servers", localhost:9092},
    };
    var consumer= new Consumer(config);
    consumer.Subscribe("Some-Invalid-Topic-Name");
    var messageConsumed = false;
    while (!messageConsumed && kafkaMessage == null)
    {
        messageConsumed = consumer.Consume(out kafkaMessage, pollInterval);
    }
    Console.WriteLine("Never Escapes the While Loop.");
}
catch (exception e)
{
    Console.WriteLine("Never Catches an Exception.");
}

Currently using Kafka Server version 2.11-0.10.2.1
Server Configured to Auto Create Topics = false

Confluent Kafka Dotnet version 11.2
LibRDKafka referenced version 11.1

Passed Consumer Configuration:

  • group.id was set
  • enable.auto.commit = false
  • log.connection.close = false
  • socket.keepalive.enable = true
  • statistics.interval.ms = 60000
  • bootstrap.servers pointed to network cluster

Currently developing Windows 7 / 10 / Server 2012 operating systems we are moving towards .net core 2.0 and supporting Linux Centos/Ubuntu distros as well.

Checklist

Please provide the following information:

  • librdkafka version (release number or git tag):
  • Apache Kafka version:
  • librdkafka client configuration:
  • Operating system:
  • Using the legacy Consumer
  • Using the high-level KafkaConsumer
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

edenhill commented Dec 6, 2017

What we could do is generate an __UNKNOWN_TOPIC consumer error if a subscribed topic does not match an existing topic.
I wonder if we should do the same for topic regexps.

@mhowlett
Copy link
Contributor

mhowlett commented Aug 14, 2018

I also think we should get a consumer error if a subscribed topic does not match and existing topic (even one topic in the subscription set that doesn't exist should produce an error). Ideally, this should occur if a topic gets deleted as well. Additionally, I think the consumer should join the group even if there are no matching existing topics (note: there may be some technical detail i'm missing here). Doing that would make operation consistent in the case of all topics in a subscription set getting deleted.

@mhowlett
Copy link
Contributor

I think there's a choice of how the error could be propagated - either via the error callback or via a consumer event (I think the former is best).

An alternative API to the above: subscribe could be made asynchronous, returning an error if a topic in the subscription set was not available (note: regexs would always be available). Question: other than regexs, is there a real use case for allowing subscriptions to a set of topics, some of which don't exist yet? bindings would probably throw a corresponding exception on error and in this API, notification that a topic no longer exists would probably be better delivered as a consumer event and in the bindings perhaps propagated as an exception when consume or poll is called.

this second api would be more fool-proof, and simpler to program against from a users perspective, though @cmccabe points out that it's not ideal from the point of view that create topics is not synchronous, so if you call create topic, then subscribe, subscribe may not succeed. so that's one thing that would complicate it's use.

overall, I'm preferring the API described in me previous comment. Although not ideal to program against from a users perspective, i think it's pretty watertight and also requires the least work.

@edenhill
Copy link
Contributor

edenhill commented Oct 5, 2018

Question: other than regexs, is there a real use case for allowing subscriptions to a set of topics, some of which don't exist yet?

Loose startup ordering; consumers may be started before producers (which might create the topics).

I think we should propagate an error each time the effective subscription list turns out empty (edge triggered).
The most natural place would be through CONSUMER_ERR (e.g.., message = poll() ; message->err == ERR__UNMATCHED_SUBSCRIPTION) since it needs be in consumer sequence order:
tha tis, an UNMATCHED_SUBSCRIPTION error is tied to a previous assign().

@fmoraes74
Copy link

Is there any update to this? Using rd_kafka_consumer_poll, I get an immediate NULL return and I am not aware of mechanism to detect that there was an invalid topic name. Any suggestions?

@fmoraes74
Copy link

@edenhill Any chance of getting this fixed ?

@edenhill
Copy link
Contributor

The proposal is here: https://github.com/edenhill/librdkafka/wiki/Proposal:-Consumer-topic-error-handling

If that seems okay we'll implement it for v1.5.0

@fmoraes74
Copy link

@edenhill That certainly looks reasonable to me.

edenhill added a commit that referenced this issue Apr 21, 2020
Nonexistent topics will propagate as RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART
while unauthorized topics will propagate as
RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED.

The propagation takes place using the standard consumer error propagation,
namely the .err field in message_t objects returned from consumer_poll() et.al.
edenhill added a commit that referenced this issue Apr 23, 2020
Nonexistent topics will propagate as RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART
while unauthorized topics will propagate as
RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED.

The propagation takes place using the standard consumer error propagation,
namely the .err field in message_t objects returned from consumer_poll() et.al.
edenhill added a commit that referenced this issue Apr 24, 2020
Nonexistent topics will propagate as RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART
while unauthorized topics will propagate as
RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED.

The propagation takes place using the standard consumer error propagation,
namely the .err field in message_t objects returned from consumer_poll() et.al.
edenhill added a commit that referenced this issue Apr 24, 2020
Nonexistent topics will propagate as RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART
while unauthorized topics will propagate as
RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED.

The propagation takes place using the standard consumer error propagation,
namely the .err field in message_t objects returned from consumer_poll() et.al.
edenhill added a commit that referenced this issue Apr 27, 2020
Nonexistent topics will propagate as RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART
while unauthorized topics will propagate as
RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED.

The propagation takes place using the standard consumer error propagation,
namely the .err field in message_t objects returned from consumer_poll() et.al.
@edenhill edenhill modified the milestones: v1.0.0, v1.5.0 Apr 29, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants