diff --git a/Cargo.lock b/Cargo.lock index 0ff4833..38d83c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -190,6 +190,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", "windows-targets", ] @@ -1266,7 +1267,9 @@ dependencies = [ "hostname", "local-ip-address", "log", + "regex", "retry", + "serde", "serde_json", "sqlparser", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 12537c8..80c8921 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ log = { version = "0.4.20", features = [] } env_logger = "0.10.0" clap = { version = "4.4.4", features = ["derive","env"] } uuid = {version = "1.4.1", features = [ "v4", "serde" ]} -chrono = "0.4.31" +chrono = {version = "0.4.31", features = [ "serde" ]} fs-tail = "0.1.4" elasticsearch = "8.5.0-alpha.1" serde_json = "1.0.107" @@ -20,6 +20,33 @@ retry = "2.0.0" hostname = "0.3.1" local-ip-address = "0.5.6" clap_complete = "4.4.3" +regex = "1.9.6" +serde = "1.0.188" [build-dependencies] clap_mangen = "0.2.14" + + +[package.metadata.deb] +maintainer = "cargo-deb developers " +copyright = "2017, cargo-deb developers." +license-file = ["LICENSE", "2"] +extended-description = "example project for cargo-deb" +depends = "$auto" +section = "utils" +priority = "optional" +assets = [ + ["target/release/sql_normalizer", "usr/bin/", "755"], +] +default-features = false +features = ["example_debian_build"] + +[package.metadata.deb.variants.debug] +assets = [ + ["target/release/sql_normalizer", "usr/bin/", "755"], +] + +[features] +default = ["example_non_debian_build"] +example_non_debian_build = [] +example_debian_build = [] diff --git a/src/anonymize.rs b/src/anonymize.rs index f26c45a..4aeaff7 100644 --- a/src/anonymize.rs +++ b/src/anonymize.rs @@ -1,239 +1,151 @@ -use sqlparser::ast::{Expr, Query, Select, SetExpr, Statement, Value, Values, Offset, OrderByExpr, Function, FunctionArg, FunctionArgExpr}; + +use sqlparser::ast::{Expr, Query, Select, SetExpr, Statement, Offset, Value, FunctionArg}; use log::{debug}; +use sqlparser::ast::FunctionArgExpr::Expr as OtherExpr; +use crate::parser::Command; + -fn selection_changer(selection: Option) -> Option { +fn selection_changer(selection: &mut Expr) -> &mut Expr { debug!("Selection Changer: {:?}", selection); match selection { - Some(Expr::BinaryOp { left, op, right }) => { - Some(Expr::BinaryOp { - left: selection_changer(Some(*left)).map(Box::new).unwrap(), - op, - right: selection_changer(Some(*right)).map(Box::new).unwrap(), - }) - } - Some(Expr::Like { negated, expr, pattern, escape_char }) => { - Some(Expr::Like { - negated, - expr, - pattern: selection_changer(Some(*pattern)).map(Box::new).unwrap(), - escape_char, - }) + Expr::BinaryOp { left, right, .. } => { + *left = Box::new(selection_changer(left).to_owned()); + *right= Box::new(selection_changer(right).to_owned()); + }, + Expr::Like { pattern, .. } => { + *pattern = Box::new(selection_changer(pattern).to_owned()); } - Some(Expr::Value(_value)) => { - Some(Expr::Value(Value::Placeholder("?".to_string()))) + Expr::Value(value) => { + *value = Value::Placeholder("?".to_string()); } - Some(Expr::InList { expr, list: _, negated }) => { - Some(Expr::InList { - expr, - list: vec![Expr::Value(Value::Placeholder("?".to_string()))], - negated, - }) + Expr::InList { list , .. } => { + *list = vec![Expr::Value(Value::Placeholder("?".to_string()))]; } - Some(Expr::Between { expr, negated, low, high }) => { - Some(Expr::Between { - expr, - negated, - low: selection_changer(Some(*low.clone())).map(Box::new).unwrap(), - high: selection_changer(Some(*high.clone())).map(Box::new).unwrap(), - }) + Expr::Between { low, high, .. } => { + *low = Box::new(selection_changer(low).to_owned()); + *high = Box::new(selection_changer(high).to_owned()); } - Some(Expr::Subquery(query)) => { - Some(Expr::Subquery( - Box::new(matcher(&query)) - )) + Expr::Subquery(query) => { + *query = Box::new(matcher(query).to_owned()); } - Some(Expr::Nested(nested)) => { - Some(Expr::Nested( - Box::new(selection_changer(Some(*nested)).unwrap()) - )) + Expr::Nested(nested) => { + *nested = Box::new(selection_changer(nested).to_owned()); } - Some(Expr::Function(function)) => { - let Function { name, mut args, over, distinct, special, order_by } = function; - { - if args.len() > 1 { - args = args.drain(0..2).collect(); - } - + Expr::Function(function) => { + if function.args.len() > 1 { + function.args = function.args.drain(0..2).collect(); + } - for arg in args.iter_mut() { - *arg = match arg { - FunctionArg::Unnamed(f_arg) => { - FunctionArg::Unnamed(match f_arg { - FunctionArgExpr::Expr(f_arg_expr) => { - FunctionArgExpr::Expr(selection_changer(Some(f_arg_expr.clone())).unwrap()) - } - _ => { - f_arg.clone() - } - }) + for arg in function.args.iter_mut() { + match arg { + FunctionArg::Unnamed(ref mut f_arg) => { + let OtherExpr(f_arg_expr) = f_arg else { panic!("{}", f_arg) }; + { + *f_arg_expr = selection_changer(f_arg_expr).to_owned(); } - FunctionArg::Named { name, arg } => { - FunctionArg::Named { - name: name.clone(), - arg: match arg { - FunctionArgExpr::Expr(f_arg_expr) => { - FunctionArgExpr::Expr(selection_changer(Some(f_arg_expr.clone())).unwrap()) - } - _ => { - arg.clone() - } - }, - } + }, + FunctionArg::Named { ref mut arg, .. } => { + let OtherExpr(f_arg_expr) = arg else { panic!("{}", arg) }; + { + *f_arg_expr = selection_changer(f_arg_expr).to_owned(); } } } - - - Some(Expr::Function(Function { - name, - args, - over, - distinct, - special, - order_by, - })) } + } - _ => { - selection - } - } + _ => {} + }; + selection } -fn matcher(query: &Query) -> Query { +fn matcher(query: &mut Query) -> &mut Query { debug!("matcher: {:?}", query); - let mut query = query.clone(); - let replaced_body = match &*query.body { - SetExpr::Values(values) => { - let mut replaced_rows = values.rows.clone(); - for xx in replaced_rows.iter_mut() { + match &mut *query.body { + SetExpr::Values(values) => { + for xx in values.rows.iter_mut() { for yy in xx.iter_mut() { - *yy = selection_changer(Some(yy.clone())).unwrap(); + *yy = selection_changer(yy).to_owned(); } }; - SetExpr::Values(Values { explicit_row: false, rows: replaced_rows }) } SetExpr::Select(select) => { - let select = select.clone(); - SetExpr::Select(Box::new(Select { - distinct: select.distinct, - top: None, - projection: select.projection, - into: None, - from: select.from, - // Add or modify other fields as needed - // For example, you can add a WHERE clause as follows: - lateral_views: select.lateral_views, - selection: selection_changer(select.selection), - // ... - group_by: select.group_by, - cluster_by: select.cluster_by, - distribute_by: select.distribute_by, - sort_by: select.sort_by, - having: None, - named_window: select.named_window, - qualify: None, - })) - } + let Select { selection, .. } = select.as_mut(); + { + if !selection.is_none() { + *selection = Some(selection_changer(selection.as_mut().unwrap()).to_owned()); + } + } - _ => { - *query.body } + _ => () }; - let replaced_offset = match query.offset { - Some(Offset { value, rows }) => { - Some(Offset { value: selection_changer(Some(value)).unwrap(), rows }) - } - _ => { - query.offset + if query.offset.is_some() { + let Offset { value, .. } = query.offset.as_mut().unwrap(); + { + *value = selection_changer(value).to_owned(); } - }; - - for order_by in query.order_by.iter_mut() { - *order_by = { - OrderByExpr { - expr: selection_changer(Some(order_by.expr.clone())).unwrap(), - asc: order_by.asc, - nulls_first: order_by.nulls_first, - } - } } + for order_by in query.order_by.iter_mut() { + order_by.expr = selection_changer(&mut order_by.expr).to_owned() + } - Query { - with: query.with, - body: Box::new(replaced_body), - order_by: query.order_by, - limit: selection_changer(query.limit), - offset: replaced_offset, - fetch: query.fetch, - locks: query.locks, + if query.limit.is_some() { + query.limit = Some(selection_changer(query.limit.as_mut().unwrap()).to_owned()); } + + query } -#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)] +#[derive(Debug, Clone)] pub struct Replaced { - pub statement_type: String, + pub statement_type: Command, pub statement: Statement } pub fn rec(statement: &mut Statement) -> Replaced { debug!("rec: {:?}", statement); - let typed; + let typed ; match statement { Statement::Query(query) => { - *statement = Statement::Query(Box::new(matcher(query))); - typed = "query"; - } - Statement::Explain { describe_alias, analyze, verbose, statement: explain_statement, format } => { - - *statement = Statement::Explain { - describe_alias: *describe_alias, - analyze: *analyze, - verbose: *verbose, - statement: Box::new(rec(explain_statement).statement.clone()), - format: *format - }; - typed = "explain"; - } + *query = Box::new(matcher(query).to_owned()); + typed = Command::Query; + }, + Statement::Explain { statement: explain_statement, .. } => { + *explain_statement = Box::new(rec(explain_statement).statement.clone()); + typed = Command::Explain; + }, + Statement::Insert { source,.. } => { + *source = Box::new(matcher(source).to_owned()); + typed = Command::Insert; + }, + Statement::Update { selection,assignments, .. } => { + + *selection = Some(selection_changer(selection.as_mut().unwrap()).clone()); + + for assigment in assignments.iter_mut() { + assigment.value = selection_changer(&mut assigment.value).to_owned(); + } + + typed = Command::Update; + }, + Statement::Delete { selection, .. } => { + + *selection = Some(selection_changer(selection.as_mut().unwrap()).clone()); + typed = Command::Delete; - Statement::Insert { or, into, table_name, columns, overwrite, source, partitioned, after_columns, table, on, returning } => { - - *statement = Statement::Insert { - or: *or, - into: *into, - table_name: table_name.clone(), - columns: columns.to_vec(), - overwrite: *overwrite, - source: Box::new(matcher(source)), - partitioned: partitioned.clone(), - after_columns: after_columns.to_vec(), - table: *table, - on: on.clone(), - returning: returning.clone(), - }; - typed = "insert"; - } - Statement::Update { table, assignments, from, selection, returning } => { - *statement = Statement::Update { - table: table.clone(), - assignments: assignments.to_vec(), - from: from.clone(), - selection: selection_changer(selection.clone()), - returning: returning.clone(), - }; - typed = "update"; }, _ => { - typed = "other"; + typed = Command::Other; } }; Replaced { - statement_type: typed.to_string(), + statement_type: typed, statement: statement.clone(), } } diff --git a/src/build.rs b/src/build.rs new file mode 100644 index 0000000..33ab581 --- /dev/null +++ b/src/build.rs @@ -0,0 +1,15 @@ +fn main() -> std::io::Result<()> { + let out_dir = std::path::PathBuf::from(std::env::var_os("OUT_DIR").ok_or(std::io::ErrorKind::NotFound)?); + + let cmd = clap::Command::new("mybin") + .arg(clap::arg!(-n --name )) + .arg(clap::arg!(-c --count )); + + let man = clap_mangen::Man::new(cmd); + let mut buffer: Vec = Default::default(); + man.render(&mut buffer)?; + + std::fs::write(out_dir.join("mybin.1"), buffer)?; + + Ok(()) +} \ No newline at end of file diff --git a/src/cli.rs b/src/cli.rs index 0144a3d..4802f91 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,26 +1,7 @@ -use std::path::PathBuf; use clap::{Args, Parser, Subcommand}; use elasticsearch::http::Url; use log::LevelFilter; - -#[derive(Parser)] -#[command(author, version, about, long_about = None)] -pub struct Client { - /// Turn debugging information on - #[arg(short, long, default_value="error", global=true)] - pub log_level: LevelFilter, - - /// Commands - #[command(subcommand)] - pub command: Commands, -} - -#[derive(Subcommand)] -pub enum Commands { - /// Adds files to myapp - Send(AddArgs), -} - +use std::path::PathBuf; #[derive(Args,Debug)] pub struct AddArgs { @@ -47,28 +28,58 @@ pub struct AddArgs { #[clap(value_enum)] pub input: Input, - /// Sets a custom config file + /// Sets a input file path #[arg(short = 'f', long, value_name = "FILE", required_if_eq_any([("input", "general"), ("input", "slow")]))] pub input_file: Option, - /// Sets a custom config file + /// Sets a output file path #[arg(short = 'o', long, value_name = "FILE", required_if_eq("output", "file"), default_value="output.txt")] pub output_file: Option, + /// Sets a push size + #[arg(short='s', long, default_value = "1000")] + pub elastic_push_size: u16, + + /// Sets a push seconds + #[arg(short='c', long, default_value = "15")] + pub elastic_push_seconds: u16, + + /// Sets Elastic password. + #[arg(short='n', long, default_value = "mysql_logs", env = "ELASTIC_INDEX")] + pub elastic_index_name: Option, + + + } -#[derive(clap::ValueEnum, Clone, Debug)] -pub enum Output { - File, - Elastic +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +pub struct Client { + /// Turn debugging information on + #[arg(short, long, default_value="error", global=true)] + pub log_level: LevelFilter, + + /// Commands + #[command(subcommand)] + pub command: Commands, +} + +#[derive(Subcommand)] +pub enum Commands { + Send(AddArgs), } #[derive(clap::ValueEnum, Clone, Debug)] pub enum Input { Slow, - General, - Syslog + General +} + +#[derive(clap::ValueEnum, Clone, Debug, PartialEq)] +pub enum Output { + File, + Elastic } diff --git a/src/elastic.rs b/src/elastic.rs index b330bd0..88b5842 100644 --- a/src/elastic.rs +++ b/src/elastic.rs @@ -1,63 +1,39 @@ -#![crate_name = "elastic"] - +use chrono::{DateTime, FixedOffset, Local}; +use crate::{cli}; +use crate::cli::Commands::Send; use elasticsearch::{BulkParts, Elasticsearch}; -use elasticsearch::http::transport::{SingleNodeConnectionPool, TransportBuilder}; +use elasticsearch::auth::Credentials; use elasticsearch::http::request::JsonBody; use elasticsearch::http::response::Response; - +use elasticsearch::http::transport::{SingleNodeConnectionPool, TransportBuilder}; +use log::{error, info}; +use crate::parser::LogEntry; use serde_json::{json, Value}; use uuid::Uuid; -use std::env; -use chrono::{DateTime, FixedOffset, Local}; -use elasticsearch::auth::Credentials; -use log::error; -use crate::{cli, parser}; -use crate::cli::Commands::Send; -pub async fn collect(mut body: Vec, user: &str, date: &str, sql: String, statement: String) -> Vec { - let id = Uuid::new_v4(); +pub async fn collect(mut body: Vec, log_entry: &LogEntry) -> Vec { + let _id = Uuid::new_v4(); let local_time: DateTime = Local::now(); - let log_time = parser::parse_timestamp(date.clone()).unwrap(); + let Send(client) = cli::cli().command; let datetime_with_timezone = - DateTime::::from_naive_utc_and_offset(log_time, - local_time.offset().clone()); - - body.push(json!({"index": {"_id": id}})); - - body.push(json!({ - "id": id, - "host": hostname(), - "statement": statement, - "ip": local_ip_address::local_ip().unwrap(), - "user": user, - "log_time": date, - "sql": sql - })); + DateTime::::from_naive_utc_and_offset(log_entry.timestamp, + *local_time.offset()); + body.push(json!({"index": {"_id": log_entry.id }})); - if body.len() > 1000 - || local_time.signed_duration_since(datetime_with_timezone).num_seconds() > 15 { + body.push(json!(log_entry)); + if body.len() > client.elastic_push_size as usize + || local_time.signed_duration_since(datetime_with_timezone).num_seconds() > client.elastic_push_seconds as i64 { match send(body.to_vec()).await { Ok(_) => {} Err(e) => error!("{:?}", e) } body.truncate(0); } - body -} -fn hostname() -> Option { - match env::var("HOSTNAME") { - Ok(val) => Some(val), - Err(_) => { - match hostname::get() { - Ok(host) => Some(host.to_string_lossy().into_owned()), - Err(_) => None, - } - } - } + body } async fn elastic_connect() -> Result> { @@ -68,8 +44,6 @@ async fn elastic_connect() -> Result> let conn_pool = SingleNodeConnectionPool::new(name.elastic_host.clone().unwrap()); let mut transport = TransportBuilder::new(conn_pool) .disable_proxy(); - - if name.elastic_user.is_some() { transport = transport.auth(Credentials::Basic(name.elastic_user.clone().unwrap(), name.elastic_password.clone().unwrap())); } @@ -82,14 +56,20 @@ async fn elastic_connect() -> Result> async fn send(body: Vec) -> Result> { let mut request_body: Vec> = Vec::with_capacity(body.len()); + info!("Elastic send statistic - {}",body.len()); + for body_datum in body { request_body.push(body_datum.into()) } + let client = elastic_connect().await?; + let cli = cli::cli(); + + let Send(options) = cli.command; let response = client - .bulk(BulkParts::Index("mysql_logs")) + .bulk(BulkParts::Index(options.elastic_index_name.unwrap().as_str())) .body(request_body) .send() .await?; diff --git a/src/main.rs b/src/main.rs index 38b8389..14b50f9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,10 +6,7 @@ mod elastic; use std::env; use std::fs::File; -use log::{error, info}; - - -use sqlparser::dialect::MySqlDialect; +use log::{ error}; use std::io::{BufRead, Write}; use env_logger::{Builder, Target}; @@ -19,28 +16,18 @@ use env_logger::{Builder, Target}; #[allow(unused_imports)] use std::io::{self, Read}; use fs_tail::TailedFile; -use sqlparser::parser::Parser; -use elastic::collect; use serde_json::Value; use cli::Commands::Send; +use crate::cli::Output; +use crate::elastic::collect; +use crate::parser::{ MultiLine}; #[tokio::main] async fn main() -> Result<(), Box> { - - - - // log::set_max_level(LevelFilter::Debug); - // env_logger::init(); - - - let cli = cli::cli(); - let dialect = MySqlDialect {}; - - match env::var("RUST_LOG") { Err(_) => { Builder::new() @@ -63,57 +50,49 @@ async fn main() -> Result<(), Box> { Ok(file) => { let file = TailedFile::new(file); let locked = file.lock(); + let mut file_write ; + + file_write = File::create(name.output_file.clone().unwrap()).unwrap(); + - let mut file_write = File::create(name.output_file.clone().unwrap()).unwrap(); let mut collected_data: Vec = Vec::new(); + let mut log_entries: MultiLine = MultiLine {log_entries: Vec::new(), multi_line:false, sql: "".to_string(), temp_entry: None}; for line in locked.lines() { - if let Some(log_entry) = parser::parse_mysql_log_entry(&line.unwrap()) { - match parser::parse_timestamp(&log_entry.timestamp) { - Ok(_parsed_timestamp) => { - match Parser::parse_sql(&dialect, &log_entry.sql_query) { - Ok(mut ast) => { - for statement in ast.iter_mut() { - let replaced = anonymize::rec(statement); - *statement = replaced.statement; - match name.output { - cli::Output::Elastic => { - collected_data = collect(collected_data,&log_entry.user, &log_entry.timestamp, statement.to_string(), replaced.statement_type).await; - }, - cli::Output::File => { - File::write(&mut file_write, (statement.to_string() + "\n").as_bytes()).expect("TODO: panic message"); - } - } - - }; - - info!("Modified SQL Tree: {:?}", ast); - info!("Modified SQL: {:?}", ast[0].to_string()); - } - Err(err) => { - error!("Error parsing timestamp: {}", err); - } + match name.input { + cli::Input::General => { + log_entries = parser::parse_mysql_log_entry(&line.unwrap(), log_entries); + }, + cli::Input::Slow => { + log_entries = parser::parse_mysql_slow_log_entry(&line.unwrap(), log_entries); + + } + } + + for log_entry in log_entries.log_entries.iter_mut() { + match name.output { + Output::File => { + if !log_entry.replaced_query.is_empty() { + log_entry.original_query = "".to_string(); + let _ = File::write(&mut file_write,(serde_json::to_string(&log_entry).unwrap() + "\n").as_bytes()); + log_entry.replaced_query = "".to_string(); } - } - Err(err) => { - error!("Error parsing timestamp: {}", err); + }, + Output::Elastic => { + log_entry.original_query = "".to_string(); + collected_data = collect(collected_data, log_entry).await; } } } + let _ = File::flush(&mut file_write); } - file_write.flush().expect("TODO: panic message"); + }, }; } } } - // match cli.debug { - // 0 => println!("Debug mode is off"), - // 1 => println!("Debug mode is kind of on"), - // 2 => println!("Debug mode is on"), - // _ => println!("Don't be crazy"), - // } Ok(()) } diff --git a/src/parser.rs b/src/parser.rs index be4d7a1..ebd332a 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -1,39 +1,241 @@ +use std::env; +use std::net::IpAddr; use chrono::{NaiveDateTime, ParseError}; +use log::{error}; +use regex::Regex; +use sqlparser::dialect::MySqlDialect; +use sqlparser::parser::{Parser, ParserError}; +use uuid::Uuid; +use crate::{anonymize, cli}; +use serde::Serialize; +#[derive(Debug, Serialize, Clone)] +pub struct MultiLine { + pub log_entries: Vec, + pub temp_entry: Option, + pub multi_line: bool, + pub sql: String +} + +pub fn parse_mysql_log_entry(log_line: &str, mut multilines: MultiLine) -> MultiLine { + + let mut parts: Vec<&str> = log_line.split_whitespace().collect(); + + let new_line = multilines.sql.clone(); + + match parse_timestamp(parts[0]) { + Ok(_) => { + parts = new_line.split_whitespace().collect(); + multilines = MultiLine {log_entries: Vec::new(), multi_line: false, sql: log_line.to_string(), temp_entry: None}; + }, + Err(_) => { + multilines.multi_line = true; + multilines.sql += log_line; + parts.truncate(0); + } + } -pub fn parse_mysql_log_entry(log_line: &str) -> Option { - // Example: MySQL general log format "[timestamp] user@host: SQL query" - // You should customize this logic to match your log format - let parts: Vec<&str> = log_line.split_whitespace().collect(); + let mut log_entries: Vec = Vec::new(); - if parts.len() >= 4 { - let timestamp = parts[0].trim_matches('[').trim_matches(']'); - let user_host = parts[1]; + if parts.len() >= 4 { + let timestamp = parts[0]; + let _id = parts[1]; + let _command = parts[1]; let sql_query = parts[3..].join(" "); - // Extract user from "user@host" - let user_parts: Vec<&str> = user_host.split('@').collect(); - let user = user_parts.first().unwrap_or(&"").trim(); + match anonymize_sql(sql_query.to_string()) { + Ok(result) => for ast in result { + let mut entry = LogEntry::default(); + + entry.timestamp = parse_timestamp(timestamp).unwrap(); + entry.command = ast.statement_type; + // entry.original_query = sql_query.to_string(); + entry.replaced_query = ast.statement.to_string(); + log_entries.push(entry); + }, + Err(_error) => { + // error!("{} - {}", log_line.to_string(),error); + } + } + } + multilines.log_entries = log_entries; + multilines +} + +pub fn parse_mysql_slow_log_entry(log_line: &str, mut multilines: MultiLine) -> MultiLine { + let re = Regex::new(r"^# Time: (.+)$|^# User@Host: (.+)Id:|^# Query_time: (.+)$|^SET timestamp=(.+);$").unwrap(); + let mut log_entries: Vec = Vec::new(); + let mut log_entry = LogEntry::default(); + if multilines.temp_entry.is_some() { + log_entry = multilines.temp_entry.clone().unwrap(); + } + if let Some(captures) = re.captures(log_line) { + if let Some(match_time) = captures.get(1) { + if multilines.multi_line && log_entry.timestamp != NaiveDateTime::default() { + // log_entry[0].original_query = log_line.to_string(); + + + match anonymize_sql(log_entry.original_query.clone()) { + Ok(result) => { + for ast in result { + let mut c_entry = log_entry.clone(); + c_entry.replaced_query = ast.statement.to_string(); + c_entry.command = ast.statement_type; + log_entries.push(c_entry); + } + }, + Err(_error) => { + + } + } + + } + multilines.multi_line = false; + log_entry.original_query = "".to_string(); + + + log_entry = LogEntry::default(); + log_entry.timestamp = parse_timestamp(match_time.as_str()).unwrap(); + } + if let Some(match_time) = captures.get(2) { + log_entry.user = Some(match_time.as_str().trim().to_string()); + } + if let Some(match_time) = captures.get(3) { + let line_re = Regex::new(r"^(.+)Lock_time:(.+)Rows_sent:(.+)Rows_examined:\s(.+)").unwrap(); + let parse_info = line_re.captures(match_time.as_str()).unwrap(); + log_entry.query_time = Some(parse_info.get(1).unwrap().as_str().trim().parse::().unwrap()); + log_entry.lock_time = Some(parse_info.get(2).unwrap().as_str().trim().parse::().unwrap()); + log_entry.row_sent = Some(parse_info.get(3).unwrap().as_str().trim().parse::().unwrap()); + let last_part: Vec<&str> = parse_info.get(4).unwrap().as_str().split_whitespace().collect(); + log_entry.row_examined = Some(last_part.first().unwrap().parse::().unwrap()); + } + + if let Some(_match_time) = captures.get(4) { + multilines.multi_line = true; + } + + multilines.temp_entry = Some(log_entry); + multilines.log_entries = log_entries; + multilines - Some(LogEntry { - timestamp: timestamp.to_string(), - user: user.to_string(), - sql_query: sql_query.to_string(), - }) } else { - None + log_entry.original_query += log_line.replace("\t","").as_str(); + multilines.temp_entry = Some(log_entry); + multilines } } -#[derive(Debug)] +#[derive(Debug, Serialize, Clone)] pub struct LogEntry { - pub timestamp: String, - pub user: String, - pub sql_query: String, + pub timestamp: NaiveDateTime, + pub id: Uuid, + pub command: Command, + pub replaced_query: String, + pub original_query: String, + pub host: String, + pub ip: IpAddr, + pub user: Option, + pub query_time: Option, + pub lock_time: Option, + pub row_sent: Option, + pub row_examined: Option, + +} + +#[derive(Debug, Clone, Serialize,PartialEq)] +pub enum Command { + Query, + Insert, + Update, + Other, + Explain, + InsertIgnore, + Delete, + DeleteWithLimit, + None +} + + +fn hostname() -> Option { + match env::var("HOSTNAME") { + Ok(val) => Some(val), + Err(_) => { + match hostname::get() { + Ok(host) => Some(host.to_string_lossy().into_owned()), + Err(_) => None, + } + } + } +} + +impl Default for LogEntry { + fn default() -> LogEntry { + LogEntry { + timestamp: Default::default(), + id: Uuid::new_v4(), + command: Command::Other, + replaced_query: "".to_string(), + original_query: "".to_string(), + host: hostname().unwrap(), + ip: local_ip_address::local_ip().unwrap(), + user: None, + query_time: None, + lock_time: None, + row_sent: None, + row_examined: None, + } + } } pub fn parse_timestamp(timestamp_str: &str) -> Result { let format_str = "%Y-%m-%dT%H:%M:%S%.6fZ"; // Adjust this format to match your timestamp format NaiveDateTime::parse_from_str(timestamp_str, format_str) +} + + +fn anonymize_sql(mut sql: String) -> Result, ParserError> { + let cli = cli::cli(); + + let dialect = MySqlDialect {}; + + let cli::Commands::Send(_name) = cli.command; + let mut command = Command::None; + sql = sql.replace(" "," "); + let re = Regex::new(r"LIMIT ([0-9])").unwrap(); + + if !sql.starts_with('#') { + if sql.to_uppercase().contains("INSERT IGNORE INTO") { + + sql = sql.replace("INSERT IGNORE INTO", "INSERT INTO"); + command = Command::InsertIgnore; + + } + if sql.to_uppercase().starts_with("DELETE ") { + sql = re.replace(&sql,"").to_string(); + command = Command::DeleteWithLimit; + } + + return match Parser::parse_sql(&dialect, sql.as_str()) { + Ok(mut ast) => { + let mut replaced = Vec::new(); + for statement in ast.iter_mut() { + let mut changed = anonymize::rec(statement); + if command != Command::None { + changed.statement_type = command.to_owned(); + } + replaced.push(changed); + }; + // sql = ast[0].to_string(); + // info!("Modified SQL: {:?}", ast.); + Ok(replaced) + } + Err(err) => { + error!("Error parsing sql: {} - {}", err,sql); + Err(err) + } + } + } + + Ok(Vec::new()) } \ No newline at end of file