Skip to content

Commit

Permalink
Merge pull request #473 from splitio/iff-update-splitworker
Browse files Browse the repository at this point in the history
added iff compression and decoding message
  • Loading branch information
chillaq authored Jun 5, 2023
2 parents cb9e20b + 3eecb05 commit 4c12107
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 26 deletions.
1 change: 1 addition & 0 deletions lib/splitclient-rb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
require 'splitclient-rb/clients/split_client'
require 'splitclient-rb/managers/split_manager'
require 'splitclient-rb/helpers/thread_helper'
require 'splitclient-rb/helpers/decryption_helper'
require 'splitclient-rb/split_factory'
require 'splitclient-rb/split_factory_builder'
require 'splitclient-rb/split_config'
Expand Down
25 changes: 25 additions & 0 deletions lib/splitclient-rb/helpers/decryption_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

module SplitIoClient
NO_COMPRESSION = 0
GZIP_COMPRESSION = 1
ZLIB_COMPRESSION = 2

module Helpers
class DecryptionHelper
def self.get_encoded_definition(compression, data)
case compression
when NO_COMPRESSION
Base64.decode64(data)
when GZIP_COMPRESSION
gz = Zlib::GzipReader.new(StringIO.new(Base64.decode64(data)))
gz.read
when ZLIB_COMPRESSION
Zlib::Inflate.inflate(Base64.decode64(data))
else
raise StandardError, 'Compression flag value is incorrect'
end
end
end
end
end
8 changes: 2 additions & 6 deletions lib/splitclient-rb/sse/notification_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,12 @@ def process(incoming_notification)

def process_split_update(notification)
@config.logger.debug("SPLIT UPDATE notification received: #{notification}") if @config.debug_enabled
@splits_worker.add_to_queue(notification.data['changeNumber'])
@splits_worker.add_to_queue(notification)
end

def process_split_kill(notification)
@config.logger.debug("SPLIT KILL notification received: #{notification}") if @config.debug_enabled
change_number = notification.data['changeNumber']
default_treatment = notification.data['defaultTreatment']
split_name = notification.data['splitName']

@splits_worker.kill_split(change_number, split_name, default_treatment)
@splits_worker.add_to_queue(notification)
end

def process_segment_update(notification)
Expand Down
60 changes: 46 additions & 14 deletions lib/splitclient-rb/sse/workers/splits_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ module SplitIoClient
module SSE
module Workers
class SplitsWorker
def initialize(synchronizer, config, splits_repository)
def initialize(synchronizer, config, feature_flags_repository)
@synchronizer = synchronizer
@config = config
@splits_repository = splits_repository
@feature_flags_repository = feature_flags_repository
@queue = Queue.new
@running = Concurrent::AtomicBoolean.new(false)
end
Expand All @@ -32,25 +32,57 @@ def stop
SplitIoClient::Helpers::ThreadHelper.stop(:split_update_worker, @config)
end

def add_to_queue(change_number)
@config.logger.debug("feature_flags_worker add to queue #{change_number}")
@queue.push(change_number)
def add_to_queue(notification)
@config.logger.debug("feature_flags_worker add to queue #{notification.data['changeNumber']}")
@queue.push(notification)
end

def kill_split(change_number, split_name, default_treatment)
return if @splits_repository.get_change_number.to_i > change_number
private

def update_feature_flag(notification)
return if @feature_flags_repository.get_change_number.to_i > notification.data['changeNumber']

@config.logger.debug("feature_flags_worker kill #{split_name}, #{change_number}")
@splits_repository.kill(change_number, split_name, default_treatment)
add_to_queue(change_number)
if @feature_flags_repository.get_change_number == notification.data['pcn']
begin
@feature_flags_repository.add_split(
JSON.parse(
SplitIoClient::Helpers::DecryptionHelper.get_encoded_definition(
notification.data['c'],
notification.data['d']
),
symbolize_names: true
)
)
@feature_flags_repository.set_change_number(notification.data['changeNumber'])
return
rescue StandardError => e
@config.logger.debug("Failed to update Split: #{e.inspect}") if @config.debug_enabled
end
end
@synchronizer.fetch_splits(notification.data['changeNumber'])
end

private
def kill_feature_flag(notification)
return if @feature_flags_repository.get_change_number.to_i > notification.data['changeNumber']

@config.logger.debug("feature_flags_worker kill #{notification.data['splitName']}, #{notification.data['changeNumber']}")
@feature_flags_repository.kill(
notification.data['changeNumber'],
notification.data['splitName'],
notification.data['defaultTreatment']
)
@synchronizer.fetch_splits(notification.data['changeNumber'])
end

def perform
while (change_number = @queue.pop)
@config.logger.debug("feature_flags_worker change_number dequeue #{change_number}")
@synchronizer.fetch_splits(change_number)
while (notification = @queue.pop)
@config.logger.debug("feature_flags_worker change_number dequeue #{notification.data['changeNumber']}")
case notification.data['type']
when SSE::EventSource::EventTypes::SPLIT_UPDATE
update_feature_flag(notification)
when SSE::EventSource::EventTypes::SPLIT_KILL
kill_feature_flag(notification)
end
end
end

Expand Down
41 changes: 35 additions & 6 deletions spec/sse/workers/splits_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require 'spec_helper'
require 'http_server_mock'
require 'byebug'

describe SplitIoClient::SSE::Workers::SplitsWorker do
subject { SplitIoClient::SSE::Workers::SplitsWorker }
Expand All @@ -16,6 +17,9 @@
let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config) }
let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) }
let(:split_fetcher) { SplitIoClient::Cache::Fetchers::SplitFetcher.new(splits_repository, api_key, config, telemetry_runtime_producer) }
let(:event_split_update_no_compression) { SplitIoClient::SSE::EventSource::StreamData.new("data", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":5564531221,"pcn":1234,"c": 0,"d":"eyJ0cmFmZmljVHlwZU5hbWUiOiJ1c2VyIiwiaWQiOiIzM2VhZmE1MC0xYTY1LTExZWQtOTBkZi1mYTMwZDk2OTA0NDUiLCJuYW1lIjoiYmlsYWxfc3BsaXQiLCJ0cmFmZmljQWxsb2NhdGlvbiI6MTAwLCJ0cmFmZmljQWxsb2NhdGlvblNlZWQiOi0xMzY0MTE5MjgyLCJzZWVkIjotNjA1OTM4ODQzLCJzdGF0dXMiOiJBQ1RJVkUiLCJraWxsZWQiOmZhbHNlLCJkZWZhdWx0VHJlYXRtZW50Ijoib2ZmIiwiY2hhbmdlTnVtYmVyIjoxNjg0MzQwOTA4NDc1LCJhbGdvIjoyLCJjb25maWd1cmF0aW9ucyI6e30sImNvbmRpdGlvbnMiOlt7ImNvbmRpdGlvblR5cGUiOiJST0xMT1VUIiwibWF0Y2hlckdyb3VwIjp7ImNvbWJpbmVyIjoiQU5EIiwibWF0Y2hlcnMiOlt7ImtleVNlbGVjdG9yIjp7InRyYWZmaWNUeXBlIjoidXNlciJ9LCJtYXRjaGVyVHlwZSI6IklOX1NFR01FTlQiLCJuZWdhdGUiOmZhbHNlLCJ1c2VyRGVmaW5lZFNlZ21lbnRNYXRjaGVyRGF0YSI6eyJzZWdtZW50TmFtZSI6ImJpbGFsX3NlZ21lbnQifX1dfSwicGFydGl0aW9ucyI6W3sidHJlYXRtZW50Ijoib24iLCJzaXplIjowfSx7InRyZWF0bWVudCI6Im9mZiIsInNpemUiOjEwMH1dLCJsYWJlbCI6ImluIHNlZ21lbnQgYmlsYWxfc2VnbWVudCJ9LHsiY29uZGl0aW9uVHlwZSI6IlJPTExPVVQiLCJtYXRjaGVyR3JvdXAiOnsiY29tYmluZXIiOiJBTkQiLCJtYXRjaGVycyI6W3sia2V5U2VsZWN0b3IiOnsidHJhZmZpY1R5cGUiOiJ1c2VyIn0sIm1hdGNoZXJUeXBlIjoiQUxMX0tFWVMiLCJuZWdhdGUiOmZhbHNlfV19LCJwYXJ0aXRpb25zIjpbeyJ0cmVhdG1lbnQiOiJvbiIsInNpemUiOjB9LHsidHJlYXRtZW50Ijoib2ZmIiwic2l6ZSI6MTAwfV0sImxhYmVsIjoiZGVmYXVsdCBydWxlIn1dfQ=="}'), 'test') }
let(:event_split_update_gzip_compression) { SplitIoClient::SSE::EventSource::StreamData.new("data", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":5564531221,"pcn":1234,"c": 1,"d":"H4sIAAkVZWQC/8WST0+DQBDFv0qzZ0ig/BF6a2xjGismUk2MaZopzOKmy9Isy0EbvrtDwbY2Xo233Tdv5se85cCMBs5FtvrYYwIlsglratTMYiKns+chcAgc24UwsF0Xczt2cm5z8Jw8DmPH9wPyqr5zKyTITb2XwpA4TJ5KWWVgRKXYxHWcX/QUkVi264W+68bjaGyxupdCJ4i9KPI9UgyYpibI9Ha1eJnT/J2QsnNxkDVaLEcOjTQrjWBKVIasFefky95BFZg05Zb2mrhh5I9vgsiL44BAIIuKTeiQVYqLotHHLyLOoT1quRjub4fztQuLxj89LpePzytClGCyd9R3umr21ErOcitUh2PTZHY29HN2+JGixMxUujNfvMB3+u2pY1AXySad3z3Mk46msACDp8W7jhly4uUpFt3qD33vDAx0gLpXkx+P1GusbdcE24M2F4uaywwVEWvxSa1Oa13Vjvn2RXradm0xCVuUVBJqNCBGV0DrX4OcLpeb+/lreh3jH8Uw/JQj3UhkxPgCCurdEnADAAA="}'), 'test') }
let(:event_split_update_zlib_compression) { SplitIoClient::SSE::EventSource::StreamData.new("data", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":5564531221,"pcn":1234,"c": 2,"d":"eJzEUtFq20AQ/JUwz2c4WZZr3ZupTQh1FKjcQinGrKU95cjpZE6nh9To34ssJ3FNX0sfd3Zm53b2TgietDbF9vXIGdUMha5lDwFTQiGOmTQlchLRPJlEEZeTVJZ6oimWZTpP5WyWQMCNyoOxZPft0ZoA8TZ5aW1TUDCNg4qk/AueM5dQkyiez6IonS6mAu0IzWWSxovFLBZoA4WuhcLy8/bh+xoCL8bagaXJtixQsqbOhq1nCjW7AIVGawgUz+Qqzrr6wB4qmi9m00/JIk7TZCpAtmqgpgJF47SpOn9+UQt16s9YaS71z9NHOYQFha9Pm83Tty0EagrFM/t733RHqIFZH4wb7LDMVh+Ecc4Lv+ZsuQiNH8hXF3hLv39XXNCHbJ+v7x/X2eDmuKLA74sPihVr47jMuRpWfxy1Kwo0GLQjmv1xpBFD3+96gSP5cLVouM7QQaA1vxhK9uKmd853bEZS9jsBSwe2UDDu7mJxd2Mo/muQy81m/2X9I7+N8R/FcPmUd76zjH7X/w4AAP//90glTw=="}'), 'test') }
let(:synchronizer) do
segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config)
telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, api_key, telemetry_runtime_producer)
Expand Down Expand Up @@ -45,7 +49,7 @@

worker = subject.new(synchronizer, config, splits_repository)
worker.start
worker.add_to_queue(1_506_703_262_919)
worker.add_to_queue(SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_UPDATE", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":1506703262919}'), 'test'))

sleep 1

Expand All @@ -60,7 +64,7 @@

worker = subject.new(synchronizer, config, splits_repository)
worker.start
worker.add_to_queue(1_506_703_262_918)
worker.add_to_queue(SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_UPDATE", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":1506703262918}'), 'test'))
sleep 1

expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262916')).to have_been_made.once
Expand All @@ -71,15 +75,15 @@

worker = subject.new(synchronizer, config, splits_repository)
worker.start
worker.add_to_queue(1_506_703_262_916)
worker.add_to_queue(SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_UPDATE", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":1506703262916}'), 'test'))
sleep 1

expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262916')).to have_been_made.times(0)
end

it 'without start, must not fetch' do
worker = subject.new(synchronizer, config, splits_repository)
worker.add_to_queue(1_506_703_262_918)
worker.add_to_queue(SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_UPDATE", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":1506703262918}'), 'test'))

expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262916')).to have_been_made.times(0)
end
Expand All @@ -102,7 +106,7 @@

worker = subject.new(synchronizer, config, splits_repository)
worker.start
worker.kill_split(1_506_703_262_918, 'FACUNDO_TEST', 'on')
worker.send :kill_feature_flag, SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_KILL", 123, JSON.parse('{"splitName":"FACUNDO_TEST", "defaultTreatment":"on", "type":"SPLIT_KILL","changeNumber":1506703262918}'), 'test')

sleep(1)

Expand All @@ -117,7 +121,7 @@
worker = subject.new(synchronizer, config, splits_repository)

worker.start
worker.kill_split(1_506_703_262_916, 'FACUNDO_TEST', 'on')
worker.send :kill_feature_flag, SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_KILL", 123, JSON.parse('{"splitName":"FACUNDO_TEST", "defaultTreatment":"on", "type":"SPLIT_KILL","changeNumber":1506703262916}'), 'test')

sleep(1)

Expand All @@ -129,6 +133,31 @@
end
end

context 'instant ff update split notification' do
it 'decode and decompress split update data' do
worker = subject.new(synchronizer, config, splits_repository)
worker.start
splits_repository.set_change_number(1234)
worker.add_to_queue(event_split_update_no_compression)
sleep 1
split = splits_repository.get_split('bilal_split')
expect(split[:name] == 'bilal_split')

splits_repository.set_change_number(1234)
worker.add_to_queue(event_split_update_gzip_compression)
sleep 1
split = splits_repository.get_split('bilal_split')
expect(split[:name] == 'bilal_split')

splits_repository.set_change_number(1234)
worker.add_to_queue(event_split_update_zlib_compression)
sleep 1
split = splits_repository.get_split('bilal_split')
expect(split[:name] == 'bilal_split')

end
end

private

def mock_split_changes(splits_json)
Expand Down
1 change: 1 addition & 0 deletions splitclient-rb.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'timecop', '~> 0.9'
spec.add_development_dependency 'webmock', '~> 3.14'
spec.add_development_dependency 'webrick', '~> 1.7'
spec.add_development_dependency 'byebug', '~> 11.1'

spec.add_runtime_dependency 'bitarray', '~> 1.3'
spec.add_runtime_dependency 'concurrent-ruby', '~> 1.0'
Expand Down

0 comments on commit 4c12107

Please sign in to comment.