-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Kinesis Elasticsearch Sink Setup 0.1.0
🚧 The documentation for our Elasticsearch Loader can be found on the Snowplow documentation site.
This documentation is for version 0.1.0 upto 0.3.0 of Kinesis Elasticsearch Sink.
If you are using Snowplow Kinesis Enrich to write enriched Snowplow events to one stream and bad events to another, you can use the Kinesis Elasticsearch Sink to read events from either of those streams and write them to Elasticsearch.
First off, install and set up Elasticsearch version 1.4.0. For more information check out the installation guide.
Elasticsearch keeps a lot of files open simultaneously so you will need to increase the maximum number of files a user can have open. To do this:
sudo vim /etc/security/limits.conf
Append the following lines to the file:
{{USERNAME}} soft nofile 32000
{{USERNAME}} hard nofile 32000
Where {{USERNAME}} is the name of the user running Elasticsearch. You will need to logout and restart Elasticsearch before the new file limit takes effect.
To check that this new limit has taken effect you can run the following command from the terminal:
curl localhost:9200/_nodes/process?pretty
If the max_file_descriptors
equals 32000 it is running with the new limit.
Use the following request to create the mapping for the enriched event type:
curl -XPUT 'http://localhost:9200/snowplow' -d '{
"settings": {
"analysis": {
"analyzer": {
"default": {
"type": "keyword"
}
}
}
},
"mappings": {
"enriched": {
"_timestamp" : {
"enabled" : "yes",
"path" : "collector_tstamp"
},
"_ttl": {
"enabled":true,
"default": "604800000"
},
"properties": {
"geo_location": {
"type": "geo_point"
}
}
}
}
}'
Elasticsearch will then treat the collector_tstamp field as the timestamp and the geo_location field as a "geo_point". Documents will be automatically deleted one week (604800000 milliseconds) after their collector_tstamp.
This initialization sets the default analyzer to "keyword". This means that string fields will not be split into separate tokens for the purposes of searching. This saves space and ensures that URL fields are handled correctly.
If you want to tokenize specific string fields, you can change the "properties" field in the mapping like this:
curl -XPUT 'http://localhost:9200/snowplow' -d '{
"settings": {
"analysis": {
"analyzer": {
"default": {
"type": "keyword"
}
}
}
},
"mappings": {
"enriched": {
"_timestamp" : {
"enabled" : "yes",
"path" : "collector_tstamp"
},
"_ttl": {
"enabled":true,
"default": "604800000"
},
"properties": {
"geo_location": {
"type": "geo_point"
},
"field_to_tokenize": {
"type": "string",
"analyzer": "english"
}
}
}
}
}'
You can choose to either:
- Download the Kinesis Elasticsearch Sink executable jarfile, or:
- Compile it from source
To get a local copy, you can download the executable jarfile directly from our Hosted Assets bucket on Amazon S3 - please see our Hosted assets page for details.
You will need to add the executable flag onto the file:
$ chmod +x snowplow-elasticsearch-sink-0.1.0
Alternatively, you can build it from the source files. To do so, you will need [scala][scala] and [sbt][sbt] installed.
To do so, clone the Snowplow repo:
$ git clone https://github.com/snowplow/snowplow.git
Navigate into the Kinesis Elasticsearch Sink folder:
$ cd 4-storage/kinesis-elasticsearch-sink
Use sbt
to resolve dependencies, compile the source, and build an [assembled][assembly] fat JAR file with all dependencies.
$ sbt assembly
The jar
file will be saved as snowplow-elasticsearch-sink-0.1.0
in the target/scala-2.10
subdirectory. It is now ready to be deployed.
The sink is configured using a HOCON file. These are the fields:
- source: Change this from "kinesis" to "stdin" to run the sink in local mode. You can pipe in the output of [Stream Enrich][stream-enrich] and the sink will log the resulting JSONs to stdout and any events which couldn't be converted to JSON to stderr.
- sink: Change this from "kinesis" to "stdouterr" to write to stdout rather than Elasticsearch and stderr rather than Kinesis. Log messages will still be logged to stdout, so make sure to filter them out if you intend to pipe the output of the sink to another process.
- aws/access-key and aws/secret-key: Change these to your AWS credentials. You can alternatively leave them as "default", in which case the DefaultAWSCredentialsProviderChain will be used.
- kinesis/in/stream-name: The name of the input Kinesis stream
- kinesis/in/stream-type: "good" if the input stream contains successfully enriched events; "bad" if it contains bad rows.
- kinesis/in/initial-position: Where to start reading from the stream the first time the app is run. "TRIM_HORIZON" for as far back as possible, "LATEST" for as recent as possibly.
- kinesis/out/stream-name: The name of the output Kinesis stream. Records which cannot be converted to JSON or can be converted but are rejected by Elasticsearch get sent here. If this stream doesn't exist already it will be created automatically.
- kinesis/out/shards: If the out stream doesn't exist, create it with this many shards.
- kinesis/region: The Kinesis region name to use.
- kinesis/app-name: Unique identifier for the app which ensures that if it is stopped and restarted, it will restart at the correct location.
- location/index: The Elasticsearch index name
- location/type: The Elasticsarch type name
- buffer/byte-limit: Whenever the total size of the buffered records exceeds this number, they will all be sent to Elasticsearch.
- buffer/record-limit: Whenever the total number of buffered records exceeds this number, they will all be sent to Elasticsearch.
- buffer/time-limit: If this length of time passes without the buffer being flushed, the buffer will be flushed.
An example is available in the repo.
You will need to configure the names of the input and output streams.
The Kinesis Elasticsearch Sink is an executable jarfile which should be runnable from any Unix-like shell environment. Simply provide the configuration file as a parameter:
$ ./kinesis-Elasticsearch-sink-0.1.0 --config my.conf
This will start the process of reading events from Kinesis and writing them to an Elasticsearch cluster.
Home | About | Project | Setup Guide | Technical Docs | Copyright © 2012-2021 Snowplow Analytics Ltd. Documentation terms of use.
HOME » SNOWPLOW SETUP GUIDE » Step 4: Setup alternative data stores
- Step 1: Setup a Collector
- Step 2a: Setup a Tracker
- Step 2b: Setup a Webhook
- Step 3: Setup Enrich
-
Step 4: Setup alternative data stores
- 4.1: setup Redshift
- 4.2: setup PostgreSQL
- 4.3: installing the StorageLoader
- 4.4: using the StorageLoader
- 4.5: scheduling the StorageLoader
- 4.6: loading shredded types
- 4.7: setup Elasticsearch
- 4.8: setup the Kinesis LZO S3 Sink
- 4.9: setup Druid
- 4.10: setup Amazon DynamoDB
- 4.11: configuring storage targets
- 4.12: setup Google Cloud Storage Loader
- Step 5: Data modeling
- Step 6: Analyze your data!
Useful resources
- Troubleshooting
- IAM Setup
- Hosted assets
- Glossary of Terms
- Upgrade Guide
- Snowplow Version Matrix
- Batch Pipeline Steps (block dataflow diagram)