Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaSinkCluster return error on ControlledShutdown request #1791

Merged
merged 2 commits into from
Nov 1, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,22 @@ impl KafkaSinkCluster {
})) => {
self.route_find_coordinator(request);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ControlledShutdown(_),
..
})) => {
// This request type specifies a broker that should shutdown.
// Since the broker ID's known to the client represent shotover nodes,
// the only reasonable interpretation of this request would be for the shotover with that broker id to shutdown.
// But its not appropriate for shotover to shutdown:
// * It cant hand off topics to other nodes like kafka does as part of its controlled shutdown process.
// * It doesnt have any ACL's, any user could send this request regardless of authorization.
//
// So we abort the connection and log an error to signal to the client that the request failed.
// It might be better to instead construct a response containing an error code,
// but its not worth the complexity for such a rare request type.
return Err(anyhow!("Client sent ControlledShutdown request. Shotover cannot handle this request as it is not appropriate for shotover to shutdown. The connection to the client has been closed."));
ronycsdu marked this conversation as resolved.
Show resolved Hide resolved
}

// route to controller broker
Some(Frame::Kafka(KafkaFrame::Request {
Expand Down
Loading