Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Diego::Client targets active bbs instance with id from locket #3002

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/cloud_controller.yml
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ locket:
ca_file: 'spec/fixtures/certs/bbs_ca.crt'
cert_file: 'spec/fixtures/certs/bbs_client.crt'
key_file: 'spec/fixtures/certs/bbs_client.key'
diego_client_timeout: 5

threadpool_size: 20
webserver: thin
Expand Down
9 changes: 9 additions & 0 deletions lib/cloud_controller/config_schemas/base/api_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ class ApiSchema < VCAP::Config
},
},

locket: {
host: String,
port: Integer,
ca_file: String,
cert_file: String,
key_file: String,
diego_client_timeout: Integer
},

log_audit_events: bool,

optional(:telemetry_log_path) => String, # path to log telemetry to, omit to disable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class DeploymentUpdaterSchema < VCAP::Config
optional(:lock_owner) => String
},

optional(:locket) => {
locket: {
host: String,
port: Integer,
ca_file: String,
Expand Down
9 changes: 9 additions & 0 deletions lib/cloud_controller/config_schemas/base/worker_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ class WorkerSchema < VCAP::Config
fog_aws_storage_options: Hash
},

locket: {
will-gant marked this conversation as resolved.
Show resolved Hide resolved
host: String,
port: Integer,
ca_file: String,
cert_file: String,
key_file: String,
diego_client_timeout: Integer
},

packages: {
max_package_size: Integer,
app_package_directory_key: String,
Expand Down
1 change: 1 addition & 0 deletions lib/cloud_controller/dependency_locator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ def build_bbs_client
connect_timeout: config.get(:diego, :bbs, :connect_timeout),
send_timeout: config.get(:diego, :bbs, :send_timeout),
receive_timeout: config.get(:diego, :bbs, :receive_timeout),
locket_config: config.get(:locket),
)
end

Expand Down
20 changes: 8 additions & 12 deletions lib/cloud_controller/deployment_updater/scheduler.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
require 'cloud_controller/deployment_updater/dispatcher'
require 'locket/lock_worker'
require 'locket/lock_runner'
require 'locket/client'

module VCAP::CloudController
module DeploymentUpdater
Expand All @@ -19,23 +19,19 @@ def start
)
}

if config.get(:locket).nil?
loop(&update_step)
return
end

lock_runner = Locket::LockRunner.new(
key: config.get(:deployment_updater, :lock_key),
owner: config.get(:deployment_updater, :lock_owner),
locket_client = Locket::Client.new(
host: config.get(:locket, :host),
port: config.get(:locket, :port),
client_ca_path: config.get(:locket, :ca_file),
client_key_path: config.get(:locket, :key_file),
client_cert_path: config.get(:locket, :cert_file),
)
lock_worker = Locket::LockWorker.new(lock_runner)

lock_worker.acquire_lock_and_repeatedly_call(&update_step)
lock_worker = Locket::LockWorker.new(locket_client)
lock_worker.acquire_lock_and_repeatedly_call(
owner: config.get(:deployment_updater, :lock_owner),
key: config.get(:deployment_updater, :lock_key),
&update_step
)
end
end

Expand Down
69 changes: 51 additions & 18 deletions lib/diego/client.rb
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
require 'diego/bbs/bbs'
require 'diego/errors'
require 'diego/routes'
require 'uri'
require 'locket/client'

module Diego
class Client
PROTOBUF_HEADER = { 'Content-Type'.freeze => 'application/x-protobuf'.freeze }.freeze

def initialize(url:, ca_cert_file:, client_cert_file:, client_key_file:,
connect_timeout:, send_timeout:, receive_timeout:)
connect_timeout:, send_timeout:, receive_timeout:, locket_config:)
ENV['PB_IGNORE_DEPRECATIONS'] ||= 'true'
@client = build_client(
url,
ca_cert_file,
client_cert_file,
client_key_file,
connect_timeout,
send_timeout,
receive_timeout)
@locket_config = locket_config
@base_url = url
end

def ping
response = with_request_error_handling do
client.post(Routes::PING)
client.post(URI.join(bbs_url, Routes::PING))
end

