Skip to content

Commit

Permalink
Change scheduler's LockRunner into a locket client
Browse files Browse the repository at this point in the history
Renamed this to Locket::Client and changed its initialization args so it can be used where the 'owner' of a key isn't already known. This enables Diego::Client to use it to retrieve the bosh instance id of the bbs server that is currently active.
  • Loading branch information
will-gant committed Oct 20, 2022
1 parent 4bc537c commit c98d0e0
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 51 deletions.
14 changes: 8 additions & 6 deletions lib/cloud_controller/deployment_updater/scheduler.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
require 'cloud_controller/deployment_updater/dispatcher'
require 'locket/lock_worker'
require 'locket/lock_runner'
require 'locket/client'

module VCAP::CloudController
module DeploymentUpdater
Expand All @@ -24,18 +24,20 @@ def start
return
end

lock_runner = Locket::LockRunner.new(
key: config.get(:deployment_updater, :lock_key),
owner: config.get(:deployment_updater, :lock_owner),
client = Locket::Client.new(
host: config.get(:locket, :host),
port: config.get(:locket, :port),
client_ca_path: config.get(:locket, :ca_file),
client_key_path: config.get(:locket, :key_file),
client_cert_path: config.get(:locket, :cert_file),
)
lock_worker = Locket::LockWorker.new(lock_runner)
lock_worker = Locket::LockWorker.new(client)

lock_worker.acquire_lock_and_repeatedly_call(&update_step)
lock_worker.acquire_lock_and_repeatedly_call(
owner: config.get(:deployment_updater, :lock_owner),
key: config.get(:deployment_updater, :lock_key),
&update_step
)
end
end

Expand Down
13 changes: 5 additions & 8 deletions lib/locket/lock_runner.rb → lib/locket/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
require 'locket/locket_services_pb'

module Locket
class LockRunner
class Client
class Error < StandardError
end

def initialize(key:, owner:, host:, port:, client_ca_path:, client_cert_path:, client_key_path:)
def initialize(host:, port:, client_ca_path:, client_cert_path:, client_key_path:)
client_ca = File.open(client_ca_path).read
client_key = File.open(client_key_path).read
client_cert = File.open(client_cert_path).read
Expand All @@ -16,18 +16,15 @@ def initialize(key:, owner:, host:, port:, client_ca_path:, client_cert_path:, c
GRPC::Core::ChannelCredentials.new(client_ca, client_key, client_cert)
)
@lock_acquired = false

@key = key
@owner = owner
end

def start
def start(owner:, key:)
raise Error.new('Cannot start more than once') if @thread

@thread = Thread.new do
loop do
begin
service.lock(build_lock_request)
service.lock(build_lock_request(owner: owner, key: key))
logger.debug("Acquired lock '#{key}' for owner '#{owner}'")
@lock_acquired = true
rescue GRPC::BadStatus => e
Expand All @@ -52,7 +49,7 @@ def lock_acquired?

attr_reader :service, :lock_acquired, :key, :owner

def build_lock_request
def build_lock_request(owner:, key:)
Models::LockRequest.new(
{
resource: {
Expand Down
10 changes: 5 additions & 5 deletions lib/locket/lock_worker.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
module Locket
class LockWorker
def initialize(lock_runner)
@lock_runner = lock_runner
def initialize(client)
@client = client
end

def acquire_lock_and_repeatedly_call(&block)
@lock_runner.start
def acquire_lock_and_repeatedly_call(owner:, key:, &block)
@client.start(owner: owner, key: key)
loop do
if @lock_runner.lock_acquired?
if @client.lock_acquired?
yield block
else
sleep 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ module VCAP::CloudController
end

describe '#start' do
let(:lock_runner) { instance_double(Locket::LockRunner, start: nil, lock_acquired?: nil) }
let(:client) { instance_double(Locket::Client, start: nil, lock_acquired?: nil) }
let(:lock_worker) { instance_double(Locket::LockWorker) }
let(:logger) { instance_double(Steno::Logger, info: nil, debug: nil, error: nil) }
let(:statsd_client) { instance_double(Statsd) }
let(:prometheus_updater) { instance_double(VCAP::CloudController::Metrics::PrometheusUpdater) }

before do
allow(Locket::LockRunner).to receive(:new).and_return(lock_runner)
allow(Locket::Client).to receive(:new).and_return(client)
allow(Locket::LockWorker).to receive(:new).and_return(lock_worker)
allow(Steno).to receive(:logger).and_return(logger)

Expand All @@ -43,20 +43,18 @@ module VCAP::CloudController
allow(prometheus_updater).to receive(:report_deployment_duration)
end

it 'correctly configures a LockRunner and uses it to initialize a LockWorker' do
it 'correctly configures a Client and uses it to initialize a LockWorker' do
DeploymentUpdater::Scheduler.start

expect(Locket::LockRunner).to have_received(:new).with(
key: TestConfig.config_instance.get(:deployment_updater, :lock_key),
owner: TestConfig.config_instance.get(:deployment_updater, :lock_owner),
expect(Locket::Client).to have_received(:new).with(
host: TestConfig.config_instance.get(:locket, :host),
port: TestConfig.config_instance.get(:locket, :port),
client_ca_path: TestConfig.config_instance.get(:locket, :ca_file),
client_key_path: TestConfig.config_instance.get(:locket, :key_file),
client_cert_path: TestConfig.config_instance.get(:locket, :cert_file),
)

expect(Locket::LockWorker).to have_received(:new).with(lock_runner)
expect(Locket::LockWorker).to have_received(:new).with(client)
end

context 'when locket is not configured' do
Expand All @@ -73,8 +71,8 @@ module VCAP::CloudController
it 'doesnt start any lock machinery' do
DeploymentUpdater::Scheduler.start

expect(Locket::LockRunner).not_to have_received(:new)
expect(Locket::LockWorker).not_to have_received(:new).with(lock_runner)
expect(Locket::Client).not_to have_received(:new)
expect(Locket::LockWorker).not_to have_received(:new).with(client)
end

it 'runs the DeploymentUpdater::Dispatcher sleeps for the configured frequency' do
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
require 'support/paths'
require 'locket/lock_runner'
require 'locket/client'
require 'rspec/wait'

RSpec.describe Locket::LockRunner do
RSpec.describe Locket::Client do
let(:locket_service) { instance_double(Models::Locket::Stub) }
let(:key) { 'lock-key' }
let(:owner) { 'lock-owner' }
Expand All @@ -22,9 +22,7 @@
end

let(:client) do
Locket::LockRunner.new(
key: key,
owner: owner,
Locket::Client.new(
host: host,
port: port,
client_ca_path: client_ca_path,
Expand Down Expand Up @@ -56,19 +54,19 @@
allow(locket_service).to receive(:lock)
allow(client).to receive(:sleep)

client.start
client.start(owner: owner, key: key)

wait_for(locket_service).to have_received(:lock).with(lock_request).at_least(3).times
end

it 'raises an error when restarted after it has already been started' do
allow(locket_service).to receive(:lock)

client.start
client.start(owner: owner, key: key)

expect {
client.start
}.to raise_error(Locket::LockRunner::Error, 'Cannot start more than once')
client.start(owner: owner, key: key)
}.to raise_error(Locket::Client::Error, 'Cannot start more than once')
end
end

Expand All @@ -92,7 +90,7 @@
client.instance_variable_set(:@lock_acquired, true)
allow(locket_service).to receive(:lock).and_raise(error)

client.start
client.start(owner: owner, key: key)

wait_for(fake_logger).to have_received(:debug).with("Failed to acquire lock '#{key}' for owner '#{owner}': #{error.message}").at_least(:once)
wait_for(-> { client.lock_acquired? }).to be(false)
Expand All @@ -103,7 +101,7 @@
it 'reports that it has a lock' do
allow(locket_service).to receive(:lock).and_return(Models::LockResponse)

client.start
client.start(owner: owner, key: key)

wait_for(fake_logger).to have_received(:debug).with("Acquired lock '#{key}' for owner '#{owner}'").at_least(:once)
wait_for(-> { client.lock_acquired? }).to be(true)
Expand Down
26 changes: 14 additions & 12 deletions spec/unit/lib/locket/lock_worker_spec.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
require 'spec_helper'
require 'locket/lock_worker'
require 'locket/lock_runner'
require 'locket/client'

RSpec.describe Locket::LockWorker do
let(:lock_runner) { instance_double(Locket::LockRunner, start: nil, lock_acquired?: nil) }
subject(:lock_worker) { Locket::LockWorker.new(lock_runner) }
let(:client) { instance_double(Locket::Client, start: nil, lock_acquired?: nil) }
let(:key) { 'lock-key' }
let(:owner) { 'lock-owner' }
subject(:lock_worker) { Locket::LockWorker.new(client) }

describe '#acquire_lock_and' do
before do
Expand All @@ -13,32 +15,32 @@
allow(lock_worker).to receive(:sleep) # dont use real time please
end

it 'should start the LockRunner' do
lock_worker.acquire_lock_and_repeatedly_call {}
it 'should start the Client' do
lock_worker.acquire_lock_and_repeatedly_call(owner: owner, key: key, &{})

expect(lock_runner).to have_received(:start)
expect(client).to have_received(:start)
end

describe 'when it does not have the lock' do
it 'does not yield to the block if it does not have the lock' do
allow(lock_runner).to receive(:lock_acquired?).and_return(false)
allow(client).to receive(:lock_acquired?).and_return(false)

expect { |b| lock_worker.acquire_lock_and_repeatedly_call(&b) }.not_to yield_control
expect { |b| lock_worker.acquire_lock_and_repeatedly_call(owner: owner, key: key, &b) }.not_to yield_control
end

it 'sleeps before attempting to check the lock status again' do
allow(lock_runner).to receive(:lock_acquired?).and_return(false)
allow(client).to receive(:lock_acquired?).and_return(false)

lock_worker.acquire_lock_and_repeatedly_call {}
lock_worker.acquire_lock_and_repeatedly_call(owner: owner, key: key, &{})
expect(lock_worker).to have_received(:sleep).with(1)
end
end

describe 'when it does not have the lock' do
it 'yields to the block if it does have the lock' do
allow(lock_runner).to receive(:lock_acquired?).and_return(true)
allow(client).to receive(:lock_acquired?).and_return(true)

expect { |b| lock_worker.acquire_lock_and_repeatedly_call(&b) }.to yield_control
expect { |b| lock_worker.acquire_lock_and_repeatedly_call(owner: owner, key: key, &b) }.to yield_control
end
end
end
Expand Down

0 comments on commit c98d0e0

Please sign in to comment.