Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rust] Feat: Add state + fix sources + views #21

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ edition = "2021"

[dependencies]

postgres = "0.19.3"
postgres = "0.19.3"
openssl = "0.10.38"
postgres-openssl = "0.5.0"
rust_decimal = { version = "1.28", features = ["db-postgres"] }
30 changes: 20 additions & 10 deletions rust/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
use postgres::{Client, NoTls, Error};
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
use postgres::{Client, Error};
use postgres_openssl::MakeTlsConnector;

/// Create a client using localhost
pub(crate) fn create_client() -> Result<Client, Error> {
let config = "postgres://materialize@localhost:6875/materialize";
Client::connect(config, NoTls)
let mut builder = SslConnector::builder(SslMethod::tls()).expect("Error creating builder.");
builder.set_verify(SslVerifyMode::NONE);
let connector = MakeTlsConnector::new(builder.build());

let config = "postgres://materialize@localhost:6875/materialize?sslmode=require";
Client::connect(config, connector)
}

// ----------------------------------
// Alternative way to create a client
// ----------------------------------
// pub(crate) fn create_client_with_config() -> Result<Client, Error> {
// Config::new()
// .host("localhost")
// .port(6875)
// .dbname("materialize")
// .user("materialize")
// .connect(NoTls)
// }
// let mut builder = SslConnector::builder(SslMethod::tls()).expect("Error creating builder.");
// builder.set_verify(SslVerifyMode::NONE);
// let connector = MakeTlsConnector::new(builder.build());

// Config::new()
// .host("localhost")
// .port(6875)
// .dbname("materialize")
// .user("materialize")
// .connect(connector)
// }
9 changes: 6 additions & 3 deletions rust/src/insert.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use postgres::{Error};
use postgres::Error;

use crate::connection::create_client;

Expand All @@ -9,5 +9,8 @@ pub(crate) fn insert() -> Result<u64, Error> {
let code = "GH";
let name = "Ghana";

client.execute("INSERT INTO countries(code, name) VALUES($1, $2)", &[&code, &name])
}
client.execute(
"INSERT INTO countries(code, name) VALUES($1, $2)",
&[&code, &name],
)
}
10 changes: 5 additions & 5 deletions rust/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use crate::view::create_materialized_view;
use insert::insert;
use query::run_query;
use source::create_source;
use table::create_table;
use crate::view::create_materialized_view;
use subscribe::subscribe;
use table::create_table;

mod connection;
mod insert;
mod query;
mod source;
mod state;
mod subscribe;
mod view;
mod connection;
mod table;

mod view;