validate_status!(response: response, statuses: [200])
Expand All @@ -32,7 +35,7 @@ def upsert_domain(domain:, ttl:)
request = protobuf_encode!({ domain: domain, ttl: ttl.to_i }, Bbs::Models::UpsertDomainRequest)

response = with_request_error_handling do
client.post(Routes::UPSERT_DOMAIN, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::UPSERT_DOMAIN), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -43,7 +46,7 @@ def desire_task(task_definition:, domain:, task_guid:)
request = protobuf_encode!({ task_definition: task_definition, domain: domain, task_guid: task_guid }, Bbs::Models::DesireTaskRequest)

response = with_request_error_handling do
client.post(Routes::DESIRE_TASK, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::DESIRE_TASK), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -54,7 +57,7 @@ def task_by_guid(task_guid)
request = protobuf_encode!({ task_guid: task_guid }, Bbs::Models::TaskByGuidRequest)

response = with_request_error_handling do
client.post(Routes::TASK_BY_GUID, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::TASK_BY_GUID), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -65,7 +68,7 @@ def tasks(domain: '', cell_id: '')
request = protobuf_encode!({ domain: domain, cell_id: cell_id }, Bbs::Models::TasksRequest)

response = with_request_error_handling do
client.post(Routes::LIST_TASKS, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::LIST_TASKS), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -76,7 +79,7 @@ def cancel_task(task_guid)
request = protobuf_encode!({ task_guid: task_guid }, Bbs::Models::TaskGuidRequest)

response = with_request_error_handling do
client.post(Routes::CANCEL_TASK, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::CANCEL_TASK), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -87,7 +90,7 @@ def desire_lrp(lrp)
request = protobuf_encode!({ desired_lrp: lrp }, Bbs::Models::DesireLRPRequest)

response = with_request_error_handling do
client.post(Routes::DESIRE_LRP, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::DESIRE_LRP), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -98,7 +101,7 @@ def desired_lrp_by_process_guid(process_guid)
request = protobuf_encode!({ process_guid: process_guid }, Bbs::Models::DesiredLRPByProcessGuidRequest)

response = with_request_error_handling do
client.post(Routes::DESIRED_LRP_BY_PROCESS_GUID, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::DESIRED_LRP_BY_PROCESS_GUID), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -109,7 +112,7 @@ def update_desired_lrp(process_guid, lrp_update)
request = protobuf_encode!({ process_guid: process_guid, update: lrp_update }, Bbs::Models::UpdateDesiredLRPRequest)

response = with_request_error_handling do
client.post(Routes::UPDATE_DESIRED_LRP, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::UPDATE_DESIRED_LRP), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -120,7 +123,7 @@ def remove_desired_lrp(process_guid)
request = protobuf_encode!({ process_guid: process_guid }, Bbs::Models::RemoveDesiredLRPRequest)

response = with_request_error_handling do
client.post(Routes::REMOVE_DESIRED_LRP, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::REMOVE_DESIRED_LRP), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -131,7 +134,7 @@ def retire_actual_lrp(actual_lrp_key)
request = protobuf_encode!({ actual_lrp_key: actual_lrp_key }, Bbs::Models::RetireActualLRPRequest)

response = with_request_error_handling do
client.post(Routes::RETIRE_ACTUAL_LRP, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::RETIRE_ACTUAL_LRP), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -142,7 +145,7 @@ def desired_lrp_scheduling_infos(domain)
request = protobuf_encode!({ domain: domain }, Bbs::Models::DesiredLRPsRequest)

response = with_request_error_handling do
client.post(Routes::DESIRED_LRP_SCHEDULING_INFOS, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::DESIRED_LRP_SCHEDULING_INFOS), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -153,7 +156,7 @@ def actual_lrps_by_process_guid(process_guid)
request = protobuf_encode!({ process_guid: process_guid }, Bbs::Models::ActualLRPsRequest)

