Skip to content

Commit

Permalink
[pinpoint-apm#9176] Add executeAsync API to WrappedPinotConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Sep 22, 2022
1 parent a589358 commit 6904b75
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.navercorp.pinpoint.metric.common.pinot;

import org.apache.pinot.client.PinotResultSet;
import org.apache.pinot.client.ResultSetGroup;

import java.sql.ResultSet;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ResultSetFuture implements Future<ResultSet> {
private final Future<ResultSetGroup> resultSetGroupFuture;

public ResultSetFuture(Future<ResultSetGroup> resultSetGroupFuture) {
this.resultSetGroupFuture = Objects.requireNonNull(resultSetGroupFuture, "resultSetGroupFuture");
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return this.resultSetGroupFuture.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return this.resultSetGroupFuture.isCancelled();
}

@Override
public boolean isDone() {
return this.resultSetGroupFuture.isDone();
}

@Override
public ResultSet get() throws InterruptedException, ExecutionException {
ResultSetGroup resultSetGroup = this.resultSetGroupFuture.get();
return toResultSet(resultSetGroup);
}

@Override
public ResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
ResultSetGroup resultSetGroup = this.resultSetGroupFuture.get(timeout, unit);
return toResultSet(resultSetGroup);
}

private static PinotResultSet toResultSet(ResultSetGroup resultSetGroup) {
if (resultSetGroup == null) {
// return null or empty??
return PinotResultSet.empty();
}
if (resultSetGroup.getResultSetCount() == 0) {
return PinotResultSet.empty();
}
return new PinotResultSet(resultSetGroup.getResultSet(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package com.navercorp.pinpoint.metric.common.pinot;

import org.apache.pinot.client.PinotConnection;
import org.apache.pinot.client.Request;
import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.client.utils.DriverUtils;

import java.sql.Array;
import java.sql.Blob;
Expand All @@ -26,6 +29,7 @@
import java.sql.DatabaseMetaData;
import java.sql.NClob;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLClientInfoException;
import java.sql.SQLException;
import java.sql.SQLWarning;
Expand All @@ -37,19 +41,46 @@
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;

/**
* @author Hyunjoon Cho
*/
public class WrappedPinotConnection implements Connection {

private static final String QUERY_FORMAT = "sql";
private static final String LIMIT_STATEMENT = "LIMIT";

private final PinotConnection connection;
private boolean close;

// Associated with setFetchSize
private int maxRows = 1024 * 100;

public WrappedPinotConnection(PinotConnection connection) {
this.connection = Objects.requireNonNull(connection, "connection");
}

/**
* Non-standard API
*/
public Future<ResultSet> executeAsync(String sql) {
sql = checkLimitStatement(sql, maxRows);

org.apache.pinot.client.Connection session = connection.getSession();

Request request = new Request(QUERY_FORMAT, sql);
Future<ResultSetGroup> future = session.executeAsync(request);
return new ResultSetFuture(future);
}

private String checkLimitStatement(String query, int maxRows) {
if (!DriverUtils.queryContainsLimitStatement(query)) {
return query.concat(" " + LIMIT_STATEMENT + " " + maxRows);
}
return query;
}

@Override
public Statement createStatement() throws SQLException {
return connection.createStatement();
Expand Down Expand Up @@ -322,4 +353,6 @@ public <T> T unwrap(Class<T> iface) throws SQLException {
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return iface.isAssignableFrom(PinotConnection.class);
}


}

0 comments on commit 6904b75

Please sign in to comment.