Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: Endless Loop / Lockup with heartbeat checker #750

Closed
sebadob opened this issue Dec 2, 2022 · 5 comments
Closed

BUG: Endless Loop / Lockup with heartbeat checker #750

sebadob opened this issue Dec 2, 2022 · 5 comments
Assignees

Comments

@sebadob
Copy link

sebadob commented Dec 2, 2022

Hey,

I found a problem with the heartbeat checker from the async_nats with Jetstream and a durable consumer.

I had most probably the same problem already in v0.20.0, but now since the update to 0.24.0 for that application, I just did see some more logs about the issue, which I added for better debugging.

The app is running in a kind of unstable IoT wireless network and issues and problems arise sometimes. After a short network issue, the resource usage jumps up and needs a full additional core, because of my new debug logs, it is spamming the console like crazy. The biggest problem is, that I did not find any way to reproduce this error. I just know for sure, that it happened quite a few times already over the last weeks and most often comes up during the night, where the network is a bit more under load because of pretty heavy backups and more traffic (less during the day, since its a pilot project not in production yet).

In my case, since it is running in a K8s cluster, I could just panic!() for now and let K8s restart the application. Yes I could just add a short sleep at that point, but I am not sure, if the consumer loop does still work in that situation

Err(Custom { kind: Other, error: "unexpected termination of heartbeat checker" })

My code pieces are quite a bit split up and I merged them together here. The problem comes up in the line at the bottom basically, where I added the // >>> ... comment.

let nc = async_nats::ConnectOptions::with_credentials(&config.nats_creds)
    .expect("loading the 'nats.creds'")
    .event_callback(|event| async move {
        match event {
            Event::Connected => info!("Connected to Nats"),
            Event::Disconnected => warn!("Disconnected from Nats"),
            Event::LameDuckMode => warn!("Nats Server entered LameDuckMode"),
            Event::SlowConsumer(_) => warn!("SlowConsumer message from Nats Server"),
            Event::ServerError(err) => error!("Nats Server Error: {}", err),
            Event::ClientError(err) => error!("Nats Client Error: {}", err),
        }
    })
    .require_tls(true)
    .ping_interval(Duration::from_secs(30))
    .connect(&config.nats_url)
    .await
    .expect("nats connection");

let js = jetstream::new(nc);

let stream = js
    .get_or_create_stream(&*CFG_DEBUG)
    .await
    .map_err(|err| error!("Error creating the debug stream: {}", err))
    .unwrap();

let cfg = consumer::pull::Config {
    ack_wait: Duration::from_secs(10),
    deliver_policy: DeliverPolicy::New,
    durable_name: Some(NATS_DURABLE_NAME.clone()),
    inactive_threshold: INACTIVE_THRESHOLD,
    ..Default::default()
};
let consumer = stream
    .get_or_create_consumer(STREAM_NAME_DEBUG, cfg)
    .await
    .map_err(|err| error!("Error creating the consumer: {}", err))
    .unwrap();

match consumer.messages().await {
    Ok(mut s) => {
        loop {
            match s.try_next().await {
                Ok(Some(msg)) => {
                    match msg.subject.as_str() {
                        // do something useful here ...
                    };
                }

                err => {
                    // >>> This is the point, where this error get spammed all the time after a network issue <<<
                    // Here, the err is `Err(Custom { kind: Other, error: "unexpected termination of heartbeat checker" })`
                    error!("Error with the Stream: {:?}", err);
                }
            };
        }
    }
    Err(err) => {
        error!("Error opening messages stream: {}", err);
    }
}

If something is still unclear, please let me know, how I can help out.

@Jarema Jarema self-assigned this Dec 2, 2022
@Jarema
Copy link
Member

Jarema commented Dec 2, 2022

Hey, thanks for the report.

Taking look into it.

@sebadob
Copy link
Author

sebadob commented Dec 2, 2022

I just found out, that this whole loop starts with the following error:

{ kind: TimedOut, error: "did not receive idle heartbeat in time" }

After that comes up once, it stays in endless logging of

{ kind: Other, error: "unexpected termination of heartbeat checker" }

@sebadob
Copy link
Author

sebadob commented Dec 2, 2022

What I did as a temporary fix for now is wrapping the whole consumer creation in a loop and then break; and create a new connection, when the error comes up.

I tested with the v0.20.0 too, which I used before the update to 0.24.0 and I am seeing the same behavior.

@Jarema
Copy link
Member

Jarema commented Dec 2, 2022

Thanks for the update.

Missing idle heartbeat should stop the consumer ultimately and not cause any growth in resources utilisation.

Will fix this.

@Jarema
Copy link
Member

Jarema commented Dec 2, 2022

The issue has been resolved.
Terminal errors are now fusing the Consumer Stream.

@Jarema Jarema closed this as completed Dec 2, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants