Skip to content

Commit

Permalink
Use Redis for rate limiters
Browse files Browse the repository at this point in the history
- Moves current implementation to ExpiringRequestCounter::InMemoryStore
  and ConcurrentRequestCounter::InMemoryStore.
- Adds ExpiringRequestCounter::RedisStore and ConcurrentRequestCounter::
  RedisStore.
- Automatically uses the Redis-based rate limiter when 'redis.socket' is
  configured.

Co-authored-by: Johannes Haass <[email protected]>
  • Loading branch information
philippthun and johha committed Jul 3, 2023
1 parent 1750fec commit 4539e59
Show file tree
Hide file tree
Showing 8 changed files with 329 additions and 91 deletions.
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ gem 'kubeclient'
gem 'loggregator_emitter', '~> 5.0'
gem 'membrane', '~> 1.0'
gem 'mime-types', '~> 3.4'
gem 'mock_redis'
gem 'multi_json'
gem 'multipart-parser'
gem 'net-ssh'
Expand All @@ -31,6 +32,7 @@ gem 'posix-spawn', '~> 0.3.15'
gem 'public_suffix'
gem 'psych', '>= 4.0.4'
gem 'rake'
gem 'redis'
gem 'retryable'
gem 'rfc822'
gem 'rubyzip', '>= 1.3.0'
Expand Down
9 changes: 9 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ GEM
simplecov (<= 0.13)
coderay (1.1.3)
concurrent-ruby (1.2.2)
connection_pool (2.3.0)
cookiejar (0.3.3)
crack (0.4.5)
rexml
Expand Down Expand Up @@ -308,6 +309,8 @@ GEM
mini_mime (1.1.2)
mini_portile2 (2.8.2)
minitest (5.18.1)
mock_redis (0.34.0)
ruby2_keywords
ms_rest (0.6.4)
concurrent-ruby (~> 1.0)
faraday (~> 0.9)
Expand Down Expand Up @@ -385,6 +388,10 @@ GEM
ffi (~> 1.0)
rbs (2.8.4)
recursive-open-struct (1.1.3)
redis (5.0.5)
redis-client (>= 0.9.0)
redis-client (0.11.1)
connection_pool
regexp_parser (2.8.1)
representable (3.2.0)
declarative (< 0.1.0)
Expand Down Expand Up @@ -580,6 +587,7 @@ DEPENDENCIES
machinist (~> 1.0.6)
membrane (~> 1.0)
mime-types (~> 3.4)
mock_redis
multi_json
multipart-parser
mysql2 (~> 0.5.5)
Expand All @@ -600,6 +608,7 @@ DEPENDENCIES
rack-test
railties (~> 6.1.7, >= 6.1.7.3)
rake
redis
retryable
rfc822
roodi
Expand Down
4 changes: 4 additions & 0 deletions lib/cloud_controller/config_schemas/base/api_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ class ApiSchema < VCAP::Config
optional(:ca_cert_path) => String,
},

optional(:redis) => {
socket: String
},