fn main() {
create_source().expect("Error creating source.");
Expand Down
14 changes: 10 additions & 4 deletions rust/src/query.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use crate::connection::create_client;

/// Run a query over the table.
pub(crate) fn run_query () {
pub(crate) fn run_query() {
let mut client = create_client().expect("Error creating client.");

let results = client.query("SELECT code, name FROM countries;", &[]).expect("Error running query.");
let results = client
.query("SELECT code, name FROM countries;", &[])
.expect("Error running query.");

for row in results {
println!("{:} - {:}", row.get::<usize, String>(0), row.get::<usize, String>(1));
};
println!(
"{:} - {:}",
row.get::<usize, String>(0),
row.get::<usize, String>(1)
);
}
}
20 changes: 12 additions & 8 deletions rust/src/source.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use postgres::{Error};
use postgres::Error;

use crate::connection::create_client;

/// Creates a PUBNUB source
/// Creates a load generator source
pub(crate) fn create_source() -> Result<u64, Error> {
let mut client = create_client().expect("Error creating client.");

client.execute("
CREATE SOURCE IF NOT EXISTS market_orders_raw FROM PUBNUB
SUBSCRIBE KEY 'sub-c-4377ab04-f100-11e3-bffd-02ee2ddab7fe'
CHANNEL 'pubnub-market-orders'
", &[])
}
client.execute(
"
CREATE SOURCE IF NOT EXISTS counter
FROM LOAD GENERATOR COUNTER
(TICK INTERVAL '500ms')
WITH (SIZE = '3xsmall');
",
&[],
)
}
95 changes: 95 additions & 0 deletions rust/src/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#[derive(Debug, Clone)]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think if this makes it to documentation/example code rather than a demo we could do a bit better. What is the surrounding context of this? Does it end up in our documentation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tagging @morsapaes since I'm unsure about the future of this code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The examples in this repo are used as a basis for the existing integration guides in the documentation (no Rust one in there yet). We plan to make these runnable end-to-end, at some point.

pub(crate) struct Update<T>
where
T: std::hash::Hash + Eq,
{
pub(crate) value: T,
pub(crate) diff: i64,
}

pub(crate) struct State<T>
where
T: std::hash::Hash + Eq,
{
state: std::collections::HashMap<T, i64>,
timestamp: i64,
valid: bool,
history: Option<Vec<Update<T>>>,
}

impl<T> State<T>
where
T: std::hash::Hash + Eq + Clone,
{
pub(crate) fn new(collect_history: bool) -> State<T> {
State {
state: std::collections::HashMap::new(),
timestamp: 0,
valid: true,
history: if collect_history {
Some(Vec::new())
} else {
None
},
}
}

pub fn get_state(&self) -> Vec<T> {
let mut list: Vec<T> = Vec::new();

for (key, value) in &self.state {
for _ in 0..*value {
list.push(key.clone());
}
}

list
}

pub fn get_history(&self) -> Option<&Vec<Update<T>>> {
self.history.as_ref()
}

fn validate(&self, timestamp: i64) -> Result<(), String> {
if !self.valid {
Err("Invalid state.".to_string())
} else if timestamp < self.timestamp {
eprintln!("Invalid timestamp.");
// &mut self.valid = false;
Err(format!(
"Update with timestamp ({}) is lower than the last timestamp ({}). Invalid state.",
timestamp, self.timestamp
))
} else {
Ok(())
}
}

fn process(&mut self, update: Update<T>) {
let update_clone = update.clone();
let value = update.value;
let diff = update.diff;
let count = self.state.get(&value).map_or(diff, |&v| v + diff);

if count <= 0 {
self.state.remove(&value);
} else {
self.state.insert(value, count);
}

if let Some(history) = &mut self.history {
history.push(update_clone);
}
}

pub fn update(&mut self, updates: Vec<Update<T>>, timestamp: i64) -> Result<(), String> {
if !updates.is_empty() {
self.validate(timestamp)?;
self.timestamp = timestamp;
updates
.iter()
.for_each(|update| self.process(update.clone()));
}
Ok(())
}
}
54 changes: 48 additions & 6 deletions rust/src/subscribe.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,60 @@
use std::{thread::sleep, time::Duration};

use crate::connection::create_client;
use rust_decimal::{prelude::ToPrimitive, Decimal};

/// Run a subscribe over the PUBNUB materialized view
use crate::{
connection::create_client,
state::{State, Update},
};

#[derive(Hash, Eq, PartialEq, Debug, Clone)]
struct Sum {
sum: Decimal,
}

/// Run a subscribe over the materialized view
pub(crate) fn subscribe() {
let mut client = create_client().expect("Error creating client.");
let mut transaction = client.transaction().expect("Error creating transaction.");
transaction.execute("DECLARE c CURSOR FOR SUBSCRIBE (SELECT symbol, bid_price::text FROM market_orders) WITH (SNAPSHOT = false);", &[]).expect("Error creating cursor.");
transaction
.execute(
"DECLARE c CURSOR FOR SUBSCRIBE (SELECT sum FROM counter_sum) WITH (PROGRESS);",
&[],
)
.expect("Error creating cursor.");
let display_history = false;
let mut state: State<Sum> = State::new(display_history);
let mut buffer: Vec<Update<Sum>> = Vec::new();

loop {
let results = transaction.query("FETCH ALL c;", &[]).expect("Error running fetch.");
let results = transaction
.query("FETCH ALL c;", &[])
.expect("Error running fetch.");
for row in results {
println!("{:} - {:}", row.get::<usize, String>(2), row.get::<usize, String>(3));
let ts = row.get::<_, Decimal>("mz_timestamp").to_i64().unwrap();
let progress = row.get::<_, bool>("mz_progressed");

if progress {
// Update the state
state.update(buffer.clone(), ts).unwrap();

// Display new state:
println!("State: {:?}", state.get_state());
if (display_history) {
println!("History: {:?}", state.get_history());
}

// Clear the buffer
buffer.clear();
} else {
let diff = row.get::<_, Option<i64>>("mz_diff").unwrap();
let sum = row.get::<_, Option<Decimal>>("sum").unwrap();
buffer.push(Update {
value: Sum { sum },
diff,
})
}
}
sleep(Duration::from_millis(200));
}
}
}
11 changes: 7 additions & 4 deletions rust/src/table.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use postgres::{Error};
use postgres::Error;

use crate::connection::create_client;

/// Create a simple table
pub(crate) fn create_table() -> Result<u64, Error> {
let mut client = create_client().expect("Error creating client.");

client.execute("
client.execute(
"
CREATE TABLE IF NOT EXISTS countries(code TEXT, name TEXT);
", &[])
}
",
&[],
)
}
21 changes: 11 additions & 10 deletions rust/src/view.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use postgres::{Error};
use postgres::Error;

use crate::connection::create_client;

/// Creates a materialized view over the PUBNUB source
/// Creates a materialized view over the source
pub(crate) fn create_materialized_view() -> Result<u64, Error> {
let mut client = create_client().expect("Error creating client.");

client.execute("
CREATE MATERIALIZED VIEW IF NOT EXISTS market_orders AS
SELECT
val->>'symbol' AS symbol,
(val->'bid_price')::float AS bid_price
FROM (SELECT text::jsonb AS val FROM market_orders_raw)
", &[])
}
client.execute(
"
CREATE MATERIALIZED VIEW IF NOT EXISTS counter_sum AS
SELECT sum(counter)
FROM counter;`
",
&[],
)
}