-
-
Notifications
You must be signed in to change notification settings - Fork 544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
producer.connect() - KafkaJSError: The producer is disconnected #907
Comments
I ran into this as well after upgrading to 1.14.0. It seems like there was an unannounced breaking change that requires producer.connect to be called before sending? |
This has always been the case. We have never supported sending messages without first connecting the producer. It's possible that a change in the network layer caused a change in behavior, but even if it somehow worked before, it was never supported. Otherwise there would be no point in having a connect method.
Could you share a more complete example of what you are doing? I tried to reproduce it myself using the simple producer example from node examples/producer.js
info: ┏ Sending 870 messages #0...
info: ┃ [0] {
info: ┃ [1] "timestamp": "2020-10-06T06:52:26.007Z",
info: ┃ [2] "logger": "kafkajs"
info: ┗ [3] }
info: ┏ Sending 602 messages #1...
info: ┃ [0] {
info: ┃ [1] "timestamp": "2020-10-06T06:52:26.013Z",
info: ┃ [2] "logger": "kafkajs"
info: ┗ [3] }
info: ┏ Messages sent #0
info: ┃ [ 0] {
info: ┃ [ 1] "timestamp": "2020-10-06T06:52:27.093Z",
info: ┃ [ 2] "logger": "kafkajs",
info: ┃ [ 3] "response": [
info: ┃ [ 4] {
info: ┃ [ 5] "topicName": "topic-test",
info: ┃ [ 6] "partition": 0,
info: ┃ [ 7] "errorCode": 0,
info: ┃ [ 8] "baseOffset": "0",
info: ┃ [ 9] "logAppendTime": "-1",
info: ┃ [10] "logStartOffset": "0"
info: ┃ [11] }
info: ┃ [12] ],
info: ┃ [13] "msgNumber": 1472
info: ┗ [14] }
info: ┏ Messages sent #1
info: ┃ [ 0] {
info: ┃ [ 1] "timestamp": "2020-10-06T06:52:27.105Z",
info: ┃ [ 2] "logger": "kafkajs",
info: ┃ [ 3] "response": [
info: ┃ [ 4] {
info: ┃ [ 5] "topicName": "topic-test",
info: ┃ [ 6] "partition": 0,
info: ┃ [ 7] "errorCode": 0,
info: ┃ [ 8] "baseOffset": "870",
info: ┃ [ 9] "logAppendTime": "-1",
info: ┃ [10] "logStartOffset": "0"
info: ┃ [11] }
info: ┃ [12] ],
info: ┃ [13] "msgNumber": 1472
info: ┗ [14] } Specifically, there's clearly something fishy going on in |
That line 111 is last line of code sample: producer.connect() doesn't throw error, producer.event.CONNECTED is not emitted and message is not sent and end up with error
it is easier to reproduce the error, when producer is stopped and restarted because reconnection takes longer. |
Is it best to connect->send->disconnect every time or just connect when the app starts and only disconnect on app shutdown? It seems like I am seeing the same error sometimes even after adding an explicit connect on app start. |
You should just connect at startup and disconnect at shutdown. There seems to be a bug introduced with the latest version related to reconnects, but regardless, nothing is changing about the intended usage. |
I was looking into this issue and we definitely changed the behavior on 1.14, but it's for the better. Without the changes, you would never be able to stop the producer without side-effects since it would always reconnect even if your server was going down. When you call If you have an API that is producing messages and you are shutting it down it usually means that the server will block new requests and start draining the remaining connections, which is also the case for the producer now. |
@crobinson42 try |
What happens if no broker is available at startup? We occasionally produce messages in our API server, but i dont want my whole API server to not start, because kafka is not available, since a large part of the API is useable without kafka. |
Hmm, I have checked with
Looks good! But, after about 10 minutes I don't see any connections to Kafka. I run I am talking about producer. |
I have used |
What happens when you call
|
I get this error then: |
Ran in the same issue after updating to 1.14. From my point of view calling As workaround we created an own abstraction and call every time our own |
We have the same issue in producer side, with set producer connected when get a producer instance from kafka client in "kafkajs": "^1.15.0"., there is data loss each time restart producer. |
I'm having the same issue when using transactions. |
Hi all, I have the same issue, Is there any resolution for this? |
It is probably because you submit message without connection built at the
beginning , it happened in parallel coding but no problem in serial coding.
So just make sure there is a success connection initialized before sending
msg.
Plus, You can use debug logging to check the disconnected connection
information when sending messages in parallel which causes this issue.
Good luck
…On Thu, Jul 7, 2022 at 04:46 Manish Sharma ***@***.***> wrote:
Hi all, I have the same issue, Is there any resolution for this?
—
Reply to this email directly, view it on GitHub
<#907 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ARI74VPWCRE3MM62YW2L36TVSXWB3ANCNFSM4SE6BQCA>
.
You are receiving this because you commented.Message ID:
***@***.***>
|
Thanks @superLincoln for the answer, Sorry i gave a little less context on the comment. Actually the scenario that i have is, My app is deployed in k8 and with one pod deployment, the kafka service runs well, the publisher doesn't get disconnected when performing stress testing also, but once i spawn multiple replicas, It started giving me this error. Both the pods point to the same broker and publish to the same topic (Its just the replica anyhow). So ya like is this a known issue or do we have some workaround for this? Do let me know of the possibilities, I will try and find some workaround myself on this. |
We are also facing the same issue with |
WE are also facing the same issue , IS there a fix for this |
I guess in this case all consumers have same consumer group as it is scaled service. You can only have as many consumers in same group as many partitions you have on kafka instance. |
IIRC, you can, they just stay idle. But no error should be thrown. |
@Nevon As you are saying that - when you try to send, it should reconnect. If reconnecting doesn't work, then there's a problem. KafkaJS is not trying to reconnect as it is just checking connection status and throwing below two error:
Can you revisit the implementation.
|
im encountering the same issue |
im encountering the same issue if i need to loop the sending, if i will send only once. no error |
Iterating a loop and individually sending messages using the Instead, I send using the
Sorry for Then call you function like this:
|
My answer could be irrelevant to the above application code, but if someone facing this issue when setting up KAFKA on K8S Cluster, the cause is related to following configs (mostly config:
default.replication.factor: 5
inter.broker.protocol.version: "3.5"
min.insync.replicas: 4
offsets.topic.replication.factor: 5
transaction.state.log.min.isr: 4
transaction.state.log.replication.factor: 5
$ sh kafka-topics.sh --create --command-config admin.config --bootstrap-server kafka-bootstrap.example.com:443 --topic test-topic --partitions 5 --replication-factor 5 --config min.insync.replicas=4 --config retention.ms=259200000
|
Describe the bug
Somewhere is problem in 1.14.0 version. I tried to connect and then produce message, but sometimes I got error:
Also producer.event.CONNECTED is not emitted.
I tried to produce 2 messages at once and to log KafkaJS connectionStatus into console and result is:
Code sample:
To Reproduce
Please provide either a link to a:
If none of the above are possible to provide, please write down the exact steps to reproduce the behavior:
Expected behavior
resolve connect then produce messages
Observed behavior
Even when connection is resolved, producing first message failed and connectionStatus is not updated, other messages are produced.
Environment:
Additional context
Add any other context about the problem here.
The text was updated successfully, but these errors were encountered: