Skip to content

Commit

Permalink
HBASE-27536: improve slowlog payload
Browse files Browse the repository at this point in the history
  • Loading branch information
Ray Mattingly committed Dec 23, 2022
1 parent dcfde79 commit 654dd5f
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ final public class OnlineLogRecord extends LogEntry {
private final int multiGetsCount;
private final int multiMutationsCount;
private final int multiServiceCalls;
private final String operationJson;

public long getStartTime() {
return startTime;
Expand Down Expand Up @@ -128,11 +129,15 @@ public int getMultiServiceCalls() {
return multiServiceCalls;
}

public String getOperationJson() {
return operationJson;
}

private OnlineLogRecord(final long startTime, final int processingTime, final int queueTime,
final long responseSize, final String clientAddress, final String serverClass,
final String methodName, final String callDetails, final String param, final String regionName,
final String userName, final int multiGetsCount, final int multiMutationsCount,
final int multiServiceCalls) {
final int multiServiceCalls, final String operationJson) {
this.startTime = startTime;
this.processingTime = processingTime;
this.queueTime = queueTime;
Expand All @@ -147,6 +152,7 @@ private OnlineLogRecord(final long startTime, final int processingTime, final in
this.multiGetsCount = multiGetsCount;
this.multiMutationsCount = multiMutationsCount;
this.multiServiceCalls = multiServiceCalls;
this.operationJson = operationJson;
}

public static class OnlineLogRecordBuilder {
Expand All @@ -164,6 +170,7 @@ public static class OnlineLogRecordBuilder {
private int multiGetsCount;
private int multiMutationsCount;
private int multiServiceCalls;
private String operationJson;

public OnlineLogRecordBuilder setStartTime(long startTime) {
this.startTime = startTime;
Expand Down Expand Up @@ -235,10 +242,15 @@ public OnlineLogRecordBuilder setMultiServiceCalls(int multiServiceCalls) {
return this;
}

public OnlineLogRecordBuilder setOperationJson(String operationJson) {
this.operationJson = operationJson;
return this;
}

public OnlineLogRecord build() {
return new OnlineLogRecord(startTime, processingTime, queueTime, responseSize, clientAddress,
serverClass, methodName, callDetails, param, regionName, userName, multiGetsCount,
multiMutationsCount, multiServiceCalls);
multiMutationsCount, multiServiceCalls, operationJson);
}
}

Expand All @@ -261,15 +273,16 @@ public boolean equals(Object o) {
.append(multiServiceCalls, that.multiServiceCalls).append(clientAddress, that.clientAddress)
.append(serverClass, that.serverClass).append(methodName, that.methodName)
.append(callDetails, that.callDetails).append(param, that.param)
.append(regionName, that.regionName).append(userName, that.userName).isEquals();
.append(regionName, that.regionName).append(userName, that.userName)
.append(operationJson, that.operationJson).isEquals();
}

@Override
public int hashCode() {
return new HashCodeBuilder(17, 37).append(startTime).append(processingTime).append(queueTime)
.append(responseSize).append(clientAddress).append(serverClass).append(methodName)
.append(callDetails).append(param).append(regionName).append(userName).append(multiGetsCount)
.append(multiMutationsCount).append(multiServiceCalls).toHashCode();
.append(multiMutationsCount).append(multiServiceCalls).append(operationJson).toHashCode();
}

@Override
Expand All @@ -286,7 +299,8 @@ public String toString() {
.append("callDetails", callDetails).append("param", param).append("regionName", regionName)
.append("userName", userName).append("multiGetsCount", multiGetsCount)
.append("multiMutationsCount", multiMutationsCount)
.append("multiServiceCalls", multiServiceCalls).toString();
.append("multiServiceCalls", multiServiceCalls).append("operationJson", operationJson)
.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,24 @@ public class SlowLogParams {

private final String regionName;
private final String params;
private final String operationJson;

public SlowLogParams(String regionName, String params, String operationJson) {
this.regionName = regionName;
this.params = params;
this.operationJson = operationJson;
}

public SlowLogParams(String regionName, String params) {
this.regionName = regionName;
this.params = params;
this.operationJson = StringUtils.EMPTY;
}

public SlowLogParams(String params) {
this.regionName = StringUtils.EMPTY;
this.params = params;
this.operationJson = StringUtils.EMPTY;
}

public String getRegionName() {
Expand All @@ -51,6 +60,10 @@ public String getParams() {
return params;
}

public String getOperationJson() {
return operationJson;
}

@Override
public String toString() {
return new ToStringBuilder(this).append("regionName", regionName).append("params", params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ByteBufferExtendedCell;
Expand Down Expand Up @@ -87,6 +88,7 @@
import org.apache.hadoop.hbase.client.LogEntry;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.OnlineLogRecord;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
Expand Down Expand Up @@ -129,6 +131,8 @@
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
import org.apache.hbase.thirdparty.com.google.gson.JsonArray;
Expand Down Expand Up @@ -229,6 +233,8 @@
@InterfaceAudience.Private // TODO: some clients (Hive, etc) use this class
public final class ProtobufUtil {

private static final Logger LOG = LoggerFactory.getLogger(ProtobufUtil.class.getName());

private ProtobufUtil() {
}

Expand Down Expand Up @@ -2142,38 +2148,41 @@ private static String getStringForByteString(ByteString bs) {
* @param message Message object {@link Message}
* @return SlowLogParams with regionName(for filter queries) and params
*/
public static SlowLogParams getSlowLogParams(Message message) {
public static SlowLogParams getSlowLogParams(Message message, boolean slowLogOperationJsonEnabled,
int maxCols) {
if (message == null) {
return null;
}
if (message instanceof ScanRequest) {
ScanRequest scanRequest = (ScanRequest) message;
String regionName = getStringForByteString(scanRequest.getRegion().getValue());
String params = TextFormat.shortDebugString(message);
return new SlowLogParams(regionName, params);
return new SlowLogParams(regionName, params, toJson(slowLogOperationJsonEnabled, maxCols,
() -> ProtobufUtil.toScan(scanRequest.getScan())));
} else if (message instanceof MutationProto) {
MutationProto mutationProto = (MutationProto) message;
String params = "type= " + mutationProto.getMutateType().toString();
return new SlowLogParams(params);
return new SlowLogParams(params,
toJson(slowLogOperationJsonEnabled, maxCols, () -> ProtobufUtil.toMutation(mutationProto)));
} else if (message instanceof GetRequest) {
GetRequest getRequest = (GetRequest) message;
String regionName = getStringForByteString(getRequest.getRegion().getValue());
String params =
"region= " + regionName + ", row= " + getStringForByteString(getRequest.getGet().getRow());
return new SlowLogParams(regionName, params);
return new SlowLogParams(regionName, params, toJson(slowLogOperationJsonEnabled, maxCols,
() -> ProtobufUtil.toGet(getRequest.getGet())));
} else if (message instanceof MultiRequest) {
MultiRequest multiRequest = (MultiRequest) message;
int actionsCount = multiRequest.getRegionActionList().stream()
.mapToInt(ClientProtos.RegionAction::getActionCount).sum();
RegionAction actions = multiRequest.getRegionActionList().get(0);
String regionName = getStringForByteString(actions.getRegion().getValue());
String params = "region= " + regionName + ", for " + actionsCount + " action(s)";
String params = getShortTextFormat(multiRequest);
return new SlowLogParams(regionName, params);
} else if (message instanceof MutateRequest) {
MutateRequest mutateRequest = (MutateRequest) message;
String regionName = getStringForByteString(mutateRequest.getRegion().getValue());
String params = "region= " + regionName;
return new SlowLogParams(regionName, params);
return new SlowLogParams(regionName, params, toJson(slowLogOperationJsonEnabled, maxCols,
() -> ProtobufUtil.toMutation(mutateRequest.getMutation())));
} else if (message instanceof CoprocessorServiceRequest) {
CoprocessorServiceRequest coprocessorServiceRequest = (CoprocessorServiceRequest) message;
String params = "coprocessorService= " + coprocessorServiceRequest.getCall().getServiceName()
Expand All @@ -2184,6 +2193,19 @@ public static SlowLogParams getSlowLogParams(Message message) {
return new SlowLogParams(params);
}

private static String toJson(boolean slowLogOperationJsonEnabled, int maxCols,
Callable<Operation> operation) {
if (!slowLogOperationJsonEnabled) {
return StringUtils.EMPTY;
}
try {
return operation.call().toJSON(maxCols);
} catch (Exception e) {
LOG.warn("Exception when deriving operation JSON", e);
}
return StringUtils.EMPTY;
}

/**
* Print out some subset of a MutationProto rather than all of it and its data
* @param proto Protobuf to print out
Expand Down Expand Up @@ -3376,7 +3398,8 @@ private static LogEntry getSlowLogRecord(final TooSlowLog.SlowLogPayload slowLog
.setQueueTime(slowLogPayload.getQueueTime()).setRegionName(slowLogPayload.getRegionName())
.setResponseSize(slowLogPayload.getResponseSize())
.setServerClass(slowLogPayload.getServerClass()).setStartTime(slowLogPayload.getStartTime())
.setUserName(slowLogPayload.getUserName()).build();
.setUserName(slowLogPayload.getUserName())
.setOperationJson(slowLogPayload.getOperationJson()).build();
return onlineLogRecord;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1564,6 +1564,14 @@ public enum OperationStatusCode {
// Default 10 mins.
public static final int DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION = 10 * 60 * 1000;

public static final String SLOW_LOG_OPERATION_JSON_MAX_COLS =
"hbase.regionserver.slowlog.operation.json.max.cols";
public static final int SLOW_LOG_OPERATION_JSON_MAX_COLS_DEFAULT = 50;

public static final String SLOW_LOG_OPERATION_JSON_ENABLED =
"hbase.regionserver.slowlog.operation.json.enabled";
public static final boolean SLOW_LOG_OPERATION_JSON_ENABLED_DEFAULT = false;

public static final String SHELL_TIMESTAMP_FORMAT_EPOCH_KEY =
"hbase.shell.timestamp.format.epoch";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ message SlowLogPayload {
optional int32 multi_mutations = 13 [default = 0];
optional int32 multi_service_calls = 14 [default = 0];
required Type type = 15;
optional string operation_json = 16;

// SLOW_LOG is RPC call slow in nature whereas LARGE_LOG is RPC call quite large.
// Majority of times, slow logs are also large logs and hence, ALL is combination of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,16 @@ public class SlowLogQueueService implements NamedQueueService {
private final boolean isSlowLogTableEnabled;
private final SlowLogPersistentService slowLogPersistentService;
private final Queue<TooSlowLog.SlowLogPayload> slowLogQueue;
private final int slowLogOperationJsonMaxCols;
private final boolean slowLogOperationJsonEnabled;

public SlowLogQueueService(Configuration conf) {
this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
this.slowLogOperationJsonMaxCols = conf.getInt(HConstants.SLOW_LOG_OPERATION_JSON_MAX_COLS,
HConstants.SLOW_LOG_OPERATION_JSON_MAX_COLS_DEFAULT);
this.slowLogOperationJsonEnabled = conf.getBoolean(HConstants.SLOW_LOG_OPERATION_JSON_ENABLED,
HConstants.SLOW_LOG_OPERATION_JSON_ENABLED_DEFAULT);

if (!isOnlineLogProviderEnabled) {
this.isSlowLogTableEnabled = false;
Expand Down Expand Up @@ -128,7 +134,8 @@ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
long endTime = EnvironmentEdgeManager.currentTime();
int processingTime = (int) (endTime - startTime);
int qTime = (int) (startTime - receiveTime);
final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param);
final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param,
slowLogOperationJsonEnabled, slowLogOperationJsonMaxCols);
int numGets = 0;
int numMutations = 0;
int numServiceCalls = 0;
Expand Down Expand Up @@ -159,7 +166,9 @@ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
.setProcessingTime(processingTime).setQueueTime(qTime)
.setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
.setResponseSize(responseSize).setServerClass(className).setStartTime(startTime).setType(type)
.setUserName(userName).build();
.setUserName(userName).setOperationJson(
slowLogParams != null ? slowLogParams.getOperationJson() : StringUtils.EMPTY)
.build();
slowLogQueue.add(slowLogPayload);
if (isSlowLogTableEnabled) {
if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseClassTestRule;
Expand Down Expand Up @@ -99,7 +100,7 @@ private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogP
}

@Test
public void testOnlieSlowLogConsumption() throws Exception {
public void testOnlineSlowLogConsumption() throws Exception {

Configuration conf = applySlowLogRecorderConf(8);
Constructor<NamedQueueRecorder> constructor =
Expand Down Expand Up @@ -277,6 +278,83 @@ && confirmPayloadParams(12, 142, slowLogPayloads)
Assert.assertEquals(slowLogPayloads.size(), 0);
}

@Test
public void testOnlineSlowLogOperationJsonDefaultDisabled() throws Exception {
Configuration conf = applySlowLogRecorderConf(1);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();

Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_1", "client_1", "class_1");
namedQueueRecorder.addRecord(rpcLogDetails);
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
if (slowLogPayload.map(SlowLogPayload::hasOperationJson).orElse(false)) {
String operationJson = slowLogPayload.get().getOperationJson();
return operationJson.equals(StringUtils.EMPTY);
}
return false;
}));
}

@Test
public void testOnlineSlowLogOperationJsonExplicitlyDisabled() throws Exception {
Configuration conf = applySlowLogRecorderConf(1);
conf.setBoolean(HConstants.SLOW_LOG_OPERATION_JSON_ENABLED, false);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();

Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_1", "client_1", "class_1");
namedQueueRecorder.addRecord(rpcLogDetails);
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
if (slowLogPayload.map(SlowLogPayload::hasOperationJson).orElse(false)) {
String operationJson = slowLogPayload.get().getOperationJson();
return operationJson.equals(StringUtils.EMPTY);
}
return false;
}));
}

@Test
public void testOnlineSlowLogOperationJsonEnabled() throws Exception {
i = 1; // set this so that the RpcCall and expected OperationJSON are deterministic
Configuration conf = applySlowLogRecorderConf(1);
conf.setBoolean(HConstants.SLOW_LOG_OPERATION_JSON_ENABLED, true);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();
String expectedOperationJson =
"{\"totalColumns\":0,\"row\":\"row123\",\"families\":{},\"ts\":\"9223372036854775807\"}";

Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
RpcLogDetails rpcLogDetails = getRpcLogDetails("userName_1", "client_1", "class_1");
namedQueueRecorder.addRecord(rpcLogDetails);
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
if (slowLogPayload.map(SlowLogPayload::hasOperationJson).orElse(false)) {
String operationJson = slowLogPayload.get().getOperationJson();
return operationJson.equals(expectedOperationJson);
}
return false;
}));
}

@Test
public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception {
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
Expand Down

0 comments on commit 654dd5f

Please sign in to comment.