diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..82632cb --- /dev/null +++ b/LICENSE @@ -0,0 +1,13 @@ +Copyright 2016 Engine Yard, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/README.md b/README.md index f26cbbf..a9ed6e7 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,21 @@ This plugin is used to decorate all log entries with kubernetes metadata. ### [fluent-plugin-elasticsearch](https://github.com/uken/fluent-plugin-elasticsearch) Allows fluentd to send log data to an elastic search cluster. You must specify an `ELASTICSEARCH_HOST` environment variable for this plugin to work. +* `ELASTICSEARCH_HOST="some.host"` +* `ELASTICSEARCH_SCHEME="http/https"` +* `ELASTICSEARCH_PORT="9200"` +* `ELASTICSEARCH_USER="username"` +* `ELASTICSEARCH_PASSWORD="password"` +* `ELASTICSEARCH_LOGSTASH_FORMAT="true/false"` - Creates indexes in the format `index_prefix-YYYY.MM.DD` +* `ELASTICSEARCH_TARGET_INDEX_KEY="kubernetes.namespace_name"` - Allows the index name to come from within the log message map. See example message format below. This allows the user to have an index per namespace, container name, or other dynamic value. +* `ELASTICSEARCH_TARGET_TYPE_KEY="some.key"` - Allows the user to set _type to a custom value found in the map. +* `ELASTICSEARCH_INCLUDE_TAG_KEY="true/false"` - Merge the fluentd tag back into the log message map. +* `ELASTICSEARCH_INDEX_NAME="fluentd"` - Set the index name where all events will be sent. +* `ELASTICSEARCH_LOGSTASH_PREFIX="logstash"` - Set the logstash prefix variable which is used when you want to use logstash format without specifying `ELASTICSEARCH_TARGET_INDEX_KEY`. +* `ELASTICSEARCH_TIME_KEY=""` - specify where the plugin can find the timestamp used for the `@timestamp` field +* `ELASTICSEARCH_TIME_KEY_FORMAT=""` - specify the format of `ELASTICSEARCH_TIME_KEY` +* `ELASTICSEARCH_TIME_KEY_EXCLUDE_TIMESTAMP=""` - If `ELASTICSEARCH_TIME_KEY` specified dont set ``@timestamp + ### [fluent-plugin-remote_syslog](https://github.com/dlackty/fluent-plugin-remote_syslog) This plugin allows `fluentd` to send data to a remote syslog endpoint like [papertrail](http://papertrailapp.com). You can configure `fluentd` to talk to multiple remote syslog endpoints by using the following scheme: * `SYSLOG_HOST_1=some.host` diff --git a/rootfs/Dockerfile b/rootfs/Dockerfile index 3fadcae..712f187 100644 --- a/rootfs/Dockerfile +++ b/rootfs/Dockerfile @@ -13,7 +13,7 @@ RUN buildDeps='g++ gcc make ruby-dev'; \ bundle install --gemfile=/opt/fluentd/deis-output/Gemfile && \ rake --rakefile=/opt/fluentd/deis-output/Rakefile build && \ fluent-gem install --no-document fluent-plugin-kubernetes_metadata_filter -v 0.25.3 && \ - fluent-gem install --no-document fluent-plugin-elasticsearch -v 1.6.0 && \ + fluent-gem install --no-document fluent-plugin-elasticsearch -v 1.7.0 && \ fluent-gem install --no-document fluent-plugin-remote_syslog -v 0.3.2 && \ fluent-gem install --no-document fluent-plugin-sumologic-mattk42 -v 0.0.4 && \ fluent-gem install --no-document influxdb -v 0.3.2 && \ diff --git a/rootfs/opt/fluentd/sbin/stores/elastic_search b/rootfs/opt/fluentd/sbin/stores/elastic_search index 66e0e61..70709fc 100755 --- a/rootfs/opt/fluentd/sbin/stores/elastic_search +++ b/rootfs/opt/fluentd/sbin/stores/elastic_search @@ -9,6 +9,18 @@ FLUENTD_BUFFER_CHUNK_LIMIT=${FLUENTD_BUFFER_CHUNK_LIMIT:-8m} FLUENTD_BUFFER_QUEUE_LIMIT=${FLUENTD_BUFFER_QUEUE_LIMIT:-8192} FLUENTD_BUFFER_TYPE=${FLUENTD_BUFFER_TYPE:-memory} FLUENTD_BUFFER_PATH=${FLUENTD_BUFFER_PATH:-/var/fluentd/buffer} +ELASTICSEARCH_LOGSTASH_FORMAT=${ELASTICSEARCH_LOGSTASH_FORMAT:-true} +# ELASTICSEARCH_LOGSTASH_PREFIX=${ELASTICSEARCH_LOGSTASH_PREFIX:-"logstash"} +# ELASTICSEARCH_TARGET_INDEX_KEY=${TARGET_INDEX_KEY:-""} +# ELASTICSEARCH_TARGET_TYPE_KEY=${TARGET_TYPE_KEY:-""} +# ELASTICSEARCH_INCLUDE_TAG_KEY=${INCLUDE_TAG_KEY:-false} +# ELASTICSEARCH_INDEX_NAME=${ELASTICSEARCH_INDEX_NAME:-"fluentd"} +# ELASTICSEARCH_TIME_KEY=${ELASTICSEARCH_TIME_KEY:-""} +# ELASTICSEARCH_TIME_KEY_FORMAT=${ELASTICSEARCH_TIME_KEY_FORMAT:-""} +# ELASTICSEARCH_TIME_KEY_EXCLUDE_TIMESTAMP=${ELASTICSEARCH_TIME_KEY_EXCLUDE_TIMESTAMP:-""} + + + if [ -n "$ELASTICSEARCH_HOST" ] then @@ -19,20 +31,27 @@ then cat << EOF >> $FLUENTD_CONF @type elasticsearch - include_tag_key true - time_key time host ${ELASTICSEARCH_HOST} - port ${ELASTICSEARCH_PORT} - scheme ${ELASTICSEARCH_SCHEME} + $([ -n "${ELASTICSEARCH_SCHEME}" ] && echo scheme ${ELASTICSEARCH_SCHEME}) + $([ -n "${ELASTICSEARCH_PORT}" ] && echo port ${ELASTICSEARCH_PORT}) $([ -n "${ELASTICSEARCH_USER}" ] && echo user ${ELASTICSEARCH_USER}) $([ -n "${ELASTICSEARCH_PASSWORD}" ] && echo password ${ELASTICSEARCH_PASSWORD}) + $([ -n "$ELASTICSEARCH_TIME_KEY_FORMAT" ] && echo time_key_format ${ELASTICSEARCH_TIME_KEY_FORMAT}) + $([ -n "$ELASTICSEARCH_TIME_KEY" ] && echo time_key ${ELASTICSEARCH_TIME_KEY}) + $([ -n "$ELASTICSEARCH_TIME_KEY_EXCLUDE_TIMESTAMP" ] && echo time_key_exclude_timestamp ${ELASTICSEARCH_TIME_KEY_EXCLUDE_TIMESTAMP}) + $([ -n "$ELASTICSEARCH_LOGSTASH_PREFIX" ] && echo logstash_prefix ${ELASTICSEARCH_LOGSTASH_PREFIX}) + $([ -n "$ELASTICSEARCH_INDEX_NAME" ] && echo index_name ${ELASTICSEARCH_INDEX_NAME}) + $([ -n "$ELASTICSEARCH_INCLUDE_TAG_KEY" ] && echo include_tag_key ${ELASTICSEARCH_INCLUDE_TAG_KEY}) + $([ -n "$ELASTICSEARCH_TARGET_INDEX_KEY" ] && echo target_index_key ${ELASTICSEARCH_TARGET_INDEX_KEY}) + $([ -n "$ELASTICSEARCH_TARGET_TYPE_KEY" ] && echo target_type_key ${ELASTICSEARCH_TARGET_TYPE_KEY}) + logstash_format ${ELASTICSEARCH_LOGSTASH_FORMAT} buffer_type ${FLUENTD_BUFFER_TYPE} $([ "${FLUENTD_BUFFER_TYPE}" == "file" ] && echo buffer_path ${FLUENTD_BUFFER_PATH}) + $([ "${FLUENTD_DISABLE_RETRY_LIMIT}" == "true" ] && echo disable_retry_limit) buffer_chunk_limit ${FLUENTD_BUFFER_CHUNK_LIMIT} buffer_queue_limit ${FLUENTD_BUFFER_QUEUE_LIMIT} flush_interval ${FLUENTD_FLUSH_INTERVAL} retry_limit ${FLUENTD_RETRY_LIMIT} - $([ "${FLUENTD_DISABLE_RETRY_LIMIT}" == "true" ] && echo disable_retry_limit) retry_wait ${FLUENTD_RETRY_WAIT} max_retry_wait ${FLUENTD_MAX_RETRY_WAIT} num_threads ${FLUENTD_FLUSH_THREADS} diff --git a/rootfs/var/lib/gems/2.3.0/gems/fluent-plugin-elasticsearch-1.7.0/lib/fluent/plugin/out_elasticsearch.rb b/rootfs/var/lib/gems/2.3.0/gems/fluent-plugin-elasticsearch-1.7.0/lib/fluent/plugin/out_elasticsearch.rb new file mode 100644 index 0000000..5596c0b --- /dev/null +++ b/rootfs/var/lib/gems/2.3.0/gems/fluent-plugin-elasticsearch-1.7.0/lib/fluent/plugin/out_elasticsearch.rb @@ -0,0 +1,370 @@ +# encoding: UTF-8 +require 'date' +require 'excon' +require 'elasticsearch' +require 'json' +require 'uri' +begin + require 'strptime' +rescue LoadError +end + +require_relative 'elasticsearch_index_template' + +class Fluent::ElasticsearchOutput < Fluent::BufferedOutput + class ConnectionFailure < StandardError; end + + Fluent::Plugin.register_output('elasticsearch', self) + + config_param :host, :string, :default => 'localhost' + config_param :port, :integer, :default => 9200 + config_param :user, :string, :default => nil + config_param :password, :string, :default => nil, :secret => true + config_param :path, :string, :default => nil + config_param :scheme, :string, :default => 'http' + config_param :hosts, :string, :default => nil + config_param :target_index_key, :string, :default => nil + config_param :target_type_key, :string, :default => nil + config_param :time_key_format, :string, :default => nil + config_param :logstash_format, :bool, :default => false + config_param :logstash_prefix, :string, :default => "logstash" + config_param :logstash_dateformat, :string, :default => "%Y.%m.%d" + config_param :utc_index, :bool, :default => true + config_param :type_name, :string, :default => "fluentd" + config_param :index_name, :string, :default => "fluentd" + config_param :id_key, :string, :default => nil + config_param :write_operation, :string, :default => "index" + config_param :parent_key, :string, :default => nil + config_param :routing_key, :string, :default => nil + config_param :request_timeout, :time, :default => 5 + config_param :reload_connections, :bool, :default => true + config_param :reload_on_failure, :bool, :default => false + config_param :resurrect_after, :time, :default => 60 + config_param :time_key, :string, :default => nil + config_param :time_key_exclude_timestamp, :bool, :default => false + config_param :ssl_verify , :bool, :default => true + config_param :client_key, :string, :default => nil + config_param :client_cert, :string, :default => nil + config_param :client_key_pass, :string, :default => nil + config_param :ca_file, :string, :default => nil + config_param :remove_keys, :string, :default => nil + config_param :remove_keys_on_update, :string, :default => "" + config_param :remove_keys_on_update_key, :string, :default => nil + config_param :flatten_hashes, :bool, :default => false + config_param :flatten_hashes_separator, :string, :default => "_" + config_param :template_name, :string, :default => nil + config_param :template_file, :string, :default => nil + + include Fluent::SetTagKeyMixin + include Fluent::ElasticsearchIndexTemplate + config_set_default :include_tag_key, false + + def initialize + super + @time_parser = TimeParser.new(@time_key_format, @router) + end + + def configure(conf) + super + @time_parser = TimeParser.new(@time_key_format, @router) + + if @remove_keys + @remove_keys = @remove_keys.split(/\s*,\s*/) + end + + if @target_index_key && @target_index_key.is_a?(String) + @target_index_key = @target_index_key.split '.' + end + + if @target_type_key && @target_type_key.is_a?(String) + @target_type_key = @target_type_key.split '.' + end + + if @remove_keys_on_update && @remove_keys_on_update.is_a?(String) + @remove_keys_on_update = @remove_keys_on_update.split ',' + end + + if @template_name && @template_file + template_install(@template_name, @template_file) + end + end + + def start + super + end + + # once fluent v0.14 is released we might be able to use + # Fluent::Parser::TimeParser, but it doesn't quite do what we want - if gives + # [sec,nsec] where as we want something we can call `strftime` on... + class TimeParser + def initialize(time_key_format, router) + @time_key_format = time_key_format + @router = router + @parser = if time_key_format + begin + # Strptime doesn't support all formats, but for those it does it's + # blazingly fast. + strptime = Strptime.new(time_key_format) + Proc.new { |value| strptime.exec(value).to_datetime } + rescue + # Can happen if Strptime doesn't recognize the format; or + # if strptime couldn't be required (because it's not installed -- it's + # ruby 2 only) + Proc.new { |value| DateTime.strptime(value, time_key_format) } + end + else + Proc.new { |value| DateTime.parse(value) } + end + end + + def parse(value, event_time) + @parser.call(value) + rescue => e + @router.emit_error_event("Fluent::ElasticsearchOutput::TimeParser.error", Fluent::Engine.now, {'time' => event_time, 'format' => @time_key_format, 'value' => value }, e) + return Time.at(event_time).to_datetime + end + end + + def client + @_es ||= begin + excon_options = { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass } + adapter_conf = lambda {|f| f.adapter :excon, excon_options } + transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(get_connection_options.merge( + options: { + reload_connections: @reload_connections, + reload_on_failure: @reload_on_failure, + resurrect_after: @resurrect_after, + retry_on_failure: 5, + transport_options: { + request: { timeout: @request_timeout }, + ssl: { verify: @ssl_verify, ca_file: @ca_file } + } + }), &adapter_conf) + es = Elasticsearch::Client.new transport: transport + + begin + raise ConnectionFailure, "Can not reach Elasticsearch cluster (#{connection_options_description})!" unless es.ping + rescue *es.transport.host_unreachable_exceptions => e + raise ConnectionFailure, "Can not reach Elasticsearch cluster (#{connection_options_description})! #{e.message}" + end + + log.info "Connection opened to Elasticsearch cluster => #{connection_options_description}" + es + end + end + + def get_connection_options + raise "`password` must be present if `user` is present" if @user && !@password + + hosts = if @hosts + @hosts.split(',').map do |host_str| + # Support legacy hosts format host:port,host:port,host:port... + if host_str.match(%r{^[^:]+(\:\d+)?$}) + { + host: host_str.split(':')[0], + port: (host_str.split(':')[1] || @port).to_i, + scheme: @scheme + } + else + # New hosts format expects URLs such as http://logs.foo.com,https://john:pass@logs2.foo.com/elastic + uri = URI(host_str) + %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key| + hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == '' + hash + end + end + end.compact + else + [{host: @host, port: @port, scheme: @scheme}] + end.each do |host| + host.merge!(user: @user, password: @password) if !host[:user] && @user + host.merge!(path: @path) if !host[:path] && @path + end + + { + hosts: hosts + } + end + + def connection_options_description + get_connection_options[:hosts].map do |host_info| + attributes = host_info.dup + attributes[:password] = 'obfuscated' if attributes.has_key?(:password) + attributes.inspect + end.join(', ') + end + + def format(tag, time, record) + [tag, time, record].to_msgpack + end + + def shutdown + super + end + + def append_record_to_messages(op, meta, record, msgs) + case op + when "update", "upsert" + if meta.has_key?("_id") + msgs << { "update" => meta } + msgs << update_body(record, op) + end + when "create" + if meta.has_key?("_id") + msgs << { "create" => meta } + msgs << record + end + when "index" + msgs << { "index" => meta } + msgs << record + end + end + + def update_body(record, op) + update = remove_keys(record) + body = { "doc" => update } + if op == "upsert" + if update == record + body["doc_as_upsert"] = true + else + body["upsert"] = record + end + end + body + end + + def remove_keys(record) + keys = record[@remove_keys_on_update_key] || @remove_keys_on_update || [] + record.delete(@remove_keys_on_update_key) + return record unless keys.any? + record = record.dup + keys.each { |key| record.delete(key) } + record + end + + def flatten_record(record, prefix=[]) + ret = {} + if record.is_a? Hash + record.each { |key, value| + ret.merge! flatten_record(value, prefix + [key.to_s]) + } + elsif record.is_a? Array + # Don't mess with arrays, leave them unprocessed + ret.merge!({prefix.join(@flatten_hashes_separator) => record}) + else + return {prefix.join(@flatten_hashes_separator) => record} + end + ret + end + + def write(chunk) + bulk_message = [] + + chunk.msgpack_each do |tag, time, record| + if @flatten_hashes + record = flatten_record(record) + end + next unless record.is_a? Hash + + dt = get_time(record, time) + target_index = get_target_index(record, dt) + + + target_type_parent, target_type_child_key = get_parent_of(record, @target_type_key) + if target_type_parent && target_type_parent[target_type_child_key] + target_type = target_type_parent.delete(target_type_child_key) + else + target_type = @type_name + end + + meta = {"_index" => target_index, "_type" => target_type} + + @meta_config_map ||= { 'id_key' => '_id', 'parent_key' => '_parent', 'routing_key' => '_routing' } + @meta_config_map.each_pair do |config_name, meta_key| + record_key = self.instance_variable_get("@#{config_name}") + meta[meta_key] = record[record_key] if record_key && record[record_key] + end + + if @remove_keys + @remove_keys.each { |key| record.delete(key) } + end + log = record["log"] + record["log"] = log.gsub(/\e\[\d*m{1}/, "").lstrip if log + append_record_to_messages(@write_operation, meta, record, bulk_message) + end + + send(bulk_message) unless bulk_message.empty? + bulk_message.clear + end + + def get_time(record, time) + if record.has_key?("@timestamp") + dt = record["@timestamp"] + dt = @time_parser.parse(record["@timestamp"], time) + elsif record.has_key?(@time_key) + dt = @time_parser.parse(record[@time_key], time) + record['@timestamp'] = record[@time_key] unless time_key_exclude_timestamp + else + if time.class == Time + dt = Time.at(time).to_datetime + else + dt = Time.now.to_datetime + end + record.merge!({"@timestamp" => dt.to_s}) + end + return dt + end + + # Get the target_index + # If we have passed in a target_index_key we should use that value first. + # --- We should also check to see if the user wants this in logstash format meaning we need to append the date to the end + # Next if the user only wants this in logstash format and hasn't provided a target_index_key, then use the logstash prefix + # Last just use the index_name that was passed in to the plugin at configuration. + def get_target_index(record, dt) + target_index_parent, target_index_child_key = get_parent_of(record, @target_index_key) + if target_index_parent && target_index_parent[target_index_child_key] + target_index = target_index_parent.delete(target_index_child_key) + if @logstash_format + target_index = "#{target_index}-#{dt.strftime(@logstash_dateformat)}" + end + elsif @logstash_format && !@target_index_key + dt = dt.new_offset(0) if @utc_index + target_index = "#{@logstash_prefix}-#{dt.strftime(@logstash_dateformat)}" + else + target_index = @index_name + end + + # Change target_index to lower-case since Elasticsearch doesn't + # allow upper-case characters in index names. + target_index = target_index.downcase + if @include_tag_key + record.merge!(@tag_key => tag) + end + return target_index + end + + # returns [parent, child_key] of child described by path array in record's tree + # returns [nil, child_key] if path doesnt exist in record + def get_parent_of(record, path) + return [nil, nil] unless path + + parent_object = path[0..-2].reduce(record) { |a, e| a.is_a?(Hash) ? a[e] : nil } + [parent_object, path[-1]] + end + + def send(data) + retries = 0 + begin + client.bulk body: data + rescue *client.transport.host_unreachable_exceptions => e + if retries < 2 + retries += 1 + @_es = nil + log.warn "Could not push logs to Elasticsearch, resetting connection and trying again. #{e.message}" + sleep 2**retries + retry + end + raise ConnectionFailure, "Could not push logs to Elasticsearch after #{retries} retries. #{e.message}" + end + end +end