diff --git a/.travis.yml b/.travis.yml index 0e17288..357f378 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,16 +1,19 @@ language: ruby rvm: - - 2.4.1 - - 2.2 + - 2.5.3 + - 2.4.5 + - 2.3.8 env: - global: - - ETCD_VERSION=v3.2.0 + - ETCD_VERSION=v3.1.20 + - ETCD_VERSION=v3.2.25 +# v3.3.10 is not working for whatever reason (at least in travis, spec passes +# locally for me) +# - ETCD_VERSION=v3.3.10 install: - bundle install - - wget https://github.com/coreos/etcd/releases/download/$ETCD_VERSION/etcd-$ETCD_VERSION-linux-amd64.tar.gz -O etcd.tar.gz --no-check-certificate - - tar zxvf etcd.tar.gz - - export PATH=$PATH:etcd-$ETCD_VERSION-linux-amd64 + - bundle exec rake download-etcd + - export PATH="$(dirname $(find /tmp -name 'etcd')):$PATH" script: bundle exec rspec diff --git a/README.md b/README.md index 5c8eef0..b723dad 100644 --- a/README.md +++ b/README.md @@ -164,6 +164,23 @@ conn.watch('boom') do |events| end ``` +## Locks +```ruby +# First, get yourself a lease +lease_id = conn.lease_grant(100)['ID'] + +# Attempt to lock distibuted lock 'foo', wait at most 10 seconds +lock_key = conn.lock('foo', lease_id, timeout: 10).key + +# Unlock the 'foo' lock using the key returned from `lock` +conn.unlock(key) + +# Perform a critical section while holding the lock 'hello' +conn.with_lock('hello', lease_id) do + puts "kitty!" +end +``` + ## Alarms ```ruby # List all active Alarms diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..ac1f79b --- /dev/null +++ b/Rakefile @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +ETCD_VERSION = ENV["ETCD_VERSION"] || "v3.2.0" +ETCD_URL = "https://github.com/coreos/etcd/releases/download/#{ETCD_VERSION}/etcd-#{ETCD_VERSION}-linux-amd64.tar.gz" + +require "tmpdir" + +desc "Download etcd for it can be used in rspec" +task :"download-etcd" do + tmpdir = Dir.mktmpdir + system("wget", ETCD_URL, "-O", "#{tmpdir}/etcd.tar.gz") || exit(1) + system(*%W{tar -C #{tmpdir} -zxvf #{tmpdir}/etcd.tar.gz}) || exit(1) + + puts "Etcd downloaded and extracted. Add it to the path:" + puts " export PATH=\"#{tmpdir}/etcd-#{ETCD_VERSION}-linux-amd64:$PATH\"" +end diff --git a/etcdv3.gemspec b/etcdv3.gemspec index 58019a1..7268b1b 100644 --- a/etcdv3.gemspec +++ b/etcdv3.gemspec @@ -14,6 +14,8 @@ Gem::Specification.new do |s| s.files = `git ls-files`.split("\n") s.test_files = `git ls-files -- {test,spec,features}/*`.split("\n") - s.add_dependency("grpc", "~> 1.6.0") - s.add_development_dependency("rspec", "~> 3.6.0") + s.add_dependency("grpc", "~> 1.17") + s.add_development_dependency("pry-byebug", "~> 3.6") + s.add_development_dependency("rake", "~> 12.3") + s.add_development_dependency("rspec", "~> 3.6") end diff --git a/lib/etcdv3.rb b/lib/etcdv3.rb index c520335..4b0a11d 100644 --- a/lib/etcdv3.rb +++ b/lib/etcdv3.rb @@ -2,6 +2,7 @@ require 'uri' require 'etcdv3/etcdrpc/rpc_services_pb' +require 'etcdv3/etcdrpc/v3lock_services_pb' require 'etcdv3/auth' require 'etcdv3/kv/requests' require 'etcdv3/kv/transaction' @@ -9,6 +10,7 @@ require 'etcdv3/maintenance' require 'etcdv3/lease' require 'etcdv3/watch' +require 'etcdv3/lock' require 'etcdv3/connection' require 'etcdv3/connection_wrapper' @@ -84,6 +86,37 @@ def get(key, opts={}) @conn.handle(:kv, 'get', [key, opts]) end + # Locks distributed lock with the given name. The lock will unlock automatically + # when lease with the given ID expires. If this is not desirable, provide a non-expiring + # lease ID as an argument. + # name - string + # lease_id - integer + # optional :timeout - integer + def lock(name, lease_id, timeout: nil) + @conn.handle(:lock, 'lock', [name, lease_id, {timeout: timeout}]) + end + + # Unlock distributed lock using the key previously obtained from lock. + # key - string + # optional :timeout - integer + def unlock(key, timeout: nil) + @conn.handle(:lock, 'unlock', [key, {timeout: timeout}]) + end + + # Yield into the critical section while holding lock with the given + # name. The lock will be unlocked even if the block throws. + # name - string + # lease_id - integer + # optional :timeout - integer + def with_lock(name, lease_id, timeout: nil) + key = lock(name, lease_id, timeout: timeout).key + begin + yield + ensure + unlock(key, timeout: timeout) + end + end + # Inserts a new key. # key - string # value - string diff --git a/lib/etcdv3/auth.rb b/lib/etcdv3/auth.rb index 315fb4d..0167870 100644 --- a/lib/etcdv3/auth.rb +++ b/lib/etcdv3/auth.rb @@ -1,6 +1,7 @@ class Etcdv3 class Auth + include GRPC::Core::TimeConsts PERMISSIONS = { :read => Authpb::Permission::Type::READ, @@ -122,7 +123,7 @@ def generate_token(user, password, timeout: nil) private def deadline(timeout) - Time.now.to_f + (timeout || @timeout) + from_relative_time(timeout || @timeout) end end end diff --git a/lib/etcdv3/connection.rb b/lib/etcdv3/connection.rb index f925399..634d20d 100644 --- a/lib/etcdv3/connection.rb +++ b/lib/etcdv3/connection.rb @@ -6,7 +6,8 @@ class Connection kv: Etcdv3::KV, maintenance: Etcdv3::Maintenance, lease: Etcdv3::Lease, - watch: Etcdv3::Watch + watch: Etcdv3::Watch, + lock: Etcdv3::Lock, } attr_reader :endpoint, :hostname, :handlers, :credentials diff --git a/lib/etcdv3/etcdrpc/annotations_pb.rb b/lib/etcdv3/etcdrpc/annotations_pb.rb new file mode 100644 index 0000000..5d20070 --- /dev/null +++ b/lib/etcdv3/etcdrpc/annotations_pb.rb @@ -0,0 +1,13 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: annotations.proto + +require 'google/protobuf' + +#require 'http_pb' +Google::Protobuf::DescriptorPool.generated_pool.build do +end + +module Google + module Api + end +end diff --git a/lib/etcdv3/etcdrpc/rpc_pb.rb b/lib/etcdv3/etcdrpc/rpc_pb.rb index 61eb671..8d5f135 100644 --- a/lib/etcdv3/etcdrpc/rpc_pb.rb +++ b/lib/etcdv3/etcdrpc/rpc_pb.rb @@ -5,6 +5,7 @@ require_relative 'kv_pb' require_relative 'auth_pb' +require_relative 'annotations_pb' Google::Protobuf::DescriptorPool.generated_pool.build do add_message "etcdserverpb.ResponseHeader" do optional :cluster_id, :uint64, 1 diff --git a/lib/etcdv3/etcdrpc/rpc_services_pb.rb b/lib/etcdv3/etcdrpc/rpc_services_pb.rb index 7780ab7..40e8501 100644 --- a/lib/etcdv3/etcdrpc/rpc_services_pb.rb +++ b/lib/etcdv3/etcdrpc/rpc_services_pb.rb @@ -1,5 +1,7 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # Source: rpc.proto for package 'etcdserverpb' + +require 'grpc' require_relative 'rpc_pb' module Etcdserverpb diff --git a/lib/etcdv3/etcdrpc/v3lock_pb.rb b/lib/etcdv3/etcdrpc/v3lock_pb.rb new file mode 100644 index 0000000..ea9b59d --- /dev/null +++ b/lib/etcdv3/etcdrpc/v3lock_pb.rb @@ -0,0 +1,30 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: v3lock.proto + +require 'google/protobuf' + +require_relative 'annotations_pb' +require_relative 'rpc_pb' +Google::Protobuf::DescriptorPool.generated_pool.build do + add_message "v3lockpb.LockRequest" do + optional :name, :bytes, 1 + optional :lease, :int64, 2 + end + add_message "v3lockpb.LockResponse" do + optional :header, :message, 1, "etcdserverpb.ResponseHeader" + optional :key, :bytes, 2 + end + add_message "v3lockpb.UnlockRequest" do + optional :key, :bytes, 1 + end + add_message "v3lockpb.UnlockResponse" do + optional :header, :message, 1, "etcdserverpb.ResponseHeader" + end +end + +module V3lockpb + LockRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("v3lockpb.LockRequest").msgclass + LockResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("v3lockpb.LockResponse").msgclass + UnlockRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("v3lockpb.UnlockRequest").msgclass + UnlockResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("v3lockpb.UnlockResponse").msgclass +end diff --git a/lib/etcdv3/etcdrpc/v3lock_services_pb.rb b/lib/etcdv3/etcdrpc/v3lock_services_pb.rb new file mode 100644 index 0000000..2c92448 --- /dev/null +++ b/lib/etcdv3/etcdrpc/v3lock_services_pb.rb @@ -0,0 +1,25 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# Source: v3lock.proto for package 'v3lockpb' + +require 'grpc' +require_relative 'v3lock_pb' + +module V3lockpb + module Lock + class Service + + include GRPC::GenericService + + self.marshal_class_method = :encode + self.unmarshal_class_method = :decode + self.service_name = 'v3lockpb.Lock' + + # Lock acquires a distributed shared lock on a given named lock. + rpc :Lock, LockRequest, LockResponse + # Unlock takes a key returned by Lock and releases the hold on lock. + rpc :Unlock, UnlockRequest, UnlockResponse + end + + Stub = Service.rpc_stub_class + end +end diff --git a/lib/etcdv3/kv.rb b/lib/etcdv3/kv.rb index afd4693..ba1ebdb 100644 --- a/lib/etcdv3/kv.rb +++ b/lib/etcdv3/kv.rb @@ -2,6 +2,7 @@ class Etcdv3 class KV include Etcdv3::KV::Requests + include GRPC::Core::TimeConsts def initialize(hostname, credentials, timeout, metadata={}) @stub = Etcdserverpb::KV::Stub.new(hostname, credentials) @@ -36,7 +37,7 @@ def transaction(block, timeout: nil) private def deadline(timeout) - Time.now.to_f + (timeout || @timeout) + from_relative_time(timeout || @timeout) end def generate_request_ops(requests) diff --git a/lib/etcdv3/lease.rb b/lib/etcdv3/lease.rb index 37c64ff..462c3f6 100644 --- a/lib/etcdv3/lease.rb +++ b/lib/etcdv3/lease.rb @@ -1,5 +1,7 @@ class Etcdv3 class Lease + include GRPC::Core::TimeConsts + def initialize(hostname, credentials, timeout, metadata={}) @stub = Etcdserverpb::Lease::Stub.new(hostname, credentials) @timeout = timeout @@ -31,7 +33,7 @@ def lease_keep_alive_once(id, timeout: nil) private def deadline(timeout) - Time.now.to_f + (timeout || @timeout) + from_relative_time(timeout || @timeout) end end end diff --git a/lib/etcdv3/lock.rb b/lib/etcdv3/lock.rb new file mode 100644 index 0000000..a439d61 --- /dev/null +++ b/lib/etcdv3/lock.rb @@ -0,0 +1,27 @@ +class Etcdv3 + class Lock + include GRPC::Core::TimeConsts + + def initialize(hostname, credentials, timeout, metadata = {}) + @stub = V3lockpb::Lock::Stub.new(hostname, credentials) + @timeout = timeout + @metadata = metadata + end + + def lock(name, lease_id, timeout: nil) + request = V3lockpb::LockRequest.new(name: name, lease: lease_id) + @stub.lock(request, deadline: deadline(timeout)) + end + + def unlock(key, timeout: nil) + request = V3lockpb::UnlockRequest.new(key: key) + @stub.unlock(request, deadline: deadline(timeout)) + end + + private + + def deadline(timeout) + from_relative_time(timeout || @timeout) + end + end +end diff --git a/lib/etcdv3/protos/v3lock.proto b/lib/etcdv3/protos/v3lock.proto new file mode 100644 index 0000000..92ab1d3 --- /dev/null +++ b/lib/etcdv3/protos/v3lock.proto @@ -0,0 +1,55 @@ +syntax = "proto3"; +package v3lockpb; + +import "annotations.proto"; +import "rpc.proto"; +import "gogo.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; + +service Lock { + // Lock acquires a distributed shared lock on a given named lock. + rpc Lock(LockRequest) returns (LockResponse) { + option (google.api.http) = { + post: "/v3alpha/lock/lock" + body: "*" + }; + } + + // Unlock takes a key returned by Lock and releases the hold on lock. + rpc Unlock(UnlockRequest) returns (UnlockResponse) { + option (google.api.http) = { + post: "/v3alpha/lock/unlock" + body: "*" + }; + } +} + +message LockRequest { + // name is the identifier for the distributed shared lock to be acquired. + bytes name = 1; + // lease is the ID of the lease that will be attached to ownership of the + // lock. If the lease expires or is revoked and currently holds the lock, + // the lock is automatically released. Calls to Lock with the same lease will + // be treated as a single acquisition; locking twice with the same lease is a + // no-op. + int64 lease = 2; +} + +message LockResponse { + etcdserverpb.ResponseHeader header = 1; + // key is a key that will exist on etcd for the duration that the Lock caller + // owns the lock. Users should not modify this key or the lock may exhibit + // undefined behavior. + bytes key = 2; +} + +message UnlockRequest { + // key is the lock ownership key granted by Lock. + bytes key = 1; +} + +message UnlockResponse { + etcdserverpb.ResponseHeader header = 1; +} diff --git a/spec/etcdv3/lock_spec.rb b/spec/etcdv3/lock_spec.rb new file mode 100644 index 0000000..39ac574 --- /dev/null +++ b/spec/etcdv3/lock_spec.rb @@ -0,0 +1,23 @@ +require 'spec_helper' + +# Locking is not implemented in etcd v3.1.X +unless $instance.version < Gem::Version.new("3.2.0") + describe Etcdv3::Lock do + let(:stub) { local_stub(Etcdv3::Lock, 1) } + let(:lease_stub) { local_stub(Etcdv3::Lease, 1) } + + it_should_behave_like "a method with a GRPC timeout", described_class, :unlock, :unlock, 'foo' + #it_should_behave_like "a method with a GRPC timeout", described_class, :lock, :lock, 'foo' + + describe '#lock' do + let(:lease_id) { lease_stub.lease_grant(10)['ID'] } + subject { stub.lock('foo', lease_id) } + it { is_expected.to be_an_instance_of(V3lockpb::LockResponse) } + end + + describe '#unlock' do + subject { stub.unlock('foo') } + it { is_expected.to be_an_instance_of(V3lockpb::UnlockResponse) } + end + end +end diff --git a/spec/etcdv3_spec.rb b/spec/etcdv3_spec.rb index 60858fd..8e0a50f 100644 --- a/spec/etcdv3_spec.rb +++ b/spec/etcdv3_spec.rb @@ -1,8 +1,9 @@ require 'spec_helper' describe Etcdv3 do - context 'Insecure connection without Auth' do + let(:lease_stub) { local_stub(Etcdv3::Lease, 1) } + context 'Insecure connection without Auth' do let(:conn) { local_connection } describe '#initialize' do @@ -103,6 +104,26 @@ it_should_behave_like "Etcdv3 instance using a timeout", :get, 'apple' end + # Locking is not implemented in etcd v3.1.X + unless $instance.version < Gem::Version.new("3.2.0") + describe '#lock' do + let(:lease_id) { lease_stub.lease_grant(10)['ID'] } + subject { conn.lock('bar', lease_id) } + it { is_expected.to be_an_instance_of(V3lockpb::LockResponse) } + end + + describe '#with_lock' do + let(:lease_id) { lease_stub.lease_grant(10)['ID'] } + let(:lease_id_2) { lease_stub.lease_grant(15)['ID'] } + it 'locks' do + conn.with_lock('foobar', lease_id) do + expect { conn.lock('foobar', lease_id_2, timeout: 0.1) } + .to raise_error(GRPC::DeadlineExceeded) + end + end + end + end + describe '#put' do subject { conn.put('test', 'value') } it { is_expected.to_not be_nil } diff --git a/spec/helpers/shared_examples_for_timeout.rb b/spec/helpers/shared_examples_for_timeout.rb index ad06437..03f49b2 100644 --- a/spec/helpers/shared_examples_for_timeout.rb +++ b/spec/helpers/shared_examples_for_timeout.rb @@ -1,22 +1,23 @@ shared_examples_for "a method with a GRPC timeout" do |stub_class, method_name, expectation_target, *args| + include GRPC::Core::TimeConsts + context "#{stub_class} timeouts for #{method_name}" do let(:handler) { local_stub(stub_class, 5) } let(:client_stub) { handler.instance_variable_get "@stub"} it 'uses the timeout value' do - start_time = Time.now - deadline_time = start_time.to_f + 5 - allow(Time).to receive(:now).and_return(start_time) + deadline = from_relative_time(5) + allow(handler).to receive(:deadline).with(nil).and_return(deadline) + allow(handler).to receive(:deadline).with(5).and_return(deadline) - expect(client_stub).to receive(expectation_target).with(anything, hash_including(deadline: deadline_time)).and_call_original + expect(client_stub).to receive(expectation_target).with(anything, hash_including(deadline: deadline)).and_call_original handler.public_send(method_name, *args) end it "can have a seperate timeout passed in" do - start_time = Time.now - deadline_time = start_time.to_f + 1 - allow(Time).to receive(:now).and_return(start_time) - expect(client_stub).to receive(expectation_target).with(anything, hash_including(deadline: deadline_time)).and_call_original + deadline = from_relative_time(1) + allow(handler).to receive(:deadline).with(1).and_return(deadline) + expect(client_stub).to receive(expectation_target).with(anything, hash_including(deadline: deadline)).and_call_original handler.public_send(method_name, *args, timeout: 1) end diff --git a/spec/helpers/test_instance.rb b/spec/helpers/test_instance.rb index 929d06c..b6bf525 100644 --- a/spec/helpers/test_instance.rb +++ b/spec/helpers/test_instance.rb @@ -13,6 +13,8 @@ class PortInUseException < StandardError; end MINIMUM_VERSION = Gem::Version.new('3.0.0') + attr_accessor :version + def initialize @pids = [] @tmpdir = Dir.mktmpdir diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 9e86231..49d4005 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -11,6 +11,8 @@ require 'helpers/connections' require 'helpers/shared_examples_for_timeout' +$instance = Helpers::TestInstance.new + RSpec.configure do |config| config.include(Helpers::Connections) @@ -22,12 +24,11 @@ end config.shared_context_metadata_behavior = :apply_to_host_groups - instance = Helpers::TestInstance.new config.before(:suite) do - # $stderr = File.open(File::NULL, "w") - instance.start + $stderr = File.open(File::NULL, "w") + $instance.start end config.after(:suite) do - instance.stop + $instance.stop end end