Skip to content

Commit

Permalink
Sqlite support for query-cloudwatch (#11291)
Browse files Browse the repository at this point in the history
* Add '*.db' to .gitignore

Common extension used for SQLite databases.

* Add sqlite3 dependency in development only

* Support '--format sqlite' in query-cloudwatch

Update bin/query-cloudwatch to support SQLite as an output option.

Queries that include '@timestamp' and '@message' will have their results added to the 'events' table in a Sqlite database (events.db by default)

* Slightly smoother interrupt handling

* Rework output.start to take a block

* Fix variable names

* Add some tests for --sqlite option

* More tests + non-events.log support

- Don't require that @message be json
- Generate IDs if needed
- Also store @log and @logstream
- And also, tests!

* Include sqlite3 in test env

* changelog: Internal, Tooling, Add SQLite support to query-cloudwatch command

* Say "user_id" instead of "uuid"

Go for consistency with Cloudwatch over consistency with the IdP

* whoops, missed one

* Use a stable ID when none present in @message

Generate a SHA265 hash of timestamp + message as an ID.
  • Loading branch information
matthinz authored and colter-nattrass committed Oct 23, 2024
1 parent a2dbea2 commit 8cd17f8
Show file tree
Hide file tree
Showing 5 changed files with 451 additions and 27 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# git config --global core.excludesfile ~/.gitignore_global

.generators
*.db
*.pyc
*.rbc
**.orig
Expand Down
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ group :development, :test do
gem 'rubocop-performance', '~> 1.20.2', require: false
gem 'rubocop-rails', '>= 2.5.2', require: false
gem 'rubocop-rspec', require: false
gem 'sqlite3', require: false
end

group :test do
Expand Down
3 changes: 3 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,8 @@ GEM
simpleidn (0.2.1)
unf (~> 0.1.4)
smart_properties (1.17.0)
sqlite3 (2.1.0)
mini_portile2 (~> 2.8.0)
stringex (2.8.5)
stringio (3.1.1)
strong_migrations (2.0.0)
Expand Down Expand Up @@ -862,6 +864,7 @@ DEPENDENCIES
simplecov (~> 0.22.0)
simplecov-cobertura
simplecov_json_formatter
sqlite3
stringex
strong_migrations (>= 0.4.2)
tableparser
Expand Down
187 changes: 171 additions & 16 deletions bin/query-cloudwatch
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require 'aws-sdk-cloudwatchlogs'
require 'concurrent-ruby'
require 'csv'
require 'json'
require 'sqlite3'
require 'optparse'
require 'optparse/time'

Expand All @@ -33,41 +34,173 @@ class QueryCloudwatch
:num_threads,
:wait_duration,
:count_distinct,
:sqlite_database_file,
keyword_init: true,
)

class JsonOutput
attr_reader :stdout

def initialize(stdout:)
@stdout = stdout
end

def start
yield
end

def handle_row(row)
stdout.puts row.to_json
end
end

class CsvOutput
attr_reader :stdout

def initialize(stdout:)
@stdout = stdout
end

def start
yield
end

def handle_row(row)
stdout.puts row.values.to_csv
end
end

class SqliteOutput
attr_reader :filename, :invalid_message_json_count, :stderr

def initialize(filename:, stderr:)
@filename = filename
@stderr = stderr
@invalid_message_json_count = 0
end

def start
create_tables_if_needed
db.transaction do
yield
end
rescue
db&.interrupt
raise
ensure
if db
if invalid_message_json_count > 0
stderr.puts <<~END
WARNING: For #{invalid_message_json_count} event#{invalid_message_json_count == 1 ? '' : 's'}, @message did not contain valid JSON
END
end
stderr.puts <<~END
Wrote #{db.total_changes.inspect} rows to the 'events' table in #{filename}
END
end
close_database
end

def handle_row(row)
message = row['@message']
raise "Query must include @message in output when using --sqlite" unless message

timestamp = row['@timestamp']
raise "Query must include @timestamp in output when using --sqlite" unless timestamp

json = begin
JSON.parse(message)
rescue
@invalid_message_json_count += 1
nil
end

success = json&.dig('properties', 'event_properties', 'success')
success = success ? 1 : (success.nil? ? nil : 0)

# NOTE: Order matters here
row = {
id: json&.dig('id') || generate_id(timestamp:,message:),
timestamp:,
name:json&.dig('name'),
user_id: json&.dig('properties', 'user_id'),
success:,
message:,
log_stream: row.dig('@logStream'),
log: row.dig('@log'),
}

insert_statement.execute(row.values)
end

def insert_statement
@insert_statement ||= db.prepare(
<<~SQL
INSERT INTO
events (id, timestamp, name, user_id, success, message, log_stream, log)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (id) DO NOTHING
SQL
)
end

def db
@db ||= SQLite3::Database.new(filename)
end

def close_database
db&.close
@db = nil
end

def create_tables_if_needed
db.execute_batch(
<<~SQL
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY NOT NULL,
timestamp TEXT NOT NULL,
name TEXT NULL,
user_id TEXT NULL,
success INTEGER NULL,
message TEXT NOT NULL,
log_stream TEXT NULL,
log TEXT NULL
);
SQL
)
end

def generate_id(timestamp:,message:)
hash = Digest::SHA256.hexdigest("#{timestamp}|#{message}")
"NOID-#{hash}"
end
end

attr_reader :config

def initialize(config)
@config = config
end

def run(stdout: STDOUT)
def run(stdout: STDOUT, stderr: STDERR)
if config.count_distinct
values = {}
fetch { |row| values[row[config.count_distinct]] = true }
stdout.puts values.size
else
fetch { |row| stdout.puts format_response(row) }
return
end

output = create_output(stdout:, stderr:)

output.start do
fetch { |row| output.handle_row(row) }
end

rescue Interrupt
# catch interrupts (ctrl-c) and directly exit, this skips printing the backtrace
exit 1
end

# Relies on the fact that hashes preserve insertion order
# @param [Hash] row
# @return [String]
def format_response(row)
case config.format
when :csv
row.values.to_csv
when :json
row.to_json
else
raise "unknown format #{config.format}"
end
end

def fetch(&block)
cloudwatch_client.fetch(
Expand All @@ -90,6 +223,19 @@ class QueryCloudwatch
)
end

def create_output(stdout:, stderr:)
case config.format
when :csv
CsvOutput.new(stdout:)
when :json
JsonOutput.new(stdout:)
when :sqlite
SqliteOutput.new(filename: config.sqlite_database_file, stderr:)
else
raise "unknown format #{config.format}"
end
end

# @return [Config]
def self.parse!(argv:, stdin:, stdout:, now: Time.now)
config = Config.new(
Expand Down Expand Up @@ -235,6 +381,11 @@ class QueryCloudwatch
config.format = :csv
end

opts.on('--sqlite [FILENAME]', 'load output into the given Sqlite database (events.db by default). Output MUST include @message and @timestamp') do |filename|
config.format = :sqlite
config.sqlite_database_file = filename || 'events.db'
end

opts.on(
'--[no-]complete',
'whether or not to split query slices if exactly 10k rows are returned, defaults to off',
Expand Down Expand Up @@ -280,6 +431,10 @@ class QueryCloudwatch
end
end

if config.format == :sqlite
errors << "ERROR: can't do --count-distinct with --sqlite" if config.count_distinct
end

if config.count_distinct
config.complete = true
config.query = [
Expand Down
Loading

0 comments on commit 8cd17f8

Please sign in to comment.