Skip to content

Commit

Permalink
add session_id connection property and expose more change events
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu authored and rernas35 committed May 4, 2022
1 parent e0a80ff commit d00ffee
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ protected Mutation(ClickHouseRequest<?> request, boolean sealed) {

this.options.putAll(request.options);
this.settings.putAll(request.settings);

this.sessionId = request.sessionId;
}

@Override
Expand Down Expand Up @@ -90,13 +88,13 @@ public Mutation data(String file, ClickHouseCompression compression) {

final ClickHouseRequest<?> self = this;
final String fileName = ClickHouseChecker.nonEmpty(file, "File");
this.input = ClickHouseDeferredValue.of(() -> {
this.input = changeProperty(PROP_DATA, this.input, ClickHouseDeferredValue.of(() -> {
try {
return ClickHouseInputStream.of(new FileInputStream(fileName), 123, compression);
} catch (FileNotFoundException e) {
throw new IllegalArgumentException(e);
}
});
}));
return this;
}

Expand All @@ -119,7 +117,8 @@ public Mutation data(InputStream input) {
public Mutation data(ClickHouseInputStream input) {
checkSealed();

this.input = ClickHouseDeferredValue.of(input, ClickHouseInputStream.class);
this.input = changeProperty(PROP_DATA, this.input,
ClickHouseDeferredValue.of(input, ClickHouseInputStream.class));

return this;
}
Expand All @@ -133,7 +132,7 @@ public Mutation data(ClickHouseInputStream input) {
public Mutation data(ClickHouseDeferredValue<ClickHouseInputStream> input) {
checkSealed();

this.input = input;
this.input = changeProperty(PROP_DATA, this.input, input);

return this;
}
Expand Down Expand Up @@ -162,16 +161,7 @@ public ClickHouseResponse sendAndWait() throws ClickHouseException {
@Override
public Mutation table(String table, String queryId) {
checkSealed();

this.queryId = queryId;

String sql = "INSERT INTO " + ClickHouseChecker.nonBlank(table, "table");
if (!sql.equals(this.sql)) {
this.sql = sql;
this.preparedQuery = null;
resetCache();
}

super.query("INSERT INTO " + ClickHouseChecker.nonBlank(table, "table"), queryId);
return this;
}

Expand All @@ -190,7 +180,6 @@ public Mutation seal() {

req.input = input;
req.queryId = queryId;
req.sessionId = sessionId;
req.sql = sql;

req.preparedQuery = preparedQuery;
Expand All @@ -202,6 +191,11 @@ public Mutation seal() {

private static final long serialVersionUID = 4990313525960702287L;

static final String PROP_DATA = "data";
static final String PROP_PREPARED_QUERY = "preparedQuery";
static final String PROP_QUERY = "query";
static final String PROP_QUERY_ID = "queryId";

private final boolean sealed;

private transient ClickHouseClient client;
Expand All @@ -216,7 +210,6 @@ public Mutation seal() {

protected transient ClickHouseDeferredValue<ClickHouseInputStream> input;
protected String queryId;
protected String sessionId;
protected String sql;
protected ClickHouseParameterizedQuery preparedQuery;

Expand Down Expand Up @@ -245,6 +238,13 @@ protected ClickHouseRequest(ClickHouseClient client, Function<ClickHouseNodeSele
this.namedParameters = new HashMap<>();
}

protected <T> T changeProperty(String property, T oldValue, T newValue) {
if (changeListener != null && !Objects.equals(oldValue, newValue)) {
changeListener.propertyChanged(this, property, oldValue, newValue);
}
return newValue;
}

protected void checkSealed() {
if (sealed) {
throw new IllegalStateException("Sealed request is immutable");
Expand Down Expand Up @@ -291,7 +291,6 @@ public ClickHouseRequest<SelfT> copy() {
req.namedParameters.putAll(namedParameters);
req.input = input;
req.queryId = queryId;
req.sessionId = sessionId;
req.sql = sql;
req.preparedQuery = preparedQuery;
return req;
Expand Down Expand Up @@ -412,7 +411,8 @@ public Optional<String> getQueryId() {
*/
public ClickHouseParameterizedQuery getPreparedQuery() {
if (preparedQuery == null) {
preparedQuery = ClickHouseParameterizedQuery.of(getConfig(), getQuery());
preparedQuery = changeProperty(PROP_PREPARED_QUERY, preparedQuery,
ClickHouseParameterizedQuery.of(getConfig(), getQuery()));
}

return preparedQuery;
Expand All @@ -433,6 +433,7 @@ public Map<String, Object> getSettings() {
* @return session id
*/
public Optional<String> getSessionId() {
String sessionId = (String) getConfig().getOption(ClickHouseClientOption.SESSION_ID);
return ClickHouseChecker.isNullOrEmpty(sessionId) ? Optional.empty() : Optional.of(sessionId);
}

Expand Down Expand Up @@ -681,12 +682,7 @@ public SelfT external(Collection<ClickHouseExternalTable> tables) {
@SuppressWarnings("unchecked")
public SelfT format(ClickHouseFormat format) {
checkSealed();

if (format == null) {
removeOption(ClickHouseClientOption.FORMAT);
} else {
option(ClickHouseClientOption.FORMAT, format);
}
option(ClickHouseClientOption.FORMAT, format);
return (SelfT) this;
}

Expand Down Expand Up @@ -739,11 +735,7 @@ public SelfT options(Map<ClickHouseOption, Serializable> options) {
m.putAll(this.options);
if (options != null) {
for (Entry<ClickHouseOption, Serializable> e : options.entrySet()) {
if (e.getValue() == null) {
removeOption(e.getKey());
} else {
option(e.getKey(), e.getValue());
}
option(e.getKey(), e.getValue());
m.remove(e.getKey());
}
}
Expand Down Expand Up @@ -1078,35 +1070,35 @@ public SelfT query(String sql) {
public SelfT query(ClickHouseParameterizedQuery query, String queryId) {
checkSealed();

if (!ClickHouseChecker.nonNull(query, "query").equals(this.preparedQuery)) {
this.preparedQuery = query;
this.sql = query.getOriginalQuery();
if (!ClickHouseChecker.nonNull(query, PROP_QUERY).equals(this.preparedQuery)) {
this.preparedQuery = changeProperty(PROP_PREPARED_QUERY, this.preparedQuery, query);
this.sql = changeProperty(PROP_QUERY, this.sql, query.getOriginalQuery());
resetCache();
}

this.queryId = queryId;
this.queryId = changeProperty(PROP_QUERY_ID, this.queryId, queryId);

return (SelfT) this;
}

/**
* Sets query and optinally query id.
*
* @param sql non-empty query
* @param query non-empty query
* @param queryId query id, null means no query id
* @return the request itself
*/
@SuppressWarnings("unchecked")
public SelfT query(String sql, String queryId) {
public SelfT query(String query, String queryId) {
checkSealed();

if (!ClickHouseChecker.nonBlank(sql, "sql").equals(this.sql)) {
this.sql = sql;
this.preparedQuery = null;
if (!ClickHouseChecker.nonBlank(query, PROP_QUERY).equals(this.sql)) {
this.sql = changeProperty(PROP_QUERY, this.sql, query);
this.preparedQuery = changeProperty(PROP_PREPARED_QUERY, this.preparedQuery, null);
resetCache();
}

this.queryId = queryId;
this.queryId = changeProperty(PROP_QUERY_ID, this.queryId, queryId);

return (SelfT) this;
}
Expand All @@ -1121,12 +1113,9 @@ public SelfT query(String sql, String queryId) {
public SelfT clearSession() {
checkSealed();

removeOption(ClickHouseClientOption.SESSION_ID);
removeOption(ClickHouseClientOption.SESSION_CHECK);
removeOption(ClickHouseClientOption.SESSION_TIMEOUT);
if (this.sessionId != null) {
this.sessionId = null;
resetCache();
}

return (SelfT) this;
}
Expand Down Expand Up @@ -1178,12 +1167,9 @@ public SelfT session(String sessionId, Integer timeout) {
public SelfT session(String sessionId, Boolean check, Integer timeout) {
checkSealed();

option(ClickHouseClientOption.SESSION_ID, sessionId);
option(ClickHouseClientOption.SESSION_CHECK, check);
option(ClickHouseClientOption.SESSION_TIMEOUT, timeout);
if (!Objects.equals(this.sessionId, sessionId)) {
this.sessionId = sessionId;
resetCache();
}

return (SelfT) this;
}
Expand Down Expand Up @@ -1263,12 +1249,7 @@ public SelfT table(String table, String queryId) {
@SuppressWarnings("unchecked")
public SelfT use(String database) {
checkSealed();

if (database == null) {
removeOption(ClickHouseClientOption.DATABASE);
} else {
option(ClickHouseClientOption.DATABASE, database);
}
option(ClickHouseClientOption.DATABASE, database);
return (SelfT) this;
}

Expand Down Expand Up @@ -1384,11 +1365,10 @@ public SelfT reset() {
}
this.namedParameters.clear();

this.input = null;
this.sql = null;
this.preparedQuery = null;
this.queryId = null;
this.sessionId = null;
this.input = changeProperty(PROP_DATA, this.input, null);
this.sql = changeProperty(PROP_QUERY, this.sql, null);
this.preparedQuery = changeProperty(PROP_PREPARED_QUERY, this.preparedQuery, null);
this.queryId = changeProperty(PROP_QUERY_ID, this.queryId, null);

resetCache();

Expand All @@ -1414,7 +1394,6 @@ public ClickHouseRequest<SelfT> seal() {

req.input = input;
req.queryId = queryId;
req.sessionId = sessionId;
req.sql = sql;
req.preparedQuery = preparedQuery;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ public enum ClickHouseClientOption implements ClickHouseOption {
* Server version.
*/
SERVER_VERSION("server_version", "", "Server version."),
/**
* Session id.
*/
SESSION_ID("session_id", "", "Session id"),
/**
* Whether to check if session id is validate.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,25 @@ default void optionChanged(T source, ClickHouseOption option, Serializable oldVa
}

/**
* Triggered when ClickHouse setting(declared on client-side) was changed.
* Removing a setting is same as reseting its value to {@code null}.
* Triggered when property of {@code source} was changed.
*
* @param source source of the event
* @param setting the changed setting, which should never be null
* @param property name of the changed property, which should never be null
* @param oldValue old option value, which could be null
* @param newValue new option value, which could be null
*/
default void settingChanged(T source, String setting, Serializable oldValue, Serializable newValue) {
default void propertyChanged(T source, String property, Object oldValue, Object newValue) {
}

/**
* Triggered when property of {@code source} was changed.
* Triggered when ClickHouse setting(declared on client-side) was changed.
* Removing a setting is same as reseting its value to {@code null}.
*
* @param source source of the event
* @param property name of the changed property, which should never be null
* @param setting the changed setting, which should never be null
* @param oldValue old option value, which could be null
* @param newValue new option value, which could be null
*/
// default void propertyChanged(T source, String property, Object oldValue,
// Object newValue) {
// }
default void settingChanged(T source, String setting, Serializable oldValue, Serializable newValue) {
}
}
Loading

0 comments on commit d00ffee

Please sign in to comment.