Skip to content

Commit

Permalink
Merge pull request #1766 from wfernandes/json_plugin_manages_processes
Browse files Browse the repository at this point in the history
Json plugin manages processes
tjvman authored Oct 19, 2017
2 parents 446fcb7 + 43346b0 commit 67591f0
Showing 2 changed files with 171 additions and 37 deletions.
106 changes: 95 additions & 11 deletions src/bosh-monitor/lib/bosh/monitor/plugins/json.rb
Original file line number Diff line number Diff line change
@@ -1,29 +1,113 @@
require 'thread'

module Bosh::Monitor::Plugins

class Json < Base
attr_reader :processes

def initialize(options = {})
super(options)
@process_manager = options.fetch('process_manager', Bosh::Monitor::Plugins::ProcessManager.new(glob: '/var/vcap/jobs/*/bin/bosh-monitor/*', logger: logger))
end

def run
@process_manager.start
end

def process(event)
@process_manager.send_event event
end
end

class ProcessManager

def initialize(options)
@bin_glob = options.fetch(:glob)
@logger = options.fetch(:logger)
@check_interval = options.fetch(:check_interval, 60)
@restart_wait = options.fetch(:restart_wait, 1)

@lock = Mutex.new
@processes = {}
end

def start
unless EM.reactor_running?
logger.error("JSON delivery agent can only be started when event loop is running")
@logger.error("JSON Plugin can only be started when event loop is running")
return false
end

@processes = Dir[bin_glob].map do |bin|
EventMachine::DeferrableChildProcess.open(bin)
end
start_processes

EventMachine.add_periodic_timer(@check_interval) { start_processes }
end

def process(event)
event_json = event.to_json
@processes.each do |process|
process.send_data "#{event_json}\n"
def send_event(event)
@lock.synchronize do
@processes.each do |_, process|
process.send_data "#{event.to_json}\n"
end

@logger.debug("JSON Plugin: Sent to #{@processes.size} managed processes")
end
end

private

def bin_glob
options.fetch('bin_glob', '/var/vcap/jobs/*/bin/bosh-monitor/*')
def start_processes
@lock.synchronize do
new_binaries = Dir[@bin_glob] - @processes.keys
new_binaries.each do |bin|
@processes[bin] = start_process(bin)
@logger.info("JSON Plugin: Started process #{bin}")
end
end
end

def restart_process(bin)
@lock.synchronize do
@processes[bin] = start_process(bin)
@logger.info("JSON Plugin: Restarted process #{bin}")
end
end

def start_process(bin)
process = Bosh::Monitor::Plugins::DeferrableChildProcess.open(bin)
process.errback do
EventMachine.add_timer(@restart_wait) { restart_process bin }
end

process
end
end

# EM's DeferrableChildProcess does not give an opportunity
# to get the exit status. So we are implementing our own unbind logic to handle the exit status.
# This way we can execute our process restart on the err callback (errback).
# https://stackoverflow.com/a/12092647
class DeferrableChildProcess < EventMachine::Connection
include EventMachine::Deferrable

def initialize
super
@data = []
end

def self.open cmd
EventMachine.popen(cmd, DeferrableChildProcess)
end

def receive_data data
@data << data
end

def unbind
status = get_status
if status.exitstatus != 0
fail(status)
else
succeed(@data.join, status)
end
end
end

end
102 changes: 76 additions & 26 deletions src/bosh-monitor/spec/unit/bosh/monitor/plugins/json_spec.rb
Original file line number Diff line number Diff line change
@@ -1,53 +1,103 @@
require 'spec_helper'

describe Bhm::Plugins::Json do
subject(:plugin) { Bhm::Plugins::Json.new(options) }
let(:process_manager) { instance_double(Bosh::Monitor::Plugins::ProcessManager) }

let(:options) do
{
"bin_glob" => `which cat`.chomp
}
subject(:plugin) { Bhm::Plugins::Json.new({'process_manager' => process_manager}) }

