Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhanced age filter to perform limit calcs internally using dynamically fetched limit setting #4

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions .github/workflows/gem-push.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
name: Ruby Gem

on:
push:
branches: [ master ]
pull_request:
branches: [ master ]

jobs:
build:
name: Build + Publish
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up Ruby 2.6
uses: actions/setup-ruby@v1
with:
ruby-version: 2.6.x

- name: Publish to GPR
run: |
mkdir -p $HOME/.gem
touch $HOME/.gem/credentials
chmod 0600 $HOME/.gem/credentials
printf -- "---\n:github: ${GEM_HOST_API_KEY}\n" > $HOME/.gem/credentials
gem build *.gemspec
gem push --KEY github --host https://rubygems.pkg.github.com/${OWNER} *.gem
env:
GEM_HOST_API_KEY: "Bearer ${{secrets.GITHUB_TOKEN}}"
OWNER: ${{ github.repository_owner }}

- name: Publish to RubyGems
run: |
mkdir -p $HOME/.gem
touch $HOME/.gem/credentials
chmod 0600 $HOME/.gem/credentials
printf -- "---\n:rubygems_api_key: ${GEM_HOST_API_KEY}\n" > $HOME/.gem/credentials
gem build *.gemspec
gem push *.gem
env:
GEM_HOST_API_KEY: "${{secrets.RUBYGEMS_AUTH_TOKEN}}"
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 1.1.0
- Add support for a limit service and perform the age criteria test internally

## 1.0.2
- Fix some documentation issues

Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
# logstash-filter-age
Filter to calculate age of an event based on when it was received by Logstash
Filter to calculate age of an event based on when it was received by Logstash.
It can optionally determine the limit by which an event is considered expired
and perform the calculation. This helps keep magic numbers out your logstash filter.
26 changes: 25 additions & 1 deletion docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ This filter calculates the age of an event by subtracting the event timestamp
from the current timestamp. This allows you to drop Logstash events that are
older than some threshold.

It also has the ability to perform a calculation if the event is older than
a limit value. If so, set a boolean field with the result. The limit value
can also be determined dynamically by age calling a limit service periodically.
Using elasticsearch cluster setttings works great for this.
This enables important limit critieria to be changed on the fly and picked
up and used in event filters; avoiding hard coded magic numbers and long
redeployments of new logstash config changes.

The historic approach is as follows.
If the hard coded magic number bothers you too, then try the other configuration.
[source,ruby]
filter {
age {}
Expand All @@ -35,6 +45,20 @@ filter {
}
}

[source,ruby]
filter {
age{
"url" => "https://foo.com/_cluster/settings?filter_path=persistent.cluster.metadata.logstash.filter.age.limit_secs"
"limit_path" => "persistent.cluster.metadata.logstash.filter.age.limit_secs"
"interval" => "5m"
"expired_target" => "[@metadata][expired]"
"age_limit_target" => "[@metadata][age_limit]"
}

if [@metadata][expired] {
drop {}
}
}

[id="plugins-{type}s-{plugin}-options"]
==== Age Filter Configuration Options
Expand Down Expand Up @@ -63,4 +87,4 @@ Define the target field for the event age, in seconds.


[id="plugins-{type}s-{plugin}-common-options"]
include::{include_path}/{type}.asciidoc[]
include::{include_path}/{type}.asciidoc[]
171 changes: 154 additions & 17 deletions lib/logstash/filters/age.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,174 @@

require 'logstash/filters/base'
require 'logstash/namespace'
require 'logstash/plugin_mixins/http_client'
require 'logstash/json'
require 'jruby/synchronized'
require 'rufus-scheduler'

# A simple filter for calculating the age of an event.
#
# This filter calculates the age of an event by subtracting the event timestamp
# from the current timestamp. This allows you to drop Logstash events that are
# older than some threshold.
#
# [source,ruby]
# filter {
# age {}
#
# if [@metadata][age] > 86400 {
# drop {}
# }
# }
#
class LogStash::Filters::Age < LogStash::Filters::Base
include LogStash::PluginMixins::HttpClient
include JRuby::Synchronized

config_name 'age'

# Define the target field for the event age, in seconds.
config :target, :default => '[@metadata][age]', :validate => :string

# There are two optional features that can be enabled:
#
# 1. Have age perform the calculation:
# if current time - event time > max_age_secs set the boolean value in
# the expired_target field.
# This feature is enabled by setting:
# a) a non zero value in the max_age_secs setting.
#
# 2. Have age determine age_limit to be used in the calculation:
# if current time - event time > age_limit max_age_secs
# set the boolean value in the expired_target field.
# This feature is enabled by setting:
# a) max_age_secs must be non zero
# b) url must be defined (the url to a limit service returning a json to a
# leaf level age limit value)
# c) user and password to the url service
# TODO: support other auth approaces
# d) limit_path defined as the lmit service json response body path to
# the age_limit value
# e) interval is the frequency between url requests to get the latest
# age limit.
# f) age_limit_target is the field to store the discovered age limit
#
# The max_age_secs is the default number of seconds beyond which the
# expired_target will be set to true (when the limit service url not found or
# there is no result)
config :max_age_secs, :default => 259200, :validate => :number

