Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[synapse] enable generator config update from ZK #241

Merged
merged 9 commits into from
Sep 6, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
PATH
remote: .
specs:
synapse (0.14.7)
synapse (0.15.1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just go to 0.15.0

aws-sdk (~> 1.39)
docker-api (~> 1.7)
hashdiff (~> 0.2.3)
logging (~> 1.8)
zk (~> 1.9.4)

Expand Down
168 changes: 111 additions & 57 deletions lib/synapse/config_generator/haproxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require 'socket'
require 'digest/sha1'
require 'set'
require 'hashdiff'

class Synapse::ConfigGenerator
class Haproxy < BaseGenerator
Expand Down Expand Up @@ -801,6 +802,8 @@ class Haproxy < BaseGenerator
# should be enough for anyone right (famous last words)?
MAX_SERVER_ID = (2**16 - 1).freeze

attr_reader :server_id_map, :state_cache

def initialize(opts)
super(opts)

Expand Down Expand Up @@ -845,8 +848,11 @@ def initialize(opts)
@backends_cache = {}
@watcher_revisions = {}

@state_file_path = @opts['state_file_path']
@state_file_ttl = @opts.fetch('state_file_ttl', DEFAULT_STATE_FILE_TTL).to_i
@state_cache = HaproxyState.new(
@opts['state_file_path'],
@opts.fetch('state_file_ttl', DEFAULT_STATE_FILE_TTL).to_i,
self
)

# For giving consistent orders, even if they are random
@server_order_seed = @opts.fetch('server_order_seed', rand(2000)).to_i
Expand Down Expand Up @@ -907,15 +913,25 @@ def update_config(watchers)
end
end

def update_state_file(watchers)
@state_cache.update_state_file(watchers)
end

# generates a new config based on the state of the watchers
def generate_config(watchers)
new_config = generate_base_config
shared_frontend_lines = generate_shared_frontend

watchers.each do |watcher|
watcher_config = watcher.config_for_generator[name]
next if watcher_config.nil? || watcher_config.empty? || watcher_config['disabled']
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, so we're changing the previous behavior that empty watcher configs would be present without a listening port (e.g. a bare backend). Is this intentional?

I think if it's just HAProxy this is potentially reasonable (typically you have backend options for bare backends but ...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like that in the parse_watcher_config function (the next line), it tries to get fields from watcher_config['listen'], and will throw exception if there is no 'listen' field. So I thought the watcher_config should at least be non-empty.

What is a valid use case when the watcher_config is completely empty?

@watcher_configs[watcher.name] ||= parse_watcher_config(watcher)
next if watcher_config['disabled']

# if watcher_config is changed, trigger restart
config_diff = HashDiff.diff(@state_cache.config_for_generator(watcher.name), watcher_config)
if !config_diff.empty?
@restart_required = true
end

regenerate = watcher.revision != @watcher_revisions[watcher.name] ||
@frontends_cache[watcher.name].nil? ||
Expand Down Expand Up @@ -1051,7 +1067,7 @@ def generate_backend_stanza(watcher, config)

# The ordering here is important. First we add all the backends in the
# disabled state...
seen.fetch(watcher.name, []).each do |backend_name, backend|
@state_cache.backends(watcher).each do |backend_name, backend|
backends[backend_name] = backend.merge('enabled' => false)
# We remember the haproxy_server_id from a previous reload here.
# Note though that if live servers below define haproxy_server_id
Expand Down Expand Up @@ -1308,74 +1324,112 @@ def construct_name(backend)
######################################
# methods for managing the state file
######################################
def seen
# if we don't support the state file, return nothing
return {} if @state_file_path.nil?
class HaproxyState
include Synapse::Logging

# if we've never needed the backends, now is the time to load them
@seen = read_state_file if @seen.nil?
KEY_WATCHER_CONFIG_FOR_GENERATOR = "watcher_config_for_generator"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we achieve the same end perhaps slightly cleaner by versioning the state file and just supporting version 1 or the new version 2? E.g. in the new state files we write out there is a synapse_state_version key? If the key's there and it's == version 2 we can parse out well defined fields, otherwise we treat it as a list of backends.

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to do versioned file once we start introducing breaking schemas into the cache. For this particular change, it is intended to be a non-breaking one. The field is added under each watcher and is optional for each watcher to have. So it seems to me that having a file-level version does not help much here, since we still need to logic such as checking "whether the watcher has the config_for_generator" field. Does it make sense to you? Or am I missing some of your points?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main part that's a bit hacky to me is that there is a special key (watcher_config_for_generator) that we effectively treat as a version check. This is fine, I just feel like having a well defined schema for what's in these hashes in each version would make reasoning about the state file easier (as opposed to this change which makes it so that we implicitly support two versions of the state file). This is not a strong objection, just raising the point (feel free add a TODO or just not do it heh, if I care enough I can always go back and change it).

I am all for backwards compatibility, and you could achieve something similar with a synapse_state_file_version key, which if found would indicate this newer formatting.

NON_BACKENDS_KEYS = [KEY_WATCHER_CONFIG_FOR_GENERATOR]

@seen
end
def initialize(state_file_path, state_file_ttl, haproxy)
@state_file_path = state_file_path
@state_file_ttl = state_file_ttl
@haproxy = haproxy
end

def update_state_file(watchers)
# if we don't support the state file, do nothing
return if @state_file_path.nil?

log.info "synapse: writing state file"
timestamp = Time.now.to_i

# Remove stale backends
seen.each do |watcher_name, backends|
backends.each do |backend_name, backend|
ts = backend.fetch('timestamp', 0)
delta = (timestamp - ts).abs
if delta > @state_file_ttl
log.info "synapse: expiring #{backend_name} with age #{delta}"
backends.delete(backend_name)
end
def backends(watcher_name)
if seen.key?(watcher_name)
seen[watcher_name].select { |section, data| !NON_BACKENDS_KEYS.include?(section) }
else
{}
end
end

# Remove any services which no longer have any backends
seen.reject!{|watcher_name, backends| backends.keys.length == 0}
def config_for_generator(watcher_name)
cache_config = {}
if seen.key?(watcher_name) && seen[watcher_name].key?(KEY_WATCHER_CONFIG_FOR_GENERATOR)
cache_config = seen[watcher_name][KEY_WATCHER_CONFIG_FOR_GENERATOR]
end

# Add backends from watchers
watchers.each do |watcher|
seen[watcher.name] ||= {}
cache_config
end

watcher.backends.each do |backend|
backend_name = construct_name(backend)
data = {
'timestamp' => timestamp,
}
server_id = @server_id_map[watcher.name][backend_name].to_i
if server_id && server_id > 0 && server_id <= MAX_SERVER_ID
data['haproxy_server_id'] = server_id
def update_state_file(watchers)
# if we don't support the state file, do nothing
return if @state_file_path.nil?

log.info "synapse: writing state file"
timestamp = Time.now.to_i

# Remove stale backends
seen.each do |watcher_name, data|
backends(watcher_name).each do |backend_name, backend|
ts = backend.fetch('timestamp', 0)
delta = (timestamp - ts).abs
if delta > @state_file_ttl
log.info "synapse: expiring #{backend_name} with age #{delta}"
data.delete(backend_name)
end
end
end

seen[watcher.name][backend_name] = data.merge(backend)
# Remove any services which no longer have any backends
seen.reject!{|watcher_name, data| backends(watcher_name).keys.length == 0}

# Add backends and config from watchers
watchers.each do |watcher|
seen[watcher.name] ||= {}

watcher.backends.each do |backend|
backend_name = @haproxy.construct_name(backend)
data = {
'timestamp' => timestamp,
}
server_id = @haproxy.server_id_map[watcher.name][backend_name].to_i
if server_id && server_id > 0 && server_id <= MAX_SERVER_ID
data['haproxy_server_id'] = server_id
end

seen[watcher.name][backend_name] = data.merge(backend)
end

# Add config for generator from watcher
if watcher.config_for_generator.key?(@haproxy.name)
seen[watcher.name][KEY_WATCHER_CONFIG_FOR_GENERATOR] =
watcher.config_for_generator[@haproxy.name]
end
end

# write the data!
write_data_to_state_file(seen)
end

# write the data!
write_data_to_state_file(seen)
end
private

def read_state_file
# Some versions of JSON return nil on an empty file ...
JSON.load(File.read(@state_file_path)) || {}
rescue StandardError => e
# It's ok if the state file doesn't exist or contains invalid data
# The state file will be rebuilt automatically
{}
end
def seen
# if we don't support the state file, return nothing
return {} if @state_file_path.nil?

# if we've never needed the backends, now is the time to load them
@seen = read_state_file if @seen.nil?

@seen
end

# we do this atomically so the state file is always consistent
def write_data_to_state_file(data)
tmp_state_file_path = @state_file_path + ".tmp"
File.write(tmp_state_file_path, JSON.pretty_generate(data))
FileUtils.mv(tmp_state_file_path, @state_file_path)
def read_state_file
# Some versions of JSON return nil on an empty file ...
JSON.load(File.read(@state_file_path)) || {}
rescue StandardError => e
# It's ok if the state file doesn't exist or contains invalid data
# The state file will be rebuilt automatically
{}
end

# we do this atomically so the state file is always consistent
def write_data_to_state_file(data)
tmp_state_file_path = @state_file_path + ".tmp"
File.write(tmp_state_file_path, JSON.pretty_generate(data))
FileUtils.mv(tmp_state_file_path, @state_file_path)
end
end
end
end
37 changes: 34 additions & 3 deletions lib/synapse/service_watcher/base.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require 'synapse/log'
require 'set'
require 'hashdiff'

class Synapse::ServiceWatcher
class BaseWatcher
Expand Down Expand Up @@ -152,7 +153,7 @@ def backends_filtered_by_labels
end
end

def set_backends(new_backends)
def set_backends(new_backends, new_config_for_generator = {})
# Aggregate and deduplicate all potential backend service instances.
new_backends = (new_backends + @default_servers) if @keep_default_servers
# Substitute backend_port_override for the provided port
Expand All @@ -165,7 +166,20 @@ def set_backends(new_backends)
[b['host'], b['port'], b.fetch('name', '')]
}

