Parallafka is a parallelization layer over your out-of-the-box, one-message-at-a-time Kafka consumer, with built-in support for the Confluent Kafka client.
static async Task Main(string[] args)
{
string topicName = "ultimate-stonks-watchlist";
IConsumer<string, StockPrice> confluentConsumer = ConsumerForTopic<StockPrice>(topicName);
IKafkaConsumer<string, StockPrice> consumer = new ConfluentConsumerAdapter<string, StockPrice>(
confluentConsumer, topicName);
IParallafkaConfig<string, StockPrice> config = new ParallafkaConfig<string, StockPrice>()
{
MaxDegreeOfParallelism = 7,
};
IParallafka<string, StockPrice> parallafka = new Parallafka<string, StockPrice>(consumer, config);
await parallafka.ConsumeAsync(async (IKafkaMessage<string, StockPrice> message) =>
{
// This handler code will now run on up to 7 threads at once
Console.WriteLine($"{message.Value.TickerSymbol} is ${message.Value.Price}");
});
}
static IConsumer<string, T> ConsumerForTopic<T>(string topicName)
where T : class
{
var consumerConfig = new ConsumerConfig()
{
BootstrapServers = "localhost:9092",
GroupId = "stonks-autotrader",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false,
};
var consumerBuilder = new ConsumerBuilder<string, T>(consumerConfig)
.SetValueDeserializer(new JsonDeserializer<T>().AsSyncOverAsync());
IConsumer<string, T> rawConsumer = consumerBuilder.Build();
rawConsumer.Subscribe(topicName);
return rawConsumer;
}
In many cases, you don't need your Kafka consumer to process a partition in full FIFO order: It's not a problem that an email is sent to User2 before User1 despite User1 having registered 30 milliseconds earlier than User2 and being first in the topic. When this is the case, it's safe to parallelize handler code far beyond Kafka's 1-consumer-per-partition guarantee by polling the topic in the traditional order on one thread and then distributing the messages across handler threads to parallelize the actual processing. Parallafka does this for you, multiplying handler throughput significantly, while also diligently preserving the same-key order guarantee when you have, say, a message activating a feature for User1 and in quick succession a message deactivating the feature or the entire user. Parallafka ensures that, in the midst of all the parallelism, FIFO-order serial processing is preserved for any set of messages published with a shared key.
- Configurable handler parallelism.
- Preserves FIFO-order serial processing for messages with the same key, no matter how many concurrent handlers are running.
- Adapter supporting Confluent's Kafka consumer.
- Configurable shutdown behavior: graceful (wait to finish handling and commit, with timeout) and hard-stop.
- Injectable ILogger.
- Optional automatic performance tuning with adaptive handler thread count.
- Configuration options for commit behavior and frequency.
dotnet add package Parallafka
dotnet add package Parallafka.AdapterForConfluentConsumer