Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java] Feat: Subscription state handler + fix source + view #16

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions java/Subscribe.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.*;
import java.sql.ResultSet;
import java.sql.Statement;

public class Subscribe {

private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?sslmode=require";
private final String user = "MATERIALIZE_USERNAME";
private final String password = "MATERIALIZE_PASSWORD";

/**
* Connect to Materialize
*
* @return a Connection object
*/
public Connection connect() throws SQLException {
Properties props = new Properties();
props.setProperty("user", user);
props.setProperty("password", password);

return DriverManager.getConnection(url, props);

}

public void subscribe() {
try (Connection conn = connect()) {

Statement stmt = conn.createStatement();
stmt.execute("BEGIN");
stmt.execute("DECLARE c CURSOR FOR SUBSCRIBE (SELECT sum FROM counter_sum) WITH (PROGRESS);");
State<HashMap<String, BigDecimal>> state = new State<>(false);
List<Update<HashMap<String, BigDecimal>>> buffer = new ArrayList<>();

while (true) {
ResultSet rs = stmt.executeQuery("FETCH ALL c");
boolean updated = false;
while (rs.next()) {
// Map row fields
long ts = rs.getLong("mz_timestamp");
boolean progress = rs.getBoolean("mz_progressed");
int diff = rs.getInt("mz_diff");

HashMap<String, BigDecimal> rowData = new HashMap<>();
rowData.put("sum", rs.getBigDecimal("sum"));

// When a progress is detected, get the last values
if (progress) {
if (updated) {
updated = false;

// Update the state with the last data
state.update(buffer, ts);
buffer.clear();

System.out.println(state.getState());
}
} else {
updated = true;
buffer.add(new Update<>(rowData, diff));
}
}
}
} catch (SQLException ex) {
System.out.println(ex.getMessage());
}
}

public static void main(String[] args) {
Subscribe app = new Subscribe();
app.subscribe();
}
}

/*
* State class to handle updates from a subscription.
*/
record Update<T>(T value, int diff) {
public T getValue() {
return value;
}

public int getDiff() {
return diff;
}
joacoc marked this conversation as resolved.
Show resolved Hide resolved
}

class State<T> {
private final HashMap<T, Integer> state;
private long timestamp;
private boolean valid;
private List<Update<T>> history;

public State(boolean collectHistory) {
state = new HashMap<>();
timestamp = 0;
valid = true;
if (collectHistory) {
history = new ArrayList<>();
}
}

public List<T> getState() {
List<T> list = new ArrayList<>();

for (Map.Entry<T, Integer> entry : state.entrySet()) {
T value = entry.getKey();
int count = entry.getValue();

for (int i = 0; i < count; i++) {
list.add(value);
}
}

return list;
}

public List<Update<T>> getHistory() {
return history;
}

private void validate(long timestamp) {
if (!valid) {
throw new RuntimeException("Invalid state.");
} else if (timestamp < this.timestamp) {
System.err.println("Invalid timestamp.");
valid = false;
throw new RuntimeException(String.format(
"Update with timestamp (%d) is lower than the last timestamp (%d). Invalid state.", timestamp, this.timestamp));
}
}

private void process(Update<T> update) {
T value = update.getValue();
int diff = update.getDiff();

int count = state.containsKey(value) ? state.get(value) + diff : diff;

if (count <= 0) {
state.remove(value);
} else {
state.put(value, count);
}

if (history != null) {
history.add(update);
}
}

public void update(List<Update<T>> updates, long timestamp) {
if (!updates.isEmpty()) {
validate(timestamp);
this.timestamp = timestamp;
updates.forEach(this::process);
}
}
}

12 changes: 5 additions & 7 deletions java/connection.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;

public class App {
public class Connection {

private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize";
private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?sslmode=require";
private final String user = "MATERIALIZE_USERNAME";
private final String password = "MATERIALIZE_PASSWORD";

Expand All @@ -14,12 +13,11 @@ public class App {
*
* @return a Connection object
*/
public Connection connect() {
public java.sql.Connection connect() {
Properties props = new Properties();
props.setProperty("user", user);
props.setProperty("password", password);
props.setProperty("ssl","true");
Connection conn = null;
java.sql.Connection conn = null;
try {
conn = DriverManager.getConnection(url, props);
System.out.println("Connected to Materialize successfully!");
Expand All @@ -31,7 +29,7 @@ public Connection connect() {
}

public static void main(String[] args) {
App app = new App();
Connection app = new Connection();
app.connect();
}
}
7 changes: 3 additions & 4 deletions java/insert.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import java.sql.Statement;
import java.sql.PreparedStatement;

public class App {
public class Insert {

private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize";
private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?sslmode=require";
private final String user = "MATERIALIZE_USERNAME";
private final String password = "MATERIALIZE_PASSWORD";

Expand All @@ -21,7 +21,6 @@ public Connection connect() throws SQLException {
Properties props = new Properties();
props.setProperty("user", user);
props.setProperty("password", password);
props.setProperty("ssl","true");

return DriverManager.getConnection(url, props);

Expand All @@ -44,7 +43,7 @@ public void insert() {
}

public static void main(String[] args) {
App app = new App();
Insert app = new Insert();
app.insert();
}
}
7 changes: 3 additions & 4 deletions java/query.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import java.sql.ResultSet;
import java.sql.Statement;

public class App {
public class Query {

private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize";
private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?sslmode=require";
private final String user = "MATERIALIZE_USERNAME";
private final String password = "MATERIALIZE_PASSWORD";

Expand All @@ -20,7 +20,6 @@ public Connection connect() throws SQLException {
Properties props = new Properties();
props.setProperty("user", user);
props.setProperty("password", password);
props.setProperty("ssl","true");

return DriverManager.getConnection(url, props);

Expand All @@ -42,7 +41,7 @@ public void query() {
}

public static void main(String[] args) {
App app = new App();
Query app = new Query();
app.query();
}
}
14 changes: 7 additions & 7 deletions java/source.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import java.sql.Statement;
import java.sql.PreparedStatement;

public class App {
public class Source {

private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize";
private final String url = "jdbc:postgresql://MATERIALIZE_HOST:6875/materialize?sslmode=require";
private final String user = "MATERIALIZE_USERNAME";
private final String password = "MATERIALIZE_PASSWORD";

Expand All @@ -21,16 +21,16 @@ public Connection connect() throws SQLException {
Properties props = new Properties();
props.setProperty("user", user);
props.setProperty("password", password);
props.setProperty("ssl","true");

return DriverManager.getConnection(url, props);

}

public void source() {

String SQL = "CREATE SOURCE counter FROM "
+ "LOAD GENERATOR COUNTER";
String SQL = "CREATE SOURCE IF NOT EXISTS counter"
+ "FROM LOAD GENERATOR COUNTER"
+ "(TICK INTERVAL '500ms')"
+ "WITH (SIZE = '3xsmall');";

try (Connection conn = connect()) {
Statement st = conn.createStatement();
Expand All @@ -43,7 +43,7 @@ public void source() {
}

public static void main(String[] args) {
App app = new App();
Source app = new Source();
app.source();
}
}
50 changes: 0 additions & 50 deletions java/subscribe.java

This file was deleted.

Loading