forked from jhrv/sensu-influxdb-extension
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinfluxdb-extension.rb
executable file
·139 lines (110 loc) · 4.83 KB
/
influxdb-extension.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/usr/bin/env ruby
require 'net/http'
require 'json'
module Sensu::Extension
class InfluxDB < Handler
@@extension_name = 'influxdb-extension'
def name
@@extension_name
end
def description
'Transforms and sends metrics to InfluxDB'
end
def post_init
influxdb_config = settings[@@extension_name]
validate_config(influxdb_config)
hostname = influxdb_config['hostname']
port = influxdb_config['port'] || 8086
database = influxdb_config['database']
ssl = influxdb_config['ssl'] || false
precision = influxdb_config['precision'] || 's'
retention_policy = influxdb_config['retention_policy']
rp_queryparam = if retention_policy.nil? then "" else "&rp=#{retention_policy}" end
protocol = if ssl then 'https' else 'http' end
username = influxdb_config['username']
password = influxdb_config['password']
auth_queryparam = if username.nil? or password.nil? then "" else "&u=#{username}&p=#{password}" end
@BUFFER_SIZE = influxdb_config['buffer_size'] || 100
@BUFFER_MAX_AGE = influxdb_config['buffer_max_age'] || 10
@uri = URI("#{protocol}://#{hostname}:#{port}/write?db=#{database}&precision=#{precision}#{rp_queryparam}#{auth_queryparam}")
@http = Net::HTTP::new(@uri.host, @uri.port)
@buffer = []
@buffer_flushed = Time.now.to_i
@logger.info("#{@@extension_name}: successfully initialized config: hostname: #{hostname}, port: #{port}, database: #{database}, uri: #{@uri.to_s}, username: #{username}, buffer_size: #{@BUFFER_SIZE}, buffer_max_age: #{@BUFFER_MAX_AGE}")
end
def run(event)
begin
if buffer_too_old? or buffer_too_big?
flush_buffer
end
event = JSON.parse(event)
client_tags = event['client']['tags'] || Hash.new
check_tags = event['check']['tags'] || Hash.new
tags = create_tags(client_tags.merge(check_tags))
output = event['check']['output']
output.split(/\r\n|\n/).each do |line|
measurement, field_value, timestamp = line.split(/\s+/)
if not is_number?(timestamp)
@logger.error("invalid timestamp, skipping line in event #{event}")
next
end
point = "#{measurement}#{tags} value=#{field_value} #{timestamp}"
@buffer.push(point)
@logger.debug("#{@@extension_name}: stored point in buffer (#{@buffer.length}/#{@BUFFER_SIZE})")
end
rescue => e
@logger.error("#{@@extension_name}: unable to post payload to influxdb for event #{event} - #{e.backtrace.to_s}")
end
yield("#{@@extension_name}: handler finished", 0)
end
def create_tags(tags)
begin
# sorting tags alphabetically in order to increase influxdb performance
sorted_tags = Hash[tags.sort]
tag_string = ""
sorted_tags.each do |tag, value|
next if value.to_s.empty? # skips tags without values
tag_string += ",#{tag}=#{value}"
end
@logger.debug("#{@@extension_name}: created tags: #{tag_string}")
tag_string
rescue => e
@logger.error("#{@@extension_name}: unable to create tag string from #{tags} - #{e.backtrace.to_s}")
""
end
end
def send_to_influxdb(payload)
request = Net::HTTP::Post.new(@uri.request_uri)
request.body = payload
@logger.debug("#{@@extension_name}: writing payload #{payload} to endpoint #{@uri.to_s}")
response = @http.request(request)
@logger.debug("#{@@extension_name}: influxdb http response code = #{response.code}, body = #{response.body}")
end
def flush_buffer
payload = @buffer.join("\n")
send_to_influxdb(payload)
@buffer = []
@buffer_flushed = Time.now.to_i
end
def buffer_too_old?
buffer_age = Time.now.to_i - @buffer_flushed
buffer_age >= @BUFFER_MAX_AGE
end
def buffer_too_big?
@buffer.length >= @BUFFER_SIZE
end
def validate_config(config)
if config.nil?
raise ArgumentError, "no configuration for #{@@extension_name} provided. exiting..."
end
["hostname", "database"].each do |required_setting|
if config[required_setting].nil?
raise ArgumentError, "required setting #{required_setting} not provided to extension. this should be provided as json element with key '#{@@extension_name}'. exiting..."
end
end
end
def is_number?(input)
true if Integer(input) rescue false
end
end
end