backends_updated = update_backends(new_backends)
config_updated = update_config_for_generator(new_config_for_generator)

if backends_updated || config_updated
reconfigure!
return true
else
return false
end
end

def update_backends(new_backends)
if new_backends.to_set == @backends.to_set
log.info "synapse: backends for service #{@name} do not change."
return false
end

Expand All @@ -192,11 +206,28 @@ def set_backends(new_backends)
@backends = new_backends
end

reconfigure!

return true
end

def update_config_for_generator(new_config_for_generator)
if new_config_for_generator.empty?
log.info "synapse: no config_for_generator data from #{name} for" \
" service #{@name}; keep existing config_for_generator: #{@config_for_generator.inspect}"
return false
else
log.info "synapse: discovered config_for_generator for service #{@name}"
diff = HashDiff.diff(new_config_for_generator, config_for_generator)

if diff.empty?
log.info "synapse: config_for_generator for service #{@name} does not change."
return false
else
@config_for_generator = new_config_for_generator
return true
end
end
end

# Subclasses should not invoke this directly; it's only exposed so that it
# can be overridden in subclasses.
def reconfigure!
Expand Down
41 changes: 40 additions & 1 deletion lib/synapse/service_watcher/zookeeper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,15 @@ def discover
end
end

set_backends(new_backends)
node = @zk.get(@discovery['path'], :watch => true)
begin
new_config_for_generator = parse_service_config(node)
rescue StandardError => e
log.error "synapse: invalid config data in ZK node at #{@discovery['path']}: #{e}"
new_config_for_generator = {}
end

