From ba3fd854dd0d38e9a2a83326b3aa6e5f579b4e28 Mon Sep 17 00:00:00 2001 From: Joaquin Colacci Date: Wed, 1 Mar 2023 15:08:52 +0100 Subject: [PATCH] Refactor: Add state handler for subscriptions + fix sqls for sources and views --- rust/Cargo.toml | 5 ++- rust/src/connection.rs | 30 ++++++++----- rust/src/insert.rs | 9 ++-- rust/src/main.rs | 10 ++--- rust/src/query.rs | 14 +++++-- rust/src/source.rs | 20 +++++---- rust/src/state.rs | 95 ++++++++++++++++++++++++++++++++++++++++++ rust/src/subscribe.rs | 54 +++++++++++++++++++++--- rust/src/table.rs | 11 +++-- rust/src/view.rs | 21 +++++----- 10 files changed, 218 insertions(+), 51 deletions(-) create mode 100644 rust/src/state.rs diff --git a/rust/Cargo.toml b/rust/Cargo.toml index a5dd97d..2142515 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -7,4 +7,7 @@ edition = "2021" [dependencies] -postgres = "0.19.3" \ No newline at end of file +postgres = "0.19.3" +openssl = "0.10.38" +postgres-openssl = "0.5.0" +rust_decimal = { version = "1.28", features = ["db-postgres"] } diff --git a/rust/src/connection.rs b/rust/src/connection.rs index ffea2eb..2524e66 100644 --- a/rust/src/connection.rs +++ b/rust/src/connection.rs @@ -1,19 +1,29 @@ -use postgres::{Client, NoTls, Error}; +use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; +use postgres::{Client, Error}; +use postgres_openssl::MakeTlsConnector; /// Create a client using localhost pub(crate) fn create_client() -> Result { - let config = "postgres://materialize@localhost:6875/materialize"; - Client::connect(config, NoTls) + let mut builder = SslConnector::builder(SslMethod::tls()).expect("Error creating builder."); + builder.set_verify(SslVerifyMode::NONE); + let connector = MakeTlsConnector::new(builder.build()); + + let config = "postgres://materialize@localhost:6875/materialize?sslmode=require"; + Client::connect(config, connector) } // ---------------------------------- // Alternative way to create a client // ---------------------------------- // pub(crate) fn create_client_with_config() -> Result { -// Config::new() -// .host("localhost") -// .port(6875) -// .dbname("materialize") -// .user("materialize") -// .connect(NoTls) -// } \ No newline at end of file +// let mut builder = SslConnector::builder(SslMethod::tls()).expect("Error creating builder."); +// builder.set_verify(SslVerifyMode::NONE); +// let connector = MakeTlsConnector::new(builder.build()); + +// Config::new() +// .host("localhost") +// .port(6875) +// .dbname("materialize") +// .user("materialize") +// .connect(connector) +// } diff --git a/rust/src/insert.rs b/rust/src/insert.rs index bd58c28..1f14dd8 100644 --- a/rust/src/insert.rs +++ b/rust/src/insert.rs @@ -1,4 +1,4 @@ -use postgres::{Error}; +use postgres::Error; use crate::connection::create_client; @@ -9,5 +9,8 @@ pub(crate) fn insert() -> Result { let code = "GH"; let name = "Ghana"; - client.execute("INSERT INTO countries(code, name) VALUES($1, $2)", &[&code, &name]) -} \ No newline at end of file + client.execute( + "INSERT INTO countries(code, name) VALUES($1, $2)", + &[&code, &name], + ) +} diff --git a/rust/src/main.rs b/rust/src/main.rs index c6c6c9e..c23628c 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -1,18 +1,18 @@ +use crate::view::create_materialized_view; use insert::insert; use query::run_query; use source::create_source; -use table::create_table; -use crate::view::create_materialized_view; use subscribe::subscribe; +use table::create_table; +mod connection; mod insert; mod query; mod source; +mod state; mod subscribe; -mod view; -mod connection; mod table; - +mod view; fn main() { create_source().expect("Error creating source."); diff --git a/rust/src/query.rs b/rust/src/query.rs index e27cdea..a8f6d9f 100644 --- a/rust/src/query.rs +++ b/rust/src/query.rs @@ -1,12 +1,18 @@ use crate::connection::create_client; /// Run a query over the table. -pub(crate) fn run_query () { +pub(crate) fn run_query() { let mut client = create_client().expect("Error creating client."); - let results = client.query("SELECT code, name FROM countries;", &[]).expect("Error running query."); + let results = client + .query("SELECT code, name FROM countries;", &[]) + .expect("Error running query."); for row in results { - println!("{:} - {:}", row.get::(0), row.get::(1)); - }; + println!( + "{:} - {:}", + row.get::(0), + row.get::(1) + ); + } } diff --git a/rust/src/source.rs b/rust/src/source.rs index 662634c..5573ed4 100644 --- a/rust/src/source.rs +++ b/rust/src/source.rs @@ -1,14 +1,18 @@ -use postgres::{Error}; +use postgres::Error; use crate::connection::create_client; -/// Creates a PUBNUB source +/// Creates a load generator source pub(crate) fn create_source() -> Result { let mut client = create_client().expect("Error creating client."); - client.execute(" - CREATE SOURCE IF NOT EXISTS market_orders_raw FROM PUBNUB - SUBSCRIBE KEY 'sub-c-4377ab04-f100-11e3-bffd-02ee2ddab7fe' - CHANNEL 'pubnub-market-orders' - ", &[]) -} \ No newline at end of file + client.execute( + " + CREATE SOURCE IF NOT EXISTS counter + FROM LOAD GENERATOR COUNTER + (TICK INTERVAL '500ms') + WITH (SIZE = '3xsmall'); + ", + &[], + ) +} diff --git a/rust/src/state.rs b/rust/src/state.rs new file mode 100644 index 0000000..75510f3 --- /dev/null +++ b/rust/src/state.rs @@ -0,0 +1,95 @@ +#[derive(Debug, Clone)] +pub(crate) struct Update +where + T: std::hash::Hash + Eq, +{ + pub(crate) value: T, + pub(crate) diff: i64, +} + +pub(crate) struct State +where + T: std::hash::Hash + Eq, +{ + state: std::collections::HashMap, + timestamp: i64, + valid: bool, + history: Option>>, +} + +impl State +where + T: std::hash::Hash + Eq + Clone, +{ + pub(crate) fn new(collect_history: bool) -> State { + State { + state: std::collections::HashMap::new(), + timestamp: 0, + valid: true, + history: if collect_history { + Some(Vec::new()) + } else { + None + }, + } + } + + pub fn get_state(&self) -> Vec { + let mut list: Vec = Vec::new(); + + for (key, value) in &self.state { + for _ in 0..*value { + list.push(key.clone()); + } + } + + list + } + + pub fn get_history(&self) -> Option<&Vec>> { + self.history.as_ref() + } + + fn validate(&self, timestamp: i64) -> Result<(), String> { + if !self.valid { + Err("Invalid state.".to_string()) + } else if timestamp < self.timestamp { + eprintln!("Invalid timestamp."); + // &mut self.valid = false; + Err(format!( + "Update with timestamp ({}) is lower than the last timestamp ({}). Invalid state.", + timestamp, self.timestamp + )) + } else { + Ok(()) + } + } + + fn process(&mut self, update: Update) { + let update_clone = update.clone(); + let value = update.value; + let diff = update.diff; + let count = self.state.get(&value).map_or(diff, |&v| v + diff); + + if count <= 0 { + self.state.remove(&value); + } else { + self.state.insert(value, count); + } + + if let Some(history) = &mut self.history { + history.push(update_clone); + } + } + + pub fn update(&mut self, updates: Vec>, timestamp: i64) -> Result<(), String> { + if !updates.is_empty() { + self.validate(timestamp)?; + self.timestamp = timestamp; + updates + .iter() + .for_each(|update| self.process(update.clone())); + } + Ok(()) + } +} diff --git a/rust/src/subscribe.rs b/rust/src/subscribe.rs index 00ae6f7..99149a9 100644 --- a/rust/src/subscribe.rs +++ b/rust/src/subscribe.rs @@ -1,18 +1,60 @@ use std::{thread::sleep, time::Duration}; -use crate::connection::create_client; +use rust_decimal::{prelude::ToPrimitive, Decimal}; -/// Run a subscribe over the PUBNUB materialized view +use crate::{ + connection::create_client, + state::{State, Update}, +}; + +#[derive(Hash, Eq, PartialEq, Debug, Clone)] +struct Sum { + sum: Decimal, +} + +/// Run a subscribe over the materialized view pub(crate) fn subscribe() { let mut client = create_client().expect("Error creating client."); let mut transaction = client.transaction().expect("Error creating transaction."); - transaction.execute("DECLARE c CURSOR FOR SUBSCRIBE (SELECT symbol, bid_price::text FROM market_orders) WITH (SNAPSHOT = false);", &[]).expect("Error creating cursor."); + transaction + .execute( + "DECLARE c CURSOR FOR SUBSCRIBE (SELECT sum FROM counter_sum) WITH (PROGRESS);", + &[], + ) + .expect("Error creating cursor."); + let display_history = false; + let mut state: State = State::new(display_history); + let mut buffer: Vec> = Vec::new(); loop { - let results = transaction.query("FETCH ALL c;", &[]).expect("Error running fetch."); + let results = transaction + .query("FETCH ALL c;", &[]) + .expect("Error running fetch."); for row in results { - println!("{:} - {:}", row.get::(2), row.get::(3)); + let ts = row.get::<_, Decimal>("mz_timestamp").to_i64().unwrap(); + let progress = row.get::<_, bool>("mz_progressed"); + + if progress { + // Update the state + state.update(buffer.clone(), ts).unwrap(); + + // Display new state: + println!("State: {:?}", state.get_state()); + if (display_history) { + println!("History: {:?}", state.get_history()); + } + + // Clear the buffer + buffer.clear(); + } else { + let diff = row.get::<_, Option>("mz_diff").unwrap(); + let sum = row.get::<_, Option>("sum").unwrap(); + buffer.push(Update { + value: Sum { sum }, + diff, + }) + } } sleep(Duration::from_millis(200)); } -} \ No newline at end of file +} diff --git a/rust/src/table.rs b/rust/src/table.rs index ea5a04d..107bdcd 100644 --- a/rust/src/table.rs +++ b/rust/src/table.rs @@ -1,4 +1,4 @@ -use postgres::{Error}; +use postgres::Error; use crate::connection::create_client; @@ -6,7 +6,10 @@ use crate::connection::create_client; pub(crate) fn create_table() -> Result { let mut client = create_client().expect("Error creating client."); - client.execute(" + client.execute( + " CREATE TABLE IF NOT EXISTS countries(code TEXT, name TEXT); - ", &[]) -} \ No newline at end of file + ", + &[], + ) +} diff --git a/rust/src/view.rs b/rust/src/view.rs index 9d0f9cf..2b1ca62 100644 --- a/rust/src/view.rs +++ b/rust/src/view.rs @@ -1,16 +1,17 @@ -use postgres::{Error}; +use postgres::Error; use crate::connection::create_client; -/// Creates a materialized view over the PUBNUB source +/// Creates a materialized view over the source pub(crate) fn create_materialized_view() -> Result { let mut client = create_client().expect("Error creating client."); - client.execute(" - CREATE MATERIALIZED VIEW IF NOT EXISTS market_orders AS - SELECT - val->>'symbol' AS symbol, - (val->'bid_price')::float AS bid_price - FROM (SELECT text::jsonb AS val FROM market_orders_raw) - ", &[]) -} \ No newline at end of file + client.execute( + " + CREATE MATERIALIZED VIEW IF NOT EXISTS counter_sum AS + SELECT sum(counter) + FROM counter;` + ", + &[], + ) +}