diff --git a/.gitignore b/.gitignore index 602124f201d..8be2a281542 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ # git config --global core.excludesfile ~/.gitignore_global .generators +*.db *.pyc *.rbc **.orig diff --git a/Gemfile b/Gemfile index 1698fff45dc..dd61c0f101b 100644 --- a/Gemfile +++ b/Gemfile @@ -122,6 +122,7 @@ group :development, :test do gem 'rubocop-performance', '~> 1.20.2', require: false gem 'rubocop-rails', '>= 2.5.2', require: false gem 'rubocop-rspec', require: false + gem 'sqlite3', require: false end group :test do diff --git a/Gemfile.lock b/Gemfile.lock index 3da9d90715f..586968a65db 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -681,6 +681,8 @@ GEM simpleidn (0.2.1) unf (~> 0.1.4) smart_properties (1.17.0) + sqlite3 (2.1.0) + mini_portile2 (~> 2.8.0) stringex (2.8.5) stringio (3.1.1) strong_migrations (2.0.0) @@ -862,6 +864,7 @@ DEPENDENCIES simplecov (~> 0.22.0) simplecov-cobertura simplecov_json_formatter + sqlite3 stringex strong_migrations (>= 0.4.2) tableparser diff --git a/bin/query-cloudwatch b/bin/query-cloudwatch index 4fd91877640..42ecfb40b42 100755 --- a/bin/query-cloudwatch +++ b/bin/query-cloudwatch @@ -9,6 +9,7 @@ require 'aws-sdk-cloudwatchlogs' require 'concurrent-ruby' require 'csv' require 'json' +require 'sqlite3' require 'optparse' require 'optparse/time' @@ -33,41 +34,173 @@ class QueryCloudwatch :num_threads, :wait_duration, :count_distinct, + :sqlite_database_file, keyword_init: true, ) + class JsonOutput + attr_reader :stdout + + def initialize(stdout:) + @stdout = stdout + end + + def start + yield + end + + def handle_row(row) + stdout.puts row.to_json + end + end + + class CsvOutput + attr_reader :stdout + + def initialize(stdout:) + @stdout = stdout + end + + def start + yield + end + + def handle_row(row) + stdout.puts row.values.to_csv + end + end + + class SqliteOutput + attr_reader :filename, :invalid_message_json_count, :stderr + + def initialize(filename:, stderr:) + @filename = filename + @stderr = stderr + @invalid_message_json_count = 0 + end + + def start + create_tables_if_needed + db.transaction do + yield + end + rescue + db&.interrupt + raise + ensure + if db + if invalid_message_json_count > 0 + stderr.puts <<~END + WARNING: For #{invalid_message_json_count} event#{invalid_message_json_count == 1 ? '' : 's'}, @message did not contain valid JSON + END + end + stderr.puts <<~END + Wrote #{db.total_changes.inspect} rows to the 'events' table in #{filename} + END + end + close_database + end + + def handle_row(row) + message = row['@message'] + raise "Query must include @message in output when using --sqlite" unless message + + timestamp = row['@timestamp'] + raise "Query must include @timestamp in output when using --sqlite" unless timestamp + + json = begin + JSON.parse(message) + rescue + @invalid_message_json_count += 1 + nil + end + + success = json&.dig('properties', 'event_properties', 'success') + success = success ? 1 : (success.nil? ? nil : 0) + + # NOTE: Order matters here + row = { + id: json&.dig('id') || generate_id(timestamp:,message:), + timestamp:, + name:json&.dig('name'), + user_id: json&.dig('properties', 'user_id'), + success:, + message:, + log_stream: row.dig('@logStream'), + log: row.dig('@log'), + } + + insert_statement.execute(row.values) + end + + def insert_statement + @insert_statement ||= db.prepare( + <<~SQL + INSERT INTO + events (id, timestamp, name, user_id, success, message, log_stream, log) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (id) DO NOTHING + SQL + ) + end + + def db + @db ||= SQLite3::Database.new(filename) + end + + def close_database + db&.close + @db = nil + end + + def create_tables_if_needed + db.execute_batch( + <<~SQL + CREATE TABLE IF NOT EXISTS events ( + id TEXT PRIMARY KEY NOT NULL, + timestamp TEXT NOT NULL, + name TEXT NULL, + user_id TEXT NULL, + success INTEGER NULL, + message TEXT NOT NULL, + log_stream TEXT NULL, + log TEXT NULL + ); + SQL + ) + end + + def generate_id(timestamp:,message:) + hash = Digest::SHA256.hexdigest("#{timestamp}|#{message}") + "NOID-#{hash}" + end + end + attr_reader :config def initialize(config) @config = config end - def run(stdout: STDOUT) + def run(stdout: STDOUT, stderr: STDERR) if config.count_distinct values = {} fetch { |row| values[row[config.count_distinct]] = true } stdout.puts values.size - else - fetch { |row| stdout.puts format_response(row) } + return end + + output = create_output(stdout:, stderr:) + + output.start do + fetch { |row| output.handle_row(row) } + end + rescue Interrupt # catch interrupts (ctrl-c) and directly exit, this skips printing the backtrace exit 1 end - # Relies on the fact that hashes preserve insertion order - # @param [Hash] row - # @return [String] - def format_response(row) - case config.format - when :csv - row.values.to_csv - when :json - row.to_json - else - raise "unknown format #{config.format}" - end - end def fetch(&block) cloudwatch_client.fetch( @@ -90,6 +223,19 @@ class QueryCloudwatch ) end + def create_output(stdout:, stderr:) + case config.format + when :csv + CsvOutput.new(stdout:) + when :json + JsonOutput.new(stdout:) + when :sqlite + SqliteOutput.new(filename: config.sqlite_database_file, stderr:) + else + raise "unknown format #{config.format}" + end + end + # @return [Config] def self.parse!(argv:, stdin:, stdout:, now: Time.now) config = Config.new( @@ -235,6 +381,11 @@ class QueryCloudwatch config.format = :csv end + opts.on('--sqlite [FILENAME]', 'load output into the given Sqlite database (events.db by default). Output MUST include @message and @timestamp') do |filename| + config.format = :sqlite + config.sqlite_database_file = filename || 'events.db' + end + opts.on( '--[no-]complete', 'whether or not to split query slices if exactly 10k rows are returned, defaults to off', @@ -280,6 +431,10 @@ class QueryCloudwatch end end + if config.format == :sqlite + errors << "ERROR: can't do --count-distinct with --sqlite" if config.count_distinct + end + if config.count_distinct config.complete = true config.query = [ diff --git a/spec/bin/query-cloudwatch_spec.rb b/spec/bin/query-cloudwatch_spec.rb index 1e44c3e69db..c17ac2e9d57 100644 --- a/spec/bin/query-cloudwatch_spec.rb +++ b/spec/bin/query-cloudwatch_spec.rb @@ -1,4 +1,5 @@ require 'rails_helper' +require 'sqlite3' load Rails.root.join('bin/query-cloudwatch') RSpec.describe QueryCloudwatch do @@ -165,6 +166,32 @@ end end + context 'with --sqlite' do + let(:argv) { required_parameters + %w[--sqlite] } + + it 'sets sqlite_database_file to events.db by default' do + config = parse! + expect(config.sqlite_database_file).to eql('events.db') + end + + context 'with a database file' do + let(:argv) { required_parameters + %w[--sqlite foo.db] } + it 'sets sqlite_database_file appropriately' do + config = parse! + expect(config.sqlite_database_file).to eql('foo.db') + end + end + + context 'with --count-distinct' do + let(:argv) { super() + %w[--count-distinct foo] } + it 'errors out' do + expect(QueryCloudwatch).to receive(:exit).with(1) + parse! + expect(stdout.string).to include("can't do --count-distinct with --sqlite") + end + end + end + context 'with --slice' do let(:argv) { required_parameters + %w[--slice 3mon] } @@ -239,13 +266,28 @@ def build_stdin_with_query(query) wait_duration: 0, query: 'fields @timestamp, @message', format: format, + sqlite_database_file: 'events.db', count_distinct: count_distinct, num_threads: Reporting::CloudwatchClient::DEFAULT_NUM_THREADS, ) end let(:query_cloudwatch) { QueryCloudwatch.new(config) } let(:stdout) { StringIO.new } - subject(:run) { query_cloudwatch.run(stdout:) } + let(:stderr) { StringIO.new } + let(:query_results) do + [ + [ + { field: '@timestamp', value: 'timestamp-1' }, + { field: '@message', value: 'message-1' }, + ], + [ + { field: '@timestamp', value: 'timestamp-2' }, + { field: '@message', value: 'message-2' }, + ], + ] + end + + subject(:run) { query_cloudwatch.run(stdout:, stderr:) } before do Aws.config[:cloudwatchlogs] = { @@ -256,16 +298,7 @@ def build_stdin_with_query(query) get_query_results: [ { status: 'Complete', - results: [ - [ - { field: '@timestamp', value: 'timestamp-1' }, - { field: '@message', value: 'message-1' }, - ], - [ - { field: '@timestamp', value: 'timestamp-2' }, - { field: '@message', value: 'message-2' }, - ], - ], + results: query_results, }, ], }, @@ -292,6 +325,237 @@ def build_stdin_with_query(query) end end + context 'with sqlite format' do + let(:format) { :sqlite } + + let(:db) do + SQLite3::Database.new(':memory:') + end + + before do + allow_any_instance_of(QueryCloudwatch::SqliteOutput).to receive(:db). + and_return(db) + allow_any_instance_of(QueryCloudwatch::SqliteOutput).to receive(:close_database) + end + + it 'does not output on stdout' do + run + expect(stdout.string).to eql('') + end + + context 'with invalid json in @message' do + let(:message_1) { 'message 1 here, not at all json' } + let(:message_2) { 'message 2 here, not at all json' } + + let(:query_results) do + [ + [ + { field: '@timestamp', value: '2024-01-11 22:26:50.336' }, + { field: '@message', value: message_1 }, + ], + [ + { field: '@timestamp', value: '"2024-01-02 03:42:50.451",' }, + { field: '@message', value: message_2 }, + ], + ] + end + + it 'generates ids for events that start with NOID-' do + run + expect(db.get_first_value('SELECT COUNT(*) FROM events')).to eql(2) + + actual_ids = db.query('SELECT id FROM events') do |results| + results.map do |row| + row.first + end + end + + expect(actual_ids).to all(start_with('NOID-')) + end + + it 'outputs warnings + summary on stderr' do + run + expect(stderr.string).to eql <<~STR + WARNING: For 2 events, @message did not contain valid JSON + Wrote 2 rows to the 'events' table in events.db + STR + end + + context 'two messages are identitical' do + let(:query_results) do + [ + [ + { field: '@timestamp', value: '2024-01-11 22:26:50.336' }, + { field: '@message', value: message_1 }, + ], + [ + { field: '@timestamp', value: '2024-01-11 22:26:50.336' }, + { field: '@message', value: message_1 }, + ], + ] + end + it 'only inserts 1 record' do + run + expect(db.get_first_value('SELECT COUNT(*) FROM events')).to eql(1) + end + end + end + + context 'with valid JSON in @message' do + let(:message_1) do + JSON.parse(<<~JSON) + { + "id": "message_1", + "name": "IdV: doc auth image upload vendor submitted", + "properties": { + "event_properties": { + "success": true, + "errors": {}, + "exception": null + }, + "user_id": "user_1" + } + } + JSON + end + + let(:message_2) do + JSON.parse(<<~JSON) + { + "id": "message_2", + "name": "IdV: doc auth image upload vendor submitted", + "properties": { + "event_properties": { + "success": false, + "errors": {}, + "exception": null + }, + "user_id": "user_2" + } + } + JSON + end + + let(:query_results) do + [ + [ + { field: '@timestamp', value: '2024-01-11 22:26:50.336' }, + { field: '@message', value: JSON.generate(message_1) }, + ], + [ + { field: '@timestamp', value: '"2024-01-02 03:42:50.451",' }, + { field: '@message', value: JSON.generate(message_2) }, + ], + ] + end + + it 'inserts 2 rows in events table' do + run + expect(db.get_first_value('SELECT COUNT(*) FROM events')).to eql(2) + end + + context 'when two messages have same id' do + let(:message_2) { message_1 } + + it 'only inserts 1 row' do + run + expect(db.get_first_value('SELECT COUNT(*) FROM events')).to eql(1) + end + end + end + + context 'when query does not return @timestamp' do + let(:query_results) do + [ + [ + { field: '@message', value: '{}' }, + ], + [ + { field: '@message', value: '{}' }, + ], + ] + end + + it 'errors out' do + expect do + run + end.to raise_error 'Query must include @timestamp in output when using --sqlite' + end + end + + context 'when query does not return @message' do + let(:query_results) do + [ + [ + { field: '@timestamp', value: '2024-01-11 22:26:50.336' }, + ], + [ + { field: '@timestamp', value: '2024-01-11 22:26:50.336' }, + ], + ] + end + + it 'errors out' do + expect do + run + end.to raise_error 'Query must include @message in output when using --sqlite' + end + end + + context 'when query returns @log' do + let(:query_results) do + [ + [ + { field: '@timestamp', value: '2024-01-11 22:26:50.336' }, + { field: '@message', value: '{}' }, + { field: '@log', value: 'my log' }, + ], + [ + { field: '@timestamp', value: '"2024-01-02 03:42:50.451",' }, + { field: '@message', value: '{}' }, + { field: '@log', value: 'my other log' }, + ], + ] + end + + it 'adds them to the log column' do + run + actual_log_values = db.query('SELECT log FROM events') do |results| + results.map(&:first) + end.sort + + expect(actual_log_values).to eql(['my log', 'my other log']) + end + end + + context 'when query returns @logStream' do + let(:query_results) do + [ + [ + { field: '@timestamp', value: '2024-01-11 22:26:50.336' }, + { field: '@message', value: '{}' }, + { field: '@logStream', value: 'my log stream' }, + ], + [ + { field: '@timestamp', value: '"2024-01-02 03:42:50.451",' }, + { field: '@message', value: '{}' }, + { field: '@logStream', value: 'my other log stream' }, + ], + ] + end + + it 'adds them to the log_stream column' do + run + + actual_log_stream_values = db.query('SELECT log_stream FROM events') do |results| + results.map(&:first) + end.sort + + expect(actual_log_stream_values).to eql(['my log stream', 'my other log stream']) + end + end + end + context 'with count distinct' do let(:count_distinct) { '@message' }