Skip to content

Commit

Permalink
Use redis 5.2.0 API
Browse files Browse the repository at this point in the history
  • Loading branch information
jlledom committed May 8, 2024
1 parent 5125eed commit 9fe39fe
Show file tree
Hide file tree
Showing 17 changed files with 82 additions and 81 deletions.
4 changes: 2 additions & 2 deletions lib/3scale/backend/alert_limit.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class AlertLimit
attr_accessor :service_id, :value

def save
storage.sadd(key_allowed_set(service_id), value.to_i) if valid?
storage.sadd?(key_allowed_set(service_id), value.to_i) if valid?
end

def to_hash
Expand All @@ -32,7 +32,7 @@ def self.save(service_id, value)
end

def self.delete(service_id, value)
storage.srem(key_allowed_set(service_id), value.to_i) if valid_value?(value)
storage.srem?(key_allowed_set(service_id), value.to_i) if valid_value?(value)
end

def self.valid_value?(value)
Expand Down
12 changes: 6 additions & 6 deletions lib/3scale/backend/alerts.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ def update_utilization(service_id, app_id, max_utilization, max_record, timestam

keys = alert_keys(service_id, app_id, discrete)

already_alerted, allowed = storage.pipelined do
storage.get(keys[:already_notified])
storage.sismember(keys[:allowed], discrete)
already_alerted, allowed = storage.pipelined do |pipeline|
pipeline.get(keys[:already_notified])
pipeline.sismember(keys[:allowed], discrete)
end

if already_alerted.nil? && allowed && discrete.to_i > 0
next_id, _ = storage.pipelined do
storage.incr(keys[:current_id])
storage.setex(keys[:already_notified], ALERT_TTL, "1")
next_id, _ = storage.pipelined do |pipeline|
pipeline.incr(keys[:current_id])
pipeline.setex(keys[:already_notified], ALERT_TTL, "1")
end

alert = { :id => next_id,
Expand Down
40 changes: 20 additions & 20 deletions lib/3scale/backend/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def extract_id!(service_id, app_id, user_key)
end

def exists?(service_id, id)
storage.exists(storage_key(service_id, id, :state))
storage.exists?(storage_key(service_id, id, :state))
end
memoize :exists?

Expand All @@ -102,9 +102,9 @@ def delete(service_id, id)
end

def delete_data(service_id, id)
storage.pipelined do
delete_set(service_id, id)
delete_attributes(service_id, id)
storage.pipelined do |pipeline|
delete_set(pipeline, service_id, id)
delete_attributes(pipeline, service_id, id)
end
end

Expand Down Expand Up @@ -137,12 +137,12 @@ def id_by_key_storage_key(service_id, key)
encode_key("application/service_id:#{service_id}/key:#{key}/id")
end

def delete_set(service_id, id)
storage.srem(applications_set_key(service_id), id)
def delete_set(client, service_id, id)
client.srem(applications_set_key(service_id), id)
end

def delete_attributes(service_id, id)
storage.del(
def delete_attributes(client, service_id, id)
client.del(
ATTRIBUTES.map do |f|
storage_key(service_id, id, f)
end
Expand All @@ -166,9 +166,9 @@ def with_app_id_from_params(service_id, app_id, user_key)
def save
raise ApplicationHasNoState.new(id) if !state

storage.pipelined do
persist_attributes
persist_set
storage.pipelined do |pipeline|
persist_attributes(pipeline)
persist_set(pipeline)
end

self.class.clear_cache(service_id, id)
Expand Down Expand Up @@ -260,7 +260,7 @@ def create_key(value = nil)
def delete_key(value)
db_key = storage_key(:keys)
invalidate_cache([:smembers, :scard, :sismember], db_key)
storage.srem(db_key, value)
storage.srem?(db_key, value)
end

def has_keys?
Expand All @@ -279,7 +279,7 @@ def has_key?(value)
db_key = storage_key(:keys)
key = Memoizer.build_key(self.class, :sismember, db_key, value)
Memoizer.memoize_block(key) do
storage.sismember(db_key, value)
storage.sismember(db_key, value.to_s)
end
end

Expand Down Expand Up @@ -319,15 +319,15 @@ def has_referrer_filters?

private

def persist_attributes
storage.set(storage_key(:state), state.to_s) if state
storage.set(storage_key(:plan_id), plan_id) if plan_id
storage.set(storage_key(:plan_name), plan_name) if plan_name
storage.set(storage_key(:redirect_url), redirect_url) if redirect_url
def persist_attributes(client)
client.set(storage_key(:state), state.to_s) if state
client.set(storage_key(:plan_id), plan_id) if plan_id
client.set(storage_key(:plan_name), plan_name) if plan_name
client.set(storage_key(:redirect_url), redirect_url) if redirect_url
end

def persist_set
storage.sadd(applications_set_key(service_id), id)
def persist_set(client)
client.sadd(applications_set_key(service_id), id)
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/3scale/backend/application_events.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ def self.generate(applications)
private

def self.first_traffic(service_id, application_id)
if storage.sadd(Stats::Keys.set_of_apps_with_traffic(service_id),
encode_key(application_id))
if storage.sadd?(Stats::Keys.set_of_apps_with_traffic(service_id),
encode_key(application_id))
EventStorage.store(:first_traffic,
{ service_id: service_id,
application_id: application_id,
Expand Down
10 changes: 5 additions & 5 deletions lib/3scale/backend/event_storage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ def delete(id)
(id > 0) ? storage.zremrangebyscore(events_queue_key, id, id) : 0
end

def size
storage.zcard(events_queue_key)
def size(strg = storage)
strg.zcard(events_queue_key)
end

def ping_if_not_empty
Expand Down Expand Up @@ -90,9 +90,9 @@ def events_hook_uri
def pending_ping?
## the queue is not empty and more than timeout has passed
## since the front-end was notified
events_set_size, can_ping = storage.pipelined do
size
storage.set(events_ping_key, '1'.freeze, ex: PING_TTL, nx: true)
events_set_size, can_ping = storage.pipelined do |pipeline|
size(pipeline)
pipeline.set(events_ping_key, '1'.freeze, ex: PING_TTL, nx: true)
end

can_ping && events_set_size > 0
Expand Down
32 changes: 16 additions & 16 deletions lib/3scale/backend/metric.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ def id_set_key(service_id)

def save
old_name = self.class.load_name(service_id, id)
storage.pipelined do
save_attributes
save_to_list
storage.pipelined do |pipeline|
save_attributes(pipeline)
save_to_list(pipeline)
remove_reverse_mapping(service_id, old_name) if old_name != name
end

Expand Down Expand Up @@ -196,12 +196,12 @@ def delete(service_id, id)
return false unless name and not name.empty?
clear_cache(service_id, id, name)

storage.pipelined do
storage.srem(id_set_key(service_id), id)
storage.pipelined do |pipeline|
pipeline.srem(id_set_key(service_id), id)

storage.del(key(service_id, id, :name),
key(service_id, id, :parent_id),
id_key(service_id, name))
pipeline.del(key(service_id, id, :name),
key(service_id, id, :parent_id),
id_key(service_id, name))
end

true
Expand All @@ -223,9 +223,9 @@ def hierarchy_ids(service_id)
ids = load_all_ids(service_id)
parent_ids_keys = ids.map { |id| key(service_id, id, :parent_id) }

parent_ids = storage.pipelined do
parent_ids = storage.pipelined do |pipeline|
parent_ids_keys.each_slice(PIPELINED_SLICE_SIZE).map do |slice|
storage.mget(slice)
pipeline.mget(slice)
end
end.flatten

Expand All @@ -247,14 +247,14 @@ def remove_reverse_mapping(service_id, name)
storage.del id_key(service_id, name)
end

def save_attributes
storage.set(id_key(service_id, name), id)
storage.set(key(service_id, id, :name), name)
storage.set(key(service_id, id, :parent_id), parent_id) if parent_id
def save_attributes(client)
client.set(id_key(service_id, name), id)
client.set(key(service_id, id, :name), name)
client.set(key(service_id, id, :parent_id), parent_id) if parent_id
end

def save_to_list
storage.sadd(id_set_key(service_id), id)
def save_to_list(client)
client.sadd(id_set_key(service_id), id)
end

def save_children
Expand Down
4 changes: 2 additions & 2 deletions lib/3scale/backend/service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def delete_by_id(service_id)
end

def exists?(service_id)
storage.exists(storage_key(service_id, 'provider_key'))
storage.exists?(storage_key(service_id, 'provider_key'))
end

def get_service(id)
Expand Down Expand Up @@ -277,7 +277,7 @@ def persist_attribute(attribute, value, ignore_nils = false)
def persist_sets
storage.sadd storage_key_by_provider(:ids), id
storage.sadd encode_key("services_set"), id
storage.sadd encode_key("provider_keys_set"), provider_key
storage.sadd encode_key("provider_keys_set"), provider_key unless provider_key.nil?
end

end
Expand Down
2 changes: 1 addition & 1 deletion lib/3scale/backend/service_token.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def delete(service_token, service_id)
end

def exists?(service_token, service_id)
storage.exists(key(service_token, service_id))
storage.exists?(key(service_token, service_id))
end
memoize :exists?

Expand Down
8 changes: 4 additions & 4 deletions lib/3scale/backend/stats/aggregator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ def aggregate(transactions)
touched_apps = {}

transactions.each_slice(PIPELINED_SLICE_SIZE) do |slice|
storage.pipelined do
storage.pipelined do |pipeline|
slice.each do |transaction|
aggregate_all(transaction)
aggregate_all(transaction, pipeline)
touched_apps.merge!(touched_relation(transaction))
end
end
Expand All @@ -57,9 +57,9 @@ def aggregate(transactions)
touched_apps
end

def aggregate_all(transaction)
def aggregate_all(transaction, client)
[Aggregators::ResponseCode, Aggregators::Usage].each do |aggregator|
aggregator.aggregate(transaction)
aggregator.aggregate(transaction, client)
end
end

Expand Down
13 changes: 7 additions & 6 deletions lib/3scale/backend/stats/aggregators/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ module Base
# @param [Time] timestamp
# @param [Array] keys array of {(service|application|user) => "key"}
# @param [Symbol] cmd
def aggregate_values(value, timestamp, keys, cmd)
# @param [Redis] client
def aggregate_values(value, timestamp, keys, cmd, client = storage)
keys_for_bucket = []

keys.each do |metric_type, prefix_key|
Expand All @@ -23,9 +24,9 @@ def aggregate_values(value, timestamp, keys, cmd)
# memory because for rate-limiting and stats, a key of set to 0
# is equivalent to a key that does not exist.
if cmd == :set && value == 0
storage.del(key)
client.del(key)
else
store_key(cmd, key, value, expire_time)
store_key(client, cmd, key, value, expire_time)
end

unless Stats::PeriodCommons::EXCLUDED_FOR_BUCKETS.include?(granularity)
Expand Down Expand Up @@ -55,9 +56,9 @@ def granularities(metric_type)
metric_type == :service ? Stats::PeriodCommons::SERVICE_GRANULARITIES : Stats::PeriodCommons::EXPANDED_GRANULARITIES
end

def store_key(cmd, key, value, expire_time = nil)
storage.send(cmd, key, value)
storage.expire(key, expire_time) if expire_time
def store_key(client, cmd, key, value, expire_time = nil)
client.send(cmd, key, value)
client.expire(key, expire_time) if expire_time
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/3scale/backend/stats/aggregators/response_code.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ class << self
include Keys
include Base

def aggregate(transaction)
def aggregate(transaction, client = storage)
keys_for_multiple_codes = keys_for_response_code(transaction)
timestamp = transaction.timestamp

keys_for_multiple_codes.each do |keys|
aggregate_values(1, timestamp, keys, :incrby)
aggregate_values(1, timestamp, keys, :incrby, client)
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/3scale/backend/stats/aggregators/usage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ class << self
# Aggregates the usage of a transaction.
#
# @param [Transaction] transaction
def aggregate(transaction)
def aggregate(transaction, client = storage)
transaction.usage.each do |metric_id, raw_value|
metric_keys = Keys.transaction_keys(transaction, :metric, metric_id)
cmd = storage_cmd(raw_value)
value = Backend::Usage.get_from raw_value

aggregate_values(value, transaction.timestamp, metric_keys, cmd)
aggregate_values(value, transaction.timestamp, metric_keys, cmd, client)
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/3scale/backend/stats/cleaner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ def delete_keys(redis_conn, services, log_deleted_keys)
end

def remove_services_from_delete_set(services)
storage.pipelined do
storage.pipelined do |pipeline|
services.each do |service|
storage.srem(KEY_SERVICES_TO_DELETE, service)
pipeline.srem(KEY_SERVICES_TO_DELETE, service)
end
end
end
Expand Down
12 changes: 6 additions & 6 deletions lib/3scale/backend/transactor/notify_batcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ def notify_batch(provider_key, usage)
end

def get_batch(num_elements)
storage.pipelined do
storage.lrange(key_for_notifications_batch, 0, num_elements - 1)
storage.ltrim(key_for_notifications_batch, num_elements, -1)
storage.pipelined do |pipeline|
pipeline.lrange(key_for_notifications_batch, 0, num_elements - 1)
pipeline.ltrim(key_for_notifications_batch, num_elements, -1)
end.first
end

Expand Down Expand Up @@ -122,9 +122,9 @@ def enqueue_notify_job(provider_key, usage, timestamp, enqueue_ts)
if ThreeScale::Backend.test?
module Test
def get_full_batch
storage.pipelined do
storage.lrange(key_for_notifications_batch, 0, -1)
storage.del(key_for_notifications_batch)
storage.pipelined do |pipeline|
pipeline.lrange(key_for_notifications_batch, 0, -1)
pipeline.del(key_for_notifications_batch)
end.first
end

Expand Down
4 changes: 2 additions & 2 deletions lib/3scale/backend/usage_limit.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ def save(attributes)
service_id = attributes[:service_id]
plan_id = attributes[:plan_id]
prefix = key_prefix(service_id, plan_id, attributes[:metric_id])
storage.pipelined do
storage.pipelined do |pipeline|
PERIODS.each do |period|
p_val = attributes[period.to_sym]
p_val and storage.set(key_for_period(prefix, period), p_val)
p_val and pipeline.set(key_for_period(prefix, period), p_val)
end
end
clear_cache(service_id, plan_id)
Expand Down
Loading

0 comments on commit 9fe39fe

Please sign in to comment.