diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index c43c6a401663..8660f3bf7146 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -193,6 +193,11 @@ mockito-core test + + org.hamcrest + hamcrest-library + test + org.apache.commons commons-crypto diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 1a12677ac0e1..b79fee0ac53c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -27,6 +27,7 @@ import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; +import io.opentelemetry.api.trace.Span; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -36,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -44,9 +46,11 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; import org.apache.hadoop.hbase.client.ConnectionUtils.Converter; +import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -218,37 +222,49 @@ private CompletableFuture get(Get get, int replicaId) { .replicaId(replicaId).call(); } + private TableOperationSpanBuilder newTableOperationSpanBuilder() { + return new TableOperationSpanBuilder().setTableName(tableName); + } + @Override public CompletableFuture get(Get get) { + final Supplier supplier = newTableOperationSpanBuilder() + .setOperation(get); return tracedFuture( () -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(), RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs, conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()), - "AsyncTable.get", tableName); + supplier); } @Override public CompletableFuture put(Put put) { validatePut(put, conn.connConf.getMaxKeyValueSize()); + final Supplier supplier = newTableOperationSpanBuilder() + .setOperation(put); return tracedFuture(() -> this. newCaller(put, writeRpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, stub, put, RequestConverter::buildMutateRequest)) - .call(), "AsyncTable.put", tableName); + .call(), supplier); } @Override public CompletableFuture delete(Delete delete) { + final Supplier supplier = newTableOperationSpanBuilder() + .setOperation(delete); return tracedFuture( () -> this. newCaller(delete, writeRpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, stub, delete, RequestConverter::buildMutateRequest)) .call(), - "AsyncTable.delete", tableName); + supplier); } @Override public CompletableFuture append(Append append) { checkHasFamilies(append); + final Supplier supplier = newTableOperationSpanBuilder() + .setOperation(append); return tracedFuture(() -> { long nonceGroup = conn.getNonceGenerator().getNonceGroup(); long nonce = conn.getNonceGenerator().newNonce(); @@ -257,12 +273,14 @@ public CompletableFuture append(Append append) { controller, loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) .call(); - }, "AsyncTable.append", tableName); + }, supplier); } @Override public CompletableFuture increment(Increment increment) { checkHasFamilies(increment); + final Supplier supplier = newTableOperationSpanBuilder() + .setOperation(increment); return tracedFuture(() -> { long nonceGroup = conn.getNonceGenerator().getNonceGroup(); long nonce = conn.getNonceGenerator().newNonce(); @@ -271,7 +289,7 @@ public CompletableFuture increment(Increment increment) { controller, loc, stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) .call(); - }, "AsyncTable.increment", tableName); + }, supplier); } private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { @@ -329,6 +347,8 @@ private void preCheck() { public CompletableFuture thenPut(Put put) { validatePut(put, conn.connConf.getMaxKeyValueSize()); preCheck(); + final Supplier supplier = newTableOperationSpanBuilder() + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return tracedFuture( () -> RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, @@ -336,12 +356,14 @@ public CompletableFuture thenPut(Put put) { null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), (c, r) -> r.getProcessed())) .call(), - "AsyncTable.CheckAndMutateBuilder.thenPut", tableName); + supplier); } @Override public CompletableFuture thenDelete(Delete delete) { preCheck(); + final Supplier supplier = newTableOperationSpanBuilder() + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return tracedFuture( () -> RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, @@ -349,23 +371,25 @@ public CompletableFuture thenDelete(Delete delete) { null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), (c, r) -> r.getProcessed())) .call(), - "AsyncTable.CheckAndMutateBuilder.thenDelete", tableName); + supplier); } @Override - public CompletableFuture thenMutate(RowMutations mutation) { + public CompletableFuture thenMutate(RowMutations mutations) { preCheck(); - validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize()); + validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); + final Supplier supplier = newTableOperationSpanBuilder() + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return tracedFuture( () -> RawAsyncTableImpl.this - . newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs) + . newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, - mutation, + mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value, null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), CheckAndMutateResult::isSuccess)) .call(), - "AsyncTable.CheckAndMutateBuilder.thenMutate", tableName); + supplier); } } @@ -397,6 +421,8 @@ public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { @Override public CompletableFuture thenPut(Put put) { validatePut(put, conn.connConf.getMaxKeyValueSize()); + final Supplier supplier = newTableOperationSpanBuilder() + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return tracedFuture( () -> RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, @@ -405,11 +431,13 @@ public CompletableFuture thenPut(Put put) { filter, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), (c, r) -> r.getProcessed())) .call(), - "AsyncTable.CheckAndMutateWithFilterBuilder.thenPut", tableName); + supplier); } @Override public CompletableFuture thenDelete(Delete delete) { + final Supplier supplier = newTableOperationSpanBuilder() + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return tracedFuture( () -> RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, @@ -417,22 +445,24 @@ public CompletableFuture thenDelete(Delete delete) { timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), (c, r) -> r.getProcessed())) .call(), - "AsyncTable.CheckAndMutateWithFilterBuilder.thenDelete", tableName); + supplier); } @Override - public CompletableFuture thenMutate(RowMutations mutation) { - validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize()); + public CompletableFuture thenMutate(RowMutations mutations) { + validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); + final Supplier supplier = newTableOperationSpanBuilder() + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return tracedFuture( () -> RawAsyncTableImpl.this - . newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs) + . newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, - mutation, + mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, filter, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), CheckAndMutateResult::isSuccess)) .call(), - "AsyncTable.CheckAndMutateWithFilterBuilder.thenMutate", tableName); + supplier); } } @@ -443,6 +473,8 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) @Override public CompletableFuture checkAndMutate(CheckAndMutate checkAndMutate) { + final Supplier supplier = newTableOperationSpanBuilder() + .setOperation(checkAndMutate); return tracedFuture(() -> { if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete || @@ -488,16 +520,18 @@ public CompletableFuture checkAndMutate(CheckAndMutate che "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName())); return future; } - }, "AsyncTable.checkAndMutate", tableName); + }, supplier); } @Override public List> checkAndMutate(List checkAndMutates) { + final Supplier supplier = newTableOperationSpanBuilder() + .setOperation(checkAndMutates); return tracedFutures( () -> batch(checkAndMutates, rpcTimeoutNs).stream() .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()), - "AsyncTable.checkAndMutateList", tableName); + supplier); } // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, @@ -548,6 +582,8 @@ public CompletableFuture mutateRow(RowMutations mutations) { validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); long nonceGroup = conn.getNonceGenerator().getNonceGroup(); long nonce = conn.getNonceGenerator().newNonce(); + final Supplier supplier = newTableOperationSpanBuilder() + .setOperation(mutations); return tracedFuture( () -> this . newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs) @@ -555,7 +591,7 @@ public CompletableFuture mutateRow(RowMutations mutations) { mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce), resp -> resp)) .call(), - "AsyncTable.mutateRow", tableName); + supplier); } private Scan setDefaultScanConfig(Scan scan) { @@ -591,6 +627,8 @@ public ResultScanner getScanner(Scan scan) { @Override public CompletableFuture> scanAll(Scan scan) { + final Supplier supplier = newTableOperationSpanBuilder() + .setOperation(scan); return tracedFuture(() -> { CompletableFuture> future = new CompletableFuture<>(); List scanResults = new ArrayList<>(); @@ -612,27 +650,35 @@ public void onComplete() { } }); return future; - }, "AsyncTable.scanAll", tableName); + }, supplier); } @Override public List> get(List gets) { - return tracedFutures(() -> batch(gets, readRpcTimeoutNs), "AsyncTable.getList", tableName); + final Supplier supplier = newTableOperationSpanBuilder() + .setOperation(gets); + return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier); } @Override public List> put(List puts) { - return tracedFutures(() -> voidMutate(puts), "AsyncTable.putList", tableName); + final Supplier supplier = newTableOperationSpanBuilder() + .setOperation(puts); + return tracedFutures(() -> voidMutate(puts), supplier); } @Override public List> delete(List deletes) { - return tracedFutures(() -> voidMutate(deletes), "AsyncTable.deleteList", tableName); + final Supplier supplier = newTableOperationSpanBuilder() + .setOperation(deletes); + return tracedFutures(() -> voidMutate(deletes), supplier); } @Override public List> batch(List actions) { - return tracedFutures(() -> batch(actions, rpcTimeoutNs), "AsyncTable.batch", tableName); + final Supplier supplier = newTableOperationSpanBuilder() + .setOperation(actions); + return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier); } private List> voidMutate(List actions) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java new file mode 100644 index 000000000000..aaa53610321e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client.trace; + +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_NAME; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanKind; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionCoprocessorServiceExec; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.Operation; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Construct {@link io.opentelemetry.api.trace.Span} instances originating from + * "table operations" -- the verbs in our public API that interact with data in tables. + */ +@InterfaceAudience.Private +public class TableOperationSpanBuilder implements Supplier { + + // n.b. The results of this class are tested implicitly by way of the likes of + // `TestAsyncTableTracing` and friends. + + private static final String unknown = "UNKNOWN"; + + private TableName tableName; + private final Map, Object> attributes = new HashMap<>(); + + @Override public Span get() { + return build(); + } + + public TableOperationSpanBuilder setOperation(final Scan scan) { + return setOperation(valueFrom(scan)); + } + + public TableOperationSpanBuilder setOperation(final Row row) { + return setOperation(valueFrom(row)); + } + + @SuppressWarnings("unused") + public TableOperationSpanBuilder setOperation(final Collection operations) { + return setOperation(Operation.BATCH); + } + + public TableOperationSpanBuilder setOperation(final Operation operation) { + attributes.put(DB_OPERATION, operation.name()); + return this; + } + + public TableOperationSpanBuilder setTableName(final TableName tableName) { + this.tableName = tableName; + attributes.put(NAMESPACE_KEY, tableName.getNamespaceAsString()); + attributes.put(DB_NAME, tableName.getNamespaceAsString()); + attributes.put(TABLE_KEY, tableName.getNameAsString()); + return this; + } + + @SuppressWarnings("unchecked") + public Span build() { + final String name = attributes.getOrDefault(DB_OPERATION, unknown) + + " " + + (tableName != null ? tableName.getNameWithNamespaceInclAsString() : unknown); + final SpanBuilder builder = TraceUtil.getGlobalTracer() + .spanBuilder(name) + // TODO: what about clients embedded in Master/RegionServer/Gateways/&c? + .setSpanKind(SpanKind.CLIENT); + attributes.forEach((k, v) -> builder.setAttribute((AttributeKey) k, v)); + return builder.startSpan(); + } + + private static Operation valueFrom(final Scan scan) { + if (scan == null) { + return null; + } + return Operation.SCAN; + } + + private static Operation valueFrom(final Row row) { + if (row == null) { + return null; + } + if (row instanceof Append) { + return Operation.APPEND; + } + if (row instanceof CheckAndMutate) { + return Operation.CHECK_AND_MUTATE; + } + if (row instanceof Delete) { + return Operation.DELETE; + } + if (row instanceof Get) { + return Operation.GET; + } + if (row instanceof Increment) { + return Operation.INCREMENT; + } + if (row instanceof Put) { + return Operation.PUT; + } + if (row instanceof RegionCoprocessorServiceExec) { + return Operation.COPROC_EXEC; + } + if (row instanceof RowMutations) { + return Operation.BATCH; + } + return null; + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java index a1770e7004ab..0377db61a4a7 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java @@ -17,15 +17,24 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY; -import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY; +import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; - +import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; @@ -43,14 +52,18 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MatcherPredicate; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.hamcrest.Matcher; +import org.hamcrest.core.IsAnything; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -59,10 +72,8 @@ import org.junit.experimental.categories.Category; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; - import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; @@ -135,11 +146,17 @@ public Void answer(InvocationOnMock invocation) throws Throwable { @Override public Void answer(InvocationOnMock invocation) throws Throwable { - ClientProtos.MultiResponse resp = - ClientProtos.MultiResponse.newBuilder() - .addRegionActionResult(RegionActionResult.newBuilder().addResultOrException( - ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result())))) - .build(); + ClientProtos.MultiRequest req = invocation.getArgument(1); + ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder(); + for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) { + RegionActionResult.Builder raBuilder = RegionActionResult.newBuilder(); + for (ClientProtos.Action ignored : regionAction.getActionList()) { + raBuilder.addResultOrException( + ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result()))); + } + builder.addRegionActionResult(raBuilder); + } + ClientProtos.MultiResponse resp = builder.build(); RpcCallback done = invocation.getArgument(2); ForkJoinPool.commonPool().execute(() -> done.run(resp)); return null; @@ -219,49 +236,73 @@ public void tearDown() throws IOException { Closeables.close(conn, true); } - private void assertTrace(String methodName) { - Waiter.waitFor(CONF, 1000, - () -> traceRule.getSpans().stream() - .anyMatch(span -> span.getName().equals("AsyncTable." + methodName) && - span.getKind() == SpanKind.INTERNAL && span.hasEnded())); - SpanData data = traceRule.getSpans().stream() - .filter(s -> s.getName().equals("AsyncTable." + methodName)).findFirst().get(); - assertEquals(StatusCode.OK, data.getStatus().getStatusCode()); - TableName tableName = table.getName(); - assertEquals(tableName.getNamespaceAsString(), data.getAttributes().get(NAMESPACE_KEY)); - assertEquals(tableName.getNameAsString(), data.getAttributes().get(TABLE_KEY)); + /** + * All {@link Span}s generated from table data access operations over {@code tableName} should + * include these attributes. + */ + private static Matcher buildBaseAttributesMatcher(TableName tableName) { + return hasAttributes(allOf( + containsEntry("db.name", tableName.getNamespaceAsString()), + containsEntry("db.hbase.namespace", tableName.getNamespaceAsString()), + containsEntry("db.hbase.table", tableName.getNameAsString()))); + } + + private void assertTrace(String tableOperation) { + assertTrace(tableOperation, new IsAnything<>()); + } + + private void assertTrace(String tableOperation, Matcher matcher) { + final TableName tableName = table.getName(); + final Matcher spanLocator = allOf( + hasName(containsString(tableOperation)), hasEnded()); + final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString(); + + Waiter.waitFor(CONF, 1000, new MatcherPredicate<>( + "waiting for span to emit", + () -> traceRule.getSpans(), hasItem(spanLocator))); + SpanData data = traceRule.getSpans() + .stream() + .filter(spanLocator::matches) + .findFirst() + .orElseThrow(AssertionError::new); + assertThat(data, allOf( + hasName(expectedName), + hasKind(SpanKind.CLIENT), + hasStatusWithCode(StatusCode.OK), + buildBaseAttributesMatcher(tableName), + matcher)); } @Test public void testExists() { table.exists(new Get(Bytes.toBytes(0))).join(); - assertTrace("get"); + assertTrace("GET"); } @Test public void testGet() { table.get(new Get(Bytes.toBytes(0))).join(); - assertTrace("get"); + assertTrace("GET"); } @Test public void testPut() { table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); - assertTrace("put"); + assertTrace("PUT"); } @Test public void testDelete() { table.delete(new Delete(Bytes.toBytes(0))).join(); - assertTrace("delete"); + assertTrace("DELETE"); } @Test public void testAppend() { table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); - assertTrace("append"); + assertTrace("APPEND"); } @Test @@ -270,21 +311,21 @@ public void testIncrement() { .increment( new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)) .join(); - assertTrace("increment"); + assertTrace("INCREMENT"); } @Test public void testIncrementColumnValue1() { table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1) .join(); - assertTrace("increment"); + assertTrace("INCREMENT"); } @Test public void testIncrementColumnValue2() { table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, Durability.ASYNC_WAL).join(); - assertTrace("increment"); + assertTrace("INCREMENT"); } @Test @@ -292,7 +333,7 @@ public void testCheckAndMutate() { table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0)) .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) .build(new Delete(Bytes.toBytes(0)))).join(); - assertTrace("checkAndMutate"); + assertTrace("CHECK_AND_MUTATE"); } @Test @@ -302,7 +343,7 @@ public void testCheckAndMutateList() { .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) .build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0])) .join(); - assertTrace("checkAndMutateList"); + assertTrace("BATCH"); } @Test @@ -310,19 +351,100 @@ public void testCheckAndMutateAll() { table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) .build(new Delete(Bytes.toBytes(0))))).join(); - assertTrace("checkAndMutateList"); + assertTrace("BATCH"); + } + + private void testCheckAndMutateBuilder(Row op) { + AsyncTable.CheckAndMutateBuilder builder = + table.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")) + .qualifier(Bytes.toBytes("cq")) + .ifEquals(Bytes.toBytes("v")); + if (op instanceof Put) { + Put put = (Put) op; + builder.thenPut(put).join(); + } else if (op instanceof Delete) { + Delete delete = (Delete) op; + builder.thenDelete(delete).join(); + } else if (op instanceof RowMutations) { + RowMutations mutations = (RowMutations) op; + builder.thenMutate(mutations).join(); + } else { + fail("unsupported CheckAndPut operation " + op); + } + assertTrace("CHECK_AND_MUTATE"); + } + + @Test + public void testCheckAndMutateBuilderThenPut() { + Put put = new Put(Bytes.toBytes(0)) + .addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v")); + testCheckAndMutateBuilder(put); + } + + @Test + public void testCheckAndMutateBuilderThenDelete() { + testCheckAndMutateBuilder(new Delete(Bytes.toBytes(0))); + } + + @Test + public void testCheckAndMutateBuilderThenMutations() throws IOException { + RowMutations mutations = new RowMutations(Bytes.toBytes(0)) + .add(new Put(Bytes.toBytes(0)) + .addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) + .add(new Delete(Bytes.toBytes(0))); + testCheckAndMutateBuilder(mutations); + } + + private void testCheckAndMutateWithFilterBuilder(Row op) { + // use of `PrefixFilter` is completely arbitrary here. + AsyncTable.CheckAndMutateWithFilterBuilder builder = + table.checkAndMutate(Bytes.toBytes(0), new PrefixFilter(Bytes.toBytes(0))); + if (op instanceof Put) { + Put put = (Put) op; + builder.thenPut(put).join(); + } else if (op instanceof Delete) { + Delete delete = (Delete) op; + builder.thenDelete(delete).join(); + } else if (op instanceof RowMutations) { + RowMutations mutations = (RowMutations) op; + builder.thenMutate(mutations).join(); + } else { + fail("unsupported CheckAndPut operation " + op); + } + assertTrace("CHECK_AND_MUTATE"); + } + + @Test + public void testCheckAndMutateWithFilterBuilderThenPut() { + Put put = new Put(Bytes.toBytes(0)) + .addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v")); + testCheckAndMutateWithFilterBuilder(put); + } + + @Test + public void testCheckAndMutateWithFilterBuilderThenDelete() { + testCheckAndMutateWithFilterBuilder(new Delete(Bytes.toBytes(0))); + } + + @Test + public void testCheckAndMutateWithFilterBuilderThenMutations() throws IOException { + RowMutations mutations = new RowMutations(Bytes.toBytes(0)) + .add(new Put(Bytes.toBytes(0)) + .addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) + .add(new Delete(Bytes.toBytes(0))); + testCheckAndMutateWithFilterBuilder(mutations); } @Test public void testMutateRow() throws IOException { table.mutateRow(new RowMutations(Bytes.toBytes(0)).add(new Delete(Bytes.toBytes(0)))); - assertTrace("mutateRow"); + assertTrace("BATCH"); } @Test - public void testScanAll() throws IOException { + public void testScanAll() { table.scanAll(new Scan().setCaching(1).setMaxResultSize(1).setLimit(1)).join(); - assertTrace("scanAll"); + assertTrace("SCAN"); } @Test @@ -331,13 +453,13 @@ public void testExistsList() { .allOf( table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) .join(); - assertTrace("getList"); + assertTrace("BATCH"); } @Test public void testExistsAll() { table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join(); - assertTrace("getList"); + assertTrace("BATCH"); } @Test @@ -345,13 +467,13 @@ public void testGetList() { CompletableFuture .allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) .join(); - assertTrace("getList"); + assertTrace("BATCH"); } @Test public void testGetAll() { table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join(); - assertTrace("getList"); + assertTrace("BATCH"); } @Test @@ -360,14 +482,14 @@ public void testPutList() { .allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0])) .join(); - assertTrace("putList"); + assertTrace("BATCH"); } @Test public void testPutAll() { table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))).join(); - assertTrace("putList"); + assertTrace("BATCH"); } @Test @@ -376,13 +498,13 @@ public void testDeleteList() { .allOf( table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) .join(); - assertTrace("deleteList"); + assertTrace("BATCH"); } @Test public void testDeleteAll() { table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); - assertTrace("deleteList"); + assertTrace("BATCH"); } @Test @@ -391,13 +513,13 @@ public void testBatch() { .allOf( table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) .join(); - assertTrace("batch"); + assertTrace("BATCH"); } @Test public void testBatchAll() { table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); - assertTrace("batch"); + assertTrace("BATCH"); } @Test diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java new file mode 100644 index 000000000000..c3bf3bee59e5 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.trace.hamcrest; + +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasProperty; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +/** + * Helper methods for matching against instances of {@link io.opentelemetry.api.common.Attributes}. + */ +public final class AttributesMatchers { + + private AttributesMatchers() { } + + public static Matcher containsEntry( + Matcher> keyMatcher, + Matcher valueMatcher + ) { + return new IsAttributesContaining<>(keyMatcher, valueMatcher); + } + + public static Matcher containsEntry(AttributeKey key, T value) { + return containsEntry(equalTo(key), equalTo(value)); + } + + public static Matcher containsEntry(String key, String value) { + return containsEntry(AttributeKey.stringKey(key), value); + } + + private static final class IsAttributesContaining extends TypeSafeMatcher { + private final Matcher> keyMatcher; + private final Matcher valueMatcher; + + private IsAttributesContaining( + final Matcher> keyMatcher, + final Matcher valueMatcher + ) { + this.keyMatcher = keyMatcher; + this.valueMatcher = valueMatcher; + } + + @Override + protected boolean matchesSafely(Attributes item) { + return item.asMap().entrySet().stream().anyMatch(e -> allOf( + hasProperty("key", keyMatcher), + hasProperty("value", valueMatcher)) + .matches(e)); + } + + @Override + public void describeMismatchSafely(Attributes item, Description mismatchDescription) { + mismatchDescription + .appendText("Attributes was ") + .appendValueList("[", ", ", "]", item.asMap().entrySet()); + } + + @Override + public void describeTo(Description description) { + description + .appendText("Attributes containing [") + .appendDescriptionOf(keyMatcher) + .appendText("->") + .appendDescriptionOf(valueMatcher) + .appendText("]"); + } + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java new file mode 100644 index 000000000000..2839e7c597c7 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.trace.hamcrest; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import org.hamcrest.Description; +import org.hamcrest.FeatureMatcher; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +/** + * Helper methods for matching against instances of {@link SpanData}. + */ +public final class SpanDataMatchers { + + private SpanDataMatchers() { } + + public static Matcher hasAttributes(Matcher matcher) { + return new FeatureMatcher( + matcher, "SpanData having attributes that ", "attributes" + ) { + @Override protected Attributes featureValueOf(SpanData item) { + return item.getAttributes(); + } + }; + } + + public static Matcher hasEnded() { + return new TypeSafeMatcher() { + @Override protected boolean matchesSafely(SpanData item) { + return item.hasEnded(); + } + @Override public void describeTo(Description description) { + description.appendText("SpanData that hasEnded"); + } + }; + } + + public static Matcher hasKind(SpanKind kind) { + return new FeatureMatcher( + equalTo(kind), "SpanData with kind that", "SpanKind") { + @Override protected SpanKind featureValueOf(SpanData item) { + return item.getKind(); + } + }; + } + + public static Matcher hasName(String name) { + return hasName(equalTo(name)); + } + + public static Matcher hasName(Matcher matcher) { + return new FeatureMatcher(matcher, "SpanKind with a name that", "name") { + @Override protected String featureValueOf(SpanData item) { + return item.getName(); + } + }; + } + + public static Matcher hasStatusWithCode(StatusCode statusCode) { + final Matcher matcher = is(equalTo(statusCode)); + return new TypeSafeMatcher() { + @Override protected boolean matchesSafely(SpanData item) { + final StatusData statusData = item.getStatus(); + return statusData != null + && statusData.getStatusCode() != null + && matcher.matches(statusData.getStatusCode()); + } + @Override public void describeTo(Description description) { + description.appendText("SpanData with StatusCode that ").appendDescriptionOf(matcher); + } + }; + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java index 1eb5d820d998..90c3c858a706 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java @@ -28,7 +28,9 @@ */ @InterfaceAudience.Private public final class HBaseSemanticAttributes { + public static final AttributeKey DB_NAME = SemanticAttributes.DB_NAME; public static final AttributeKey NAMESPACE_KEY = SemanticAttributes.DB_HBASE_NAMESPACE; + public static final AttributeKey DB_OPERATION = SemanticAttributes.DB_OPERATION; public static final AttributeKey TABLE_KEY = AttributeKey.stringKey("db.hbase.table"); public static final AttributeKey> REGION_NAMES_KEY = AttributeKey.stringArrayKey("db.hbase.regions"); @@ -44,5 +46,23 @@ public final class HBaseSemanticAttributes { AttributeKey.booleanKey("db.hbase.rowlock.readlock"); public static final AttributeKey WAL_IMPL = AttributeKey.stringKey("db.hbase.wal.impl"); + /** + * These are values used with {@link #DB_OPERATION}. They correspond with the implementations of + * {@code org.apache.hadoop.hbase.client.Operation}, as well as + * {@code org.apache.hadoop.hbase.client.CheckAndMutate}, and "MULTI", meaning a batch of multiple + * operations. + */ + public enum Operation { + APPEND, + BATCH, + CHECK_AND_MUTATE, + COPROC_EXEC, + DELETE, + GET, + INCREMENT, + PUT, + SCAN, + } + private HBaseSemanticAttributes() { } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java index e360705916dd..706d4891c618 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java @@ -91,9 +91,11 @@ public static Span createClientSpan(String name) { /** * Trace an asynchronous operation for a table. */ - public static CompletableFuture tracedFuture(Supplier> action, - String spanName, TableName tableName) { - Span span = createTableSpan(spanName, tableName); + public static CompletableFuture tracedFuture( + Supplier> action, + Supplier spanSupplier + ) { + Span span = spanSupplier.get(); try (Scope scope = span.makeCurrent()) { CompletableFuture future = action.get(); endSpan(future, span); @@ -119,8 +121,10 @@ public static CompletableFuture tracedFuture(Supplier List> tracedFutures( - Supplier>> action, String spanName, TableName tableName) { - Span span = createTableSpan(spanName, tableName); + Supplier>> action, + Supplier spanSupplier + ) { + Span span = spanSupplier.get(); try (Scope scope = span.makeCurrent()) { List> futures = action.get(); endSpan(CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])), span);