Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[synapse] enable generator config update from ZK #241

Merged
merged 9 commits into from
Sep 6, 2017
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
PATH
remote: .
specs:
synapse (0.14.7)
synapse (0.15.1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just go to 0.15.0

aws-sdk (~> 1.39)
docker-api (~> 1.7)
hashdiff (~> 0.2.3)
logging (~> 1.8)
zk (~> 1.9.4)

Expand Down
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ are proven routing components like [HAProxy](http://haproxy.1wt.eu/) or [NGINX](
For every external service that your application talks to, we assign a synapse local port on localhost.
Synapse creates a proxy from the local port to the service, and you reconfigure your application to talk to the proxy.

Under the hood, Synapse sports `service_watcher`s for service discovery and
Under the hood, Synapse supports `service_watcher`s for service discovery and
`config_generators` for configuring local state (e.g. load balancer configs)
based on that service discovery state.

Synapse supports service discovery with with pluggable `service_watcher`s which
Synapse supports service discovery with pluggable `service_watcher`s which
take care of signaling to the `config_generators` so that they can react and
reconfigure to point at available servers on the fly.

Expand Down Expand Up @@ -183,7 +183,7 @@ relevant routing component. For example if you want to only configure HAProxy an
not NGINX for a particular service, you would pass ``disabled`` to the `nginx` section
of that service's watcher config.

* [`haproxy`](#haproxysvc): how will the haproxy section for this service be configured
* [`haproxy`](#haproxysvc): how will the haproxy section for this service be configured. If the corresponding `watcher` is defined to use `zookeeper` and the service publishes its `haproxy` configure on ZK, the `haproxy` hash can be filled/updated via data from the ZK node.
* [`nginx`](https://github.com/jolynch/synapse-nginx#service-watcher-config): how will the nginx section for this service be configured. **NOTE** to use this you must have the synapse-nginx [plugin](#plugins) installed.

The services hash may contain the following additional keys:
Expand Down Expand Up @@ -221,7 +221,7 @@ Given a `label_filters`: `[{ "label": "cluster", "value": "dev", "condition": "e

##### Zookeeper #####

This watcher retrieves a list of servers from zookeeper.
This watcher retrieves a list of servers and also service config data from zookeeper.
It takes the following mandatory arguments:

* `method`: zookeeper
Expand All @@ -230,6 +230,8 @@ It takes the following mandatory arguments:

The watcher assumes that each node under `path` represents a service server.

The watcher assumes that the data (if any) retrieved at znode `path` is a hash, where each key is named by a valid `config_generator` (e.g. `haproxy`) and the value is a hash that configs the generator.

The following arguments are optional:

* `decode`: A hash containing configuration for how to decode the data found in zookeeper.
Expand Down
171 changes: 113 additions & 58 deletions lib/synapse/config_generator/haproxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require 'socket'
require 'digest/sha1'
require 'set'
require 'hashdiff'

class Synapse::ConfigGenerator
class Haproxy < BaseGenerator
Expand Down Expand Up @@ -801,6 +802,8 @@ class Haproxy < BaseGenerator
# should be enough for anyone right (famous last words)?
MAX_SERVER_ID = (2**16 - 1).freeze

attr_reader :server_id_map, :state_cache

def initialize(opts)
super(opts)

Expand Down Expand Up @@ -845,8 +848,11 @@ def initialize(opts)
@backends_cache = {}
@watcher_revisions = {}

@state_file_path = @opts['state_file_path']
@state_file_ttl = @opts.fetch('state_file_ttl', DEFAULT_STATE_FILE_TTL).to_i
@state_cache = HaproxyState.new(
@opts['state_file_path'],
@opts.fetch('state_file_ttl', DEFAULT_STATE_FILE_TTL).to_i,
self
)

# For giving consistent orders, even if they are random
@server_order_seed = @opts.fetch('server_order_seed', rand(2000)).to_i
Expand Down Expand Up @@ -907,15 +913,26 @@ def update_config(watchers)
end
end

def update_state_file(watchers)
@state_cache.update_state_file(watchers)
end

# generates a new config based on the state of the watchers
def generate_config(watchers)
new_config = generate_base_config
shared_frontend_lines = generate_shared_frontend

watchers.each do |watcher|
watcher_config = watcher.config_for_generator[name]
@watcher_configs[watcher.name] ||= parse_watcher_config(watcher)
next if watcher_config['disabled']
next if watcher_config.nil? || watcher_config.empty? || watcher_config['disabled']
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, so we're changing the previous behavior that empty watcher configs would be present without a listening port (e.g. a bare backend). Is this intentional?

I think if it's just HAProxy this is potentially reasonable (typically you have backend options for bare backends but ...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like that in the parse_watcher_config function (the next line), it tries to get fields from watcher_config['listen'], and will throw exception if there is no 'listen' field. So I thought the watcher_config should at least be non-empty.

What is a valid use case when the watcher_config is completely empty?

@watcher_configs[watcher.name] = parse_watcher_config(watcher)

# if watcher_config is changed, trigger restart
config_diff = HashDiff.diff(@state_cache.config_for_generator(watcher.name), watcher_config)
if !config_diff.empty?
log.info "synapse: restart required because config_for_generator changed. before: #{@state_cache.config_for_generator(watcher.name)}, after: #{watcher_config}"
@restart_required = true
end

regenerate = watcher.revision != @watcher_revisions[watcher.name] ||
@frontends_cache[watcher.name].nil? ||
Expand Down Expand Up @@ -1051,7 +1068,7 @@ def generate_backend_stanza(watcher, config)

# The ordering here is important. First we add all the backends in the
# disabled state...
seen.fetch(watcher.name, []).each do |backend_name, backend|
@state_cache.backends(watcher).each do |backend_name, backend|
backends[backend_name] = backend.merge('enabled' => false)
# We remember the haproxy_server_id from a previous reload here.
# Note though that if live servers below define haproxy_server_id
Expand Down Expand Up @@ -1308,74 +1325,112 @@ def construct_name(backend)
######################################
# methods for managing the state file
######################################
def seen
# if we don't support the state file, return nothing
return {} if @state_file_path.nil?
class HaproxyState
include Synapse::Logging

# if we've never needed the backends, now is the time to load them
@seen = read_state_file if @seen.nil?
KEY_WATCHER_CONFIG_FOR_GENERATOR = "watcher_config_for_generator"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we achieve the same end perhaps slightly cleaner by versioning the state file and just supporting version 1 or the new version 2? E.g. in the new state files we write out there is a synapse_state_version key? If the key's there and it's == version 2 we can parse out well defined fields, otherwise we treat it as a list of backends.

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to do versioned file once we start introducing breaking schemas into the cache. For this particular change, it is intended to be a non-breaking one. The field is added under each watcher and is optional for each watcher to have. So it seems to me that having a file-level version does not help much here, since we still need to logic such as checking "whether the watcher has the config_for_generator" field. Does it make sense to you? Or am I missing some of your points?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main part that's a bit hacky to me is that there is a special key (watcher_config_for_generator) that we effectively treat as a version check. This is fine, I just feel like having a well defined schema for what's in these hashes in each version would make reasoning about the state file easier (as opposed to this change which makes it so that we implicitly support two versions of the state file). This is not a strong objection, just raising the point (feel free add a TODO or just not do it heh, if I care enough I can always go back and change it).

I am all for backwards compatibility, and you could achieve something similar with a synapse_state_file_version key, which if found would indicate this newer formatting.

NON_BACKENDS_KEYS = [KEY_WATCHER_CONFIG_FOR_GENERATOR]

@seen
end
def initialize(state_file_path, state_file_ttl, haproxy)
@state_file_path = state_file_path
@state_file_ttl = state_file_ttl
@haproxy = haproxy
end

def update_state_file(watchers)
# if we don't support the state file, do nothing
return if @state_file_path.nil?

log.info "synapse: writing state file"
timestamp = Time.now.to_i

# Remove stale backends
seen.each do |watcher_name, backends|
backends.each do |backend_name, backend|
ts = backend.fetch('timestamp', 0)
delta = (timestamp - ts).abs
if delta > @state_file_ttl
log.info "synapse: expiring #{backend_name} with age #{delta}"
backends.delete(backend_name)
end
def backends(watcher_name)
if seen.key?(watcher_name)
seen[watcher_name].select { |section, data| !NON_BACKENDS_KEYS.include?(section) }
else
{}
end
end

# Remove any services which no longer have any backends
seen.reject!{|watcher_name, backends| backends.keys.length == 0}
def config_for_generator(watcher_name)
cache_config = {}
if seen.key?(watcher_name) && seen[watcher_name].key?(KEY_WATCHER_CONFIG_FOR_GENERATOR)
cache_config = seen[watcher_name][KEY_WATCHER_CONFIG_FOR_GENERATOR]
end

# Add backends from watchers
watchers.each do |watcher|
seen[watcher.name] ||= {}
cache_config
end

watcher.backends.each do |backend|
backend_name = construct_name(backend)
data = {
'timestamp' => timestamp,
}
server_id = @server_id_map[watcher.name][backend_name].to_i
if server_id && server_id > 0 && server_id <= MAX_SERVER_ID
data['haproxy_server_id'] = server_id
def update_state_file(watchers)
# if we don't support the state file, do nothing
return if @state_file_path.nil?

log.info "synapse: writing state file"
timestamp = Time.now.to_i

# Remove stale backends
seen.each do |watcher_name, data|
backends(watcher_name).each do |backend_name, backend|
ts = backend.fetch('timestamp', 0)
delta = (timestamp - ts).abs
if delta > @state_file_ttl
log.info "synapse: expiring #{backend_name} with age #{delta}"
data.delete(backend_name)
end
end
end

seen[watcher.name][backend_name] = data.merge(backend)
# Remove any services which no longer have any backends
seen.reject!{|watcher_name, data| backends(watcher_name).keys.length == 0}

# Add backends and config from watchers
watchers.each do |watcher|
seen[watcher.name] ||= {}

watcher.backends.each do |backend|
backend_name = @haproxy.construct_name(backend)
data = {
'timestamp' => timestamp,
}
server_id = @haproxy.server_id_map[watcher.name][backend_name].to_i
if server_id && server_id > 0 && server_id <= MAX_SERVER_ID
data['haproxy_server_id'] = server_id
end

seen[watcher.name][backend_name] = data.merge(backend)
end

# Add config for generator from watcher
if watcher.config_for_generator.key?(@haproxy.name)
seen[watcher.name][KEY_WATCHER_CONFIG_FOR_GENERATOR] =
watcher.config_for_generator[@haproxy.name]
end
end

# write the data!
write_data_to_state_file(seen)
end

# write the data!
write_data_to_state_file(seen)
end
private

def read_state_file
# Some versions of JSON return nil on an empty file ...
JSON.load(File.read(@state_file_path)) || {}
rescue StandardError => e
# It's ok if the state file doesn't exist or contains invalid data
# The state file will be rebuilt automatically
{}
end
def seen
# if we don't support the state file, return nothing
return {} if @state_file_path.nil?

# if we've never needed the backends, now is the time to load them
@seen = read_state_file if @seen.nil?

@seen
end

# we do this atomically so the state file is always consistent
def write_data_to_state_file(data)
tmp_state_file_path = @state_file_path + ".tmp"
File.write(tmp_state_file_path, JSON.pretty_generate(data))
FileUtils.mv(tmp_state_file_path, @state_file_path)
def read_state_file
# Some versions of JSON return nil on an empty file ...
JSON.load(File.read(@state_file_path)) || {}
rescue StandardError => e
# It's ok if the state file doesn't exist or contains invalid data
# The state file will be rebuilt automatically
{}
end

# we do this atomically so the state file is always consistent
def write_data_to_state_file(data)
tmp_state_file_path = @state_file_path + ".tmp"
File.write(tmp_state_file_path, JSON.pretty_generate(data))
FileUtils.mv(tmp_state_file_path, @state_file_path)
end
end
end
end
44 changes: 40 additions & 4 deletions lib/synapse/service_watcher/base.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
require 'synapse/log'
require 'set'
require 'hashdiff'

class Synapse::ServiceWatcher
class BaseWatcher
include Synapse::Logging

LEADER_WARN_INTERVAL = 30

attr_reader :name, :config_for_generator, :revision
attr_reader :name, :revision

def initialize(opts={}, synapse)
super()
Expand Down Expand Up @@ -99,6 +100,11 @@ def ping?
true
end

# deep clone the hash to protect its readonly property
def config_for_generator
Marshal.load( Marshal.dump(@config_for_generator))
end

def backends
filtered = backends_filtered_by_labels

Expand Down Expand Up @@ -152,7 +158,7 @@ def backends_filtered_by_labels
end
end

def set_backends(new_backends)
def set_backends(new_backends, new_config_for_generator = {})
# Aggregate and deduplicate all potential backend service instances.
new_backends = (new_backends + @default_servers) if @keep_default_servers
# Substitute backend_port_override for the provided port
Expand All @@ -165,7 +171,20 @@ def set_backends(new_backends)
[b['host'], b['port'], b.fetch('name', '')]
}

backends_updated = update_backends(new_backends)
config_updated = update_config_for_generator(new_config_for_generator)

if backends_updated || config_updated
reconfigure!
return true
else
return false
end
end

def update_backends(new_backends)
if new_backends.to_set == @backends.to_set
log.info "synapse: backends for service #{@name} do not change."
return false
end

Expand All @@ -192,11 +211,28 @@ def set_backends(new_backends)
@backends = new_backends
end

reconfigure!

return true
end

def update_config_for_generator(new_config_for_generator)
if new_config_for_generator.empty?
log.info "synapse: no config_for_generator data from #{name} for" \
" service #{@name}; keep existing config_for_generator: #{@config_for_generator.inspect}"
return false
else
log.info "synapse: discovered config_for_generator for service #{@name}"
diff = HashDiff.diff(new_config_for_generator, config_for_generator)

if diff.empty?
log.info "synapse: config_for_generator for service #{@name} does not change."
return false
else
@config_for_generator = new_config_for_generator
return true
end
end
end

# Subclasses should not invoke this directly; it's only exposed so that it
# can be overridden in subclasses.
def reconfigure!
Expand Down
Loading