Skip to content

Commit

Permalink
HBASE-26474 Implement connection-level attributes (#3952)
Browse files Browse the repository at this point in the history
Add support for `db.system`, `db.connection_string`, `db.user`.

Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
ndimiduk authored Jan 5, 2022
1 parent baeb51f commit 5cb7ed8
Show file tree
Hide file tree
Showing 20 changed files with 467 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,11 @@ public CompletableFuture<ServerName> getActiveMaster() {
getClass().getSimpleName() + ".getClusterId");
}

@Override
public String getConnectionString() {
return "unimplemented";
}

@Override
public void close() {
trace(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
* The implementation of AsyncConnection.
*/
@InterfaceAudience.Private
class AsyncConnectionImpl implements AsyncConnection {
public class AsyncConnectionImpl implements AsyncConnection {

private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);

Expand Down Expand Up @@ -198,6 +198,14 @@ synchronized ChoreService getChoreService() {
return choreService;
}

public User getUser() {
return user;
}

public ConnectionRegistry getConnectionRegistry() {
return registry;
}

@Override
public Configuration getConfiguration() {
return conf;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -20,24 +20,27 @@
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY;
import static org.apache.hadoop.hbase.trace.TraceUtil.createSpan;
import static org.apache.hadoop.hbase.trace.TraceUtil.createTableSpan;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.trace.ConnectionSpanBuilder;
import org.apache.hadoop.hbase.client.trace.TableSpanBuilder;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -96,9 +99,12 @@ private boolean isMeta(TableName tableName) {
return TableName.isMetaTableName(tableName);
}

private <T> CompletableFuture<T> tracedLocationFuture(Supplier<CompletableFuture<T>> action,
Function<T, List<String>> getRegionNames, TableName tableName, String methodName) {
Span span = createTableSpan("AsyncRegionLocator." + methodName, tableName);
private <T> CompletableFuture<T> tracedLocationFuture(
Supplier<CompletableFuture<T>> action,
Function<T, List<String>> getRegionNames,
Supplier<Span> spanSupplier
) {
final Span span = spanSupplier.get();
try (Scope scope = span.makeCurrent()) {
CompletableFuture<T> future = action.get();
FutureUtils.addListener(future, (resp, error) -> {
Expand All @@ -117,18 +123,30 @@ private <T> CompletableFuture<T> tracedLocationFuture(Supplier<CompletableFuture
}
}

private List<String> getRegionName(RegionLocations locs) {
List<String> names = new ArrayList<>();
for (HRegionLocation loc : locs.getRegionLocations()) {
if (loc != null) {
names.add(loc.getRegion().getRegionNameAsString());
}
private static List<String> getRegionNames(RegionLocations locs) {
if (locs == null || locs.getRegionLocations() == null) {
return Collections.emptyList();
}
return names;
return Arrays.stream(locs.getRegionLocations())
.filter(Objects::nonNull)
.map(HRegionLocation::getRegion)
.map(RegionInfo::getRegionNameAsString)
.collect(Collectors.toList());
}

private static List<String> getRegionNames(HRegionLocation location) {
return Optional.ofNullable(location)
.map(HRegionLocation::getRegion)
.map(RegionInfo::getRegionNameAsString)
.map(Collections::singletonList)
.orElseGet(Collections::emptyList);
}

CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
RegionLocateType type, boolean reload, long timeoutNs) {
final Supplier<Span> supplier = new TableSpanBuilder(conn)
.setName("AsyncRegionLocator.getRegionLocations")
.setTableName(tableName);
return tracedLocationFuture(() -> {
CompletableFuture<RegionLocations> future = isMeta(tableName) ?
metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) :
Expand All @@ -138,11 +156,14 @@ CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
"ms) waiting for region locations for " + tableName + ", row='" +
Bytes.toStringBinary(row) + "'");
}, this::getRegionName, tableName, "getRegionLocations");
}, AsyncRegionLocator::getRegionNames, supplier);
}

CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
final Supplier<Span> supplier = new TableSpanBuilder(conn)
.setName("AsyncRegionLocator.getRegionLocation")
.setTableName(tableName);
return tracedLocationFuture(() -> {
// meta region can not be split right now so we always call the same method.
// Change it later if the meta table can have more than one regions.
Expand Down Expand Up @@ -173,8 +194,7 @@ CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[]
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
"ms) waiting for region location for " + tableName + ", row='" +
Bytes.toStringBinary(row) + "', replicaId=" + replicaId);
}, loc -> Arrays.asList(loc.getRegion().getRegionNameAsString()), tableName,
"getRegionLocation");
}, AsyncRegionLocator::getRegionNames, supplier);
}

CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
Expand Down Expand Up @@ -202,31 +222,38 @@ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
}