staging: {
timeout_in_seconds: Integer,
minimum_staging_memory_mb: Integer,
Expand Down
64 changes: 53 additions & 11 deletions middleware/base_rate_limiter.rb
Original file line number Diff line number Diff line change
@@ -1,30 +1,72 @@
require 'mixins/client_ip'
require 'mixins/user_reset_interval'
require 'redis'

module CloudFoundry
module Middleware
class ExpiringRequestCounter
include CloudFoundry::Middleware::UserResetInterval

Counter = Struct.new(:value, :expires_at)

def initialize(key_prefix)
def initialize(key_prefix, redis_connection_pool_size: nil)
@key_prefix = key_prefix
@mutex = Mutex.new
@data = {}
@redis_connection_pool_size = redis_connection_pool_size
end

def increment(user_guid, reset_interval_in_minutes, logger)
key = "#{@key_prefix}:#{user_guid}"
expires_in = next_expires_in(user_guid, reset_interval_in_minutes)
@mutex.synchronize do
if !@data.key?(key) || (ttl = @data[key].expires_at - Time.now.to_i) <= 0 # not existing or expired
@data[key] = Counter.new(1, Time.now.to_i + expires_in)
[1, expires_in]
else
[@data[key].value += 1, ttl]
store.increment(key, expires_in, logger)
end

private

def store
return @store if defined?(@store)

redis_socket = VCAP::CloudController::Config.config.get(:redis, :socket)
@store = redis_socket.nil? ? InMemoryStore.new : RedisStore.new(redis_socket, @redis_connection_pool_size)
end

class InMemoryStore
Counter = Struct.new(:value, :expires_at)

def initialize
@mutex = Mutex.new
@data = {}
end

def increment(key, expires_in, _)
@mutex.synchronize do
if !@data.key?(key) || (ttl = @data[key].expires_at - Time.now.to_i) <= 0 # not existing or expired
@data[key] = Counter.new(1, Time.now.to_i + expires_in)
[1, expires_in]
else
[@data[key].value += 1, ttl]
end
end
end
end

class RedisStore
def initialize(socket, connection_pool_size)
connection_pool_size ||= VCAP::CloudController::Config.config.get(:puma, :max_threads) || 1
@redis = ConnectionPool::Wrapper.new(size: connection_pool_size) do
Redis.new(timeout: 1, path: socket)
end
end

def increment(key, expires_in, logger)
_, count_str, ttl_int = @redis.multi do |transaction|
transaction.set(key, 0, ex: expires_in, nx: true) # nx => set only if not exists
transaction.incr(key)
transaction.ttl(key)
end

[count_str.to_i, ttl_int]
rescue Redis::BaseError => e
logger.error("Redis error: #{e.inspect}")
[1, expires_in]
end
end
end

Expand Down
69 changes: 60 additions & 9 deletions middleware/service_broker_rate_limiter.rb
Original file line number Diff line number Diff line change
@@ -1,26 +1,77 @@
require 'concurrent-ruby'
require 'redis'

module CloudFoundry
module Middleware
class ConcurrentRequestCounter
def initialize(key_prefix)
def initialize(key_prefix, redis_connection_pool_size: nil)
@key_prefix = key_prefix
@mutex = Mutex.new
@data = {}
@redis_connection_pool_size = redis_connection_pool_size
end

def try_increment?(user_guid, max_concurrent_requests, logger)
key = "#{@key_prefix}:#{user_guid}"
@mutex.synchronize do
@data[key] = Concurrent::Semaphore.new(max_concurrent_requests) unless @data.key?(key)
@data[key].try_acquire
end
store.try_increment?(key, max_concurrent_requests, logger)
end

def decrement(user_guid, logger)
key = "#{@key_prefix}:#{user_guid}"
@mutex.synchronize do
@data[key].release if @data.key?(key)
store.decrement(key, logger)
end

private

def store
return @store if defined?(@store)

redis_socket = VCAP::CloudController::Config.config.get(:redis, :socket)
@store = redis_socket.nil? ? InMemoryStore.new : RedisStore.new(redis_socket, @redis_connection_pool_size)
end

class InMemoryStore
def initialize
@mutex = Mutex.new
@data = {}
end

def try_increment?(key, max_concurrent_requests, _)
@mutex.synchronize do
@data[key] = Concurrent::Semaphore.new(max_concurrent_requests) unless @data.key?(key)
@data[key].try_acquire
end
end

def decrement(key, _)
@mutex.synchronize do
@data[key].release if @data.key?(key)
end
end
end

class RedisStore
def initialize(socket, connection_pool_size)
connection_pool_size ||= VCAP::CloudController::Config.config.get(:puma, :max_threads) || 1
@redis = ConnectionPool::Wrapper.new(size: connection_pool_size) do
Redis.new(timeout: 1, path: socket)
end
end

def try_increment?(key, max_concurrent_requests, logger)
count_str = @redis.incr(key)
return true if count_str.to_i <= max_concurrent_requests

@redis.decr(key)
false
rescue Redis::BaseError => e
logger.error("Redis error: #{e.inspect}")
true
end

def decrement(key, logger)
count_str = @redis.decr(key)
@redis.incr(key) if count_str.to_i < 0
rescue Redis::BaseError => e
logger.error("Redis error: #{e.inspect}")
end
end
end
Expand Down
4 changes: 4 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
SPEC_HELPER_LOADED = true
require 'rubygems'
require 'mock_redis'

begin
require 'spork'
Expand Down Expand Up @@ -169,6 +170,9 @@

VCAP::CloudController::SecurityContext.clear
allow_any_instance_of(VCAP::CloudController::UaaTokenDecoder).to receive(:uaa_issuer).and_return(UAAIssuer::ISSUER)

mock_redis = MockRedis.new
allow(Redis).to receive(:new).and_return(mock_redis)
end

rspec_config.around :each do |example|
Expand Down
118 changes: 87 additions & 31 deletions spec/unit/middleware/rate_limiter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -323,42 +323,98 @@ module Middleware
end

RSpec.describe ExpiringRequestCounter do
let(:expiring_request_counter) { ExpiringRequestCounter.new('test') }
let(:stubbed_expires_in) { 30.minutes.to_i }
let(:user_guid) { SecureRandom.uuid }
let(:reset_interval_in_minutes) { 60 }
let(:logger) { double('logger') }

before do
allow(expiring_request_counter).to receive(:next_expires_in).and_return(stubbed_expires_in)
end
store_implementations = []
store_implementations << :in_memory # Test the ExpiringRequestCounter::InMemoryStore
store_implementations << :mock_redis # Test the ExpiringRequestCounter::RedisStore with MockRedis

store_implementations.each do |store_implementation|
describe store_implementation do
let(:expiring_request_counter) { ExpiringRequestCounter.new('test') }
let(:stubbed_expires_in) { 30.minutes.to_i }
let(:store) { expiring_request_counter.instance_variable_get(:@store) }
let(:user_guid) { SecureRandom.uuid }
let(:reset_interval_in_minutes) { 60 }
let(:logger) { double('logger') }

describe '#initialize' do
it 'sets the @key_prefix' do
expect(expiring_request_counter.instance_variable_get(:@key_prefix)).to eq('test')
end
end
before do
TestConfig.override(redis: { socket: 'foo' }, puma: { max_threads: 123 }) unless store_implementation == :in_memory

describe '#increment' do
it 'calls next_expires_in with the given user guid and reset interval' do
expiring_request_counter.increment(user_guid, reset_interval_in_minutes, logger)
expect(expiring_request_counter).to have_received(:next_expires_in).with(user_guid, reset_interval_in_minutes)
end
allow(ConnectionPool::Wrapper).to receive(:new).and_call_original
expiring_request_counter.send(:store) # instantiate counter and store implementation
allow(expiring_request_counter).to receive(:next_expires_in).and_return(stubbed_expires_in)
end

it 'returns count=1 and expires_in for a new user' do
count, expires_in = expiring_request_counter.increment(user_guid, reset_interval_in_minutes, logger)
expect(count).to eq(1)
expect(expires_in).to eq(stubbed_expires_in)
end
describe '#initialize' do
it 'sets the @key_prefix' do
expect(expiring_request_counter.instance_variable_get(:@key_prefix)).to eq('test')
end

it 'returns count=2 and expires_in minus the elapsed time for a recurring user' do
expiring_request_counter.increment(user_guid, reset_interval_in_minutes, logger)
it 'instantiates the appropriate store class' do
if store_implementation == :in_memory
expect(store).to be_kind_of(ExpiringRequestCounter::InMemoryStore)
else
expect(store).to be_kind_of(ExpiringRequestCounter::RedisStore)
end
end

it 'uses a connection pool size that equals the maximum puma threads' do
skip('Not relevant for InMemoryStore') if store_implementation == :in_memory

expect(ConnectionPool::Wrapper).to have_received(:new).with(size: 123)
end

elapsed_seconds = 10
Timecop.travel(Time.now + elapsed_seconds.seconds) do
count, expires_in = expiring_request_counter.increment(user_guid, reset_interval_in_minutes, logger)
expect(count).to eq(2)
expect(expires_in).to eq(stubbed_expires_in - elapsed_seconds)
context 'with custom connection pool size' do
let(:expiring_request_counter) { ExpiringRequestCounter.new('test', redis_connection_pool_size: 456) }

it 'uses the provided connection pool size' do
skip('Not relevant for InMemoryStore') if store_implementation == :in_memory

expect(ConnectionPool::Wrapper).to have_received(:new).with(size: 456)
end
end
end

describe '#increment' do
it 'calls next_expires_in with the given user guid and reset interval' do
expiring_request_counter.increment(user_guid, reset_interval_in_minutes, logger)
expect(expiring_request_counter).to have_received(:next_expires_in).with(user_guid, reset_interval_in_minutes)
end

it 'calls @store.increment with the prefixed user guid, stubbed expires_in and given logger' do
allow(store).to receive(:increment).and_call_original
expiring_request_counter.increment(user_guid, reset_interval_in_minutes, logger)
expect(store).to have_received(:increment).with("test:#{user_guid}", stubbed_expires_in, logger)
end

it 'returns count=1 and expires_in for a new user' do
count, expires_in = expiring_request_counter.increment(user_guid, reset_interval_in_minutes, logger)
expect(count).to eq(1)
expect(expires_in).to eq(stubbed_expires_in)
end

it 'returns count=2 and expires_in minus the elapsed time for a recurring user' do
expiring_request_counter.increment(user_guid, reset_interval_in_minutes, logger)

elapsed_seconds = 10
Timecop.travel(Time.now + elapsed_seconds.seconds) do
count, expires_in = expiring_request_counter.increment(user_guid, reset_interval_in_minutes, logger)
expect(count).to eq(2)
expect(expires_in).to eq(stubbed_expires_in - elapsed_seconds)
end
end

it 'returns count=1 and expires_in in case of a Redis error' do
skip('Not relevant for InMemoryStore') if store_implementation == :in_memory

allow_any_instance_of(MockRedis::TransactionWrapper).to receive(:multi).and_raise(Redis::ConnectionError)
allow_any_instance_of(Redis).to receive(:multi).and_raise(Redis::ConnectionError)
allow(logger).to receive(:error)

count, expires_in = expiring_request_counter.increment(user_guid, reset_interval_in_minutes, logger)
expect(count).to eq(1)
expect(expires_in).to eq(stubbed_expires_in)
expect(logger).to have_received(:error).with(/Redis error/)
end
end
end
end
Expand Down
Loading

0 comments on commit 4539e59

Please sign in to comment.