Skip to content

Commit

Permalink
Use locket to ensure diego client hits an active bbs server
Browse files Browse the repository at this point in the history
Get the bosh instance id for the currently-active bbs process from locket, so that Diego::Client can use an instance-specific bosh-dns URL instead of one that resolves to all IPs in the instance group (i.e. fd0458bf-71f9-4df2-9ab1-5fb523a92e56.bbs.service.cf.internal, not bbs.service.cf.internal).

Mitigates the risk of Runner is unavailable errors during a cf push when the client is unable to reach a bbs instance.

bbs operates as a cluster. At any one time one instance is 'active', and able to serve responses to API calls, while others are inactive. An active instance holds the bbs lock from locket.

At present, whenever the Diego::Client needs to communicate with bbs it hits the URL passed in from config.diego.bbs.url. The default value passed in for this from the capi-release is https://bbs.service.cf.internal:8889. This domain is defined by cf-deployment as a bosh-dns alias for the diego-api instance group with query q-s4 (i.e. resolving to all instances, whether healthy or unhealthy, in a random order).

When all bbs instances are reachable, you either hit the active instance first, or you hit an inactive one (connection refused) and the http client tries the next IP in the list. But if the first IP in the list is unreachable, the request hangs until it times out (by default after 10s) and then fails.

The Diego::Client makes three attempts to reach the active bbs instance before giving up and raising an error. Say you've got two availability zones with one diego-api instance in each, and a network issue makes one unreachable. In such a case, every API call currently has a 1/8 chance of that the first IP in the list is the unreachable one three times in a row, which will cause cf push and other commands to fail.
  • Loading branch information
will-gant committed Oct 20, 2022
1 parent 603e8b1 commit 3c7806e
Show file tree
Hide file tree
Showing 12 changed files with 271 additions and 148 deletions.
1 change: 1 addition & 0 deletions config/cloud_controller.yml
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ locket:
ca_file: 'spec/fixtures/certs/bbs_ca.crt'
cert_file: 'spec/fixtures/certs/bbs_client.crt'
key_file: 'spec/fixtures/certs/bbs_client.key'
diego_client_timeout: 5

threadpool_size: 20
webserver: thin
Expand Down
9 changes: 9 additions & 0 deletions lib/cloud_controller/config_schemas/base/api_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ class ApiSchema < VCAP::Config
},
},

locket: {
host: String,
port: Integer,
ca_file: String,
cert_file: String,
key_file: String,
diego_client_timeout: Integer
},

log_audit_events: bool,

optional(:telemetry_log_path) => String, # path to log telemetry to, omit to disable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class DeploymentUpdaterSchema < VCAP::Config
optional(:lock_owner) => String
},

optional(:locket) => {
locket: {
host: String,
port: Integer,
ca_file: String,
Expand Down
9 changes: 9 additions & 0 deletions lib/cloud_controller/config_schemas/base/worker_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ class WorkerSchema < VCAP::Config
fog_aws_storage_options: Hash
},

locket: {
host: String,
port: Integer,
ca_file: String,
cert_file: String,
key_file: String,
diego_client_timeout: Integer
},

packages: {
max_package_size: Integer,
app_package_directory_key: String,
Expand Down
1 change: 1 addition & 0 deletions lib/cloud_controller/dependency_locator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ def build_bbs_client
connect_timeout: config.get(:diego, :bbs, :connect_timeout),
send_timeout: config.get(:diego, :bbs, :send_timeout),
receive_timeout: config.get(:diego, :bbs, :receive_timeout),
locket_config: config.get(:locket),
)
end

Expand Down
10 changes: 2 additions & 8 deletions lib/cloud_controller/deployment_updater/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,14 @@ def start
)
}

if config.get(:locket).nil?
loop(&update_step)
return
end

