Skip to content

Commit

Permalink
ARROW-17785: [Java] Suppress flakiness from gRPC in JDBC driver tests (
Browse files Browse the repository at this point in the history
…apache#14210)

I couldn't reproduce it, so I added a suppression instead. 

In both cases, the error is that the server is uncontactable. That shouldn't happen, but I changed the tests to also bind to port 0 instead of using a potentially flaky free port finder.

Authored-by: David Li <[email protected]>
Signed-off-by: David Li <[email protected]>
  • Loading branch information
lidavidm authored Sep 23, 2022
1 parent bed6f8e commit dfdd0ce
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 74 deletions.
6 changes: 0 additions & 6 deletions java/flight/flight-sql-jdbc-driver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>me.alexpanov</groupId>
<artifactId>free-port-finder</artifactId>
<version>1.1.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>commons-io</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.arrow.flight.FlightStatusCode;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.auth2.BearerCredentialWriter;
Expand All @@ -49,12 +50,14 @@
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.calcite.avatica.Meta.StatementType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link FlightSqlClient} handler.
*/
public final class ArrowFlightSqlClientHandler implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(ArrowFlightSqlClientHandler.class);
private final FlightSqlClient sqlClient;
private final Set<CallOption> options = new HashSet<>();

Expand Down Expand Up @@ -189,7 +192,18 @@ public Schema getDataSetSchema() {

@Override
public void close() {
preparedStatement.close(getOptions());
try {
preparedStatement.close(getOptions());
} catch (FlightRuntimeException fre) {
// ARROW-17785: suppress exceptions caused by flaky gRPC layer
if (fre.status().code().equals(FlightStatusCode.UNAVAILABLE) ||
(fre.status().code().equals(FlightStatusCode.INTERNAL) &&
fre.getMessage().contains("Connection closed after GOAWAY"))) {
LOGGER.warn("Supressed error closing PreparedStatement", fre);
return;
}
throw fre;
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ public class ArrowFlightJdbcConnectionPoolDataSourceTest {
.build();

FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder()
.host("localhost")
.randomPort()
.authentication(authentication)
.producer(PRODUCER)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ public class ArrowFlightJdbcDriverTest {
new UserPasswordAuthentication.Builder().user("user1", "pass1").user("user2", "pass2")
.build();

FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder().host("localhost").randomPort()
.authentication(authentication).producer(PRODUCER).build();
FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder()
.authentication(authentication)
.producer(PRODUCER)
.build();
}

private BufferAllocator allocator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ public class ArrowFlightJdbcFactoryTest {
new UserPasswordAuthentication.Builder().user("user1", "pass1").user("user2", "pass2")
.build();

FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder().host("localhost").randomPort()
.authentication(authentication).producer(PRODUCER).build();
FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder()
.authentication(authentication)
.producer(PRODUCER)
.build();
}

private BufferAllocator allocator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ public class ConnectionTest {
.user(userTest, passTest)
.build();

FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder().host("localhost").randomPort()
.authentication(authentication).producer(PRODUCER).build();
FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder()
.authentication(authentication)
.producer(PRODUCER)
.build();
}

private BufferAllocator allocator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public class ConnectionTlsTest {
.build();

FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder()
.host("localhost")
.randomPort()
.authentication(authentication)
.useEncryption(certKey.cert, certKey.key)
.producer(PRODUCER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import me.alexpanov.net.FreePortFinder;

/**
* Utility class for unit tests that need to instantiate a {@link FlightServer}
* and interact with it.
Expand Down Expand Up @@ -95,8 +93,6 @@ public static FlightServerTestRule createStandardTestRule(final FlightSqlProduce
.build();

return new Builder()
.host("localhost")
.randomPort()
.authentication(authentication)
.producer(producer)
.build();
Expand All @@ -106,11 +102,6 @@ ArrowFlightJdbcDataSource createDataSource() {
return ArrowFlightJdbcDataSource.createNewDataSource(properties);
}

ArrowFlightJdbcDataSource createDataSource(String token) {
properties.put("token", token);
return ArrowFlightJdbcDataSource.createNewDataSource(properties);
}

public ArrowFlightJdbcConnectionPoolDataSource createConnectionPoolDataSource() {
return ArrowFlightJdbcConnectionPoolDataSource.createNewDataSource(properties);
}
Expand Down Expand Up @@ -159,9 +150,8 @@ public Statement apply(Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
try (FlightServer flightServer =
getStartServer(location ->
initiateServer(location), 3)) {
try (FlightServer flightServer = getStartServer(location -> initiateServer(location), 3)) {
properties.put("port", flightServer.getPort());
LOGGER.info("Started " + FlightServer.class.getName() + " as " + flightServer);
base.evaluate();
} finally {
Expand All @@ -174,12 +164,9 @@ public void evaluate() throws Throwable {
private FlightServer getStartServer(CheckedFunction<Location, FlightServer> newServerFromLocation,
int retries)
throws IOException {

final Deque<ReflectiveOperationException> exceptions = new ArrayDeque<>();

for (; retries > 0; retries--) {
final Location location = Location.forGrpcInsecure(config.getHost(), config.getPort());
final FlightServer server = newServerFromLocation.apply(location);
final FlightServer server = newServerFromLocation.apply(Location.forGrpcInsecure("localhost", 0));
try {
Method start = server.getClass().getMethod("start");
start.setAccessible(true);
Expand All @@ -189,9 +176,7 @@ private FlightServer getStartServer(CheckedFunction<Location, FlightServer> newS
exceptions.add(e);
}
}

exceptions.forEach(
e -> LOGGER.error("Failed to start a new " + FlightServer.class.getName() + ".", e));
exceptions.forEach(e -> LOGGER.error("Failed to start FlightServer", e));
throw new IOException(exceptions.pop().getCause());
}

Expand Down Expand Up @@ -223,44 +208,14 @@ public void close() throws Exception {
* Builder for {@link FlightServerTestRule}.
*/
public static final class Builder {
private final Properties properties = new Properties();
private final Properties properties;
private FlightSqlProducer producer;
private Authentication authentication;
private CertKeyPair certKeyPair;

/**
* Sets the host for the server rule.
*
* @param host the host value.
* @return the Builder.
*/
public Builder host(final String host) {
properties.put(ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.HOST.camelName(),
host);
return this;
}

/**
* Sets a random port to be used by the server rule.
*
* @return the Builder.
*/
public Builder randomPort() {
properties.put(ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.PORT.camelName(),
FreePortFinder.findFreeLocalPort());
return this;
}

/**
* Sets a specific port to be used by the server rule.
*
* @param port the port value.
* @return the Builder.
*/
public Builder port(final int port) {
properties.put(ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.PORT.camelName(),
port);
return this;
public Builder() {
this.properties = new Properties();
this.properties.put("host", "localhost");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ public void testShouldRunSelectQuerySettingLargeMaxRowLimit() throws Exception {
try (Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(
CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) {

final long maxRowsLimit = 3;
statement.setLargeMaxRows(maxRowsLimit);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ public class TokenAuthenticationTest {

static {
FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder()
.host("localhost")
.randomPort()
.authentication(new TokenAuthentication.Builder()
.token("1234")
.build())
Expand Down

0 comments on commit dfdd0ce

Please sign in to comment.