it "send events to the process manager" do
expect(process_manager).to receive(:start)
plugin.run

heartbeat = make_heartbeat(timestamp: Time.now.to_i)

expect(process_manager).to receive(:send_event).with(heartbeat)
plugin.process(heartbeat)
end
end

let(:process) { double(:process).as_null_object }
describe Bhm::Plugins::ProcessManager do
subject(:process_manager) do
Bhm::Plugins::ProcessManager.new({
glob: '/*/json-plugin/*',
logger: double('logger').as_null_object,
check_interval: 0.2,
restart_wait: 0.1
})
end

it "doesn't start if event loop isn't running" do
expect(plugin.run).to be(false)
expect(process_manager.start).to be(false)
end

it "sends alerts as JSON" do
alert = make_alert(timestamp: Time.now.to_i)
it "starts processes that match the glob" do
allow(Dir).to receive(:[]).with('/*/json-plugin/*').and_return(['/plugin'])

expect(EventMachine::DeferrableChildProcess).to receive(:open).with("/bin/cat").and_return(process)
process = double('some-process').as_null_object
expect(Bosh::Monitor::Plugins::DeferrableChildProcess).to receive(:open).once.with('/plugin').and_return(process)
allow(EventMachine).to receive(:defer).and_yield

EM.run do
plugin.run

expect(process).to receive(:send_data) do |payload|
json = JSON.parse(payload)
expect(json['kind']).to eq 'alert'
end
plugin.process(alert)
process_manager.start

EM.stop
end
end

it "sends heartbeat metrics as JSON" do
heartbeat = make_heartbeat(timestamp: Time.now.to_i)
it "restarts processes when they die" do
allow(Dir).to receive(:[]).with('/*/json-plugin/*').and_return(['/non-existent-plugin'])
expect(Bosh::Monitor::Plugins::DeferrableChildProcess).to receive(:open).at_least(2).times.with('/non-existent-plugin').and_call_original

expect(EventMachine::DeferrableChildProcess).to receive(:open).with("/bin/cat").and_return(process)
EM.run do
allow(EventMachine).to receive(:add_timer).with(0.1).twice.and_yield
expect(EventMachine).to receive(:add_timer) { EM.stop }
process_manager.start
end
end

it "detects and starts new processes" do
expect(Dir).to receive(:[]).with('/*/json-plugin/*').and_return([], ['/plugin'])
expect(Bosh::Monitor::Plugins::DeferrableChildProcess).to receive(:open).with('/plugin')

EM.run do
plugin.run
process_manager.start

EM.add_timer(5) do
# By this time the test is failing
puts("Timeout canceling the event machine")
EM.stop
end

expect(process).to receive(:send_data) do |payload|
json = JSON.parse(payload)
expect(json['kind']).to eq 'heartbeat'
EM.add_periodic_timer(0.5) do
if process_manager.instance_variable_get(:@processes).size == 1
EM.stop
end
end
plugin.process(heartbeat)
end
end


it "sends events to all managed processes as JSON" do
alert = make_alert(timestamp: Time.now.to_i)

expect(Dir).to receive(:[]).with('/*/json-plugin/*').and_return(['/process-a', '/process-b'])

process_a = double('process-a').as_null_object
allow(Bosh::Monitor::Plugins::DeferrableChildProcess).to receive(:open).with('/process-a').and_return(process_a)

process_b = double('process-b').as_null_object
allow(Bosh::Monitor::Plugins::DeferrableChildProcess).to receive(:open).with('/process-b').and_return(process_b)

EM.run do
process_manager.start

process_manager.send_event(alert)

expect(process_a).to have_received(:send_data).with("#{alert.to_json}\n")
expect(process_b).to have_received(:send_data).with("#{alert.to_json}\n")

EM.stop
end
end
end

end

0 comments on commit 67591f0

Please sign in to comment.