Skip to content

Commit

Permalink
deliver error from events
Browse files Browse the repository at this point in the history
  • Loading branch information
blindspotbounty committed Nov 4, 2024
1 parent 497e167 commit 08402aa
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 11 deletions.
6 changes: 6 additions & 0 deletions Sources/Kafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,12 @@ public final class KafkaConsumer: Sendable, Service {
} else {
try await client.assign(topicPartitionList: nil) // fallback
}
case .error(let error):
if let eventSource {
_ = eventSource.yield(.error(error))
} else {
throw error
}
default:
break // Ignore
}
Expand Down
4 changes: 4 additions & 0 deletions Sources/Kafka/KafkaConsumerEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ public enum RebalanceAction : Sendable, Hashable {
public enum KafkaConsumerEvent: Sendable, Hashable {
/// Rebalance from librdkafka
case rebalance(RebalanceAction)
/// Error from librdkafka
case error(KafkaError)
/// - Important: Always provide a `default` case when switiching over this `enum`.
case DO_NOT_SWITCH_OVER_THIS_EXHAUSITVELY

Expand All @@ -109,6 +111,8 @@ public enum KafkaConsumerEvent: Sendable, Hashable {
fatalError("Cannot cast \(event) to KafkaConsumerEvent")
case .rebalance(let action):
self = .rebalance(action)
case .error(let error):
self = .error(error)
case .deliveryReport:
fatalError("Cannot cast \(event) to KafkaConsumerEvent")
}
Expand Down
6 changes: 6 additions & 0 deletions Sources/Kafka/KafkaProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,12 @@ public final class KafkaProducer: Service, Sendable {
_ = source?.yield(.deliveryReports(reports))
case .statistics(let statistics):
self.configuration.metrics.update(with: statistics)
case .error(let error):
if let source {
_ = source.yield(.error(error))
} else {
throw error
}
default:
fatalError("Cannot cast \(event) to KafkaProducerEvent")
}
Expand Down
13 changes: 2 additions & 11 deletions Sources/Kafka/KafkaProducerEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,8 @@
public enum KafkaProducerEvent: Sendable, Hashable {
/// A collection of delivery reports received from the Kafka cluster indicating the status of produced messages.
case deliveryReports([KafkaDeliveryReport])
/// Error from librdkafka
case error(KafkaError)
/// - Important: Always provide a `default` case when switching over this `enum`.
case DO_NOT_SWITCH_OVER_THIS_EXHAUSITVELY

internal init(_ event: RDKafkaClient.KafkaEvent) {
switch event {
case .deliveryReport(results: let results):
self = .deliveryReports(results)
case .rebalance(_):
fatalError("Cannot cast \(event) to KafkaProducerEvent")
case .statistics:
fatalError("Cannot cast \(event) to KafkaProducerEvent")
}
}
}
16 changes: 16 additions & 0 deletions Sources/Kafka/RDKafka/RDKafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ public final class RDKafkaClient: Sendable {
case deliveryReport(results: [KafkaDeliveryReport])
case statistics(RDKafkaStatistics)
case rebalance(RebalanceAction)
case error(KafkaError)
}

/// Poll the event `rd_kafka_queue_t` for new events.
Expand Down Expand Up @@ -406,6 +407,8 @@ public final class RDKafkaClient: Sendable {
if let forwardEvent = self.handleStatistics(event) {
events.append(forwardEvent)
}
case .error:
events.append(self.handleError(event))
case .none:
// Finished reading events, return early
return shouldSleep
Expand All @@ -417,6 +420,19 @@ public final class RDKafkaClient: Sendable {
return shouldSleep
}

private func handleError(_ event: OpaquePointer?) -> KafkaEvent {
let err = rd_kafka_event_error(event)
let errorString = if let error = rd_kafka_err2str(rd_kafka_event_error(event)) {
String(cString: error)
} else {
"\(err)"
}
let fatal = rd_kafka_event_error_is_fatal(event) != 0

return .error(KafkaError.rdKafkaError(wrapping: err, errorMessage: errorString, isFatal: fatal))

}

/// Handle event of type `RDKafkaEvent.deliveryReport`.
///
/// - Parameter event: Pointer to underlying `rd_kafka_event_t`.
Expand Down

0 comments on commit 08402aa

Please sign in to comment.