Skip to content

Commit

Permalink
HBASE-26472 Adhere to semantic conventions regarding table data opera…
Browse files Browse the repository at this point in the history
…tions

Follows the guidance outlined in https://github.com/open-telemetry/opentelemetry-specification/blob/3e380e2/specification/trace/semantic_conventions/database.dm

* all table data operations are assumed to be of type CLIENT
* populate `db.name` and `db.operation` attributes
* name table data operation spans as `db.operation` `db.name`:`db.hbase.table`
  note: this implementation deviates from the recommended `db.name`.`db.sql.table` and instead
  uses HBase's native String representation of namespace:tablename.

Signed-off-by: Duo Zhang <[email protected]>
Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
  • Loading branch information
ndimiduk authored Dec 14, 2021
1 parent a36d41a commit 8f5a12f
Show file tree
Hide file tree
Showing 8 changed files with 601 additions and 78 deletions.
5 changes: 5 additions & 0 deletions hbase-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -27,6 +27,7 @@
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import io.opentelemetry.api.trace.Span;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -36,6 +37,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.DoNotRetryIOException;
Expand All @@ -44,9 +46,11 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
import org.apache.hadoop.hbase.client.ConnectionUtils.Converter;
import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -218,37 +222,49 @@ private CompletableFuture<Result> get(Get get, int replicaId) {
.replicaId(replicaId).call();
}

private TableOperationSpanBuilder newTableOperationSpanBuilder() {
return new TableOperationSpanBuilder().setTableName(tableName);
}

@Override
public CompletableFuture<Result> get(Get get) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(get);
return tracedFuture(
() -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(),
RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs,
conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()),
"AsyncTable.get", tableName);
supplier);
}

@Override
public CompletableFuture<Void> put(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(put);
return tracedFuture(() -> this.<Void, Put> newCaller(put, writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
put, RequestConverter::buildMutateRequest))
.call(), "AsyncTable.put", tableName);
.call(), supplier);
}

@Override
public CompletableFuture<Void> delete(Delete delete) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(delete);
return tracedFuture(
() -> this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
stub, delete, RequestConverter::buildMutateRequest))
.call(),
"AsyncTable.delete", tableName);
supplier);
}

@Override
public CompletableFuture<Result> append(Append append) {
checkHasFamilies(append);
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(append);
return tracedFuture(() -> {
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
Expand All @@ -257,12 +273,14 @@ public CompletableFuture<Result> append(Append append) {
controller, loc, stub, append, RequestConverter::buildMutateRequest,
RawAsyncTableImpl::toResult))
.call();
}, "AsyncTable.append", tableName);
}, supplier);
}

@Override
public CompletableFuture<Result> increment(Increment increment) {
checkHasFamilies(increment);
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(increment);
return tracedFuture(() -> {
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
Expand All @@ -271,7 +289,7 @@ public CompletableFuture<Result> increment(Increment increment) {
controller, loc, stub, increment, RequestConverter::buildMutateRequest,
RawAsyncTableImpl::toResult))
.call();
}, "AsyncTable.increment", tableName);
}, supplier);
}

private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
Expand Down Expand Up @@ -329,43 +347,49 @@ private void preCheck() {
public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
preCheck();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call(),
"AsyncTable.CheckAndMutateBuilder.thenPut", tableName);
supplier);
}

@Override
public CompletableFuture<Boolean> thenDelete(Delete delete) {
preCheck();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call(),
"AsyncTable.CheckAndMutateBuilder.thenDelete", tableName);
supplier);
}

@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
preCheck();
validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize());
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return tracedFuture(
() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
mutation,
mutations,
(rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value,
null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
CheckAndMutateResult::isSuccess))
.call(),
"AsyncTable.CheckAndMutateBuilder.thenMutate", tableName);
supplier);
}
}

Expand Down Expand Up @@ -397,6 +421,8 @@ public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
@Override
public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
Expand All @@ -405,34 +431,38 @@ public CompletableFuture<Boolean> thenPut(Put put) {
filter, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call(),
"AsyncTable.CheckAndMutateWithFilterBuilder.thenPut", tableName);
supplier);
}

@Override
public CompletableFuture<Boolean> thenDelete(Delete delete) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter,
timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call(),
"AsyncTable.CheckAndMutateWithFilterBuilder.thenDelete", tableName);
supplier);
}

@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize());
public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return tracedFuture(
() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
mutation,
mutations,
(rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, filter,
timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
CheckAndMutateResult::isSuccess))
.call(),
"AsyncTable.CheckAndMutateWithFilterBuilder.thenMutate", tableName);
supplier);
}
}

Expand All @@ -443,6 +473,8 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter)

@Override
public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(checkAndMutate);
return tracedFuture(() -> {
if (checkAndMutate.getAction() instanceof Put ||
checkAndMutate.getAction() instanceof Delete ||
Expand Down Expand Up @@ -488,16 +520,18 @@ public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate che
"CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName()));
return future;
}
}, "AsyncTable.checkAndMutate", tableName);
}, supplier);
}

@Override
public List<CompletableFuture<CheckAndMutateResult>>
checkAndMutate(List<CheckAndMutate> checkAndMutates) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(checkAndMutates);
return tracedFutures(
() -> batch(checkAndMutates, rpcTimeoutNs).stream()
.map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()),
"AsyncTable.checkAndMutateList", tableName);
supplier);
}

// We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
Expand Down Expand Up @@ -548,14 +582,16 @@ public CompletableFuture<Result> mutateRow(RowMutations mutations) {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(mutations);
return tracedFuture(
() -> this
.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs)
.action((controller, loc, stub) -> this.<Result, Result> mutateRow(controller, loc, stub,
mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce),
resp -> resp))
.call(),
"AsyncTable.mutateRow", tableName);
supplier);
}

private Scan setDefaultScanConfig(Scan scan) {
Expand Down Expand Up @@ -591,6 +627,8 @@ public ResultScanner getScanner(Scan scan) {

@Override
public CompletableFuture<List<Result>> scanAll(Scan scan) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(scan);
return tracedFuture(() -> {
CompletableFuture<List<Result>> future = new CompletableFuture<>();
List<Result> scanResults = new ArrayList<>();
Expand All @@ -612,27 +650,35 @@ public void onComplete() {
}
});
return future;
}, "AsyncTable.scanAll", tableName);
}, supplier);
}

@Override
public List<CompletableFuture<Result>> get(List<Get> gets) {
return tracedFutures(() -> batch(gets, readRpcTimeoutNs), "AsyncTable.getList", tableName);
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(gets);
return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier);
}

@Override
public List<CompletableFuture<Void>> put(List<Put> puts) {
return tracedFutures(() -> voidMutate(puts), "AsyncTable.putList", tableName);
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(puts);
return tracedFutures(() -> voidMutate(puts), supplier);
}

@Override
public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
return tracedFutures(() -> voidMutate(deletes), "AsyncTable.deleteList", tableName);
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(deletes);
return tracedFutures(() -> voidMutate(deletes), supplier);
}

@Override
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
return tracedFutures(() -> batch(actions, rpcTimeoutNs), "AsyncTable.batch", tableName);
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(actions);
return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier);
}

private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) {
Expand Down
Loading

0 comments on commit 8f5a12f

Please sign in to comment.