Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] pulsar keep creating dead letter queue producer and exceed the maximum limit #20635

Closed
1 of 2 tasks
mingmcb opened this issue Jun 23, 2023 · 6 comments · Fixed by #23824 · May be fixed by #21294 or tonisojandu/pulsar#3
Closed
1 of 2 tasks

[Bug] pulsar keep creating dead letter queue producer and exceed the maximum limit #20635

mingmcb opened this issue Jun 23, 2023 · 6 comments · Fixed by #23824 · May be fixed by #21294 or tonisojandu/pulsar#3
Assignees
Labels
Stale triage/lhotari/important lhotari's triaging label for important issues or PRs type/bug The PR fixed a bug or issue reported a bug
Milestone

Comments

@mingmcb
Copy link

mingmcb commented Jun 23, 2023

Search before asking

  • I searched in the issues and found nothing similar.

Version

2.11

Minimal reproduce step

Use pulsar java client library to create a consumer with dlq prodcer.

  1. after application starts, stop bookie
  2. produce some message to the queue and trigger the consumer
  3. check the log.
    It seems keep creating dead letter queue producer, and eventually hit the maximum limit

What did you expect to see?

extra producer should not be created if there is an issue on pulsar

What did you see instead?

created over 10000 producers and eventually exceed the limits

Anything else?

see logs