response = with_request_error_handling do
client.post(Routes::ACTUAL_LRPS, request, PROTOBUF_HEADER)
client.post(URI.join(bbs_url, Routes::ACTUAL_LRPS), request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
Expand All @@ -164,13 +167,28 @@ def with_request_error_handling(&blk)
tries ||= 3
yield
rescue => e
@cached_active_bbs_id = nil
retry unless (tries -= 1).zero?
raise RequestError.new(e.message)
end

def bbs_url
uri = URI(base_url)
uri.host = "#{active_bbs_id}.#{uri.host}"
uri.to_s
end

def active_bbs_id
will-gant marked this conversation as resolved.
Show resolved Hide resolved
tries ||= 3
@cached_active_bbs_id ||= latest_active_bbs_id
rescue GRPC::BadStatus => e
will-gant marked this conversation as resolved.
Show resolved Hide resolved
retry unless (tries -= 1).zero?
raise e
end

private

attr_reader :client
attr_reader :client, :base_url, :locket_config

def protobuf_encode!(hash, protobuf_message_class)
# See below link to understand proto3 message encoding
Expand All @@ -190,15 +208,30 @@ def protobuf_decode!(message, protobuf_decoder)
raise DecodeError.new(e.message)
end

def build_client(url, ca_cert_file, client_cert_file, client_key_file,
def build_client(ca_cert_file, client_cert_file, client_key_file,
connect_timeout, send_timeout, receive_timeout)
client = HTTPClient.new(base_url: url)
client = HTTPClient.new
client.connect_timeout = connect_timeout
client.send_timeout = send_timeout
client.receive_timeout = receive_timeout
client.ssl_config.set_client_cert_file(client_cert_file, client_key_file)
client.ssl_config.set_trust_ca(ca_cert_file)
client
end

def latest_active_bbs_id
locket_client.fetch(key: 'bbs').resource.owner
end

def locket_client
Locket::Client.new(
host: locket_config[:host],
port: locket_config[:port],
client_ca_path: locket_config[:ca_file],
client_key_path: locket_config[:key_file],
client_cert_path: locket_config[:cert_file],
timeout: locket_config[:diego_client_timeout]
)
end
end
end
28 changes: 19 additions & 9 deletions lib/locket/lock_runner.rb → lib/locket/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,34 @@
require 'locket/locket_services_pb'

module Locket
class LockRunner
class Client
class Error < StandardError
end

def initialize(key:, owner:, host:, port:, client_ca_path:, client_cert_path:, client_key_path:)
def initialize(host:, port:, client_ca_path:, client_cert_path:, client_key_path:, timeout: nil)
client_ca = File.open(client_ca_path).read
client_key = File.open(client_key_path).read
client_cert = File.open(client_cert_path).read

@service = Models::Locket::Stub.new(
will-gant marked this conversation as resolved.
Show resolved Hide resolved
"#{host}:#{port}",
GRPC::Core::ChannelCredentials.new(client_ca, client_key, client_cert)
GRPC::Core::ChannelCredentials.new(client_ca, client_key, client_cert),
timeout: timeout
)
@lock_acquired = false
end

@key = key
@owner = owner
def fetch(key:)
service.fetch(build_fetch_request(key: key))
end

def start
def start(owner:, key:)
raise Error.new('Cannot start more than once') if @thread

@thread = Thread.new do
loop do
begin
service.lock(build_lock_request)
service.lock(build_lock_request(owner: owner, key: key))
logger.debug("Acquired lock '#{key}' for owner '#{owner}'")
@lock_acquired = true
rescue GRPC::BadStatus => e
Expand All @@ -50,9 +52,9 @@ def lock_acquired?

private

attr_reader :service, :lock_acquired, :key, :owner
attr_reader :service, :lock_acquired

def build_lock_request
def build_lock_request(owner:, key:)
Models::LockRequest.new(
{
resource: {
Expand All @@ -65,6 +67,14 @@ def build_lock_request
)
end

def build_fetch_request(key:)
Models::FetchRequest.new(
{
key: key
}
)
end

def logger
@logger ||= Steno.logger('cc.locket-client')
end
Expand Down
10 changes: 5 additions & 5 deletions lib/locket/lock_worker.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
module Locket
class LockWorker
def initialize(lock_runner)
@lock_runner = lock_runner
def initialize(client)
@client = client
end

def acquire_lock_and_repeatedly_call(&block)
@lock_runner.start
def acquire_lock_and_repeatedly_call(owner:, key:, &block)
@client.start(owner: owner, key: key)
loop do
if @lock_runner.lock_acquired?
if @client.lock_acquired?
yield block
else
sleep 1
Expand Down
Loading