Skip to content

Commit

Permalink
Fix client threads leak in Cassandra connector shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed May 14, 2024
1 parent 3a31b48 commit d892919
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 13 deletions.
12 changes: 6 additions & 6 deletions plugin/trino-cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down Expand Up @@ -148,12 +154,6 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>concurrent</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.json.JsonBinder.jsonBinder;
import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;
import static io.trino.plugin.base.ClosingBinder.closingBinder;
import static io.trino.plugin.base.ssl.SslUtils.createSSLContext;
import static io.trino.plugin.cassandra.CassandraErrorCode.CASSANDRA_SSL_INITIALIZATION_FAILURE;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -88,6 +89,8 @@ public void configure(Binder binder)

jsonCodecBinder(binder).bindListJsonCodec(ExtraColumnMetadata.class);
jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);

closingBinder(binder).registerCloseable(CassandraSession.class);
}

public static final class TypeDeserializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
Expand Down Expand Up @@ -76,7 +77,6 @@
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Suppliers.memoize;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.getOnlyElement;
Expand Down Expand Up @@ -105,9 +105,13 @@ public class CassandraSession

private final CassandraTypeManager cassandraTypeManager;
private final JsonCodec<List<ExtraColumnMetadata>> extraColumnMetadataCodec;
private final Supplier<CqlSession> session;
private final Duration noHostAvailableRetryTimeout;

@GuardedBy("this")
private Supplier<CqlSession> sessionSupplier;
@GuardedBy("this")
private CqlSession session;

public CassandraSession(
CassandraTypeManager cassandraTypeManager,
JsonCodec<List<ExtraColumnMetadata>> extraColumnMetadataCodec,
Expand All @@ -117,7 +121,16 @@ public CassandraSession(
this.cassandraTypeManager = requireNonNull(cassandraTypeManager, "cassandraTypeManager is null");
this.extraColumnMetadataCodec = requireNonNull(extraColumnMetadataCodec, "extraColumnMetadataCodec is null");
this.noHostAvailableRetryTimeout = requireNonNull(noHostAvailableRetryTimeout, "noHostAvailableRetryTimeout is null");
this.session = memoize(sessionSupplier::get);
this.sessionSupplier = requireNonNull(sessionSupplier, "sessionSupplier is null");
}

private synchronized CqlSession session()
{
if (session == null) {
checkState(sessionSupplier != null, "already closed");
session = sessionSupplier.get();
}
return session;
}

public Version getCassandraVersion()
Expand Down Expand Up @@ -559,12 +572,12 @@ private void checkSizeEstimatesTableExist()

private <T> T executeWithSession(SessionCallable<T> sessionCallable)
{
ReconnectionPolicy reconnectionPolicy = session.get().getContext().getReconnectionPolicy();
ReconnectionPolicy reconnectionPolicy = session().getContext().getReconnectionPolicy();
ReconnectionPolicy.ReconnectionSchedule schedule = reconnectionPolicy.newControlConnectionSchedule(false);
long deadline = System.currentTimeMillis() + noHostAvailableRetryTimeout.toMillis();
while (true) {
try {
return sessionCallable.executeWithSession(session.get());
return sessionCallable.executeWithSession(session());
}
catch (AllNodesFailedException e) {
long timeLeft = deadline - System.currentTimeMillis();
Expand Down Expand Up @@ -611,9 +624,13 @@ private List<DataType> getTypeArguments(DataType dataType)
}

@Override
public void close()
public synchronized void close()
{
session.get().close();
sessionSupplier = null;
if (session != null) {
session.close();
session = null;
}
}

private interface SessionCallable<T>
Expand Down

0 comments on commit d892919

Please sign in to comment.