--
Starting Pulsar producer perf with config: {"topicName":"persistent://public/default/my-topic","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
--
Pulsar client config: {"serviceUrl":"pulsar://pulsar:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":true,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
[persistent://public/default/dlq] [null] Creating producer on cnx [id: 0x910a07bb, L:/10.204.68.122:45104 - R:pulsar/10.204.67.17:6650]
[persistent://public/default/dlq] [pulsar-dev-5-10032] Created producer on cnx [id: 0x910a07bb, L:/10.204.68.122:45104 - R:pulsar/10.204.67.17:6650]
[persistent://public/default/my-topic] failed to get schema : org.apache.pulsar.client.api.PulsarClientException: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.SchemaException: Bookie handle is not available - ledger=35399 - operation=Failed to read entry - entry=0","reqId":4059106653410978868, "remote":"pulsar/10.204.70.7:6650", "local":"/10.204.68.122:44372"}
Dead letter producer exception with topic: persistent://public/default/dlq
Starting Pulsar producer perf with config: {"topicName":"persistent://public/default/dlq","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
Pulsar client config: {"serviceUrl":"pulsar://pulsar:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":true,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
[persistent://public/default/dlq] [null] Creating producer on cnx [id: 0x910a07bb, L:/10.204.68.122:45104 - R:pulsar/10.204.67.17:6650]
[persistent://public/default/dlq] [pulsar-dev-5-10033] Created producer on cnx [id: 0x910a07bb, L:/10.204.68.122:45104 - R:pulsar/10.204.67.17:6650]
[persistent://public/default/my-topic] failed to get schema : org.apache.pulsar.client.api.PulsarClientException: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.SchemaException: Bookie handle is not available - ledger=35399 - operation=Failed to read entry - entry=0","reqId":4059106653410978872, "remote":"pulsar/10.204.70.7:6650", "local":"/10.204.68.122:44372"}
Dead letter producer exception with topic: persistent://public/default/dlq
Starting Pulsar producer perf with config: {"topicName":"persistent://public/default/dlq","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
Pulsar client config: {"serviceUrl":"pulsar://pulsar:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":true,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
[persistent://public/default/dlq] [null] Creating producer on cnx [id: 0x910a07bb, L:/10.204.68.122:45104 - R:pulsar/10.204.67.17:6650]
[persistent://public/default/dlq] [pulsar-dev-5-10034] Created producer on cnx [id: 0x910a07bb, L:/10.204.68.122:45104 - R:pulsar/10.204.67.17:6650]
[persistent://public/default/my-topic] failed to get schema : org.apache.pulsar.client.api.PulsarClientException: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.SchemaException: Bookie handle is not available - ledger=35399 - operation=Failed to read entry - entry=0","reqId":4059106653410978876, "remote":"pulsar/10.204.70.7:6650", "local":"/10.204.68.122:44372"}
Dead letter producer exception with topic: persistent://public/default/dlq
Starting Pulsar producer perf with config: {"topicName":"persistent://public/default/dlq","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
Pulsar client config: {"serviceUrl":"pulsar://pulsar:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":true,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
[persistent://public/default/dlq] [null] Creating producer on cnx [id: 0x910a07bb, L:/10.204.68.122:45104 - R:pulsar/10.204.67.17:6650]
[persistent://public/default/dlq] [pulsar-dev-5-10035] Created producer on cnx [id: 0x910a07bb, L:/10.204.68.122:45104 - R:pulsar/10.204.67.17:6650]
[persistent://public/default/my-topic] failed to get schema : org.apache.pulsar.client.api.PulsarClientException: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.SchemaException: Bookie handle is not available - ledger=35399 - operation=Failed to read entry - entry=0","reqId":4059106653410978880, "remote":"pulsar/10.204.70.7:6650", "local":"/10.204.68.122:44372"}
Dead letter producer exception with topic: persistent://public/default/dlq

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@mingmcb mingmcb added the type/bug The PR fixed a bug or issue reported a bug label Jun 23, 2023
@david-streamlio
Copy link
Contributor

david-streamlio commented Jul 12, 2023

A single bookie failure shouldn’t have that type of impact unless it drops the number of active/writable bookies below the write/ack quorum. Can you share the number of bookies in your cluster and the write and ack quorum settings ?

@mingmcb
Copy link
Author

mingmcb commented Jul 27, 2023

There are 4 bookies configured for each environment. We also have the following configure that requires 3 bookie up running. In another word, only 1 bookie is allowed to be down without service interruption.
# Number of bookies to use when creating a ledger
managedLedgerDefaultEnsembleSize: "3"
# Number of copies to store for each message
managedLedgerDefaultWriteQuorum: "3"
# Number of guaranteed copies (acks to wait before write is complete)
managedLedgerDefaultAckQuorum: "3"

@mingmcb mingmcb changed the title [Bug] pulsar keep creating deadqueue producer and exceed the maximum limit [Bug] pulsar keep creating dead letter queue producer and exceed the maximum limit Jul 28, 2023
@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Aug 28, 2023
@tonisojandu
Copy link

tonisojandu commented Sep 7, 2023

I think we are facing the same problem. This issue might also be related.

We face this issue from time to time, when due to engineer, we encounter and schema incompatibility between the received messages and the one consumer expects. We have disabled schema validations on broker side by choice. However, rather than just sending those messages to DLQ, Pulsar client side fails validation and goes into a producer creating loop.

Here is the example code to replicate the issue.

We had a look at the client code and the problem seems to be in AutoProduceBytesSchema.encode(byte[] message) method since the requireSchemaValidation is flipped to true in the constructor, even though broker side validation is disabled.

We were able to "fix" this problem when in ConsumerImpl.initDeadLetterProducerIfNeeded() instead of :

((ProducerBuilderImpl<byte[]>) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)))

we created the producer with as:

((ProducerBuilderImpl<byte[]>) client.newProducer(Schema.BYTES))

and in ConsumerImpl.processPossibleToDLQ(MessageIdAdv messageId) instead of

producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))

we sent the message with

producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.BYTES))

I am not sure if this is proper way to fix it, as it leaves the DLQ with binary schema in registry. This would be fine for us, but not sure if someone else is relying on it having a more descriptive schema. However, I would like to imagine DLQ as a dumping ground that should be able to accept all types of messages.

If this is an OK fix, I can create a pull request for it.

edit:

  • first link was wrong

@github-actions
Copy link

github-actions bot commented Oct 8, 2023

The issue had no activity for 30 days, mark with Stale label.

@lhotari
Copy link
Member

lhotari commented Jan 8, 2025

I think we are facing the same problem. This issue might also be related.

We face this issue from time to time, when due to engineer, we encounter and schema incompatibility between the received messages and the one consumer expects. We have disabled schema validations on broker side by choice. However, rather than just sending those messages to DLQ, Pulsar client side fails validation and goes into a producer creating loop.

Here is the example code to replicate the issue.

We had a look at the client code and the problem seems to be in AutoProduceBytesSchema.encode(byte[] message) method since the requireSchemaValidation is flipped to true in the constructor, even though broker side validation is disabled.

We were able to "fix" this problem when in ConsumerImpl.initDeadLetterProducerIfNeeded() instead of :

((ProducerBuilderImpl<byte[]>) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)))

we created the producer with as:

((ProducerBuilderImpl<byte[]>) client.newProducer(Schema.BYTES))

and in ConsumerImpl.processPossibleToDLQ(MessageIdAdv messageId) instead of

producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))

we sent the message with

producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.BYTES))

I am not sure if this is proper way to fix it, as it leaves the DLQ with binary schema in registry. This would be fine for us, but not sure if someone else is relying on it having a more descriptive schema. However, I would like to imagine DLQ as a dumping ground that should be able to accept all types of messages.

If this is an OK fix, I can create a pull request for it.

edit:

  • first link was wrong

@tonisojandu I have submitted a PR #23824 to address the producer leak. There's also a retry backoff so that such failures don't cause excessive load on the broker. This PR doesn't make changes to the schema validation. I'm planning to address that in a separate PR. Most likely there would need to be a separate mode for AUTO_PRODUCE_BYTES where it can be chosen on the client side to skip schema validation. That feature is also useful in other use cases since it can reduce CPU usage on the client side when schema validation is skipped at producing time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment