Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added iff compression and decoding message #473

Merged
merged 9 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/splitclient-rb/sse/notification_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def process(incoming_notification)

def process_split_update(notification)
@config.logger.debug("SPLIT UPDATE notification received: #{notification}") if @config.debug_enabled
@splits_worker.add_to_queue(notification.data['changeNumber'])
@splits_worker.split_update(notification)
chillaq marked this conversation as resolved.
Show resolved Hide resolved
end

def process_split_kill(notification)
Expand Down
26 changes: 26 additions & 0 deletions lib/splitclient-rb/sse/workers/splits_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ def stop
SplitIoClient::Helpers::ThreadHelper.stop(:split_update_worker, @config)
end

def split_update(notification)
chillaq marked this conversation as resolved.
Show resolved Hide resolved
chillaq marked this conversation as resolved.
Show resolved Hide resolved
if @splits_repository.get_change_number() == notification.data['pcn']
chillaq marked this conversation as resolved.
Show resolved Hide resolved
begin
@new_split = JSON.parse(get_encoded_definition(notification), symbolize_names: true)
@splits_repository.add_split(@new_split)
@splits_repository.set_change_number(notification.data['changeNumber'])
return
rescue Exception => e
@config.logger.debug("Failed to update Split: #{e.inspect}") if @config.debug_enabled
end
end
add_to_queue(notification.data['changeNumber'])
end

def add_to_queue(change_number)
@config.logger.debug("feature_flags_worker add to queue #{change_number}")
@queue.push(change_number)
Expand All @@ -47,6 +61,18 @@ def kill_split(change_number, split_name, default_treatment)

private

def get_encoded_definition(notification)
chillaq marked this conversation as resolved.
Show resolved Hide resolved
case notification.data[:c]
when 0
return Base64.decode64(notification.data[:d])
when 1
gz = Zlib::GzipReader.new(StringIO.new(Base64.decode64(notification.data[:d])))
return gz.read
when 2
return Zlib::Inflate.inflate(Base64.decode64(notification.data[:d]))
end
end

def perform
while (change_number = @queue.pop)
@config.logger.debug("feature_flags_worker change_number dequeue #{change_number}")
Expand Down
20 changes: 20 additions & 0 deletions spec/sse/workers/splits_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require 'spec_helper'
require 'http_server_mock'
require 'byebug'

describe SplitIoClient::SSE::Workers::SplitsWorker do
subject { SplitIoClient::SSE::Workers::SplitsWorker }
Expand All @@ -16,6 +17,9 @@
let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config) }
let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) }
let(:split_fetcher) { SplitIoClient::Cache::Fetchers::SplitFetcher.new(splits_repository, api_key, config, telemetry_runtime_producer) }
let(:event_split_update_no_compression) { SplitIoClient::SSE::EventSource::StreamData.new("data", 123, {"type":"SPLIT_UPDATE","changeNumber":5564531221,"pcn":1234,"c": 0,"d":"eyJ0cmFmZmljVHlwZU5hbWUiOiJ1c2VyIiwiaWQiOiIzM2VhZmE1MC0xYTY1LTExZWQtOTBkZi1mYTMwZDk2OTA0NDUiLCJuYW1lIjoiYmlsYWxfc3BsaXQiLCJ0cmFmZmljQWxsb2NhdGlvbiI6MTAwLCJ0cmFmZmljQWxsb2NhdGlvblNlZWQiOi0xMzY0MTE5MjgyLCJzZWVkIjotNjA1OTM4ODQzLCJzdGF0dXMiOiJBQ1RJVkUiLCJraWxsZWQiOmZhbHNlLCJkZWZhdWx0VHJlYXRtZW50Ijoib2ZmIiwiY2hhbmdlTnVtYmVyIjoxNjg0MzQwOTA4NDc1LCJhbGdvIjoyLCJjb25maWd1cmF0aW9ucyI6e30sImNvbmRpdGlvbnMiOlt7ImNvbmRpdGlvblR5cGUiOiJST0xMT1VUIiwibWF0Y2hlckdyb3VwIjp7ImNvbWJpbmVyIjoiQU5EIiwibWF0Y2hlcnMiOlt7ImtleVNlbGVjdG9yIjp7InRyYWZmaWNUeXBlIjoidXNlciJ9LCJtYXRjaGVyVHlwZSI6IklOX1NFR01FTlQiLCJuZWdhdGUiOmZhbHNlLCJ1c2VyRGVmaW5lZFNlZ21lbnRNYXRjaGVyRGF0YSI6eyJzZWdtZW50TmFtZSI6ImJpbGFsX3NlZ21lbnQifX1dfSwicGFydGl0aW9ucyI6W3sidHJlYXRtZW50Ijoib24iLCJzaXplIjowfSx7InRyZWF0bWVudCI6Im9mZiIsInNpemUiOjEwMH1dLCJsYWJlbCI6ImluIHNlZ21lbnQgYmlsYWxfc2VnbWVudCJ9LHsiY29uZGl0aW9uVHlwZSI6IlJPTExPVVQiLCJtYXRjaGVyR3JvdXAiOnsiY29tYmluZXIiOiJBTkQiLCJtYXRjaGVycyI6W3sia2V5U2VsZWN0b3IiOnsidHJhZmZpY1R5cGUiOiJ1c2VyIn0sIm1hdGNoZXJUeXBlIjoiQUxMX0tFWVMiLCJuZWdhdGUiOmZhbHNlfV19LCJwYXJ0aXRpb25zIjpbeyJ0cmVhdG1lbnQiOiJvbiIsInNpemUiOjB9LHsidHJlYXRtZW50Ijoib2ZmIiwic2l6ZSI6MTAwfV0sImxhYmVsIjoiZGVmYXVsdCBydWxlIn1dfQ=="}, 'test') }
let(:event_split_update_gzip_compression) { SplitIoClient::SSE::EventSource::StreamData.new("data", 123, {"type":"SPLIT_UPDATE","changeNumber":5564531221,"pcn":1234,"c": 1,"d":"H4sIAAkVZWQC/8WST0+DQBDFv0qzZ0ig/BF6a2xjGismUk2MaZopzOKmy9Isy0EbvrtDwbY2Xo233Tdv5se85cCMBs5FtvrYYwIlsglratTMYiKns+chcAgc24UwsF0Xczt2cm5z8Jw8DmPH9wPyqr5zKyTITb2XwpA4TJ5KWWVgRKXYxHWcX/QUkVi264W+68bjaGyxupdCJ4i9KPI9UgyYpibI9Ha1eJnT/J2QsnNxkDVaLEcOjTQrjWBKVIasFefky95BFZg05Zb2mrhh5I9vgsiL44BAIIuKTeiQVYqLotHHLyLOoT1quRjub4fztQuLxj89LpePzytClGCyd9R3umr21ErOcitUh2PTZHY29HN2+JGixMxUujNfvMB3+u2pY1AXySad3z3Mk46msACDp8W7jhly4uUpFt3qD33vDAx0gLpXkx+P1GusbdcE24M2F4uaywwVEWvxSa1Oa13Vjvn2RXradm0xCVuUVBJqNCBGV0DrX4OcLpeb+/lreh3jH8Uw/JQj3UhkxPgCCurdEnADAAA="}, 'test') }
let(:event_split_update_zlib_compression) { SplitIoClient::SSE::EventSource::StreamData.new("data", 123, {"type":"SPLIT_UPDATE","changeNumber":5564531221,"pcn":1234,"c": 2,"d":"eJzEUtFq20AQ/JUwz2c4WZZr3ZupTQh1FKjcQinGrKU95cjpZE6nh9To34ssJ3FNX0sfd3Zm53b2TgietDbF9vXIGdUMha5lDwFTQiGOmTQlchLRPJlEEZeTVJZ6oimWZTpP5WyWQMCNyoOxZPft0ZoA8TZ5aW1TUDCNg4qk/AueM5dQkyiez6IonS6mAu0IzWWSxovFLBZoA4WuhcLy8/bh+xoCL8bagaXJtixQsqbOhq1nCjW7AIVGawgUz+Qqzrr6wB4qmi9m00/JIk7TZCpAtmqgpgJF47SpOn9+UQt16s9YaS71z9NHOYQFha9Pm83Tty0EagrFM/t733RHqIFZH4wb7LDMVh+Ecc4Lv+ZsuQiNH8hXF3hLv39XXNCHbJ+v7x/X2eDmuKLA74sPihVr47jMuRpWfxy1Kwo0GLQjmv1xpBFD3+96gSP5cLVouM7QQaA1vxhK9uKmd853bEZS9jsBSwe2UDDu7mJxd2Mo/muQy81m/2X9I7+N8R/FcPmUd76zjH7X/w4AAP//90glTw=="}, 'test') }
let(:synchronizer) do
segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config)
telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, api_key, telemetry_runtime_producer)
Expand Down Expand Up @@ -129,6 +133,22 @@
end
end

context 'update split notification' do
it 'decode and decompress split update data' do
worker = subject.new(synchronizer, config, splits_repository)
worker.start
split_definition = JSON.parse(worker.send(:get_encoded_definition, event_split_update_no_compression))
expect(split_definition['name'] == 'bilal_split')

split_definition = JSON.parse(worker.send(:get_encoded_definition, event_split_update_gzip_compression))
expect(split_definition['name'] == 'bilal_split')

split_definition = JSON.parse(worker.send(:get_encoded_definition, event_split_update_zlib_compression))
expect(split_definition['name'] == 'bilal_split')

end
end

private

def mock_split_changes(splits_json)
Expand Down