[Java] Feat: Subscription state handler + fix source + view#16
[Java] Feat: Subscription state handler + fix source + view#16
Conversation
sjwiesman
left a comment
There was a problem hiding this comment.
There are a few nits, but stepping back, which version of Java are you targeting? In modern versions, it would be more idiomatic to use the Stream API along with Update being a record class.
|
Thank you @sjwiesman. The code has been updated, and now With regards to |
|
Happy to approve this, but I just want to show you how I think you'd write this in idiomatic java 17 package com.materialize.subscribe;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
public class Subscribe {
public static <T> Stream<ChangeBatch<T>> subscribe(Connection conn, String sql, Function<ResultSet, T> parser) throws SQLException {
if (sql.endsWith(";")) {
sql = sql.substring(0, sql.length() - 1);
}
Statement statement = conn.createStatement();
statement.execute("BEGIN");
statement.execute(String.format("DECLARE c CURSOR FOR SUBSCRIBE (%s) WITH (PROGRESS);", sql));
return Stream.generate(() -> {
try {
return statement.executeQuery("FETCH ALL c");
} catch (SQLException e) {
throw new RuntimeException(e);
}
})
.mapMulti(new ChangeBatcher<>(parser))
.onClose(() -> {
try {
statement.close();
} catch (SQLException e) {
throw new RuntimeException(e);
}
});
}
public record ChangeBatch<T>(long timestamp, List<T> inserts, List<T> retractions) { }
private static class ChangeBatcher<T> implements BiConsumer<ResultSet, Consumer<ChangeBatch<T>>> {
private final Function<ResultSet, T> parser;
private ChangeBatcher(Function<ResultSet, T> parser) {
this.parser = parser;
}
@Override
public void accept(ResultSet rs, Consumer<ChangeBatch<T>> out) {
ChangeBatch<T> batch = null;
try {
while (rs.next()) {
boolean progress = rs.getBoolean("mz_progressed");
long ts = rs.getLong("mz_timestamp");
int diff = rs.getInt("mz_diff");
if (progress) {
if (batch != null) {
out.accept(batch);
}
batch = null;
continue;
}
T row = parser.apply(rs);
if (row == null) {
continue;
}
if (batch == null) {
batch = new ChangeBatch<>(ts, new ArrayList<>(), new ArrayList<>());
}
if (diff == 1) {
batch.inserts().add(row);
} else {
batch.retractions().add(row);
}
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
} |
|
@sjwiesman I got a bit lost on how to embed it in the current code and output the state. Feel free to apply any updates you think would be a best practice! (The only part I can identify it needs a touch is |
Java version https://github.com/MaterializeInc/developer-experience/issues/217#event-8578542249
sslmode=require