-
Notifications
You must be signed in to change notification settings - Fork 56
Ruby DSL Documentation
Your project can be created in a single file containing all spouts, bolts and topology classes or each classes can be in its own file, your choice. There are many examples for the simple DSL.
The DSL uses a callback metaphor to attach code to the topology/spout/bolt execution contexts using on_*
DSL constructs (ex.: on_submit, on_send, ...). When using on_*
you can attach you code in 3 different ways:
- using a code block
on_receive (:ack => true, :anchor => true) {|tuple| do_something_with(tuple)}
on_receive :ack => true, :anchor => true do |tuple|
do_something_with(tuple)
end
- defining the corresponding method
on_receive :ack => true, :anchor => true
def on_receive(tuple)
do_something_with(tuple)
end
- defining an arbitrary method
on_receive :my_method, :ack => true, :anchor => true
def my_method(tuple)
do_something_with(tuple)
end
The example SplitSentenceBolt shows the 3 different coding style.
Normally Storm topology components are assigned and referenced using numeric ids. In the SimpleTopology DSL ids are optional. By default the DSL will use the component class name as an implicit symbolic id and bolt source ids can use these implicit ids. The DSL will automatically resolve and assign numeric ids upon topology submission. If two components are of the same class, creating a conflict, then the id can be explicitly defined using either a numeric value, a symbol or a string. Numeric values will be used as-is at topology submission while symbols and strings will be resolved and assigned a numeric id.
require 'red_storm'
class MyTopology < RedStorm::SimpleTopology
spout spout_class, options do
set "attribute_name" value
config_method value
...
end
bolt bolt_class, options do
source source_id, grouping
set "attribute_name" value
config_method value
...
end
configure topology_name do |env|
set "attribute_name" value
config_method value
...
end
on_submit do |env|
...
end
end
spout spout_class, options do
set "attribute_name" value
config_method value
...
end
-
spout_class
— spout Ruby class -
options
-
:id
— spout explicit id (default is spout class name) -
:parallelism
— spout parallelism (default is 1)
-
- the spout definition body is essentially for configuration options. See Storm Configuration documentation.
-
set
— set the valid spout config attribute_name to the given value. The spout code block and config statements are optional.topology.debug
topology.max.spout.pending
topology.max.task.parallelism
topology.kryo.register
-
config_method
— the valid spout config method setter name. Some attributes have a dedicated setter method. Here we use the setter name in uderscore format and without the "set" prefix.debug
max_spout_pending
max_task_parallelism
-
bolt bolt_class, options do
source source_id, grouping
set "attribute_name" value
config_method value
...
end
-
bolt_class
— bolt Ruby class -
options
-
:id
— bolt explicit id (default is bolt class name) -
:parallelism
— bolt parallelism (default is 1)
-
-
source
- define a new source-
source_id
— source id reference. can be the source class name if unique or the explicit id if defined -
grouping
-
:fields => ["field", ...]
— fieldsGrouping using fields on the source_id -
:shuffle
— shuffleGrouping on the source_id -
:global
— globalGrouping on the source_id -
:none
— noneGrouping on the source_id -
:all
— allGrouping on the source_id -
:direct
— directGrouping on the source_id
-
-
-
set
andconfig_method
are for configuration options. See Storm Configuration documentation.-
set
— set the valid bolt config attribute_name to the given value. optional.topology.debug
topology.max.task.parallelism
topology.kryo.register
-
config_method
— the valid bolt config method setter name. Some attributes have a dedicated setter method. Here we use the setter name in uderscore format and without the "set" prefix. optional.debug
max_task_parallelism
-
configure topology_name do |env|
set "attribute_name" value
config_method value
...
end
The configure
statement is required.
-
topology_name
— alternate topology name (default is topology class name) -
env
— is set to:local
or:cluster
for you to set enviroment specific configurations -
set
andconfig_method
are for Storm configuration options. See Storm Configuration documentation.-
set
— set the valid topology config attribute_name to the given value. optional.topology.debug
topology.max.task.parallelism
topology.workers
- ...
-
config_method
— the valid topology config method setter name. Some attributes have a dedicated setter method. Here we use the setter name in uderscore format and without the "set" prefix. optional.debug
max_task_parallelism
num_workers
- ...
-
on_submit do |env|
...
end
The on_submit
statement is optional. Use it to execute code after the topology submission.
-
env
— is set to:local
or:cluster
For example, you can use on_submit
to shutdown the LocalCluster after some time. The LocalCluster instance is available usign the cluster
method.
on_submit do |env|
if env == :local
sleep(5)
cluster.shutdown
end
end
require 'red_storm'
class MySpout < RedStorm::SimpleSpout
output_fields :field, ...
configure do
set "attribute_name" value
config_method value
...
end
on_send options do
...
end
on_init do
...
end
on_close do
...
end
on_ack do |msg_id|
...
end
on_fail do |msg_id|
...
end
on_activate do
...
end
on_deactivate do
...
end
end
output_fields :field, ...
Define the output fields for this spout.
-
:field
— the field name, can be symbol or string.
set "attribute_name" value
config_method value
Set configuration attribute specific for this spout. See Storm Configuration documentation.
-
set
— set the valid spout config attribute_name to the given value.topology.debug
topology.max.spout.pending
topology.max.task.parallelism
topology.kryo.register
-
config_method
— the valid spout config method setter name. Some attributes have a dedicated setter method. Here we use the setter name in uderscore format and without the "set" prefix.debug
max_spout_pending
max_task_parallelism
on_send options do
...
end
on_send
relates to the Java spout nextTuple
method and is called periodically by storm to allow the spout to output a tuple. When using auto-emit (default) and unreliable mode (default), the block return value will be auto-emited, with nil as message_id and tuple-tracking will be disabled and on_ack
and on_fail
will never be called. A single value return will be emited as a single-field tuple. An array of values [a, b]
will be emited as a multiple-fields tuple.
If you enable reliable mode and auto-emit, the block return value must be an array and you must supply a message_id as the first element of the array.
When not using auto-emit the reliable_emit(message_id, value, ...)
or unreliable_emit(value, ...)
methods can be used to emit a single tuple.
Normally a spout should only output a single tuple per on_send invocation.
-
:options
-
:emit
— set tofalse
to disable auto-emit (default istrue
) -
:reliable
— set totrue
to enable reliable mode when using auto-emit (default isfalse
)
-
on_init do
...
end
on_init
relates to the Java spout open
method. When on_init
is called, the config
, context
and collector
are set to return the Java spout config Map
, TopologyContext
and SpoutOutputCollector
.
on_close do
...
end
on_close
relates to the Java spout close
method.
on_ack do |msg_id|
...
end
on_ack
relates to the Java spout ack
method.
on_fail do |msg_id|
...
end
on_fail
relates to the Java spout fail
method.
on_activate do
...
end
on_activate
relates to the Java spout activate
method.
on_deactivate do
...
end
on_deactivate
relates to the Java spout deactivate
method.
require 'red_storm'
class MyBolt < RedStorm::SimpleBolt
output_fields :field, ...
configure do
set "attribute_name" value
config_method value
...
end
on_receive options do
...
end
on_init do
...
end
on_close do
...
end
end
output_fields :field, ...
Define the output fields for this bolt.
-
:field
— the field name, can be symbol or string.
set "attribute_name" value
config_method value
Set configuration attribute specific for this bolt. See Storm Configuration documentation.
-
set
— set the valid bolt config attribute_name to the given value.topology.debug
topology.max.task.parallelism
topology.kryo.register
-
config_method
— the valid bolt config method setter name. Some attributes have a dedicated setter method. Here we use the setter name in uderscore format and without the "set" prefix.debug
max_task_parallelism
on_receive options do
...
end
on_receive
relates to the Java bolt execute
method and is called upon tuple reception by Storm. When using auto-emit, the block return value will be auto emited. A single value return will be emited as a single-field tuple. An array of values [a, b]
will be emited as a multiple-fields tuple. An array of arrays [[a, b], [c, d]]
will be emited as multiple-fields multiple tuples.
When not using auto-emit, the unanchored_emit(value, ...)
and anchored_emit(tuple, value, ...)
methods can be used to emit a single tuple.
When using auto-anchor (disabled by default) the sent tuples will be anchored to the received tuple. When using auto-ack (disabled by default) the received tuple will be ack'ed after emitting the return value. When not using auto-ack, the ack(tuple)
method can be used to ack the tuple.
Note that setting auto-ack and auto-anchor is valid only when auto-emit is enabled.
-
:options
-
:emit
— set tofalse
to disable auto-emit (default istrue
) -
:ack
— set totrue
to enable auto-ack (default isfalse
) -
:anchor
— set totrue
to enable auto-anchor (default isfalse
)
-
on_init do
...
end
on_init
relates to the Java bolt prepare
method. When on_init
is called, the config
, context
and collector
are set to return the Java spout config Map
, TopologyContext
and SpoutOutputCollector
.
on_close do
...
end
on_close
relates to the Java bolt cleanup
method.