diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java index 96182ca4b296..56d7ce152af0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java @@ -17,15 +17,22 @@ */ package org.apache.hadoop.hbase.client; +import java.io.IOException; +import java.util.List; +import java.util.Optional; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.GsonUtil; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.gson.Gson; import org.apache.hbase.thirdparty.com.google.gson.JsonObject; +import org.apache.hbase.thirdparty.com.google.gson.JsonParser; import org.apache.hbase.thirdparty.com.google.gson.JsonSerializer; /** @@ -36,24 +43,48 @@ @InterfaceStability.Evolving final public class OnlineLogRecord extends LogEntry { + private static final Logger LOG = LoggerFactory.getLogger(OnlineLogRecord.class.getName()); + // used to convert object to pretty printed format // used by toJsonPrettyPrint() private static final Gson GSON = - GsonUtil.createGson().setPrettyPrinting().registerTypeAdapter(OnlineLogRecord.class, - (JsonSerializer) (slowLogPayload, type, jsonSerializationContext) -> { - Gson gson = new Gson(); - JsonObject jsonObj = (JsonObject) gson.toJsonTree(slowLogPayload); - if (slowLogPayload.getMultiGetsCount() == 0) { - jsonObj.remove("multiGetsCount"); - } - if (slowLogPayload.getMultiMutationsCount() == 0) { - jsonObj.remove("multiMutationsCount"); - } - if (slowLogPayload.getMultiServiceCalls() == 0) { - jsonObj.remove("multiServiceCalls"); + GsonUtil.createGson().setPrettyPrinting().registerTypeAdapter(Operation.class, + (JsonSerializer) (operation, type, jsonSerializationContext) -> { + String jsonString = null; + try { + jsonString = operation.toJSON(); + } catch (IOException e) { + LOG.warn("Unable to serialize operation {}", operation, e); } - return jsonObj; - }).create(); + return JsonParser.parseString(jsonString == null ? HConstants.EMPTY_STRING : jsonString); + }).registerTypeAdapter(OnlineLogRecord.class, + (JsonSerializer) (slowLogPayload, type, jsonSerializationContext) -> { + Gson gson = new Gson(); + JsonObject jsonObj = (JsonObject) gson.toJsonTree(slowLogPayload); + if (slowLogPayload.getMultiGetsCount() == 0) { + jsonObj.remove("multiGetsCount"); + } + if (slowLogPayload.getMultiMutationsCount() == 0) { + jsonObj.remove("multiMutationsCount"); + } + if (slowLogPayload.getMultiServiceCalls() == 0) { + jsonObj.remove("multiServiceCalls"); + } + if (!slowLogPayload.getScan().isPresent()) { + jsonObj.remove("scan"); + } + if (!slowLogPayload.getMulti().isPresent()) { + jsonObj.remove("multi"); + } + if (!slowLogPayload.getGet().isPresent()) { + jsonObj.remove("get"); + } + if (!slowLogPayload.getMutate().isPresent()) { + jsonObj.remove("mutate"); + } + return jsonObj; + }) + .create(); private final long startTime; private final int processingTime; @@ -71,6 +102,10 @@ final public class OnlineLogRecord extends LogEntry { private final int multiGetsCount; private final int multiMutationsCount; private final int multiServiceCalls; + private final Optional scan; + private final Optional> multi; + private final Optional get; + private final Optional mutate; public long getStartTime() { return startTime; @@ -128,11 +163,52 @@ public int getMultiServiceCalls() { return multiServiceCalls; } + /** + * If {@value org.apache.hadoop.hbase.HConstants#SLOW_LOG_OPERATION_MESSAGE_PAYLOAD_ENABLED} is + * enabled then this value may be present and should represent the Scan that produced the given + * {@link OnlineLogRecord}. This value should only be present if {@link #getMulti()}, + * {@link #getGet()}, and {@link #getMutate()} are empty + */ + public Optional getScan() { + return scan; + } + + /** + * If {@value org.apache.hadoop.hbase.HConstants#SLOW_LOG_OPERATION_MESSAGE_PAYLOAD_ENABLED} is + * enabled then this value may be present and should represent the MultiRequest that produced the + * given {@link OnlineLogRecord}. This value should only be present if {@link #getScan}, + * {@link #getGet()}, and {@link #getMutate()} are empty + */ + public Optional> getMulti() { + return multi; + } + + /** + * If {@value org.apache.hadoop.hbase.HConstants#SLOW_LOG_OPERATION_MESSAGE_PAYLOAD_ENABLED} is + * enabled then this value may be present and should represent the Get that produced the given + * {@link OnlineLogRecord}. This value should only be present if {@link #getScan()}, + * {@link #getMulti()} ()}, and {@link #getMutate()} are empty + */ + public Optional getGet() { + return get; + } + + /** + * If {@value org.apache.hadoop.hbase.HConstants#SLOW_LOG_OPERATION_MESSAGE_PAYLOAD_ENABLED} is + * enabled then this value may be present and should represent the Mutation that produced the + * given {@link OnlineLogRecord}. This value should only be present if {@link #getScan}, + * {@link #getMulti()} ()}, and {@link #getGet()} ()} are empty + */ + public Optional getMutate() { + return mutate; + } + private OnlineLogRecord(final long startTime, final int processingTime, final int queueTime, final long responseSize, final String clientAddress, final String serverClass, final String methodName, final String callDetails, final String param, final String regionName, final String userName, final int multiGetsCount, final int multiMutationsCount, - final int multiServiceCalls) { + final int multiServiceCalls, final Optional scan, final Optional> multi, + final Optional get, final Optional mutate) { this.startTime = startTime; this.processingTime = processingTime; this.queueTime = queueTime; @@ -147,6 +223,10 @@ private OnlineLogRecord(final long startTime, final int processingTime, final in this.multiGetsCount = multiGetsCount; this.multiMutationsCount = multiMutationsCount; this.multiServiceCalls = multiServiceCalls; + this.scan = scan; + this.multi = multi; + this.get = get; + this.mutate = mutate; } public static class OnlineLogRecordBuilder { @@ -164,6 +244,10 @@ public static class OnlineLogRecordBuilder { private int multiGetsCount; private int multiMutationsCount; private int multiServiceCalls; + private Optional scan = Optional.empty(); + private Optional> multi = Optional.empty(); + private Optional get = Optional.empty(); + private Optional mutate = Optional.empty(); public OnlineLogRecordBuilder setStartTime(long startTime) { this.startTime = startTime; @@ -235,10 +319,30 @@ public OnlineLogRecordBuilder setMultiServiceCalls(int multiServiceCalls) { return this; } + public OnlineLogRecordBuilder setScan(Scan scan) { + this.scan = Optional.of(scan); + return this; + } + + public OnlineLogRecordBuilder setMulti(List multi) { + this.multi = Optional.of(multi); + return this; + } + + public OnlineLogRecordBuilder setGet(Get get) { + this.get = Optional.of(get); + return this; + } + + public OnlineLogRecordBuilder setMutate(Mutation mutate) { + this.mutate = Optional.of(mutate); + return this; + } + public OnlineLogRecord build() { return new OnlineLogRecord(startTime, processingTime, queueTime, responseSize, clientAddress, serverClass, methodName, callDetails, param, regionName, userName, multiGetsCount, - multiMutationsCount, multiServiceCalls); + multiMutationsCount, multiServiceCalls, scan, multi, get, mutate); } } @@ -261,7 +365,8 @@ public boolean equals(Object o) { .append(multiServiceCalls, that.multiServiceCalls).append(clientAddress, that.clientAddress) .append(serverClass, that.serverClass).append(methodName, that.methodName) .append(callDetails, that.callDetails).append(param, that.param) - .append(regionName, that.regionName).append(userName, that.userName).isEquals(); + .append(regionName, that.regionName).append(userName, that.userName).append(scan, that.scan) + .append(multi, that.multi).append(get, that.get).append(mutate, that.mutate).isEquals(); } @Override @@ -269,7 +374,8 @@ public int hashCode() { return new HashCodeBuilder(17, 37).append(startTime).append(processingTime).append(queueTime) .append(responseSize).append(clientAddress).append(serverClass).append(methodName) .append(callDetails).append(param).append(regionName).append(userName).append(multiGetsCount) - .append(multiMutationsCount).append(multiServiceCalls).toHashCode(); + .append(multiMutationsCount).append(multiServiceCalls).append(scan).append(multi).append(get) + .append(mutate).toHashCode(); } @Override @@ -286,7 +392,8 @@ public String toString() { .append("callDetails", callDetails).append("param", param).append("regionName", regionName) .append("userName", userName).append("multiGetsCount", multiGetsCount) .append("multiMutationsCount", multiMutationsCount) - .append("multiServiceCalls", multiServiceCalls).toString(); + .append("multiServiceCalls", multiServiceCalls).append("scan", scan).append("multi", multi) + .append("get", get).append("mutate", mutate).toString(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java index b1460c0b116c..e15be9b23581 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java @@ -23,6 +23,8 @@ import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + /** * SlowLog params object that contains detailed info as params and region name : to be used for * filter purpose @@ -32,15 +34,63 @@ public class SlowLogParams { private final String regionName; private final String params; + private final ClientProtos.Scan scan; + private final ClientProtos.MultiRequest multi; + private final ClientProtos.Get get; + private final ClientProtos.MutationProto mutate; + + public SlowLogParams(String regionName, String params, ClientProtos.Scan scan) { + this.regionName = regionName; + this.params = params; + this.scan = scan; + this.multi = null; + this.get = null; + this.mutate = null; + } + + public SlowLogParams(String regionName, String params, ClientProtos.MultiRequest multi) { + this.regionName = regionName; + this.params = params; + this.scan = null; + this.multi = multi; + this.get = null; + this.mutate = null; + } + + public SlowLogParams(String regionName, String params, ClientProtos.Get get) { + this.regionName = regionName; + this.params = params; + this.scan = null; + this.multi = null; + this.get = get; + this.mutate = null; + } + + public SlowLogParams(String regionName, String params, ClientProtos.MutationProto mutate) { + this.regionName = regionName; + this.params = params; + this.scan = null; + this.multi = null; + this.get = null; + this.mutate = mutate; + } public SlowLogParams(String regionName, String params) { this.regionName = regionName; this.params = params; + this.scan = null; + this.multi = null; + this.get = null; + this.mutate = null; } public SlowLogParams(String params) { this.regionName = StringUtils.EMPTY; this.params = params; + this.scan = null; + this.multi = null; + this.get = null; + this.mutate = null; } public String getRegionName() { @@ -51,9 +101,26 @@ public String getParams() { return params; } + public ClientProtos.Scan getScan() { + return scan; + } + + public ClientProtos.MultiRequest getMulti() { + return multi; + } + + public ClientProtos.Get getGet() { + return get; + } + + public ClientProtos.MutationProto getMutate() { + return mutate; + } + @Override public String toString() { return new ToStringBuilder(this).append("regionName", regionName).append("params", params) + .append("scan", scan).append("multi", multi).append("get", get).append("mutate", mutate) .toString(); } @@ -67,11 +134,13 @@ public boolean equals(Object o) { } SlowLogParams that = (SlowLogParams) o; return new EqualsBuilder().append(regionName, that.regionName).append(params, that.params) - .isEquals(); + .append(scan, that.scan).append(multi, that.multi).append(get, that.get) + .append(mutate, that.mutate).isEquals(); } @Override public int hashCode() { - return new HashCodeBuilder(17, 37).append(regionName).append(params).toHashCode(); + return new HashCodeBuilder(17, 37).append(regionName).append(params).append(scan).append(multi) + .append(get).append(mutate).toHashCode(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index ca38a91e74a6..ad414718da9b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -87,6 +87,7 @@ import org.apache.hadoop.hbase.client.LogEntry; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.OnlineLogRecord; +import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfoBuilder; @@ -129,6 +130,8 @@ import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; import org.apache.hbase.thirdparty.com.google.gson.JsonArray; @@ -229,6 +232,8 @@ @InterfaceAudience.Private // TODO: some clients (Hive, etc) use this class public final class ProtobufUtil { + private static final Logger LOG = LoggerFactory.getLogger(ProtobufUtil.class.getName()); + private ProtobufUtil() { } @@ -1072,6 +1077,21 @@ public static ClientProtos.Scan toScan(final Scan scan) throws IOException { return scanBuilder.build(); } + public static List toMulti(MultiRequest multiRequest) throws IOException { + List operations = new ArrayList<>(); + for (RegionAction regionAction : multiRequest.getRegionActionList()) { + List actions = regionAction.getActionList(); + for (ClientProtos.Action action : actions) { + if (action.hasGet()) { + operations.add(ProtobufUtil.toGet(action.getGet())); + } else if (action.hasMutation()) { + operations.add(ProtobufUtil.toMutation(action.getMutation())); + } + } + } + return operations; + } + /** * Convert a protocol buffer Scan to a client Scan * @param proto the protocol buffer Scan to convert @@ -2139,10 +2159,13 @@ private static String getStringForByteString(ByteString bs) { /** * Return SlowLogParams to maintain recent online slowlog responses - * @param message Message object {@link Message} + * @param message Message object {@link Message} + * @param slowLogOperationMessagePayloadEnabled whether to include the {@link Message} in the + * payload * @return SlowLogParams with regionName(for filter queries) and params */ - public static SlowLogParams getSlowLogParams(Message message) { + public static SlowLogParams getSlowLogParams(Message message, + boolean slowLogOperationMessagePayloadEnabled) { if (message == null) { return null; } @@ -2150,30 +2173,48 @@ public static SlowLogParams getSlowLogParams(Message message) { ScanRequest scanRequest = (ScanRequest) message; String regionName = getStringForByteString(scanRequest.getRegion().getValue()); String params = TextFormat.shortDebugString(message); - return new SlowLogParams(regionName, params); + if (slowLogOperationMessagePayloadEnabled) { + return new SlowLogParams(regionName, params, scanRequest.getScan()); + } else { + return new SlowLogParams(regionName, params); + } } else if (message instanceof MutationProto) { MutationProto mutationProto = (MutationProto) message; String params = "type= " + mutationProto.getMutateType().toString(); - return new SlowLogParams(params); + if (slowLogOperationMessagePayloadEnabled) { + return new SlowLogParams(null, params, mutationProto); + } else { + return new SlowLogParams(params); + } } else if (message instanceof GetRequest) { GetRequest getRequest = (GetRequest) message; String regionName = getStringForByteString(getRequest.getRegion().getValue()); String params = "region= " + regionName + ", row= " + getStringForByteString(getRequest.getGet().getRow()); - return new SlowLogParams(regionName, params); + if (slowLogOperationMessagePayloadEnabled) { + return new SlowLogParams(regionName, params, getRequest.getGet()); + } else { + return new SlowLogParams(regionName, params); + } } else if (message instanceof MultiRequest) { MultiRequest multiRequest = (MultiRequest) message; - int actionsCount = multiRequest.getRegionActionList().stream() - .mapToInt(ClientProtos.RegionAction::getActionCount).sum(); RegionAction actions = multiRequest.getRegionActionList().get(0); String regionName = getStringForByteString(actions.getRegion().getValue()); - String params = "region= " + regionName + ", for " + actionsCount + " action(s)"; - return new SlowLogParams(regionName, params); + String params = getShortTextFormat(multiRequest); + if (slowLogOperationMessagePayloadEnabled) { + return new SlowLogParams(regionName, params, multiRequest); + } else { + return new SlowLogParams(regionName, params); + } } else if (message instanceof MutateRequest) { MutateRequest mutateRequest = (MutateRequest) message; String regionName = getStringForByteString(mutateRequest.getRegion().getValue()); String params = "region= " + regionName; - return new SlowLogParams(regionName, params); + if (slowLogOperationMessagePayloadEnabled) { + return new SlowLogParams(regionName, params, mutateRequest.getMutation()); + } else { + return new SlowLogParams(regionName, params); + } } else if (message instanceof CoprocessorServiceRequest) { CoprocessorServiceRequest coprocessorServiceRequest = (CoprocessorServiceRequest) message; String params = "coprocessorService= " + coprocessorServiceRequest.getCall().getServiceName() @@ -3365,7 +3406,7 @@ public static Set toCompactedStoreFiles(byte[] bytes) throws IOException * @return SlowLog Payload for client usecase */ private static LogEntry getSlowLogRecord(final TooSlowLog.SlowLogPayload slowLogPayload) { - OnlineLogRecord onlineLogRecord = + OnlineLogRecord.OnlineLogRecordBuilder onlineLogRecordBuilder = new OnlineLogRecord.OnlineLogRecordBuilder().setCallDetails(slowLogPayload.getCallDetails()) .setClientAddress(slowLogPayload.getClientAddress()) .setMethodName(slowLogPayload.getMethodName()) @@ -3376,8 +3417,31 @@ private static LogEntry getSlowLogRecord(final TooSlowLog.SlowLogPayload slowLog .setQueueTime(slowLogPayload.getQueueTime()).setRegionName(slowLogPayload.getRegionName()) .setResponseSize(slowLogPayload.getResponseSize()) .setServerClass(slowLogPayload.getServerClass()).setStartTime(slowLogPayload.getStartTime()) - .setUserName(slowLogPayload.getUserName()).build(); - return onlineLogRecord; + .setUserName(slowLogPayload.getUserName()); + if (slowLogPayload.hasScan()) { + onlineLogRecordBuilder.setScan(catchAll(() -> ProtobufUtil.toScan(slowLogPayload.getScan()))); + } + if (slowLogPayload.hasMulti()) { + onlineLogRecordBuilder + .setMulti(catchAll(() -> ProtobufUtil.toMulti(slowLogPayload.getMulti()))); + } + if (slowLogPayload.hasGet()) { + onlineLogRecordBuilder.setGet(catchAll(() -> ProtobufUtil.toGet(slowLogPayload.getGet()))); + } + if (slowLogPayload.hasMutate()) { + onlineLogRecordBuilder + .setMutate(catchAll(() -> ProtobufUtil.toMutation(slowLogPayload.getMutate()))); + } + return onlineLogRecordBuilder.build(); + } + + private static T catchAll(Callable callable) { + try { + return callable.call(); + } catch (Exception e) { + LOG.warn("Suppressing exception", e); + } + return null; } /** diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 26c105b40cb6..214b542c1453 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1564,6 +1564,10 @@ public enum OperationStatusCode { // Default 10 mins. public static final int DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION = 10 * 60 * 1000; + public static final String SLOW_LOG_OPERATION_MESSAGE_PAYLOAD_ENABLED = + "hbase.slowlog.operation.message.payload.enabled"; + public static final boolean SLOW_LOG_OPERATION_MESSAGE_PAYLOAD_ENABLED_DEFAULT = false; + public static final String SHELL_TIMESTAMP_FORMAT_EPOCH_KEY = "hbase.shell.timestamp.format.epoch"; diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/TooSlowLog.proto b/hbase-protocol-shaded/src/main/protobuf/server/region/TooSlowLog.proto index 36ed9d504a71..77fd7d23546d 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/region/TooSlowLog.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/region/TooSlowLog.proto @@ -27,6 +27,8 @@ option java_outer_classname = "TooSlowLog"; option java_generate_equals_and_hash = true; option optimize_for = SPEED; +import "client/Client.proto"; + message SlowLogPayload { required int64 start_time = 1; required int32 processing_time = 2; @@ -43,6 +45,10 @@ message SlowLogPayload { optional int32 multi_mutations = 13 [default = 0]; optional int32 multi_service_calls = 14 [default = 0]; required Type type = 15; + optional Scan scan = 16; + optional MultiRequest multi = 17; + optional Get get = 18; + optional MutationProto mutate = 19; // SLOW_LOG is RPC call slow in nature whereas LARGE_LOG is RPC call quite large. // Majority of times, slow logs are also large logs and hence, ALL is combination of diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java index 86b24e9d975e..c80ebd4d4502 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java @@ -65,10 +65,14 @@ public class SlowLogQueueService implements NamedQueueService { private final boolean isSlowLogTableEnabled; private final SlowLogPersistentService slowLogPersistentService; private final Queue slowLogQueue; + private final boolean slowLogOperationMessagePayloadEnabled; public SlowLogQueueService(Configuration conf) { this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); + this.slowLogOperationMessagePayloadEnabled = + conf.getBoolean(HConstants.SLOW_LOG_OPERATION_MESSAGE_PAYLOAD_ENABLED, + HConstants.SLOW_LOG_OPERATION_MESSAGE_PAYLOAD_ENABLED_DEFAULT); if (!isOnlineLogProviderEnabled) { this.isSlowLogTableEnabled = false; @@ -128,7 +132,8 @@ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { long endTime = EnvironmentEdgeManager.currentTime(); int processingTime = (int) (endTime - startTime); int qTime = (int) (startTime - receiveTime); - final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param); + final SlowLogParams slowLogParams = + ProtobufUtil.getSlowLogParams(param, slowLogOperationMessagePayloadEnabled); int numGets = 0; int numMutations = 0; int numServiceCalls = 0; @@ -151,7 +156,7 @@ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY); final String methodDescriptorName = methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY; - TooSlowLog.SlowLogPayload slowLogPayload = TooSlowLog.SlowLogPayload.newBuilder() + TooSlowLog.SlowLogPayload.Builder slowLogPayloadBuilder = TooSlowLog.SlowLogPayload.newBuilder() .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")") .setClientAddress(clientAddress).setMethodName(methodDescriptorName).setMultiGets(numGets) .setMultiMutations(numMutations).setMultiServiceCalls(numServiceCalls) @@ -159,7 +164,22 @@ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { .setProcessingTime(processingTime).setQueueTime(qTime) .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY) .setResponseSize(responseSize).setServerClass(className).setStartTime(startTime).setType(type) - .setUserName(userName).build(); + .setUserName(userName); + if (slowLogParams != null) { + if (slowLogParams.getScan() != null) { + slowLogPayloadBuilder.setScan(slowLogParams.getScan()); + } + if (slowLogParams.getMulti() != null) { + slowLogPayloadBuilder.setMulti(slowLogParams.getMulti()); + } + if (slowLogParams.getGet() != null) { + slowLogPayloadBuilder.setGet(slowLogParams.getGet()); + } + if (slowLogParams.getMutate() != null) { + slowLogPayloadBuilder.setMutate(slowLogParams.getMutate()); + } + } + TooSlowLog.SlowLogPayload slowLogPayload = slowLogPayloadBuilder.build(); slowLogQueue.add(slowLogPayload); if (isSlowLogTableEnabled) { if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java index 406748fab630..89661416cace 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java @@ -99,7 +99,7 @@ private boolean confirmPayloadParams(int i, int j, List slowLogP } @Test - public void testOnlieSlowLogConsumption() throws Exception { + public void testOnlineSlowLogConsumption() throws Exception { Configuration conf = applySlowLogRecorderConf(8); Constructor constructor = @@ -277,6 +277,118 @@ && confirmPayloadParams(12, 142, slowLogPayloads) Assert.assertEquals(slowLogPayloads.size(), 0); } + @Test + public void testOnlineSlowLogOperationMessagePayloadDefaultDisabled() throws Exception { + Configuration conf = applySlowLogRecorderConf(1); + conf.unset(HConstants.SLOW_LOG_OPERATION_MESSAGE_PAYLOAD_ENABLED); + Constructor constructor = + NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + namedQueueRecorder = constructor.newInstance(conf); + AdminProtos.SlowLogResponseRequest request = + AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); + + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + LOG.debug("Initially ringbuffer of Slow Log records is empty"); + RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_1", "client_1", "class_1"); + namedQueueRecorder.addRecord(rpcLogDetails); + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { + Optional slowLogPayload = getSlowLogPayloads(request).stream().findAny(); + if (slowLogPayload.isPresent()) { + Message message = getMessage(slowLogPayload.get()); + return message == null; + } + return false; + })); + } + + @Test + public void testOnlineSlowLogOperationMessagePayloadExplicitlyDisabled() throws Exception { + Configuration conf = applySlowLogRecorderConf(1); + conf.setBoolean(HConstants.SLOW_LOG_OPERATION_MESSAGE_PAYLOAD_ENABLED, false); + Constructor constructor = + NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + namedQueueRecorder = constructor.newInstance(conf); + AdminProtos.SlowLogResponseRequest request = + AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); + + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + LOG.debug("Initially ringbuffer of Slow Log records is empty"); + RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_1", "client_1", "class_1"); + namedQueueRecorder.addRecord(rpcLogDetails); + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { + Optional slowLogPayload = getSlowLogPayloads(request).stream().findAny(); + if (slowLogPayload.isPresent()) { + Message message = getMessage(slowLogPayload.get()); + return message == null; + } + return false; + })); + } + + @Test + public void testOnlineSlowLogOperationMessagePayloadEnabled() throws Exception { + i = 0; // set this so that the RpcCall and expected Message are deterministic + Configuration conf = applySlowLogRecorderConf(1); + conf.setBoolean(HConstants.SLOW_LOG_OPERATION_MESSAGE_PAYLOAD_ENABLED, true); + Constructor constructor = + NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + namedQueueRecorder = constructor.newInstance(conf); + AdminProtos.SlowLogResponseRequest request = + AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); + Message expectedMessage = + ClientProtos.MutationProto.newBuilder().setRow(ByteString.copyFromUtf8("row123")).build(); + + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + LOG.debug("Initially ringbuffer of Slow Log records is empty"); + RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_1", "client_1", "class_1"); + namedQueueRecorder.addRecord(rpcLogDetails); + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { + Optional slowLogPayload = getSlowLogPayloads(request).stream().findAny(); + if (slowLogPayload.map(TestNamedQueueRecorder::hasMessage).orElse(false)) { + Message message = getMessage(slowLogPayload.get()); + return message.equals(expectedMessage); + } + return false; + })); + } + + private static boolean hasMessage(SlowLogPayload slowLogPayload) { + int trueCount = 0; + if (slowLogPayload.hasScan()) { + trueCount++; + } + if (slowLogPayload.hasMulti()) { + trueCount++; + } + if (slowLogPayload.hasGet()) { + trueCount++; + } + if (slowLogPayload.hasMutate()) { + trueCount++; + } + Assert.assertTrue(trueCount <= 1); + return trueCount > 0; + } + + private static Message getMessage(SlowLogPayload slowLogPayload) { + if (slowLogPayload.hasScan()) { + return slowLogPayload.getScan(); + } + if (slowLogPayload.hasMulti()) { + return slowLogPayload.getMulti(); + } + if (slowLogPayload.hasGet()) { + return slowLogPayload.getGet(); + } + if (slowLogPayload.hasMutate()) { + return slowLogPayload.getMutate(); + } + return null; + } + @Test public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception { Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();