diff --git a/Gemfile.lock b/Gemfile.lock index 6799fffe..4b75b347 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,9 +1,10 @@ PATH remote: . specs: - synapse (0.14.7) + synapse (0.15.0) aws-sdk (~> 1.39) docker-api (~> 1.7) + hashdiff (~> 0.2.3) logging (~> 1.8) zk (~> 1.9.4) diff --git a/README.md b/README.md index efad0e1e..e8c88b39 100644 --- a/README.md +++ b/README.md @@ -34,11 +34,11 @@ are proven routing components like [HAProxy](http://haproxy.1wt.eu/) or [NGINX]( For every external service that your application talks to, we assign a synapse local port on localhost. Synapse creates a proxy from the local port to the service, and you reconfigure your application to talk to the proxy. -Under the hood, Synapse sports `service_watcher`s for service discovery and +Under the hood, Synapse supports `service_watcher`s for service discovery and `config_generators` for configuring local state (e.g. load balancer configs) based on that service discovery state. -Synapse supports service discovery with with pluggable `service_watcher`s which +Synapse supports service discovery with pluggable `service_watcher`s which take care of signaling to the `config_generators` so that they can react and reconfigure to point at available servers on the fly. @@ -183,7 +183,7 @@ relevant routing component. For example if you want to only configure HAProxy an not NGINX for a particular service, you would pass ``disabled`` to the `nginx` section of that service's watcher config. -* [`haproxy`](#haproxysvc): how will the haproxy section for this service be configured +* [`haproxy`](#haproxysvc): how will the haproxy section for this service be configured. If the corresponding `watcher` is defined to use `zookeeper` and the service publishes its `haproxy` configure on ZK, the `haproxy` hash can be filled/updated via data from the ZK node. * [`nginx`](https://github.com/jolynch/synapse-nginx#service-watcher-config): how will the nginx section for this service be configured. **NOTE** to use this you must have the synapse-nginx [plugin](#plugins) installed. The services hash may contain the following additional keys: @@ -221,7 +221,7 @@ Given a `label_filters`: `[{ "label": "cluster", "value": "dev", "condition": "e ##### Zookeeper ##### -This watcher retrieves a list of servers from zookeeper. +This watcher retrieves a list of servers and also service config data from zookeeper. It takes the following mandatory arguments: * `method`: zookeeper @@ -230,6 +230,8 @@ It takes the following mandatory arguments: The watcher assumes that each node under `path` represents a service server. +The watcher assumes that the data (if any) retrieved at znode `path` is a hash, where each key is named by a valid `config_generator` (e.g. `haproxy`) and the value is a hash that configs the generator. + The following arguments are optional: * `decode`: A hash containing configuration for how to decode the data found in zookeeper. diff --git a/lib/synapse/config_generator/haproxy.rb b/lib/synapse/config_generator/haproxy.rb index a7024a19..01a5d303 100644 --- a/lib/synapse/config_generator/haproxy.rb +++ b/lib/synapse/config_generator/haproxy.rb @@ -5,6 +5,7 @@ require 'socket' require 'digest/sha1' require 'set' +require 'hashdiff' class Synapse::ConfigGenerator class Haproxy < BaseGenerator @@ -801,6 +802,8 @@ class Haproxy < BaseGenerator # should be enough for anyone right (famous last words)? MAX_SERVER_ID = (2**16 - 1).freeze + attr_reader :server_id_map, :state_cache + def initialize(opts) super(opts) @@ -845,8 +848,11 @@ def initialize(opts) @backends_cache = {} @watcher_revisions = {} - @state_file_path = @opts['state_file_path'] - @state_file_ttl = @opts.fetch('state_file_ttl', DEFAULT_STATE_FILE_TTL).to_i + @state_cache = HaproxyState.new( + @opts['state_file_path'], + @opts.fetch('state_file_ttl', DEFAULT_STATE_FILE_TTL).to_i, + self + ) # For giving consistent orders, even if they are random @server_order_seed = @opts.fetch('server_order_seed', rand(2000)).to_i @@ -907,6 +913,10 @@ def update_config(watchers) end end + def update_state_file(watchers) + @state_cache.update_state_file(watchers) + end + # generates a new config based on the state of the watchers def generate_config(watchers) new_config = generate_base_config @@ -914,8 +924,15 @@ def generate_config(watchers) watchers.each do |watcher| watcher_config = watcher.config_for_generator[name] - @watcher_configs[watcher.name] ||= parse_watcher_config(watcher) - next if watcher_config['disabled'] + next if watcher_config.nil? || watcher_config.empty? || watcher_config['disabled'] + @watcher_configs[watcher.name] = parse_watcher_config(watcher) + + # if watcher_config is changed, trigger restart + config_diff = HashDiff.diff(@state_cache.config_for_generator(watcher.name), watcher_config) + if !config_diff.empty? + log.info "synapse: restart required because config_for_generator changed. before: #{@state_cache.config_for_generator(watcher.name)}, after: #{watcher_config}" + @restart_required = true + end regenerate = watcher.revision != @watcher_revisions[watcher.name] || @frontends_cache[watcher.name].nil? || @@ -1051,7 +1068,7 @@ def generate_backend_stanza(watcher, config) # The ordering here is important. First we add all the backends in the # disabled state... - seen.fetch(watcher.name, []).each do |backend_name, backend| + @state_cache.backends(watcher).each do |backend_name, backend| backends[backend_name] = backend.merge('enabled' => false) # We remember the haproxy_server_id from a previous reload here. # Note though that if live servers below define haproxy_server_id @@ -1308,74 +1325,113 @@ def construct_name(backend) ###################################### # methods for managing the state file ###################################### - def seen - # if we don't support the state file, return nothing - return {} if @state_file_path.nil? + class HaproxyState + include Synapse::Logging - # if we've never needed the backends, now is the time to load them - @seen = read_state_file if @seen.nil? + # TODO: enable version in the Haproxy Cache File + KEY_WATCHER_CONFIG_FOR_GENERATOR = "watcher_config_for_generator" + NON_BACKENDS_KEYS = [KEY_WATCHER_CONFIG_FOR_GENERATOR] - @seen - end + def initialize(state_file_path, state_file_ttl, haproxy) + @state_file_path = state_file_path + @state_file_ttl = state_file_ttl + @haproxy = haproxy + end - def update_state_file(watchers) - # if we don't support the state file, do nothing - return if @state_file_path.nil? - - log.info "synapse: writing state file" - timestamp = Time.now.to_i - - # Remove stale backends - seen.each do |watcher_name, backends| - backends.each do |backend_name, backend| - ts = backend.fetch('timestamp', 0) - delta = (timestamp - ts).abs - if delta > @state_file_ttl - log.info "synapse: expiring #{backend_name} with age #{delta}" - backends.delete(backend_name) - end + def backends(watcher_name) + if seen.key?(watcher_name) + seen[watcher_name].select { |section, data| !NON_BACKENDS_KEYS.include?(section) } + else + {} end end - # Remove any services which no longer have any backends - seen.reject!{|watcher_name, backends| backends.keys.length == 0} + def config_for_generator(watcher_name) + cache_config = {} + if seen.key?(watcher_name) && seen[watcher_name].key?(KEY_WATCHER_CONFIG_FOR_GENERATOR) + cache_config = seen[watcher_name][KEY_WATCHER_CONFIG_FOR_GENERATOR] + end - # Add backends from watchers - watchers.each do |watcher| - seen[watcher.name] ||= {} + cache_config + end - watcher.backends.each do |backend| - backend_name = construct_name(backend) - data = { - 'timestamp' => timestamp, - } - server_id = @server_id_map[watcher.name][backend_name].to_i - if server_id && server_id > 0 && server_id <= MAX_SERVER_ID - data['haproxy_server_id'] = server_id + def update_state_file(watchers) + # if we don't support the state file, do nothing + return if @state_file_path.nil? + + log.info "synapse: writing state file" + timestamp = Time.now.to_i + + # Remove stale backends + seen.each do |watcher_name, data| + backends(watcher_name).each do |backend_name, backend| + ts = backend.fetch('timestamp', 0) + delta = (timestamp - ts).abs + if delta > @state_file_ttl + log.info "synapse: expiring #{backend_name} with age #{delta}" + data.delete(backend_name) + end end + end - seen[watcher.name][backend_name] = data.merge(backend) + # Remove any services which no longer have any backends + seen.reject!{|watcher_name, data| backends(watcher_name).keys.length == 0} + + # Add backends and config from watchers + watchers.each do |watcher| + seen[watcher.name] ||= {} + + watcher.backends.each do |backend| + backend_name = @haproxy.construct_name(backend) + data = { + 'timestamp' => timestamp, + } + server_id = @haproxy.server_id_map[watcher.name][backend_name].to_i + if server_id && server_id > 0 && server_id <= MAX_SERVER_ID + data['haproxy_server_id'] = server_id + end + + seen[watcher.name][backend_name] = data.merge(backend) + end + + # Add config for generator from watcher + if watcher.config_for_generator.key?(@haproxy.name) + seen[watcher.name][KEY_WATCHER_CONFIG_FOR_GENERATOR] = + watcher.config_for_generator[@haproxy.name] + end end + + # write the data! + write_data_to_state_file(seen) end - # write the data! - write_data_to_state_file(seen) - end + private - def read_state_file - # Some versions of JSON return nil on an empty file ... - JSON.load(File.read(@state_file_path)) || {} - rescue StandardError => e - # It's ok if the state file doesn't exist or contains invalid data - # The state file will be rebuilt automatically - {} - end + def seen + # if we don't support the state file, return nothing + return {} if @state_file_path.nil? + + # if we've never needed the backends, now is the time to load them + @seen = read_state_file if @seen.nil? + + @seen + end - # we do this atomically so the state file is always consistent - def write_data_to_state_file(data) - tmp_state_file_path = @state_file_path + ".tmp" - File.write(tmp_state_file_path, JSON.pretty_generate(data)) - FileUtils.mv(tmp_state_file_path, @state_file_path) + def read_state_file + # Some versions of JSON return nil on an empty file ... + JSON.load(File.read(@state_file_path)) || {} + rescue StandardError => e + # It's ok if the state file doesn't exist or contains invalid data + # The state file will be rebuilt automatically + {} + end + + # we do this atomically so the state file is always consistent + def write_data_to_state_file(data) + tmp_state_file_path = @state_file_path + ".tmp" + File.write(tmp_state_file_path, JSON.pretty_generate(data)) + FileUtils.mv(tmp_state_file_path, @state_file_path) + end end end end diff --git a/lib/synapse/service_watcher/base.rb b/lib/synapse/service_watcher/base.rb index e1276ab9..4de6b1dd 100644 --- a/lib/synapse/service_watcher/base.rb +++ b/lib/synapse/service_watcher/base.rb @@ -1,5 +1,6 @@ require 'synapse/log' require 'set' +require 'hashdiff' class Synapse::ServiceWatcher class BaseWatcher @@ -7,7 +8,7 @@ class BaseWatcher LEADER_WARN_INTERVAL = 30 - attr_reader :name, :config_for_generator, :revision + attr_reader :name, :revision def initialize(opts={}, synapse) super() @@ -99,6 +100,11 @@ def ping? true end + # deep clone the hash to protect its readonly property + def config_for_generator + Marshal.load( Marshal.dump(@config_for_generator)) + end + def backends filtered = backends_filtered_by_labels @@ -152,7 +158,7 @@ def backends_filtered_by_labels end end - def set_backends(new_backends) + def set_backends(new_backends, new_config_for_generator = {}) # Aggregate and deduplicate all potential backend service instances. new_backends = (new_backends + @default_servers) if @keep_default_servers # Substitute backend_port_override for the provided port @@ -165,7 +171,20 @@ def set_backends(new_backends) [b['host'], b['port'], b.fetch('name', '')] } + backends_updated = update_backends(new_backends) + config_updated = update_config_for_generator(new_config_for_generator) + + if backends_updated || config_updated + reconfigure! + return true + else + return false + end + end + + def update_backends(new_backends) if new_backends.to_set == @backends.to_set + log.info "synapse: backends for service #{@name} do not change." return false end @@ -192,11 +211,28 @@ def set_backends(new_backends) @backends = new_backends end - reconfigure! - return true end + def update_config_for_generator(new_config_for_generator) + if new_config_for_generator.empty? + log.info "synapse: no config_for_generator data from #{name} for" \ + " service #{@name}; keep existing config_for_generator: #{@config_for_generator.inspect}" + return false + else + log.info "synapse: discovered config_for_generator for service #{@name}" + diff = HashDiff.diff(new_config_for_generator, config_for_generator) + + if diff.empty? + log.info "synapse: config_for_generator for service #{@name} does not change." + return false + else + @config_for_generator = new_config_for_generator + return true + end + end + end + # Subclasses should not invoke this directly; it's only exposed so that it # can be overridden in subclasses. def reconfigure! diff --git a/lib/synapse/service_watcher/zookeeper.rb b/lib/synapse/service_watcher/zookeeper.rb index f3b11501..aec8a827 100644 --- a/lib/synapse/service_watcher/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper.rb @@ -157,7 +157,15 @@ def discover end end - set_backends(new_backends) + node = @zk.get(@discovery['path'], :watch => true) + begin + new_config_for_generator = parse_service_config(node.first) + rescue StandardError => e + log.error "synapse: invalid config data in ZK node at #{@discovery['path']}: #{e}" + new_config_for_generator = {} + end + + set_backends(new_backends, new_config_for_generator) end # sets up zookeeper callbacks if the data at the discovery path changes @@ -260,6 +268,37 @@ def deserialize_service_instance(data) return host, port, name, weight, haproxy_server_options, labels end + + def parse_service_config(data) + log.debug "synapse: deserializing process data" + if data.nil? || data.empty? + decoded = {} + else + decoded = @decode_method.call(data) + end + + new_generator_config = {} + # validate the config. if the config is not empty: + # each key should be named by one of the available generators + # each value should be a hash (could be empty) + decoded.collect.each do |generator_name, generator_config| + if !@synapse.available_generators.keys.include?(generator_name) + log.error "synapse: invalid generator name in ZK node at #{@discovery['path']}:" \ + " #{generator_name}" + next + else + if generator_config.nil? || !generator_config.is_a?(Hash) + log.warn "synapse: invalid generator config in ZK node at #{@discovery['path']}" \ + " for generator #{generator_name}" + new_generator_config[generator_name] = {} + else + new_generator_config[generator_name] = generator_config + end + end + end + + return new_generator_config + end end end diff --git a/lib/synapse/version.rb b/lib/synapse/version.rb index c54f399f..94c6d5e9 100644 --- a/lib/synapse/version.rb +++ b/lib/synapse/version.rb @@ -1,3 +1,3 @@ module Synapse - VERSION = "0.14.7" + VERSION = "0.15.0" end diff --git a/spec/lib/synapse/haproxy_spec.rb b/spec/lib/synapse/haproxy_spec.rb index 788f6e1f..fa3633cf 100644 --- a/spec/lib/synapse/haproxy_spec.rb +++ b/spec/lib/synapse/haproxy_spec.rb @@ -33,6 +33,28 @@ class MockWatcher; end; mockWatcher end + let(:mockwatcher_with_non_haproxy_config) do + mockWatcher = double(Synapse::ServiceWatcher) + allow(mockWatcher).to receive(:name).and_return('example_service2') + backends = [{ 'host' => 'somehost', 'port' => 5555, 'haproxy_server_options' => 'id 12 backup'}] + allow(mockWatcher).to receive(:backends).and_return(backends) + allow(mockWatcher).to receive(:config_for_generator).and_return({ + 'unknown' => {'server_options' => "check inter 2000 rise 3 fall 2"} + }) + mockWatcher + end + + let(:mockwatcher_with_empty_haproxy_config) do + mockWatcher = double(Synapse::ServiceWatcher) + allow(mockWatcher).to receive(:name).and_return('example_service2') + backends = [{ 'host' => 'somehost', 'port' => 5555, 'haproxy_server_options' => 'id 12 backup'}] + allow(mockWatcher).to receive(:backends).and_return(backends) + allow(mockWatcher).to receive(:config_for_generator).and_return({ + 'haproxy' => {} + }) + mockWatcher + end + let(:mockwatcher_with_server_id) do mockWatcher = double(Synapse::ServiceWatcher) allow(mockWatcher).to receive(:name).and_return('server_id_svc') @@ -316,6 +338,46 @@ class MockWatcher; end; subject.update_config(watchers) end end + + context 'if watcher has empty or nil config_for_generator[haproxy]' do + let(:watchers) { [mockwatcher, mockwatcher_with_non_haproxy_config, mockwatcher_with_empty_haproxy_config] } + + it 'does not generate config for those watchers' do + allow(subject).to receive(:parse_watcher_config).and_return({}) + expect(subject).to receive(:generate_frontend_stanza).exactly(:once).with(mockwatcher, nil) + expect(subject).to receive(:generate_backend_stanza).exactly(:once).with(mockwatcher, nil) + subject.update_config(watchers) + end + end + + context 'if watcher has a new different config_for_generator[haproxy]' do + let(:watchers) { [mockwatcher] } + let(:socket_file_path) { ['socket_file_path1', 'socket_file_path2'] } + + before do + config['haproxy']['do_writes'] = true + config['haproxy']['do_reloads'] = true + config['haproxy']['do_socket'] = true + config['haproxy']['socket_file_path'] = socket_file_path + end + + it 'trigger restart' do + allow(subject).to receive(:parse_watcher_config).and_return({}) + allow(subject).to receive(:write_config).and_return(nil) + + # set config_for_generator in state_cache to {} + allow(subject.state_cache).to receive(:config_for_generator).and_return({}) + + # make sure @restart_required is not triggered in other places + allow(subject).to receive(:update_backends_at).and_return(nil) + allow(subject).to receive(:generate_frontend_stanza).exactly(:once).with(mockwatcher, nil).and_return([]) + allow(subject).to receive(:generate_backend_stanza).exactly(:once).with(mockwatcher, nil).and_return([]) + + expect(subject).to receive(:restart) + + subject.update_config(watchers) + end + end end describe '#tick' do @@ -329,31 +391,58 @@ class MockWatcher; end; describe '#update_state_file' do let(:watchers) { [mockwatcher, mockwatcher_with_server_options] } + let(:watchers_with_non_haproxy_config) { [mockwatcher_with_non_haproxy_config] } let(:state_file_ttl) { 60 } # seconds before do config['haproxy']['state_file_path'] = '/statefile' config['haproxy']['state_file_ttl'] = state_file_ttl - allow(subject).to receive(:write_data_to_state_file) + allow(subject.state_cache).to receive(:write_data_to_state_file) end it 'adds backends along with timestamps' do subject.update_state_file(watchers) - data = subject.send(:seen) watcher_names = watchers.map{ |w| w.name } - expect(data.keys).to contain_exactly(*watcher_names) + expect(subject.state_cache.send(:seen).keys).to contain_exactly(*watcher_names) watchers.each do |watcher| backend_names = watcher.backends.map{ |b| subject.construct_name(b) } - expect(data[watcher.name].keys).to contain_exactly(*backend_names) + data = subject.state_cache.backends(watcher.name) + expect(data.keys).to contain_exactly(*backend_names) backend_names.each do |backend_name| - expect(data[watcher.name][backend_name]).to include('timestamp') + expect(data[backend_name]).to include('timestamp') end end end + it 'adds config_for_generator from watcher' do + subject.update_state_file(watchers) + + watcher_names = watchers.map{ |w| w.name } + expect(subject.state_cache.send(:seen).keys).to contain_exactly(*watcher_names) + + watchers.each do |watcher| + watcher_config_for_generator = watcher.config_for_generator + data = subject.state_cache.config_for_generator(watcher.name) + expect(data).to eq(watcher_config_for_generator["haproxy"]) + end + end + + it 'does not add config_for_generator of other generators from watcher' do + subject.update_state_file(watchers_with_non_haproxy_config) + + watcher_names = watchers_with_non_haproxy_config.map{ |w| w.name } + expect(subject.state_cache.send(:seen).keys).to contain_exactly(*watcher_names) + + watchers_with_non_haproxy_config.each do |watcher| + watcher_config_for_generator = watcher.config_for_generator + data = subject.state_cache.config_for_generator(watcher.name) + expect(data).to eq({}) + end + end + context 'when the state file contains backends not in the watcher' do it 'keeps them in the config' do subject.update_state_file(watchers) @@ -363,7 +452,7 @@ class MockWatcher; end; allow(watcher).to receive(:backends).and_return([]) end subject.update_state_file(watchers) - end.to_not change { subject.send(:seen) } + end.to_not change { subject.state_cache.send(:seen) } end context 'if those backends are stale' do @@ -377,9 +466,9 @@ class MockWatcher; end; # the final +1 puts us over the expiry limit Timecop.travel(Time.now + state_file_ttl + 1) do subject.update_state_file(watchers) - data = subject.send(:seen) watchers.each do |watcher| - expect(data[watcher.name]).to be_empty + data = subject.state_cache.backends(watcher.name) + expect(data).to be_empty end end end diff --git a/spec/lib/synapse/service_watcher_base_spec.rb b/spec/lib/synapse/service_watcher_base_spec.rb index 6af6127e..ddc9e7e0 100644 --- a/spec/lib/synapse/service_watcher_base_spec.rb +++ b/spec/lib/synapse/service_watcher_base_spec.rb @@ -53,6 +53,22 @@ def remove_arg(name) {'name' => 'server1', 'host' => 'server1', 'port' => 123}, {'name' => 'server2', 'host' => 'server2', 'port' => 123} ] + config_for_generator = { + "haproxy" => { + "frontend" => [ + "binding ::1:1111" + ], + "listen" => [ + "mode http", + "option httpchk GET /health", + "timeout client 300s", + "timeout server 300s", + "option httplog" + ], + "port" => 1111, + "server_options" => "check inter 60s fastinter 2s downinter 5s rise 3 fall 2", + } + } let(:args) { testargs.merge({'default_servers' => default_servers}) } it 'sets backends' do @@ -61,6 +77,20 @@ def remove_arg(name) expect(subject.backends).to eq(backends) end + it 'sets backends with config for generator' do + expect(subject).to receive(:'reconfigure!').exactly(:once) + expect(subject.send(:set_backends, backends, config_for_generator)).to equal(true) + expect(subject.backends).to eq(backends) + expect(subject.config_for_generator).to eq(config_for_generator) + end + + it 'calls reconfigure for duplicate backends but different config_for_generator' do + allow(subject).to receive(:backends).and_return(backends) + expect(subject).to receive(:'reconfigure!').exactly(:once) + expect(subject.send(:set_backends, backends, config_for_generator)).to equal(true) + expect(subject.config_for_generator).to eq(config_for_generator) + end + it 'removes duplicate backends' do expect(subject).to receive(:'reconfigure!').exactly(:once) duplicate_backends = backends + backends @@ -74,6 +104,19 @@ def remove_arg(name) expect(subject.backends).to eq(default_servers) end + it 'keeps the current config_for_generator if no config discovered from ZK' do + expect(subject).to receive(:'reconfigure!').exactly(:once) + # set config_for_generator to some valid config + expect(subject.send(:set_backends, backends, config_for_generator)).to equal(true) + expect(subject.backends).to eq(backends) + expect(subject.config_for_generator).to eq(config_for_generator) + + # re-set config_for_generator to empty + expect(subject.send(:set_backends, backends, {})).to equal(false) + expect(subject.backends).to eq(backends) + expect(subject.config_for_generator).to eq(config_for_generator) + end + context 'with no default_servers' do let(:args) { remove_arg 'default_servers' } it 'uses previous backends if no default_servers set' do @@ -98,12 +141,14 @@ def remove_arg(name) end end - it 'calls reconfigure only once for duplicate backends' do + it 'calls reconfigure only once for duplicate backends and config_for_generator' do expect(subject).to receive(:'reconfigure!').exactly(:once) - expect(subject.send(:set_backends, backends)).to equal(true) + expect(subject.send(:set_backends, backends, config_for_generator)).to equal(true) expect(subject.backends).to eq(backends) - expect(subject.send(:set_backends, backends)).to equal(false) + expect(subject.config_for_generator).to eq(config_for_generator) + expect(subject.send(:set_backends, backends, config_for_generator)).to equal(false) expect(subject.backends).to eq(backends) + expect(subject.config_for_generator).to eq(config_for_generator) end context 'with keep_default_servers set' do diff --git a/spec/lib/synapse/service_watcher_zookeeper_spec.rb b/spec/lib/synapse/service_watcher_zookeeper_spec.rb index 5aa91fdb..f7d30e99 100644 --- a/spec/lib/synapse/service_watcher_zookeeper_spec.rb +++ b/spec/lib/synapse/service_watcher_zookeeper_spec.rb @@ -29,11 +29,52 @@ 'labels' => { 'az' => 'us-east-1a' } } end + let(:config_for_generator_haproxy) do + { + "frontend" => [ + "binding ::1:1111" + ], + "listen" => [ + "mode http", + "option httpchk GET /health", + "timeout client 300s", + "timeout server 300s", + "option httplog" + ], + "port" => 1111, + "server_options" => "check inter 60s fastinter 2s downinter 5s rise 3 fall 2", + } + end + let(:config_for_generator) do + { + "haproxy" => config_for_generator_haproxy, + "unknown_generator" => { + "key" => "value" + } + } + end + let(:config_for_generator_invalid) do + { + "haproxy" => "value", + } + end let(:service_data_string) { service_data.to_json } let(:deserialized_service_data) { [ service_data['host'], service_data['port'], service_data['name'], service_data['weight'], service_data['haproxy_server_options'], service_data['labels'] ] } + let(:config_for_generator_string) { [config_for_generator.to_json] } + let(:parsed_config_for_generator) do + { + "haproxy" => config_for_generator_haproxy + } + end + let(:config_for_generator_invalid_string) { config_for_generator_invalid.to_json } + let(:parsed_config_for_generator_invalid) do + { + "haproxy" => {} + } + end context 'ZookeeperWatcher' do let(:discovery) { { 'method' => 'zookeeper', 'hosts' => 'somehost', 'path' => 'some/path' } } @@ -49,15 +90,24 @@ expect(subject.send(:deserialize_service_instance, service_data_string)).to eql(deserialized_service_data) end + it 'decodes config data correctly' do + expect(subject.send(:parse_service_config, config_for_generator_string.first)).to eql(parsed_config_for_generator) + end + + it 'decodes invalid config data correctly' do + expect(subject.send(:parse_service_config, config_for_generator_invalid_string)).to eql(parsed_config_for_generator_invalid) + end + it 'reacts to zk push events' do expect(subject).to receive(:watch) expect(subject).to receive(:discover).and_call_original + expect(mock_zk).to receive(:get).with('some/path', {:watch=>true}).and_return(config_for_generator_string) expect(mock_zk).to receive(:children).with('some/path', {:watch=>true}).and_return( ["test_child_1"] ) expect(mock_zk).to receive(:get).with('some/path/test_child_1').and_return(mock_node) subject.instance_variable_set('@zk', mock_zk) - expect(subject).to receive(:set_backends).with([service_data.merge({'id' => 1})]) + expect(subject).to receive(:set_backends).with([service_data.merge({'id' => 1})], parsed_config_for_generator) subject.send(:watcher_callback).call end @@ -67,9 +117,11 @@ expect(mock_zk).to receive(:children).with('some/path', {:watch=>true}).and_return( ["test_child_1"] ) + expect(mock_zk).to receive(:get).with('some/path', {:watch=>true}).and_return("") expect(mock_zk).to receive(:get).with('some/path/test_child_1').and_raise(ZK::Exceptions::NoNode) + subject.instance_variable_set('@zk', mock_zk) - expect(subject).to receive(:set_backends).with([]) + expect(subject).to receive(:set_backends).with([],{}) subject.send(:watcher_callback).call end end diff --git a/synapse.gemspec b/synapse.gemspec index 84a430f0..19b7605a 100644 --- a/synapse.gemspec +++ b/synapse.gemspec @@ -25,6 +25,7 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency "docker-api", "~> 1.7" gem.add_runtime_dependency "zk", "~> 1.9.4" gem.add_runtime_dependency "logging", "~> 1.8" + gem.add_runtime_dependency "hashdiff", "~> 0.2.3" gem.add_development_dependency "rake" gem.add_development_dependency "rspec", "~> 3.1.0"