Skip to content

Commit

Permalink
Merge pull request #241 from lap1817/feng-pan-synapse
Browse files Browse the repository at this point in the history
[synapse] enable generator config update from ZK
  • Loading branch information
jolynch authored Sep 6, 2017
2 parents 9226826 + 80bbf62 commit 117e26d
Show file tree
Hide file tree
Showing 10 changed files with 403 additions and 82 deletions.
3 changes: 2 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
PATH
remote: .
specs:
synapse (0.14.7)
synapse (0.15.0)
aws-sdk (~> 1.39)
docker-api (~> 1.7)
hashdiff (~> 0.2.3)
logging (~> 1.8)
zk (~> 1.9.4)

Expand Down
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ are proven routing components like [HAProxy](http://haproxy.1wt.eu/) or [NGINX](
For every external service that your application talks to, we assign a synapse local port on localhost.
Synapse creates a proxy from the local port to the service, and you reconfigure your application to talk to the proxy.

Under the hood, Synapse sports `service_watcher`s for service discovery and
Under the hood, Synapse supports `service_watcher`s for service discovery and
`config_generators` for configuring local state (e.g. load balancer configs)
based on that service discovery state.

Synapse supports service discovery with with pluggable `service_watcher`s which
Synapse supports service discovery with pluggable `service_watcher`s which
take care of signaling to the `config_generators` so that they can react and
reconfigure to point at available servers on the fly.

Expand Down Expand Up @@ -183,7 +183,7 @@ relevant routing component. For example if you want to only configure HAProxy an
not NGINX for a particular service, you would pass ``disabled`` to the `nginx` section
of that service's watcher config.

* [`haproxy`](#haproxysvc): how will the haproxy section for this service be configured
* [`haproxy`](#haproxysvc): how will the haproxy section for this service be configured. If the corresponding `watcher` is defined to use `zookeeper` and the service publishes its `haproxy` configure on ZK, the `haproxy` hash can be filled/updated via data from the ZK node.
* [`nginx`](https://github.com/jolynch/synapse-nginx#service-watcher-config): how will the nginx section for this service be configured. **NOTE** to use this you must have the synapse-nginx [plugin](#plugins) installed.

The services hash may contain the following additional keys:
Expand Down Expand Up @@ -221,7 +221,7 @@ Given a `label_filters`: `[{ "label": "cluster", "value": "dev", "condition": "e

##### Zookeeper #####

This watcher retrieves a list of servers from zookeeper.
This watcher retrieves a list of servers and also service config data from zookeeper.
It takes the following mandatory arguments:

* `method`: zookeeper
Expand All @@ -230,6 +230,8 @@ It takes the following mandatory arguments:

The watcher assumes that each node under `path` represents a service server.

The watcher assumes that the data (if any) retrieved at znode `path` is a hash, where each key is named by a valid `config_generator` (e.g. `haproxy`) and the value is a hash that configs the generator.

The following arguments are optional:

* `decode`: A hash containing configuration for how to decode the data found in zookeeper.
Expand Down
172 changes: 114 additions & 58 deletions lib/synapse/config_generator/haproxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require 'socket'
require 'digest/sha1'
require 'set'
require 'hashdiff'

class Synapse::ConfigGenerator
class Haproxy < BaseGenerator
Expand Down Expand Up @@ -801,6 +802,8 @@ class Haproxy < BaseGenerator
# should be enough for anyone right (famous last words)?
MAX_SERVER_ID = (2**16 - 1).freeze

attr_reader :server_id_map, :state_cache

def initialize(opts)
super(opts)

Expand Down Expand Up @@ -845,8 +848,11 @@ def initialize(opts)
@backends_cache = {}
@watcher_revisions = {}

@state_file_path = @opts['state_file_path']
@state_file_ttl = @opts.fetch('state_file_ttl', DEFAULT_STATE_FILE_TTL).to_i
@state_cache = HaproxyState.new(
@opts['state_file_path'],
@opts.fetch('state_file_ttl', DEFAULT_STATE_FILE_TTL).to_i,
self
)

# For giving consistent orders, even if they are random
@server_order_seed = @opts.fetch('server_order_seed', rand(2000)).to_i
Expand Down Expand Up @@ -907,15 +913,26 @@ def update_config(watchers)
end
end

def update_state_file(watchers)
@state_cache.update_state_file(watchers)
end

# generates a new config based on the state of the watchers
def generate_config(watchers)
new_config = generate_base_config
shared_frontend_lines = generate_shared_frontend

watchers.each do |watcher|
watcher_config = watcher.config_for_generator[name]
@watcher_configs[watcher.name] ||= parse_watcher_config(watcher)
next if watcher_config['disabled']
next if watcher_config.nil? || watcher_config.empty? || watcher_config['disabled']
@watcher_configs[watcher.name] = parse_watcher_config(watcher)

# if watcher_config is changed, trigger restart
config_diff = HashDiff.diff(@state_cache.config_for_generator(watcher.name), watcher_config)
if !config_diff.empty?
log.info "synapse: restart required because config_for_generator changed. before: #{@state_cache.config_for_generator(watcher.name)}, after: #{watcher_config}"
@restart_required = true
end

regenerate = watcher.revision != @watcher_revisions[watcher.name] ||
@frontends_cache[watcher.name].nil? ||
Expand Down Expand Up @@ -1051,7 +1068,7 @@ def generate_backend_stanza(watcher, config)

# The ordering here is important. First we add all the backends in the
# disabled state...
seen.fetch(watcher.name, []).each do |backend_name, backend|
@state_cache.backends(watcher).each do |backend_name, backend|
backends[backend_name] = backend.merge('enabled' => false)
# We remember the haproxy_server_id from a previous reload here.
# Note though that if live servers below define haproxy_server_id
Expand Down Expand Up @@ -1308,74 +1325,113 @@ def construct_name(backend)
######################################
# methods for managing the state file
######################################
def seen
# if we don't support the state file, return nothing
return {} if @state_file_path.nil?
class HaproxyState
include Synapse::Logging

# if we've never needed the backends, now is the time to load them
@seen = read_state_file if @seen.nil?
# TODO: enable version in the Haproxy Cache File
KEY_WATCHER_CONFIG_FOR_GENERATOR = "watcher_config_for_generator"
NON_BACKENDS_KEYS = [KEY_WATCHER_CONFIG_FOR_GENERATOR]

@seen
end
def initialize(state_file_path, state_file_ttl, haproxy)
@state_file_path = state_file_path
@state_file_ttl = state_file_ttl
@haproxy = haproxy
end

def update_state_file(watchers)
# if we don't support the state file, do nothing
return if @state_file_path.nil?

log.info "synapse: writing state file"
timestamp = Time.now.to_i

# Remove stale backends
seen.each do |watcher_name, backends|
backends.each do |backend_name, backend|
ts = backend.fetch('timestamp', 0)
delta = (timestamp - ts).abs
if delta > @state_file_ttl
log.info "synapse: expiring #{backend_name} with age #{delta}"
backends.delete(backend_name)
end
def backends(watcher_name)
if seen.key?(watcher_name)
seen[watcher_name].select { |section, data| !NON_BACKENDS_KEYS.include?(section) }
else
{}
end
end

# Remove any services which no longer have any backends
seen.reject!{|watcher_name, backends| backends.keys.length == 0}
def config_for_generator(watcher_name)
cache_config = {}
if seen.key?(watcher_name) && seen[watcher_name].key?(KEY_WATCHER_CONFIG_FOR_GENERATOR)
cache_config = seen[watcher_name][KEY_WATCHER_CONFIG_FOR_GENERATOR]
end

# Add backends from watchers
watchers.each do |watcher|
seen[watcher.name] ||= {}
cache_config
end

watcher.backends.each do |backend|
backend_name = construct_name(backend)
data = {
'timestamp' => timestamp,
}
server_id = @server_id_map[watcher.name][backend_name].to_i
if server_id && server_id > 0 && server_id <= MAX_SERVER_ID
data['haproxy_server_id'] = server_id
def update_state_file(watchers)
# if we don't support the state file, do nothing
return if @state_file_path.nil?

log.info "synapse: writing state file"
timestamp = Time.now.to_i

# Remove stale backends
seen.each do |watcher_name, data|
backends(watcher_name).each do |backend_name, backend|
ts = backend.fetch('timestamp', 0)
delta = (timestamp - ts).abs
if delta > @state_file_ttl
log.info "synapse: expiring #{backend_name} with age #{delta}"
data.delete(backend_name)
end
end
end

seen[watcher.name][backend_name] = data.merge(backend)
# Remove any services which no longer have any backends
seen.reject!{|watcher_name, data| backends(watcher_name).keys.length == 0}

# Add backends and config from watchers
watchers.each do |watcher|
seen[watcher.name] ||= {}

watcher.backends.each do |backend|
backend_name = @haproxy.construct_name(backend)
data = {
'timestamp' => timestamp,
}
server_id = @haproxy.server_id_map[watcher.name][backend_name].to_i
if server_id && server_id > 0 && server_id <= MAX_SERVER_ID
data['haproxy_server_id'] = server_id
end

seen[watcher.name][backend_name] = data.merge(backend)
end

# Add config for generator from watcher
if watcher.config_for_generator.key?(@haproxy.name)
seen[watcher.name][KEY_WATCHER_CONFIG_FOR_GENERATOR] =
watcher.config_for_generator[@haproxy.name]
end
end

# write the data!
write_data_to_state_file(seen)
end

# write the data!
write_data_to_state_file(seen)
end
private

def read_state_file
# Some versions of JSON return nil on an empty file ...
JSON.load(File.read(@state_file_path)) || {}
rescue StandardError => e
# It's ok if the state file doesn't exist or contains invalid data
# The state file will be rebuilt automatically
{}
end
def seen
# if we don't support the state file, return nothing
return {} if @state_file_path.nil?

# if we've never needed the backends, now is the time to load them
@seen = read_state_file if @seen.nil?

@seen
end

# we do this atomically so the state file is always consistent
def write_data_to_state_file(data)
tmp_state_file_path = @state_file_path + ".tmp"
File.write(tmp_state_file_path, JSON.pretty_generate(data))
FileUtils.mv(tmp_state_file_path, @state_file_path)
def read_state_file
# Some versions of JSON return nil on an empty file ...
JSON.load(File.read(@state_file_path)) || {}
rescue StandardError => e
# It's ok if the state file doesn't exist or contains invalid data
# The state file will be rebuilt automatically
{}
end

# we do this atomically so the state file is always consistent
def write_data_to_state_file(data)
tmp_state_file_path = @state_file_path + ".tmp"
File.write(tmp_state_file_path, JSON.pretty_generate(data))
FileUtils.mv(tmp_state_file_path, @state_file_path)
end
end
end
end
44 changes: 40 additions & 4 deletions lib/synapse/service_watcher/base.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
require 'synapse/log'
require 'set'
require 'hashdiff'

class Synapse::ServiceWatcher
class BaseWatcher
include Synapse::Logging

LEADER_WARN_INTERVAL = 30

attr_reader :name, :config_for_generator, :revision
attr_reader :name, :revision

def initialize(opts={}, synapse)
super()
Expand Down Expand Up @@ -99,6 +100,11 @@ def ping?
true
end

# deep clone the hash to protect its readonly property
def config_for_generator
Marshal.load( Marshal.dump(@config_for_generator))
end

def backends
filtered = backends_filtered_by_labels

Expand Down Expand Up @@ -152,7 +158,7 @@ def backends_filtered_by_labels
end
end

def set_backends(new_backends)
def set_backends(new_backends, new_config_for_generator = {})
# Aggregate and deduplicate all potential backend service instances.
new_backends = (new_backends + @default_servers) if @keep_default_servers
# Substitute backend_port_override for the provided port
Expand All @@ -165,7 +171,20 @@ def set_backends(new_backends)
[b['host'], b['port'], b.fetch('name', '')]
}

backends_updated = update_backends(new_backends)
config_updated = update_config_for_generator(new_config_for_generator)

if backends_updated || config_updated
reconfigure!
return true
else
return false
end
end

def update_backends(new_backends)
if new_backends.to_set == @backends.to_set
log.info "synapse: backends for service #{@name} do not change."
return false
end

Expand All @@ -192,11 +211,28 @@ def set_backends(new_backends)
@backends = new_backends
end

reconfigure!

return true
end

def update_config_for_generator(new_config_for_generator)
if new_config_for_generator.empty?
log.info "synapse: no config_for_generator data from #{name} for" \
" service #{@name}; keep existing config_for_generator: #{@config_for_generator.inspect}"
return false
else
log.info "synapse: discovered config_for_generator for service #{@name}"
diff = HashDiff.diff(new_config_for_generator, config_for_generator)

if diff.empty?
log.info "synapse: config_for_generator for service #{@name} does not change."
return false
else
@config_for_generator = new_config_for_generator
return true
end
end
end

# Subclasses should not invoke this directly; it's only exposed so that it
# can be overridden in subclasses.
def reconfigure!
Expand Down
Loading

0 comments on commit 117e26d

Please sign in to comment.