Skip to content
Diogo Behrens edited this page Feb 25, 2018 · 4 revisions

This is a list of notes on how to configure and use Goka. You should check these notes before putting Goka into production.

The only configuration that affects correctness is the first one and must be applied.

Configuring log compaction for table topics

By default, topics are not log compacted in Kafka. Goka tables should be configured with log compaction, otherwise Kafka gradually deletes their content based on the default retention time.

Here are some ways to configure a table topic with log compaction.

Using TopicManager to create table topics

Goka provides a topic manager to help the creation of tables with log compaction on. At the time of writting, Sarama does not support the creation of topics via Kafka, so we need to connect to ZooKeeper to create them.

When creating a processor pass a ZKTopicManagerBuilder as option

servers := []string{"localhost:2181/chroot"}
p, err := goka.NewProcessor(brokers, graph,
	goka.WithTopicManagerBuilder(
		kafka.ZKTopicManagerBuilder(servers)))

The TopicManager creates the table topic if it does not exist. The number of partition is inferred by the number of partitions of the input streams.

In production, this is our preferred method to create properly configured tables. We use to disable the default automatic topic creation of Kafka to avoid having to reconfigure topics later on.

Note that only Goka processors create tables, views don't.

Manually creating a table topic

Instead of letting a TopicManager automatically create tables, one can manually create them (or using scripts). For a group called mygroup, one should create a table topic called mygroup-table, for example:

./kafka-topics.sh                       \
	--zookeeper localhost:2181      \
	--create                        \
	--replication-factor 2          \
	--partitions 10                 \
	--config cleanup.policy=compact \
	--topic mygroup-table

Be sure to create the table with the same number of partitions as the input streams of the processor group.

Configuring an existing table topic

If the table topic already exists in Kafka, one may configure it for log compaction as follows.

./kafka-topics.sh                       \
	--zookeeper localhost:2181      \
	--alter                         \
	--config cleanup.policy=compact \
	--topic mygroup-table

We typically use Docker to execute Kafka and ZooKeeper commands. To execute them from a Docker container, one has to call verbose commands starting with docker run --rm -it .... In our examples directory, we have a script that make transparent the use of Docker: For Kafka command, it creates one local script that internally calls docker and runs the command with the same name.

Configuring the Kafka client library

Goka abstracts the Kafka client library and allows one to plug different implementations via options to processors, consumers, and emitters. The builders in the subpackage kafka allow one also to configure the default client library (sarama).

Passing options to Sarama

An often changed configuration is the Kafka version in the Sarama library. One can change the version as well as any other configuration in Sarama as follows.

config := kafka.NewConfig()
config.Version = sarama.V1_0_0
emitter, _ := goka.NewEmitter(brokers, stream, codec,
	goka.WithEmitterProducerBuilder(
		kafka.ProducerBuilderWithConfig(config)))

Using Confluent Kafka client

One may also use confluent-kafka-go with Goka. For details check the github.com/lovoo/goka/kafka/confluent subpackage.

Configuring the local storage

As the Kafka client, the local storage can also be replaced, configured, or extended via options. Examples of extensions are:

  • adding metrics, for example, as a Prometheus collector
  • adding an LRU layer to speedup read-most workloads in Views
  • making a single database for all partitions instead of one per partition
  • replacing the local storage with a remote storage, for example, Redis

Configuring LevelDB

Simply create an option from the LevelDB package opt and pass that to storage.BuilderWithOptions.

opts := &opt.Options{
	BlockCacheCapacity:     opt.MiB * 30,
	WriteBuffer:            opt.MiB * 10,
}
path := "/tmp/goka"
builder := storage.BuilderWithOptions(path, opts)

p, _ := goka.NewView(brokers, table, codec,
	goka.WithViewStorageBuilder(builder))

Note that if you run multiple instances of the same processor group in the same machine, you have to configure different storage path for them.

Using in-memory local storage

Goka provides a in-memory local storage, which is basically a map.

builder := storage.MemoryBuilder()
p, _ := goka.NewView(brokers, table, codec,
	goka.WithViewStorageBuilder(builder))
Clone this wiki locally