client = Locket::Client.new(
locket_client = Locket::Client.new(
host: config.get(:locket, :host),
port: config.get(:locket, :port),
client_ca_path: config.get(:locket, :ca_file),
client_key_path: config.get(:locket, :key_file),
client_cert_path: config.get(:locket, :cert_file),
)
lock_worker = Locket::LockWorker.new(client)

lock_worker = Locket::LockWorker.new(locket_client)
lock_worker.acquire_lock_and_repeatedly_call(
owner: config.get(:deployment_updater, :lock_owner),
key: config.get(:deployment_updater, :lock_key),
Expand Down
70 changes: 52 additions & 18 deletions lib/diego/client.rb
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
require 'diego/bbs/bbs'
require 'diego/errors'
require 'diego/routes'
require 'uri'
require 'locket/client'

module Diego
class Client
PROTOBUF_HEADER = { 'Content-Type'.freeze => 'application/x-protobuf'.freeze }.freeze

def initialize(url:, ca_cert_file:, client_cert_file:, client_key_file:,
connect_timeout:, send_timeout:, receive_timeout:)
connect_timeout:, send_timeout:, receive_timeout:, locket_config:)
ENV['PB_IGNORE_DEPRECATIONS'] ||= 'true'
@client = build_client(
url,
ca_cert_file,
client_cert_file,
client_key_file,
connect_timeout,
send_timeout,
receive_timeout)
@locket_config = locket_config
@base_url = url
end

def ping
response = with_request_error_handling do
client.post(Routes::PING)
client.post(URI.join(bbs_url, Routes::PING))
end

validate_status!(response: response, statuses: [200])
Expand All @@ -32,7 +35,7 @@ def upsert_domain(domain:, ttl:)
request = protobuf_encode!({ domain: domain, ttl: ttl.to_i }, Bbs::Models::UpsertDomainRequest)

response = with_request_error_handling do
client.post(Routes::UPSERT_DOMAIN, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::UPSERT_DOMAIN), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -43,7 +46,7 @@ def desire_task(task_definition:, domain:, task_guid:)
request = protobuf_encode!({ task_definition: task_definition, domain: domain, task_guid: task_guid }, Bbs::Models::DesireTaskRequest)

response = with_request_error_handling do
client.post(Routes::DESIRE_TASK, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::DESIRE_TASK), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -54,7 +57,7 @@ def task_by_guid(task_guid)
request = protobuf_encode!({ task_guid: task_guid }, Bbs::Models::TaskByGuidRequest)

response = with_request_error_handling do
client.post(Routes::TASK_BY_GUID, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::TASK_BY_GUID), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -65,7 +68,7 @@ def tasks(domain: '', cell_id: '')
request = protobuf_encode!({ domain: domain, cell_id: cell_id }, Bbs::Models::TasksRequest)

response = with_request_error_handling do
client.post(Routes::LIST_TASKS, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::LIST_TASKS), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -76,7 +79,7 @@ def cancel_task(task_guid)
request = protobuf_encode!({ task_guid: task_guid }, Bbs::Models::TaskGuidRequest)

response = with_request_error_handling do
client.post(Routes::CANCEL_TASK, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::CANCEL_TASK), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -87,7 +90,7 @@ def desire_lrp(lrp)
request = protobuf_encode!({ desired_lrp: lrp }, Bbs::Models::DesireLRPRequest)

response = with_request_error_handling do
client.post(Routes::DESIRE_LRP, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::DESIRE_LRP), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -98,7 +101,7 @@ def desired_lrp_by_process_guid(process_guid)
request = protobuf_encode!({ process_guid: process_guid }, Bbs::Models::DesiredLRPByProcessGuidRequest)

response = with_request_error_handling do
client.post(Routes::DESIRED_LRP_BY_PROCESS_GUID, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::DESIRED_LRP_BY_PROCESS_GUID), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -109,7 +112,7 @@ def update_desired_lrp(process_guid, lrp_update)
request = protobuf_encode!({ process_guid: process_guid, update: lrp_update }, Bbs::Models::UpdateDesiredLRPRequest)

response = with_request_error_handling do
client.post(Routes::UPDATE_DESIRED_LRP, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::UPDATE_DESIRED_LRP), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -120,7 +123,7 @@ def remove_desired_lrp(process_guid)
request = protobuf_encode!({ process_guid: process_guid }, Bbs::Models::RemoveDesiredLRPRequest)

response = with_request_error_handling do
client.post(Routes::REMOVE_DESIRED_LRP, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::REMOVE_DESIRED_LRP), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -131,7 +134,7 @@ def retire_actual_lrp(actual_lrp_key)
request = protobuf_encode!({ actual_lrp_key: actual_lrp_key }, Bbs::Models::RetireActualLRPRequest)

response = with_request_error_handling do
client.post(Routes::RETIRE_ACTUAL_LRP, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::RETIRE_ACTUAL_LRP), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -142,7 +145,7 @@ def desired_lrp_scheduling_infos(domain)
request = protobuf_encode!({ domain: domain }, Bbs::Models::DesiredLRPsRequest)

response = with_request_error_handling do
client.post(Routes::DESIRED_LRP_SCHEDULING_INFOS, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::DESIRED_LRP_SCHEDULING_INFOS), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -153,7 +156,7 @@ def actual_lrps_by_process_guid(process_guid)
request = protobuf_encode!({ process_guid: process_guid }, Bbs::Models::ActualLRPsRequest)

response = with_request_error_handling do
client.post(Routes::ACTUAL_LRPS, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::ACTUAL_LRPS), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -164,13 +167,29 @@ def with_request_error_handling(&blk)
tries ||= 3
yield
rescue => e
@cached_active_bbs_id = nil
retry unless (tries -= 1).zero?
raise RequestError.new(e.message)
end

def bbs_url
uri = URI(base_url)
uri.host = "#{active_bbs_id}.#{uri.host}"
uri.to_s
end

def active_bbs_id
tries ||= 3
@cached_active_bbs_id ||= latest_active_bbs_id
rescue GRPC::BadStatus => e
retry unless (tries -= 1).zero?
raise e
end

private

attr_reader :client
attr_reader :client, :base_url, :locket_config
attr_accessor :cached_active_bbs_id

def protobuf_encode!(hash, protobuf_message_class)
# See below link to understand proto3 message encoding
Expand All @@ -190,15 +209,30 @@ def protobuf_decode!(message, protobuf_decoder)
raise DecodeError.new(e.message)
end

def build_client(url, ca_cert_file, client_cert_file, client_key_file,
def build_client(ca_cert_file, client_cert_file, client_key_file,
connect_timeout, send_timeout, receive_timeout)
client = HTTPClient.new(base_url: url)
client = HTTPClient.new
client.connect_timeout = connect_timeout
client.send_timeout = send_timeout
client.receive_timeout = receive_timeout
client.ssl_config.set_client_cert_file(client_cert_file, client_key_file)
client.ssl_config.set_trust_ca(ca_cert_file)
client
end

def latest_active_bbs_id
locket_client.fetch(key: 'bbs').resource.owner
end

def locket_client
Locket::Client.new(
host: locket_config[:host],
port: locket_config[:port],
client_ca_path: locket_config[:ca_file],
client_key_path: locket_config[:key_file],
client_cert_path: locket_config[:cert_file],
timeout: locket_config[:diego_client_timeout]
)
end
end
end
19 changes: 16 additions & 3 deletions lib/locket/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,23 @@ class Client
class Error < StandardError
end

def initialize(host:, port:, client_ca_path:, client_cert_path:, client_key_path:)
def initialize(host:, port:, client_ca_path:, client_cert_path:, client_key_path:, timeout: nil)
client_ca = File.open(client_ca_path).read
client_key = File.open(client_key_path).read
client_cert = File.open(client_cert_path).read

@service = Models::Locket::Stub.new(
"#{host}:#{port}",
GRPC::Core::ChannelCredentials.new(client_ca, client_key, client_cert)
GRPC::Core::ChannelCredentials.new(client_ca, client_key, client_cert),
timeout: timeout
)
@lock_acquired = false
end

def fetch(key:)
service.fetch(build_fetch_request(key: key))
end

def start(owner:, key:)
raise Error.new('Cannot start more than once') if @thread

Expand Down Expand Up @@ -47,7 +52,7 @@ def lock_acquired?

private

attr_reader :service, :lock_acquired, :key, :owner
attr_reader :service, :lock_acquired

def build_lock_request(owner:, key:)
Models::LockRequest.new(
Expand All @@ -62,6 +67,14 @@ def build_lock_request(owner:, key:)
)
end

def build_fetch_request(key:)
Models::FetchRequest.new(
{
key: key
}
)
end

def logger
@logger ||= Steno.logger('cc.locket-client')
end
Expand Down
Loading

0 comments on commit 3c7806e

Please sign in to comment.