-
Notifications
You must be signed in to change notification settings - Fork 73
Home
Like any spout in a storm toplogy, a kafka spout is created to emit tuples into the topology to be processed further by bolts.
As kafka cares little about the content of messages, a kafka spout will emit messages read from a topic as tuples with a single field named bytes
, containing a byte array as it was read from the topic.
In its current implementation, it operates by reading batches of messages from kafka and emitting these into the storm topology.
Only once a batch has been processed completely, the consumed offsets are committed to zookeeper (this is a limitation of the high level consumer API for kafka, this is expected to change with kafka 0.9).
The project uses maven and is published to maven central. To use it in your local project, include the following dependency:
<dependency>
<groupId>nl.minvenj.nfi.storm</groupId>
<artifactId>kafka-spout</artifactId>
<version>0.2</version>
</dependency>
Use the latest development state by cloning the repository and running mvn install
.
Note the version in the checkout to depend on the version just built.
In its simplest form, the kafka spout can be used as follows:
TopologyBuilder topology = new TopologyBuilder();
builder.setSpout("kafka", new KafkaSpout());
builder.setBolt("bolt", new MyBolt()).shuffleGrouping("kafka");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("my-topology", topology.build());
Configuration of the spout and the underlying kafka consumer is done through storm itself. The spout uses the following configuration parameters:
-
kafka.spout.topic
: the kafka topic to read messages from (default:storm
); -
kafka.spout.consumer.group
: the kafka consumer group to register with zookeeper (default:kafka_spout
); -
kafka.spout.buffer.size.max
: the maximum number of messages to buffer within aKafkaSpout
instance (default:1024
); -
kafka.spout.fail.handler
: the 'failure policy' to use when tuples are failed by a bolt, see Failure Policy (default:reliable
).
These configurations can be supplied to storm through topology configuration as follows:
TopologyBuilder topology = new TopologyBuilder();
builder.setSpout("kafka", new KafkaSpout());
// (...) define a topology
// create a storm Config object
Config config = new Config();
// configure kafka spout (values are available as constants on ConfigUtils)
config.put("kafka.spout.topic", "a-topic");
// kafka consumer configuration, see below
config.put("kafka.zookeeper.connect", "zookeeper.example.net:2181");
config.put("kafka.consumer.timeout.ms", 100);
LocalCluster cluster = new LocalCluster();
// submit topology with configuration
cluster.submitTopology("my-topology", config, topology.build());
Beside the configuration options specific to the spout, the configuration for the underlying kafka consumer are read from configuration provided by storm as well. These options can be specified in one of two ways:
-
Configuration file as classpath resource: value for configuration key
kakfa.config
is read as a classpath resource, loaded as a javaProperties
object and passed to a kafka consumer (note that the resource is loaded when the spout is opened, make sure the resource is available on the remote host); -
Configuration parameters prefixed from storm config: keys prefixed with
kafka.
are read from storm configuration and passed to a kafka consumer (this includeskafka.spout
keys, which kafka ignores).
Special actions are taken for some kafka consumer configuration parameters: zookeeper.connect
, consumer.timeout.ms
and auto.commit.enable
.
If zookeeper.connect
is not provided by one of the above methods, storm's zookeeper configuration (Config.STORM_ZOOKEEPER_SERVERS
and Config.STORM_ZOOKEEPER_PORT
) is converted to a zookeeper.connect
compatible string.
consumer.timeout.ms
is required to be present and greater than zero, as a kafka spout will otherwise block indefinately when waiting for new messages which hurts storm's way of calling nextTuple
on spouts.
Setting auto.commit.enable
to true
causes problems with reliability in the case a spout is stopped mid-batch or crashes.
The spout will disable this if it's not specified and refuse the configuration if it's enabled.
See kafka's consumer documentation for all available configuration parameters to kafka consumers.
The project allows you to specify a failure policy, which controls the whether a failed tuple should be replayed.
Two implementations are provided by default: ReliableFailHandler
and UnreliableFailHandler
.
The reliable failure policy will replay a tuple when it fails in the topology, the unreliable failure policy will ignore failed tuples.
The policy to be used by a spout is configured with the kafka.spout.fail.handler
configuration option.
The two default implementations are available by supplying either reliable
or unreliable
for the configuration, but the key allows your to specify a fully qualified class name, which is loaded and instantiated when the spout is opened.
A failure policy configured like this should implement nl.minvenj.nfi.storm.kafka.fail.FailHandler
.
The current implementation of KafkaSpout
will still require all tuples to be either acknowledged or failed before reading the next batch of messages from kafka.
This limitation hurts performance for applications that don't care about failed tuples, so expect this to change in the future.