##What is a Flume Interceptor Interceptors are components of the Flume agent architecture. Typically, such an agent is based on a source dealing with the input, a sink dealing with the output and a channel communicating them. The source processes the input, producing Flume events (an object based on a set of headers and a byte-based body) that are put in the channel; then the sink consumes the events by getting them from the channel. This basic architecture may be enriched by the addition of Interceptors, a chained sequence of Flume events preprocessors that intercept the events before they are put into the channel and performing one of these operations:
- Drop the event.
- Modify an existent header of the Flume event.
- Add a new header to the Flume event.
Interceptors should never modify the body part. Once an event is preprocessed, it is put in the channel as usual.
As can be seen, this mechanism allows for very useful ways of enriching the basic Flume events a certain Flume source may generate. Let's see how Cygnus makes use of this concept in order to add certain information to the Flume events created from the Orion notifications.
##Timestamp
Interceptor
This is an Interceptor that can be natively found in any Flume distribution. It adds a timestamp
header to the Flume event, whose value expresses the number of miliseconds from January the 1st, 1970.
The way Cygnus makes use of this Interceptor is the standard one:
cygnusagent.sources.http-source.interceptors = ts <other-interceptors>
cygnusagent.sources.http-source.interceptors.ts.type = timestamp
##GroupingInterceptor
Interceptor
This is a custom Interceptor specifically designed for Cygnus. Its goal is to infer the destination entity where the data regarding a notified entity is going to be persisted. This destination entity, depending on the used sinks, may be a HDFS file name, a MySQL table name or a CKAN resource name. In addition, a new fiware-servicePath
containing the destination entity may be configured (in case of HDFS, this is a folder; in case of CKAN this is a package; in case of MySQL this is simply a prefix for the table name; please, have a look to doc/design/naming_conventions.md for more details).
Such an inference is made by inspecting (but not modifying) certain configured fields of the body part of the event; if the concatenation of such fields matches a configured regular expresion, then the configured destination entity is added as the value of a destination
header. The already existing fiware-servicePath
header may be substituted as well by the configured new service path.
If a notified entity contains more than one context response, then both the destination
and the fiware-servicePath
headers contains a comma-separated list of values.
###Grouping rules There exists a grouping rules file containing Json-like rules definition, following this format:
{
"grouping_rules": [
{
"id": 1,
"fields": [
...
],
"regex": "...",
"destination": "...",
"fiware_service_path": "..."
},
...
]
}
Being:
- id: A unique unsigned integer-based identifier. Not really used in the current implementation, but could be useful in the future.
- fields: These are the fields that will be concatenated for regular expression matching. The available dictionary of fields for concatenation is entityId, entityType and servicePath. The order of these fields is important since the concatenation is made from left to right.
- regex: Java-like regular expression to be applied on the concatenated fields.
- destination: Name of the HDFS file or CKAN resource where the data will be effectively persisted. In the case of MySQL, Mongo and STH this sufixes the table/collection name. Please, have a look to doc/design/naming_conventions.md for more details.
- fiware_service_path: New
fiware-servicePath
replacing the notified one. The sinks will translate this into the name of the HDFS folder or CKAN package where the above destination entity will be placed. In the case of MySQL, Mongo and STH this prefixes the table/collection name. Please, have a look to doc/design/naming_conventions.md for more details.
For instance:
{
"grouping_rules": [
{
"id": 1,
"fields": [
"entityId",
"entityType"
],
"regex": "Room\.(\d*)Room",
"destination": "numeric_rooms",
"fiware_service_path": "rooms"
},
{
"id": 2,
"fields": [
"entityId",
"entityType"
],
"regex": "Room\.(\D*)Room",
"destination": "character_rooms",
"fiware_service_path": "rooms"
},
{
"id": 3,
"fields": [
"entityType",
"entityId"
],
"regex": "RoomRoom\.(\D*)",
"destination": "character_rooms",
"fiware_service_path": "rooms"
},
{
"id": 4,
"fields": [
"entityType"
],
"regex": "Room",
"destination": "other_rooms",
"fiware_service_path": "rooms"
}
]
}
The above rules set that:
- All the
Room
entities having their identifiers composed by aRoom.
and an integer (e.g.Room.12
) will be persisted in anumeric_rooms
destination within arooms
service parth (in the example, the concatenation is equals toRoom.12Room
). - All the
Room
entities having their identifiers composed by aRoom.
and any number of characters (no digits) (e.g.Room.left
) will be persisted in acharacter_rooms
destination within arooms
service path (in the example, the concatenation is equals toRoom.leftRoom
when appliying rule number 2, butRoomRoom.left
when applying rule number 3; nevertheless, from a semantic point of view they are the same rule). - All other rooms will go to
other_rooms
destination within arooms
service path.
Rules are tryed sequentially, and if any rules matches then the default destination for the notified entity is generated, i.e. the concatenation of the entity id, _
and the entity type; and the notified service path is maintained.
The grouping rules file is usually placed at [FLUME_HOME_DIR]/conf/
, and there exists a template within Cygnus distribution.
The usage of such an Interceptor is:
cygnusagent.sources.http-source.interceptors = gi <other-interceptors>
cygnusagent.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.GroupingInterceptor$Builder
cygnusagent.sources.http-source.interceptors.gi.grouping_rules_conf_file = [FLUME_HOME_DIR]/conf/grouping_rules.conf
It is very important to configure the absolute path to the grouping rules file.
##Contact
Francisco Romero Bueno ([email protected]) [Main contributor]
Fermín Galán Márquez ([email protected]) [Contributor and Orion Context Broker owner]
Germán Toro del Valle ([email protected]) [Contributor]
Iván Arias León ([email protected]) [Quality Assurance]