set_backends(new_backends, new_config_for_generator)
end

# sets up zookeeper callbacks if the data at the discovery path changes
Expand Down Expand Up @@ -260,6 +268,37 @@ def deserialize_service_instance(data)

return host, port, name, weight, haproxy_server_options, labels
end

def parse_service_config(data)
log.debug "synapse: deserializing process data"
if data.nil? || data.empty?
decoded = {}
else
decoded = @decode_method.call(data)
end

new_generator_config = {}
# validate the config. if the config is not empty:
# each key should be named by one of the available generators
# each value should be a hash (could be empty)
decoded.collect.each do |generator_name, generator_config|
if [email protected]_generators.keys.include?(generator_name)
log.error "synapse: invalid generator name in ZK node at #{@discovery['path']}:" \
" #{generator_name}"
next
else
if generator_config.nil? || !generator_config.is_a?(Hash)
log.warn "synapse: invalid generator config in ZK node at #{@discovery['path']}" \
" for generator #{generator_name}"
new_generator_config[generator_name] = {}
else
new_generator_config[generator_name] = generator_config
end
end
end

return new_generator_config
end
end
end

2 changes: 1 addition & 1 deletion lib/synapse/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Synapse
VERSION = "0.14.7"
VERSION = "0.15.1"
end
Loading