# Define the elasticsearch url to the limit service
config :url, :default => '', :validate => :string

# The response to the limit url will be a json with a nested structure the ends
# in a numeric age limit . The limit_path is a dot delimited representation
# of the nested json returned in the limit service response body.
config :limit_path,
:default => 'persistent.cluster.metadata.logstash.filter.age.limit_secs',
:validate => :string

# The expired_target field is true when the event is older than age_limit
config :expired_target,
:default => '[@metadata][expired]',
:validate => :string

# The age_limit_target is the name of the field whose value is the number of
# seconds actually used in the calculated result stored in expired_target
# When url is defined and the limit is found, then this is the discovered value
config :age_limit_target,
:default => '[@metadata][age_limit]',
:validate => :string

# The interval between calls to the limit service given by the url
config :interval, :default => "60s", :validate => :string

# user and password (and other options) come from the http client mixin
# Note that the password needs to be dereferenced using @password.value

public
def register
# Nothing to do here
if url != ''
@logger.debug('age filter is configured to use a limit service')
@split_limit_path = limit_path.split(".")
@scheduler = Rufus::Scheduler.new

request_limit()

@scheduler.every @interval do
request_limit()
end
else
@logger.debug('age filter is not configured to use a limit service')
@age_limit = @max_age_secs.to_f
end
end

public
def filter(event)
event.set(@target, Time.now.to_f - event.timestamp.to_f)

delta = Time.now.to_f - event.timestamp.to_f
event.set(@target, delta)

if delta > @age_limit
event.set(@expired_target, true)
else
event.set(@expired_target, false)
end

event.set(@age_limit_target, @age_limit)

# filter_matched should go in the last line of our successful code
filter_matched(event)
end

private
def request_limit
begin

options = {auth: {user: @user, password: @password.value},
request_timeout: @request_timeout, socket_timeout: @socket_timeout,
connect_timeout: @connect_timeout, automatic_retries: @automatic_retries}

code, response_headers, response_body = request_http(@url, options)

rescue => e
client_error = e
end

if client_error
@logger.error('error during HTTP request',
:url => @url,
:client_error => client_error.message)

elsif !code.between?(200, 299)
@logger.error('error during HTTP request',
:url => @url,
:code => code,
:response => response_body)
else
process_response(response_body)
end
end

def request_http(url, options = {})
@logger.info('age making request_http with arguments', :url => url)
response = client.http("get", url, options)
[response.code, response.headers, response.body]
end

def process_response(body)
begin
parsed = LogStash::Json.load(body).to_hash

@split_limit_path.each do |field|
break if !parsed
parsed = parsed.dig(field)
end

if parsed
@age_limit = parsed.to_f
if @age_limit <= 0
@age_limit = @max_age_secs.to_f
@logger.info('age response parsed non numeric',
:age_limit => @age_limit, :parsed => parsed)
else
@logger.info('age response parsed numeric',
:age_limit => @age_limit, :parsed => parsed)
end
else
@age_limit = @max_age_secs.to_f
@logger.info('age response parsed false (using max_age_secs)',
:age_limit => @age_limit, :parsed => parsed, :max_age_secs => @max_age_secs)
end

rescue => e
@logger.warn('JSON parsing error', :message => e.message, :body => body)
end
end
end
4 changes: 2 additions & 2 deletions logstash-filter-age.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|
s.name = 'logstash-filter-age'
s.version = '1.0.2'
s.licenses = ['Apache License (2.0)']
s.version = '1.1.0'
s.licenses = ['Apache-2.0']
s.summary = 'A Logstash filter for calculating the age of an event.'
s.description = 'This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program'
s.homepage = 'https://github.com/joshuaspence/logstash-filter-json'
Expand Down
4 changes: 4 additions & 0 deletions testing/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM docker.elastic.co/logstash/logstash:7.9.2
RUN rm -f /usr/share/logstash/pipeline/logstash.conf
ADD pipeline/ /usr/share/logstash/pipeline/
ADD config/ /usr/share/logstash/config/
28 changes: 28 additions & 0 deletions testing/config/logstash.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#logstash.conf
input {
file {
path => "/app/input.log"
}
}

filter {
age{
"url" => "https://foo.com/_cluster/settings?filter_path=persistent.cluster.metadata.logstash.filter.age.limit_secs"
"limit_path" => "persistent.cluster.metadata.logstash.filter.age.limit_secs"
"user" => "trump"
"password" => "biden"
"request_timeout" => 2
"socket_timeout" => 1
"connect_timeout" => 1
"automatic_retries" => 1
}
}

output {
file {
path => "/app/output.log"
codec => rubydebug {
metadata => true
}
}
}
3 changes: 3 additions & 0 deletions testing/config/logstash.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#http.host: "0.0.0.0"
xpack.monitoring.enabled: false
#xpack.monitoring.elasticsearch.hosts: [ "http://elasticsearch:9200" ]