void clearCache(TableName tableName) {
Supplier<Span> supplier = new TableSpanBuilder(conn)
.setName("AsyncRegionLocator.clearCache")
.setTableName(tableName);
TraceUtil.trace(() -> {
LOG.debug("Clear meta cache for {}", tableName);
if (tableName.equals(META_TABLE_NAME)) {
metaRegionLocator.clearCache();
} else {
nonMetaRegionLocator.clearCache(tableName);
}
}, () -> createTableSpan("AsyncRegionLocator.clearCache", tableName));
}, supplier);
}

void clearCache(ServerName serverName) {
Supplier<Span> supplier = new ConnectionSpanBuilder(conn)
.setName("AsyncRegionLocator.clearCache")
.addAttribute(SERVER_NAME_KEY, serverName.getServerName());
TraceUtil.trace(() -> {
LOG.debug("Clear meta cache for {}", serverName);
metaRegionLocator.clearCache(serverName);
nonMetaRegionLocator.clearCache(serverName);
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
}, () -> createSpan("AsyncRegionLocator.clearCache").setAttribute(SERVER_NAME_KEY,
serverName.getServerName()));
}, supplier);
}

void clearCache() {
Supplier<Span> supplier = new ConnectionSpanBuilder(conn)
.setName("AsyncRegionLocator.clearCache");
TraceUtil.trace(() -> {
metaRegionLocator.clearCache();
nonMetaRegionLocator.clearCache();
}, "AsyncRegionLocator.clearCache");
}, supplier);
}

AsyncNonMetaRegionLocator getNonMetaRegionLocator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* Internal use only.
*/
@InterfaceAudience.Private
interface ConnectionRegistry extends Closeable {
public interface ConnectionRegistry extends Closeable {

/**
* Get the location of meta region(s).
Expand All @@ -48,6 +48,13 @@ interface ConnectionRegistry extends Closeable {
*/
CompletableFuture<ServerName> getActiveMaster();

/**
* Return the connection string associated with this registry instance. This value is
* informational, used for annotating traces. Values returned may not be valid for establishing a
* working cluster connection.
*/
String getConnectionString();

/**
* Closes this instance and releases any system resources associated with it
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,12 @@ public static Set<ServerName> parseMasterAddrs(Configuration conf) throws Unknow
return masterAddrs;
}

private final String connectionString;

MasterRegistry(Configuration conf) throws IOException {
super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS,
MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES);
connectionString = getConnectionString(conf);
}

@Override
Expand All @@ -102,6 +105,15 @@ protected CompletableFuture<Set<ServerName>> fetchEndpoints() {
return getMasters();
}

@Override
public String getConnectionString() {
return connectionString;
}

static String getConnectionString(Configuration conf) throws UnknownHostException {
return getMasterAddr(conf);
}

/**
* Builds the default master address end point if it is not specified in the configuration.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private CompletableFuture<Result> get(Get get, int replicaId) {
}

private TableOperationSpanBuilder newTableOperationSpanBuilder() {
return new TableOperationSpanBuilder().setTableName(tableName);
return new TableOperationSpanBuilder(conn).setTableName(tableName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -72,9 +73,23 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {

private static final char ADDRS_CONF_SEPARATOR = ',';

private final String connectionString;

RpcConnectionRegistry(Configuration conf) throws IOException {
super(conf, HEDGED_REQS_FANOUT_KEY, INITIAL_REFRESH_DELAY_SECS, PERIODIC_REFRESH_INTERVAL_SECS,
MIN_SECS_BETWEEN_REFRESHES);
connectionString = buildConnectionString(conf);
}

private String buildConnectionString(Configuration conf) throws UnknownHostException {
final String configuredBootstrapNodes = conf.get(BOOTSTRAP_NODES);
if (StringUtils.isBlank(configuredBootstrapNodes)) {
return MasterRegistry.getConnectionString(conf);
}
return Splitter.on(ADDRS_CONF_SEPARATOR)
.trimResults()
.splitToStream(configuredBootstrapNodes)
.collect(Collectors.joining(String.valueOf(ADDRS_CONF_SEPARATOR)));
}

@Override
Expand All @@ -91,6 +106,11 @@ protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOExcepti
}
}

@Override
public String getConnectionString() {
return connectionString;
}

private static Set<ServerName> transformServerNames(GetBootstrapNodesResponse resp) {
return resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
.collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ public CompletableFuture<ServerName> getActiveMaster() {
"ZKConnectionRegistry.getActiveMaster");
}

@Override
public String getConnectionString() {
final String serverList = zk.getConnectString();
final String baseZNode = znodePaths.baseZNode;
return serverList + ":" + baseZNode;
}

@Override
public void close() {
zk.close();
Expand Down
Loading

0 comments on commit 5cb7ed8

Please sign in to comment.