Skip to content

Commit

Permalink
Improvements to ClientContext for ensureOpen (#5258)
Browse files Browse the repository at this point in the history
These changes are small quality fixes to ensure that
ClientContext.ensureOpen is used when it is needed, and not used when it
isn't. This fixes an issue seen where the client RPC timeout value is
being retrieved from a supplier in a thread pool when returning RPC
transports after a client is closed. In these cases, ensureOpen does not
need to be checked. However, there were a few context API methods where
it was not checked but should have been.

Also, improved the close method to ensure close activities are only
called at most once, and made private and renamed an internal method to
get the client properties from the ClientInfo object, so it's more clear
which properties the method is returning and isn't exposed for misuse.
ctubbsii authored Jan 22, 2025

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
1 parent 27e5fe7 commit e1038d4
Showing 8 changed files with 40 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -142,7 +142,7 @@ public class ClientContext implements AccumuloClient {
private ThriftTransportPool thriftTransportPool;
private ZookeeperLockChecker zkLockChecker;

private volatile boolean closed = false;
private final AtomicBoolean closed = new AtomicBoolean();

private SecurityOperations secops = null;
private final TableOperationsImpl tableops;
@@ -157,22 +157,21 @@ public class ClientContext implements AccumuloClient {
private final Supplier<ZooSession> zooSession;

private void ensureOpen() {
if (closed) {
if (closed.get()) {
throw new IllegalStateException("This client was closed.");
}
}

private ScanServerSelector createScanServerSelector() {
String clazz = ClientProperty.SCAN_SERVER_SELECTOR.getValue(info.getProperties());
String clazz = ClientProperty.SCAN_SERVER_SELECTOR.getValue(getClientProperties());
try {
Class<? extends ScanServerSelector> impl =
Class.forName(clazz).asSubclass(ScanServerSelector.class);
ScanServerSelector scanServerSelector = impl.getDeclaredConstructor().newInstance();

Map<String,String> sserverProps = new HashMap<>();
ClientProperty
.getPrefix(info.getProperties(), ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey())
.forEach((k, v) -> {
ClientProperty.getPrefix(getClientProperties(),
ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey()).forEach((k, v) -> {
sserverProps.put(
k.toString()
.substring(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey().length()),
@@ -311,9 +310,8 @@ public AuthenticationToken getAuthenticationToken() {
return getCredentials().getToken();
}

public Properties getProperties() {
ensureOpen();
return info.getProperties();
private Properties getClientProperties() {
return info.getClientProperties();
}

/**
@@ -396,7 +394,7 @@ static BatchWriterConfig getBatchWriterConfig(Properties props) {
public synchronized BatchWriterConfig getBatchWriterConfig() {
ensureOpen();
if (batchWriterConfig == null) {
batchWriterConfig = getBatchWriterConfig(info.getProperties());
batchWriterConfig = getBatchWriterConfig(getClientProperties());
}
return batchWriterConfig;
}
@@ -405,6 +403,7 @@ public synchronized BatchWriterConfig getBatchWriterConfig() {
* @return map of live scan server addresses to lock uuids.
*/
public Map<String,Pair<UUID,String>> getScanServers() {
ensureOpen();
Map<String,Pair<UUID,String>> liveScanServers = new HashMap<>();
String root = this.getZooKeeperRoot() + Constants.ZSSERVERS;
var addrs = this.getZooCache().getChildren(root);
@@ -455,7 +454,7 @@ static ConditionalWriterConfig getConditionalWriterConfig(Properties props) {
public synchronized ConditionalWriterConfig getConditionalWriterConfig() {
ensureOpen();
if (conditionalWriterConfig == null) {
conditionalWriterConfig = getConditionalWriterConfig(info.getProperties());
conditionalWriterConfig = getConditionalWriterConfig(getClientProperties());
}
return conditionalWriterConfig;
}
@@ -621,6 +620,7 @@ public Map<String,TableId> getTableNameToIdMap() {
}

public Map<NamespaceId,String> getNamespaceIdToNameMap() {
ensureOpen();
return Namespaces.getIdToNameMap(this);
}

@@ -694,14 +694,15 @@ public BatchScanner createBatchScanner(String tableName, Authorizations authoriz
throws TableNotFoundException {
ensureOpen();
Integer numQueryThreads =
ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS.getInteger(getProperties());
ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS.getInteger(getClientProperties());
Objects.requireNonNull(numQueryThreads);
return createBatchScanner(tableName, authorizations, numQueryThreads);
}

@Override
public BatchScanner createBatchScanner(String tableName)
throws TableNotFoundException, AccumuloSecurityException, AccumuloException {
ensureOpen();
Authorizations auths = securityOperations().getUserAuthorizations(getPrincipal());
return createBatchScanner(tableName, auths);
}
@@ -718,7 +719,6 @@ public BatchDeleter createBatchDeleter(String tableName, Authorizations authoriz
@Override
public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations,
int numQueryThreads) throws TableNotFoundException {
ensureOpen();
return createBatchDeleter(tableName, authorizations, numQueryThreads, new BatchWriterConfig());
}

@@ -773,7 +773,7 @@ public Scanner createScanner(String tableName, Authorizations authorizations)
checkArgument(authorizations != null, "authorizations is null");
Scanner scanner =
new ScannerImpl(this, requireNotOffline(getTableId(tableName), tableName), authorizations);
Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE.getInteger(getProperties());
Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE.getInteger(getClientProperties());
if (batchSize != null) {
scanner.setBatchSize(batchSize);
}
@@ -829,7 +829,7 @@ public synchronized InstanceOperations instanceOperations() {
public Properties properties() {
ensureOpen();
Properties result = new Properties();
getProperties().forEach((key, value) -> {
getClientProperties().forEach((key, value) -> {
if (!key.equals(ClientProperty.AUTH_TOKEN.getKey())) {
result.setProperty((String) key, (String) value);
}
@@ -844,23 +844,24 @@ public AuthenticationToken token() {

@Override
public synchronized void close() {
closed = true;
if (zooKeeperOpened.get()) {
zooSession.get().close();
}
if (thriftTransportPool != null) {
thriftTransportPool.shutdown();
}
if (tableZooHelper != null) {
tableZooHelper.close();
}
if (scannerReadaheadPool != null) {
scannerReadaheadPool.shutdownNow(); // abort all tasks, client is shutting down
}
if (cleanupThreadPool != null) {
cleanupThreadPool.shutdown(); // wait for shutdown tasks to execute
if (closed.compareAndSet(false, true)) {
if (zooKeeperOpened.get()) {
zooSession.get().close();
}
if (thriftTransportPool != null) {
thriftTransportPool.shutdown();
}
if (tableZooHelper != null) {
tableZooHelper.close();
}
if (scannerReadaheadPool != null) {
scannerReadaheadPool.shutdownNow(); // abort all tasks, client is shutting down
}
if (cleanupThreadPool != null) {
cleanupThreadPool.shutdown(); // wait for shutdown tasks to execute
}
singletonReservation.close();
}
singletonReservation.close();
}

public static class ClientBuilderImpl<T>
@@ -896,7 +897,7 @@ public static AccumuloClient buildClient(ClientBuilderImpl<AccumuloClient> cbi)
try {
// ClientContext closes reservation unless a RuntimeException is thrown
ClientInfo info = cbi.getClientInfo();
AccumuloConfiguration config = ClientConfConverter.toAccumuloConf(info.getProperties());
var config = ClientConfConverter.toAccumuloConf(info.getClientProperties());
return new ClientContext(reservation, info, config, cbi.getUncaughtExceptionHandler());
} catch (RuntimeException e) {
reservation.close();
@@ -1080,8 +1081,7 @@ public ZooSession getZooSession() {
}

protected long getTransportPoolMaxAgeMillis() {
ensureOpen();
return ClientProperty.RPC_TRANSPORT_IDLE_TIMEOUT.getTimeInMillis(getProperties());
return ClientProperty.RPC_TRANSPORT_IDLE_TIMEOUT.getTimeInMillis(getClientProperties());
}

public synchronized ThriftTransportPool getTransportPool() {
@@ -1108,6 +1108,7 @@ public synchronized ZookeeperLockChecker getTServerLockChecker() {
}

public NamespaceMapping getNamespaces() {
ensureOpen();
return namespaces;
}

Original file line number Diff line number Diff line change
@@ -80,7 +80,7 @@ public interface ClientInfo {
/**
* @return All Accumulo client properties set for this connection
*/
Properties getProperties();
Properties getClientProperties();

/**
* @return hadoop Configuration
Original file line number Diff line number Diff line change
@@ -101,7 +101,7 @@ public String getPrincipal() {
}

@Override
public Properties getProperties() {
public Properties getClientProperties() {
Properties result = new Properties();
properties.forEach((key, value) -> result.setProperty((String) key, (String) value));
return result;
Original file line number Diff line number Diff line change
@@ -144,7 +144,7 @@ public AccumuloClient createAccumuloClient(String user, AuthenticationToken toke

@Override
public Properties getClientProperties() {
return info.getProperties();
return info.getClientProperties();
}

@Override
Original file line number Diff line number Diff line change
@@ -193,7 +193,7 @@ public boolean saslEnabled() {
}

@Override
public Properties getProperties() {
public Properties getClientProperties() {
Properties properties = ClientConfConverter.toProperties(getSiteConfiguration());
properties.setProperty(ClientProperty.INSTANCE_ZOOKEEPERS.getKey(), getZooKeepers());
properties.setProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT.getKey(),
Original file line number Diff line number Diff line change
@@ -21,8 +21,6 @@
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;

import java.util.Properties;

import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
@@ -43,7 +41,6 @@ public static ServerContext get() {
ConfigurationCopy conf = new ConfigurationCopy(DefaultConfiguration.getInstance());
conf.set(Property.INSTANCE_VOLUMES, "file:///");
expect(context.getConfiguration()).andReturn(conf).anyTimes();
expect(context.getProperties()).andReturn(new Properties()).anyTimes();
return context;
}

Original file line number Diff line number Diff line change
@@ -35,7 +35,6 @@
import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Properties;

import org.apache.accumulo.core.clientImpl.thrift.ClientService.Iface;
import org.apache.accumulo.core.clientImpl.thrift.ClientService.Processor;
@@ -70,7 +69,6 @@ public void createMockServerContext() {
expect(context.getZooSession()).andReturn(zk).anyTimes();
expect(zk.asReader()).andReturn(null).anyTimes();
expect(zk.asReaderWriter()).andReturn(null).anyTimes();
expect(context.getProperties()).andReturn(new Properties()).anyTimes();
expect(context.getZooKeepers()).andReturn("").anyTimes();
expect(context.getInstanceName()).andReturn("instance").anyTimes();
expect(context.getZooKeepersSessionTimeOut()).andReturn(1).anyTimes();
Original file line number Diff line number Diff line change
@@ -79,7 +79,7 @@ public static void main(final String[] args) throws AccumuloException, TableNotF
default:
throw new RuntimeException("Incorrect usage; expected to be run by test only");
}
try (AccumuloClient client = Accumulo.newClient().from(context.getProperties())
try (AccumuloClient client = Accumulo.newClient().from(context.properties())
.as(creds.getPrincipal(), creds.getToken()).build()) {
client.securityOperations().authenticateUser(creds.getPrincipal(), creds.getToken());
try (Scanner scan =

0 comments on commit e1038d4

Please sign in to comment.