From 0223a6778f7b1e36915a8e7dfeeb2c38c333acf0 Mon Sep 17 00:00:00 2001 From: Joaquin Colacci Date: Thu, 23 Feb 2023 19:28:45 +0100 Subject: [PATCH 1/3] Feat: subscription state handling + ssl_mode = require + source + view fix --- java/connection.java | 1 - java/insert.java | 1 - java/query.java | 1 - java/source.java | 10 +-- java/subscribe.java | 187 +++++++++++++++++++++++++++++++++++++++++-- java/view.java | 11 +-- 6 files changed, 190 insertions(+), 21 deletions(-) diff --git a/java/connection.java b/java/connection.java index 464d477..56931de 100644 --- a/java/connection.java +++ b/java/connection.java @@ -18,7 +18,6 @@ public Connection connect() { Properties props = new Properties(); props.setProperty("user", user); props.setProperty("password", password); - props.setProperty("ssl","true"); Connection conn = null; try { conn = DriverManager.getConnection(url, props); diff --git a/java/insert.java b/java/insert.java index 9816587..d4e1030 100644 --- a/java/insert.java +++ b/java/insert.java @@ -21,7 +21,6 @@ public Connection connect() throws SQLException { Properties props = new Properties(); props.setProperty("user", user); props.setProperty("password", password); - props.setProperty("ssl","true"); return DriverManager.getConnection(url, props); diff --git a/java/query.java b/java/query.java index 8a5d4c4..4e75d66 100644 --- a/java/query.java +++ b/java/query.java @@ -20,7 +20,6 @@ public Connection connect() throws SQLException { Properties props = new Properties(); props.setProperty("user", user); props.setProperty("password", password); - props.setProperty("ssl","true"); return DriverManager.getConnection(url, props); diff --git a/java/source.java b/java/source.java index f390cae..cf73725 100644 --- a/java/source.java +++ b/java/source.java @@ -8,7 +8,7 @@ public class App { - private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize"; + private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?ssl_mode=require"; private final String user = "MATERIALIZE_USERNAME"; private final String password = "MATERIALIZE_PASSWORD"; @@ -21,16 +21,16 @@ public Connection connect() throws SQLException { Properties props = new Properties(); props.setProperty("user", user); props.setProperty("password", password); - props.setProperty("ssl","true"); return DriverManager.getConnection(url, props); } public void source() { - - String SQL = "CREATE SOURCE counter FROM " - + "LOAD GENERATOR COUNTER"; + String SQL = "CREATE SOURCE IF NOT EXISTS counter" + + "FROM LOAD GENERATOR COUNTER" + + "(TICK INTERVAL '500ms')" + + "WITH (SIZE = '3xsmall');"; try (Connection conn = connect()) { Statement st = conn.createStatement(); diff --git a/java/subscribe.java b/java/subscribe.java index 5a56b96..b22cab5 100644 --- a/java/subscribe.java +++ b/java/subscribe.java @@ -4,10 +4,15 @@ import java.util.Properties; import java.sql.ResultSet; import java.sql.Statement; +import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; public class App { - private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize"; + private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?ssl_mode=require"; private final String user = "MATERIALIZE_USERNAME"; private final String password = "MATERIALIZE_PASSWORD"; @@ -20,7 +25,6 @@ public Connection connect() throws SQLException { Properties props = new Properties(); props.setProperty("user", user); props.setProperty("password", password); - props.setProperty("ssl","true"); return DriverManager.getConnection(url, props); @@ -31,11 +35,36 @@ public void subscribe() { Statement stmt = conn.createStatement(); stmt.execute("BEGIN"); - stmt.execute("DECLARE c CURSOR FOR SUBSCRIBE my_view"); + stmt.execute("DECLARE c CURSOR FOR SUBSCRIBE (SELECT sum FROM counter_sum) WITH (PROGRESS);"); + State state = new State>(false); + while (true) { ResultSet rs = stmt.executeQuery("FETCH ALL c"); - if(rs.next()) { - System.out.println(rs.getString(1) + " " + rs.getString(2) + " " + rs.getString(3)); + boolean updated = false; + while (rs.next()) { + // Map row fields + long ts = rs.getLong("mz_timestamp"); + boolean progress = rs.getBoolean("mz_progressed"); + int diff = rs.getInt("mz_diff"); + + Map rowData = new HashMap<>(); + rowData.put("sum", rs.getInt("sum")); + + // When a progress is detected, get the last values + if (progress) { + if (updated) { + updated = false; + System.out.println(state.getValues()); + } + } else { + // Update the state with the last data + updated = true; + try { + state.update(new Update>(null, rowData, diff), ts); + } catch (Exception e) { + e.printStackTrace(); + } + } } } } catch (SQLException ex) { @@ -47,4 +76,150 @@ public static void main(String[] args) { App app = new App(); app.subscribe(); } -} \ No newline at end of file +} + +/* + * State class to handle updates from a subscription. + */ +class Update { + private final Optional key; + private final T value; + private final int diff; + + public Update(T value, int diff) { + this.key = Optional.empty(); + this.value = value; + this.diff = diff; + } + + public Update(String key, T value, int diff) { + this.key = Optional.ofNullable(key); + this.value = value; + this.diff = diff; + } + + public Optional getKey() { + return key; + } + + public T getValue() { + return value; + } + + public int getDiff() { + return diff; + } +} + +class State { + + private Map state; + private Map stateCount; + private long timestamp; + private boolean valid; + private List> history; + + public State(boolean collectHistory) { + this.state = new HashMap<>(); + this.stateCount = new HashMap<>(); + this.timestamp = 0; + this.valid = true; + if (collectHistory) { + this.history = new ArrayList<>(); + } + } + + public Object get(String key) { + return this.state.get(key); + } + + public List getKeys() { + return new ArrayList<>(this.state.keySet()); + } + + public List getValues() { + return new ArrayList(this.state.values()); + } + + public boolean isValid() { + return this.valid; + } + + public long getTimestamp() { + return this.timestamp; + } + + public Map getState() { + return this.state; + } + + public List> getHistory() { + return this.history; + } + + private void applyDiff(String key, int diff) { + // Count value starts as a null + if (!this.stateCount.containsKey(key)) { + this.stateCount.put(key, diff); + } else { + this.stateCount.put(key, this.stateCount.get(key) + diff); + } + } + + private String hash(T value) { + return value.toString(); // You can use any other hash function + } + + private void validate(long timestamp) { + if (!this.valid) { + throw new IllegalStateException("Invalid state."); + } else if (timestamp < this.timestamp) { + System.err.println("Invalid timestamp."); + this.valid = false; + throw new IllegalStateException( + "Update with timestamp (" + timestamp + ") is lower than the last timestamp (" + + this.timestamp + "). Invalid state."); + } + } + + private void process(Update update) { + String key = ""; + if (update.getKey().isPresent()) { + key = update.getKey().get(); + } else { + key = this.hash(update.getValue()); + } + this.applyDiff(key, update.getDiff()); + int count = this.stateCount.get(key); + + if (history != null) { + this.history.add(update); + } + + if (count <= 0) { + this.state.remove(key); + this.stateCount.remove(key); + } else { + this.state.put(key, update.getValue()); + } + } + + public void update(Update update, long timestamp) { + this.validate(timestamp); + this.timestamp = timestamp; + this.process(update); + } + + public void batchUpdate(List> updates, long timestamp) { + if (updates != null && !updates.isEmpty()) { + this.validate(timestamp); + this.timestamp = timestamp; + updates.forEach(this::process); + } + } + + @Override + public String toString() { + return state.toString(); + } +} diff --git a/java/view.java b/java/view.java index cda21df..0a53387 100644 --- a/java/view.java +++ b/java/view.java @@ -8,7 +8,7 @@ public class App { - private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize"; + private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?ssl_mode=require"; private final String user = "MATERIALIZE_USERNAME"; private final String password = "MATERIALIZE_PASSWORD"; @@ -21,18 +21,15 @@ public Connection connect() throws SQLException { Properties props = new Properties(); props.setProperty("user", user); props.setProperty("password", password); - props.setProperty("ssl","true"); return DriverManager.getConnection(url, props); } public void view() { - String SQL = "CREATE VIEW market_orders_2 AS " - + "SELECT " - + " val->>'symbol' AS symbol, " - + " (val->'bid_price')::float AS bid_price " - + "FROM (SELECT text::jsonb AS val FROM market_orders_raw_2)"; + String SQL = "CREATE MATERIALIZED VIEW IF NOT EXISTS counter_sum AS" + + "SELECT sum(counter)" + + "FROM counter;"; try (Connection conn = connect()) { Statement st = conn.createStatement(); From 11b6035c8324bf2558e43c985d58bb0d15788ffd Mon Sep 17 00:00:00 2001 From: Joaquin Colacci Date: Mon, 27 Feb 2023 13:41:09 +0100 Subject: [PATCH 2/3] Update state refactor + fix sslmode + classes name --- java/Subscribe.java | 162 +++++++++++++++++++++++++++++++ java/connection.java | 11 +-- java/insert.java | 6 +- java/query.java | 6 +- java/source.java | 6 +- java/subscribe.java | 225 ------------------------------------------- java/view.java | 6 +- 7 files changed, 179 insertions(+), 243 deletions(-) create mode 100644 java/Subscribe.java delete mode 100644 java/subscribe.java diff --git a/java/Subscribe.java b/java/Subscribe.java new file mode 100644 index 0000000..e78735d --- /dev/null +++ b/java/Subscribe.java @@ -0,0 +1,162 @@ +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.*; +import java.sql.ResultSet; +import java.sql.Statement; + +public class Subscribe { + + private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?sslmode=require"; + private final String user = "MATERIALIZE_USERNAME"; + private final String password = "MATERIALIZE_PASSWORD"; + + /** + * Connect to Materialize + * + * @return a Connection object + */ + public Connection connect() throws SQLException { + Properties props = new Properties(); + props.setProperty("user", user); + props.setProperty("password", password); + + return DriverManager.getConnection(url, props); + + } + + public void subscribe() { + try (Connection conn = connect()) { + + Statement stmt = conn.createStatement(); + stmt.execute("BEGIN"); + stmt.execute("DECLARE c CURSOR FOR SUBSCRIBE (SELECT sum FROM counter_sum) WITH (PROGRESS);"); + State> state = new State<>(false); + List>> buffer = new ArrayList<>(); + + while (true) { + ResultSet rs = stmt.executeQuery("FETCH ALL c"); + boolean updated = false; + while (rs.next()) { + // Map row fields + long ts = rs.getLong("mz_timestamp"); + boolean progress = rs.getBoolean("mz_progressed"); + int diff = rs.getInt("mz_diff"); + + HashMap rowData = new HashMap<>(); + rowData.put("sum", rs.getBigDecimal("sum")); + + // When a progress is detected, get the last values + if (progress) { + if (updated) { + updated = false; + + // Update the state with the last data + state.update(buffer, ts); + buffer.clear(); + + System.out.println(state.getState()); + } + } else { + updated = true; + buffer.add(new Update<>(rowData, diff)); + } + } + } + } catch (SQLException ex) { + System.out.println(ex.getMessage()); + } + } + + public static void main(String[] args) { + Subscribe app = new Subscribe(); + app.subscribe(); + } +} + +/* + * State class to handle updates from a subscription. + */ +record Update(T value, int diff) { + public T getValue() { + return value; + } + + public int getDiff() { + return diff; + } +} + +class State { + private final HashMap state; + private long timestamp; + private boolean valid; + private List> history; + + public State(boolean collectHistory) { + state = new HashMap<>(); + timestamp = 0; + valid = true; + if (collectHistory) { + history = new ArrayList<>(); + } + } + + public List getState() { + List list = new ArrayList<>(); + + for (Map.Entry entry : state.entrySet()) { + T value = entry.getKey(); + int count = entry.getValue(); + + for (int i = 0; i < count; i++) { + list.add(value); + } + } + + return list; + } + + public List> getHistory() { + return history; + } + + private void validate(long timestamp) { + if (!valid) { + throw new RuntimeException("Invalid state."); + } else if (timestamp < this.timestamp) { + System.err.println("Invalid timestamp."); + valid = false; + throw new RuntimeException(String.format( + "Update with timestamp (%d) is lower than the last timestamp (%d). Invalid state.", timestamp, this.timestamp)); + } + } + + private void process(Update update) { + T value = update.getValue(); + int diff = update.getDiff(); + + int count = state.containsKey(value) ? state.get(value) + diff : diff; + + if (count <= 0) { + state.remove(value); + } else { + state.put(value, count); + } + + if (history != null) { + history.add(update); + } + } + + public void update(List> updates, long timestamp) { + if (!updates.isEmpty()) { + validate(timestamp); + this.timestamp = timestamp; + updates.forEach(this::process); + } + } +} + diff --git a/java/connection.java b/java/connection.java index 56931de..8b5fb59 100644 --- a/java/connection.java +++ b/java/connection.java @@ -1,11 +1,10 @@ -import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.Properties; -public class App { +public class Connection { - private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize"; + private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?sslmode=require"; private final String user = "MATERIALIZE_USERNAME"; private final String password = "MATERIALIZE_PASSWORD"; @@ -14,11 +13,11 @@ public class App { * * @return a Connection object */ - public Connection connect() { + public java.sql.Connection connect() { Properties props = new Properties(); props.setProperty("user", user); props.setProperty("password", password); - Connection conn = null; + java.sql.Connection conn = null; try { conn = DriverManager.getConnection(url, props); System.out.println("Connected to Materialize successfully!"); @@ -30,7 +29,7 @@ public Connection connect() { } public static void main(String[] args) { - App app = new App(); + Connection app = new Connection(); app.connect(); } } \ No newline at end of file diff --git a/java/insert.java b/java/insert.java index d4e1030..0eff108 100644 --- a/java/insert.java +++ b/java/insert.java @@ -6,9 +6,9 @@ import java.sql.Statement; import java.sql.PreparedStatement; -public class App { +public class Insert { - private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize"; + private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?sslmode=require"; private final String user = "MATERIALIZE_USERNAME"; private final String password = "MATERIALIZE_PASSWORD"; @@ -43,7 +43,7 @@ public void insert() { } public static void main(String[] args) { - App app = new App(); + Insert app = new Insert(); app.insert(); } } \ No newline at end of file diff --git a/java/query.java b/java/query.java index 4e75d66..0f65643 100644 --- a/java/query.java +++ b/java/query.java @@ -5,9 +5,9 @@ import java.sql.ResultSet; import java.sql.Statement; -public class App { +public class Query { - private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize"; + private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?sslmode=require"; private final String user = "MATERIALIZE_USERNAME"; private final String password = "MATERIALIZE_PASSWORD"; @@ -41,7 +41,7 @@ public void query() { } public static void main(String[] args) { - App app = new App(); + Query app = new Query(); app.query(); } } \ No newline at end of file diff --git a/java/source.java b/java/source.java index cf73725..f2d62a7 100644 --- a/java/source.java +++ b/java/source.java @@ -6,9 +6,9 @@ import java.sql.Statement; import java.sql.PreparedStatement; -public class App { +public class Source { - private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?ssl_mode=require"; + private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?sslmode=require"; private final String user = "MATERIALIZE_USERNAME"; private final String password = "MATERIALIZE_PASSWORD"; @@ -43,7 +43,7 @@ public void source() { } public static void main(String[] args) { - App app = new App(); + Source app = new Source(); app.source(); } } \ No newline at end of file diff --git a/java/subscribe.java b/java/subscribe.java deleted file mode 100644 index b22cab5..0000000 --- a/java/subscribe.java +++ /dev/null @@ -1,225 +0,0 @@ -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.util.Properties; -import java.sql.ResultSet; -import java.sql.Statement; -import java.util.Map; -import java.util.HashMap; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -public class App { - - private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?ssl_mode=require"; - private final String user = "MATERIALIZE_USERNAME"; - private final String password = "MATERIALIZE_PASSWORD"; - - /** - * Connect to Materialize - * - * @return a Connection object - */ - public Connection connect() throws SQLException { - Properties props = new Properties(); - props.setProperty("user", user); - props.setProperty("password", password); - - return DriverManager.getConnection(url, props); - - } - - public void subscribe() { - try (Connection conn = connect()) { - - Statement stmt = conn.createStatement(); - stmt.execute("BEGIN"); - stmt.execute("DECLARE c CURSOR FOR SUBSCRIBE (SELECT sum FROM counter_sum) WITH (PROGRESS);"); - State state = new State>(false); - - while (true) { - ResultSet rs = stmt.executeQuery("FETCH ALL c"); - boolean updated = false; - while (rs.next()) { - // Map row fields - long ts = rs.getLong("mz_timestamp"); - boolean progress = rs.getBoolean("mz_progressed"); - int diff = rs.getInt("mz_diff"); - - Map rowData = new HashMap<>(); - rowData.put("sum", rs.getInt("sum")); - - // When a progress is detected, get the last values - if (progress) { - if (updated) { - updated = false; - System.out.println(state.getValues()); - } - } else { - // Update the state with the last data - updated = true; - try { - state.update(new Update>(null, rowData, diff), ts); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - } - } catch (SQLException ex) { - System.out.println(ex.getMessage()); - } - } - - public static void main(String[] args) { - App app = new App(); - app.subscribe(); - } -} - -/* - * State class to handle updates from a subscription. - */ -class Update { - private final Optional key; - private final T value; - private final int diff; - - public Update(T value, int diff) { - this.key = Optional.empty(); - this.value = value; - this.diff = diff; - } - - public Update(String key, T value, int diff) { - this.key = Optional.ofNullable(key); - this.value = value; - this.diff = diff; - } - - public Optional getKey() { - return key; - } - - public T getValue() { - return value; - } - - public int getDiff() { - return diff; - } -} - -class State { - - private Map state; - private Map stateCount; - private long timestamp; - private boolean valid; - private List> history; - - public State(boolean collectHistory) { - this.state = new HashMap<>(); - this.stateCount = new HashMap<>(); - this.timestamp = 0; - this.valid = true; - if (collectHistory) { - this.history = new ArrayList<>(); - } - } - - public Object get(String key) { - return this.state.get(key); - } - - public List getKeys() { - return new ArrayList<>(this.state.keySet()); - } - - public List getValues() { - return new ArrayList(this.state.values()); - } - - public boolean isValid() { - return this.valid; - } - - public long getTimestamp() { - return this.timestamp; - } - - public Map getState() { - return this.state; - } - - public List> getHistory() { - return this.history; - } - - private void applyDiff(String key, int diff) { - // Count value starts as a null - if (!this.stateCount.containsKey(key)) { - this.stateCount.put(key, diff); - } else { - this.stateCount.put(key, this.stateCount.get(key) + diff); - } - } - - private String hash(T value) { - return value.toString(); // You can use any other hash function - } - - private void validate(long timestamp) { - if (!this.valid) { - throw new IllegalStateException("Invalid state."); - } else if (timestamp < this.timestamp) { - System.err.println("Invalid timestamp."); - this.valid = false; - throw new IllegalStateException( - "Update with timestamp (" + timestamp + ") is lower than the last timestamp (" + - this.timestamp + "). Invalid state."); - } - } - - private void process(Update update) { - String key = ""; - if (update.getKey().isPresent()) { - key = update.getKey().get(); - } else { - key = this.hash(update.getValue()); - } - this.applyDiff(key, update.getDiff()); - int count = this.stateCount.get(key); - - if (history != null) { - this.history.add(update); - } - - if (count <= 0) { - this.state.remove(key); - this.stateCount.remove(key); - } else { - this.state.put(key, update.getValue()); - } - } - - public void update(Update update, long timestamp) { - this.validate(timestamp); - this.timestamp = timestamp; - this.process(update); - } - - public void batchUpdate(List> updates, long timestamp) { - if (updates != null && !updates.isEmpty()) { - this.validate(timestamp); - this.timestamp = timestamp; - updates.forEach(this::process); - } - } - - @Override - public String toString() { - return state.toString(); - } -} diff --git a/java/view.java b/java/view.java index 0a53387..530fa41 100644 --- a/java/view.java +++ b/java/view.java @@ -6,9 +6,9 @@ import java.sql.Statement; import java.sql.PreparedStatement; -public class App { +public class View { - private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?ssl_mode=require"; + private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?sslmode=require"; private final String user = "MATERIALIZE_USERNAME"; private final String password = "MATERIALIZE_PASSWORD"; @@ -42,7 +42,7 @@ public void view() { } public static void main(String[] args) { - App app = new App(); + View app = new View(); app.view(); } } \ No newline at end of file From 9c6dde46452f99b7379b3e26affc03aab53bf649 Mon Sep 17 00:00:00 2001 From: Joaquin Colacci Date: Wed, 1 Mar 2023 15:44:01 +0100 Subject: [PATCH 3/3] Fix: Remove Record methods --- java/Subscribe.java | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/java/Subscribe.java b/java/Subscribe.java index e78735d..be3b15c 100644 --- a/java/Subscribe.java +++ b/java/Subscribe.java @@ -24,7 +24,6 @@ public Connection connect() throws SQLException { props.setProperty("password", password); return DriverManager.getConnection(url, props); - } public void subscribe() { @@ -79,15 +78,7 @@ public static void main(String[] args) { /* * State class to handle updates from a subscription. */ -record Update(T value, int diff) { - public T getValue() { - return value; - } - - public int getDiff() { - return diff; - } -} +record Update(T value, int diff) {}; class State { private final HashMap state; @@ -135,8 +126,8 @@ private void validate(long timestamp) { } private void process(Update update) { - T value = update.getValue(); - int diff = update.getDiff(); + T value = update.value(); + int diff = update.diff(); int count = state.containsKey(value) ? state.get(value) + diff : diff;