diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PException.java b/src/main/java/com/xiaomi/infra/pegasus/client/PException.java index 7692ec17..4483bcf3 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PException.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PException.java @@ -4,6 +4,7 @@ package com.xiaomi.infra.pegasus.client; import com.xiaomi.infra.pegasus.base.error_code; +import com.xiaomi.infra.pegasus.client.PegasusTable.Request; import com.xiaomi.infra.pegasus.rpc.ReplicationException; import java.util.concurrent.TimeoutException; @@ -40,13 +41,14 @@ static PException threadInterrupted(String tableName, InterruptedException e) { String.format("[table=%s] Thread was interrupted: %s", tableName, e.getMessage()))); } - static PException timeout(String tableName, int timeout, TimeoutException e) { + static PException timeout( + String metaList, String tableName, Request request, int timeout, TimeoutException e) { return new PException( new ReplicationException( error_code.error_types.ERR_TIMEOUT, String.format( - "[table=%s, timeout=%dms] Timeout on Future await: %s", - tableName, timeout, e.getMessage()))); + "[metaServer=%s, table=%s, request=%s, timeout=%dms] Timeout on Future await: %s", + metaList, tableName, request.toString(), timeout, e.getMessage()))); } private static String loadVersion() { diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index 99f0eb6f..a00e7c58 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -32,6 +32,7 @@ public class PegasusClient implements PegasusClientInterface { private final Properties config; private final ConcurrentHashMap tableMap; private final Object tableMapLock; + private final String[] metaList; private Cluster cluster; private static class PegasusHasher implements KeyHasher { @@ -88,6 +89,7 @@ public PegasusClient(Properties config) throws PException { this.cluster = Cluster.createCluster(config); this.tableMap = new ConcurrentHashMap(); this.tableMapLock = new Object(); + this.metaList = cluster.getMetaList(); this.enableWriteLimit = Boolean.parseBoolean( config.getProperty(PEGASUS_ENABLE_WRITE_LIMIT, PEGASUS_ENABLE_WRITE_LIMIT_DEF)); @@ -98,6 +100,10 @@ public boolean isWriteLimitEnabled() { return enableWriteLimit; } + String getMetaList() { + return Arrays.toString(metaList); + } + @Override public void finalize() { close(); diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index 4da5a98f..ecec2f9d 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -34,11 +34,13 @@ public class PegasusTable implements PegasusTableInterface { private Table table; private int defaultTimeout; private WriteLimiter writeLimiter; + private String metaList; public PegasusTable(PegasusClient client, Table table) { this.table = table; this.defaultTimeout = table.getDefaultTimeout(); this.writeLimiter = new WriteLimiter(client.isWriteLimitEnabled()); + this.metaList = client.getMetaList(); } @Override @@ -83,7 +85,7 @@ public Future asyncSortKeyCount(byte[] hashKey, int timeout) { public void onCompletion(client_operator clientOP) { rrdb_sortkey_count_operator op = (rrdb_sortkey_count_operator) clientOP; if (op.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicaException(promise, op, table, timeout); + handleReplicaException(new Request(hashKey), promise, op, table, timeout); } else if (op.get_response().error != 0) { promise.setFailure(new PException("rocksdb error: " + op.get_response().error)); } else { @@ -110,7 +112,7 @@ public Future asyncGet(byte[] hashKey, byte[] sortKey, int timeout /* ms public void onCompletion(client_operator clientOP) { rrdb_get_operator gop = (rrdb_get_operator) clientOP; if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicaException(promise, op, table, timeout); + handleReplicaException(new Request(hashKey, sortKey), promise, op, table, timeout); } else if (gop.get_response().error == 1) { // rocksdb::kNotFound promise.setSuccess(null); } else if (gop.get_response().error != 0) { @@ -160,7 +162,7 @@ public Future asyncSet( public void onCompletion(client_operator clientOP) { rrdb_put_operator gop = (rrdb_put_operator) clientOP; if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicaException(promise, op, table, timeout); + handleReplicaException(new Request(hashKey, sortKey), promise, op, table, timeout); } else if (gop.get_response().error != 0) { promise.setFailure(new PException("rocksdb error: " + gop.get_response().error)); } else { @@ -242,7 +244,8 @@ private Future asyncMultiGet( public void onCompletion(client_operator clientOP) { rrdb_multi_get_operator gop = (rrdb_multi_get_operator) clientOP; if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicaException(promise, op, table, timeout); + handleReplicaException( + new Request(hashKey, sortKeyBlobs.size()), promise, op, table, timeout); } else if (gop.get_response().error != 0 && gop.get_response().error != 7) { // rocksdb::Status::kOk && rocksdb::Status::kIncomplete promise.setFailure(new PException("rocksdb error: " + gop.get_response().error)); @@ -333,7 +336,8 @@ public Future asyncMultiGet( public void onCompletion(client_operator clientOP) { rrdb_multi_get_operator gop = (rrdb_multi_get_operator) clientOP; if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicaException(promise, op, table, timeout); + handleReplicaException( + new Request(hashKey, maxFetchCount), promise, op, table, timeout); } else if (gop.get_response().error != 0 && gop.get_response().error != 7) { // rocksdb::Status::kOk && rocksdb::Status::kIncomplete promise.setFailure(new PException("rocksdb error: " + gop.get_response().error)); @@ -456,7 +460,8 @@ public Future asyncMultiSet( public void onCompletion(client_operator clientOP) { rrdb_multi_put_operator op2 = (rrdb_multi_put_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicaException(promise, op, table, timeout); + handleReplicaException( + new Request(hashKey, values_blob.size()), promise, op, table, timeout); } else if (op2.get_response().error != 0) { promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); } else { @@ -490,7 +495,7 @@ public Future asyncDel(byte[] hashKey, byte[] sortKey, int timeout) { public void onCompletion(client_operator clientOP) { rrdb_remove_operator op2 = (rrdb_remove_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicaException(promise, op, table, timeout); + handleReplicaException(new Request(hashKey, sortKey), promise, op, table, timeout); } else if (op2.get_response().error != 0) { promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); } else { @@ -542,7 +547,8 @@ public Future asyncMultiDel(byte[] hashKey, final List sortKeys, i public void onCompletion(client_operator clientOP) { rrdb_multi_remove_operator op2 = (rrdb_multi_remove_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicaException(promise, op, table, timeout); + handleReplicaException( + new Request(hashKey, sortKeyBlobs.size()), promise, op, table, timeout); } else if (op2.get_response().error != 0) { promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); } else { @@ -579,7 +585,7 @@ public Future asyncIncr( public void onCompletion(client_operator clientOP) { rrdb_incr_operator op2 = (rrdb_incr_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicaException(promise, op, table, timeout); + handleReplicaException(new Request(hashKey, sortKey), promise, op, table, timeout); } else if (op2.get_response().error != 0) { promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); } else { @@ -668,7 +674,7 @@ public Future asyncCheckAndSet( public void onCompletion(client_operator clientOP) { rrdb_check_and_set_operator op2 = (rrdb_check_and_set_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicaException(promise, op, table, timeout); + handleReplicaException(new Request(hashKey, setSortKey), promise, op, table, timeout); } else if (op2.get_response().error != 0 && op2.get_response().error != 13) { // 13 : kTryAgain promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); @@ -759,7 +765,12 @@ public Future asyncCheckAndMutate( public void onCompletion(client_operator clientOP) { rrdb_check_and_mutate_operator op2 = (rrdb_check_and_mutate_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicaException(promise, op, table, timeout); + handleReplicaException( + new Request(hashKey, mutations.getMutations().size()), + promise, + op, + table, + timeout); } else if (op2.get_response().error != 0 && op2.get_response().error != 13) { // 13 : kTryAgain promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); @@ -847,7 +858,7 @@ public Future asyncCompareExchange( public void onCompletion(client_operator clientOP) { rrdb_check_and_set_operator op2 = (rrdb_check_and_set_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicaException(promise, op, table, timeout); + handleReplicaException(new Request(hashKey, sortKey), promise, op, table, timeout); } else if (op2.get_response().error != 0 && op2.get_response().error != 13) { // 13 : kTryAgain promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); @@ -888,7 +899,7 @@ public Future asyncTTL(byte[] hashKey, byte[] sortKey, int timeout) { public void onCompletion(client_operator clientOP) { rrdb_ttl_operator op2 = (rrdb_ttl_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicaException(promise, op, table, timeout); + handleReplicaException(new Request(hashKey, sortKey), promise, op, table, timeout); } else if (op2.get_response().error != 0 && op2.get_response().error != 1) { promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); } else { @@ -911,7 +922,8 @@ public boolean exist(byte[] hashKey, byte[] sortKey, int timeout) throws PExcept } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey, sortKey), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -925,7 +937,7 @@ public long sortKeyCount(byte[] hashKey, int timeout) throws PException { } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout(metaList, table.getTableName(), new Request(hashKey), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -939,7 +951,8 @@ public byte[] get(byte[] hashKey, byte[] sortKey, int timeout) throws PException } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey, sortKey), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -1012,13 +1025,15 @@ public MultiGetResult multiGet( byte[] hashKey, List sortKeys, int maxFetchCount, int maxFetchSize, int timeout) throws PException { if (timeout <= 0) timeout = defaultTimeout; + int count = sortKeys == null ? 0 : sortKeys.size(); try { return asyncMultiGet(hashKey, sortKeys, maxFetchCount, maxFetchSize, timeout) .get(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey, count), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -1028,12 +1043,14 @@ public MultiGetResult multiGet( public MultiGetResult multiGet(byte[] hashKey, List sortKeys, int timeout) throws PException { if (timeout <= 0) timeout = defaultTimeout; + int count = sortKeys == null ? 0 : sortKeys.size(); try { return asyncMultiGet(hashKey, sortKeys, timeout).get(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey, count), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -1057,7 +1074,8 @@ public MultiGetResult multiGet( } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey, maxFetchCount), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -1078,7 +1096,7 @@ public MultiGetResult multiGet( } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout(metaList, table.getTableName(), new Request(hashKey), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -1161,7 +1179,8 @@ public MultiGetSortKeysResult multiGetSortKeys( } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey, maxFetchCount), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -1175,7 +1194,7 @@ public MultiGetSortKeysResult multiGetSortKeys(byte[] hashKey, int timeout) thro } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout(metaList, table.getTableName(), new Request(hashKey), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -1190,7 +1209,8 @@ public void set(byte[] hashKey, byte[] sortKey, byte[] value, int ttlSeconds, in } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey, sortKey), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -1204,7 +1224,8 @@ public void set(byte[] hashKey, byte[] sortKey, byte[] value, int timeout) throw } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey, sortKey), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -1259,12 +1280,14 @@ public void multiSet( byte[] hashKey, List> values, int ttlSeconds, int timeout) throws PException { if (timeout <= 0) timeout = defaultTimeout; + int count = values == null ? 0 : values.size(); try { asyncMultiSet(hashKey, values, ttlSeconds, timeout).get(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey, count), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -1274,12 +1297,14 @@ public void multiSet( public void multiSet(byte[] hashKey, List> values, int timeout) throws PException { if (timeout <= 0) timeout = defaultTimeout; + int count = values == null ? 0 : values.size(); try { asyncMultiSet(hashKey, values, timeout).get(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey, count), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -1353,7 +1378,8 @@ public void del(byte[] hashKey, byte[] sortKey, int timeout) throws PException { } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey, sortKey), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -1412,12 +1438,14 @@ public int batchDel2(List> keys, List results, @Override public void multiDel(byte[] hashKey, List sortKeys, int timeout) throws PException { if (timeout <= 0) timeout = defaultTimeout; + int count = sortKeys == null ? 0 : sortKeys.size(); try { asyncMultiDel(hashKey, sortKeys, timeout).get(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey, count), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -1572,7 +1600,8 @@ public long incr(byte[] hashKey, byte[] sortKey, long increment, int ttlSeconds, } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey, sortKey), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -1586,7 +1615,8 @@ public long incr(byte[] hashKey, byte[] sortKey, long increment, int timeout) th } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey, sortKey), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -1618,7 +1648,8 @@ public CheckAndSetResult checkAndSet( } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey, setSortKey), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -1635,6 +1666,7 @@ public CheckAndMutateResult checkAndMutate( int timeout) throws PException { if (timeout <= 0) timeout = defaultTimeout; + int count = (mutations == null || mutations.isEmpty()) ? 1 : mutations.getMutations().size(); try { return asyncCheckAndMutate( hashKey, checkSortKey, checkType, checkOperand, mutations, options, timeout) @@ -1642,7 +1674,8 @@ public CheckAndMutateResult checkAndMutate( } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey, count), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -1665,7 +1698,8 @@ public CompareExchangeResult compareExchange( } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey, sortKey), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -1679,7 +1713,8 @@ public int ttl(byte[] hashKey, byte[] sortKey, int timeout) throws PException { } catch (InterruptedException e) { throw PException.threadInterrupted(table.getTableName(), e); } catch (TimeoutException e) { - throw PException.timeout(table.getTableName(), timeout, e); + throw PException.timeout( + metaList, table.getTableName(), new Request(hashKey, sortKey), timeout, e); } catch (ExecutionException e) { throw new PException(e); } @@ -1788,7 +1823,7 @@ public List getUnorderedScanners( } public void handleReplicaException( - DefaultPromise promise, client_operator op, Table table, int timeout) { + Request request, DefaultPromise promise, client_operator op, Table table, int timeout) { if (timeout <= 0) timeout = defaultTimeout; gpid gPid = op.get_gpid(); ReplicaConfiguration replicaConfiguration = @@ -1806,19 +1841,15 @@ public void handleReplicaException( String message = ""; String header = - "[table=" - + table.getTableName() - + ",operation=" - + op.name() - + ",replicaServer=" - + replicaServer - + ",gpid=(" - + gPid.toString() - + ")" - + ",timeout=" - + timeout - + "ms" - + "]"; + String.format( + "[metaServer=%s,table=%s,operation=%s,request=%s,replicaServer=%s,gpid=(%s),timeout=%dms]", + metaList, + table.getTableName(), + op.name(), + request.toString(), + replicaServer, + gPid.toString(), + timeout); switch (op.rpc_error.errno) { case ERR_SESSION_RESET: message = " Disconnected from the replica-server due to internal error!"; @@ -1835,6 +1866,8 @@ public void handleReplicaException( case ERR_INVALID_STATE: message = " The target replica is not primary!"; break; + case ERR_INVALID_DATA: + message = " The request maybe too large!"; } promise.setFailure( new PException(new ReplicationException(op.rpc_error.errno, header + message))); @@ -1843,4 +1876,45 @@ public void handleReplicaException( private void handleWriteLimiterException(DefaultPromise promise, String message) { promise.setFailure(new PException("Exceed write limit threshold:" + message)); } + + static class Request { + byte[] hashKey = null; + byte[] sortKey = null; + int sortKeyCount = 0; + + Request(byte[] hashKey) { + this.hashKey = hashKey; + } + + Request(byte[] hashKey, byte[] sortKey) { + this.hashKey = hashKey; + this.sortKey = sortKey; + } + + Request(byte[] hashKey, int sortKeyCount) { + this.hashKey = hashKey; + this.sortKeyCount = sortKeyCount; + } + + private String getSubstring(byte[] key) { + String keyStr = key == null ? "" : new String(key); + return keyStr.length() < 32 ? keyStr : keyStr.substring(0, 32); + } + + @Override + public String toString() { + if (sortKey != null) { + return String.format( + "[hashKey[:32]=\"%s\",sortKey[:32]=\"%s\"]", + getSubstring(hashKey), getSubstring(sortKey)); + } + + if (sortKeyCount > 0) { + return String.format( + "[hashKey[:32]=\"%s\",sortKeyCount=%d]", getSubstring(hashKey), sortKeyCount); + } + + return String.format("[hashKey[:32]=\"%s\"]", getSubstring(hashKey)); + } + } } diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java index 86047180..edcaa02d 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java @@ -9,6 +9,7 @@ import com.xiaomi.infra.pegasus.base.error_code; import com.xiaomi.infra.pegasus.base.error_code.error_types; import com.xiaomi.infra.pegasus.base.gpid; +import com.xiaomi.infra.pegasus.client.PegasusTable.Request; import com.xiaomi.infra.pegasus.operator.rrdb_put_operator; import com.xiaomi.infra.pegasus.rpc.ClusterOptions; import com.xiaomi.infra.pegasus.rpc.TableOptions; @@ -21,20 +22,26 @@ import org.junit.Test; public class TestPException { + private String metaList = "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603"; + private Request request = new Request("hashKey".getBytes(), "sortKey".getBytes()); + @Test public void testThreadInterrupted() throws Exception { PException ex = PException.threadInterrupted("test", new InterruptedException("intxxx")); - Assert.assertEquals( - "{version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_THREAD_INTERRUPTED: [table=test] Thread was interrupted: intxxx", - ex.getMessage()); + String exceptionInfo = + "{version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_THREAD_INTERRUPTED: [table=test] Thread was interrupted: intxxx"; + Assert.assertEquals(exceptionInfo, ex.getMessage()); } @Test public void testTimeout() throws Exception { - PException ex = PException.timeout("test", 1000, new TimeoutException("tmxxx")); - Assert.assertEquals( - "{version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_TIMEOUT: [table=test, timeout=1000ms] Timeout on Future await: tmxxx", - ex.getMessage()); + PException ex = + PException.timeout(metaList, "test", request, 1000, new TimeoutException("tmxxx")); + String exceptionInfo = + String.format( + "{version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_TIMEOUT: [metaServer=%s, table=test, request=%s, timeout=1000ms] Timeout on Future await: tmxxx", + metaList, request.toString()); + Assert.assertEquals(exceptionInfo, ex.getMessage()); } @Test @@ -63,7 +70,7 @@ public void testHandleReplicationException() throws Exception { int timeout = 1000; PegasusClient client = (PegasusClient) PegasusClientFactory.getSingletonClient(); PegasusTable pegasusTable = new PegasusTable(client, table); - pegasusTable.handleReplicaException(promise, op, table, timeout); + pegasusTable.handleReplicaException(request, promise, op, table, timeout); try { promise.get(); } catch (ExecutionException e) { @@ -73,8 +80,8 @@ public void testHandleReplicationException() throws Exception { String msg = String.format( - "com.xiaomi.infra.pegasus.client.PException: {version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_OBJECT_NOT_FOUND: [table=temp,operation=put,replicaServer=%s,gpid=(%s),timeout=%dms] The replica server doesn't serve this partition!", - server, gpid.toString(), timeout); + "com.xiaomi.infra.pegasus.client.PException: {version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_OBJECT_NOT_FOUND: [metaServer=%s,table=temp,operation=put,request=%s,replicaServer=%s,gpid=(%s),timeout=%dms] The replica server doesn't serve this partition!", + client.getMetaList(), request.toString(), server, gpid.toString(), timeout); Assert.assertEquals(e.getMessage(), msg); return; } catch (InterruptedException e) { @@ -98,7 +105,7 @@ public void testTimeOutIsZero() throws Exception { PegasusClient client = (PegasusClient) PegasusClientFactory.getSingletonClient(); PegasusTable pegasusTable = new PegasusTable(client, table); - pegasusTable.handleReplicaException(promise, op, table, 0); + pegasusTable.handleReplicaException(request, promise, op, table, 0); try { promise.get(); } catch (Exception e) { @@ -108,8 +115,8 @@ public void testTimeOutIsZero() throws Exception { String msg = String.format( - "com.xiaomi.infra.pegasus.client.PException: {version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_TIMEOUT: [table=temp,operation=put,replicaServer=%s,gpid=(%s),timeout=1000ms] The operation is timed out!", - server, gpid.toString()); + "com.xiaomi.infra.pegasus.client.PException: {version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_TIMEOUT: [metaServer=%s,table=temp,operation=put,request=%s,replicaServer=%s,gpid=(%s),timeout=1000ms] The operation is timed out!", + client.getMetaList(), request.toString(), server, gpid.toString()); Assert.assertEquals(e.getMessage(), msg); } }