Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

refactor: bound minimal value for socket connect timeout #74

Merged
merged 6 commits into from
Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1774,13 +1774,16 @@ public void handleReplicaException(
+ ",gpid=("
+ gPid.toString()
+ ")"
+ ",timeout="
+ timeout
+ "ms"
+ "]";
switch (op.rpc_error.errno) {
case ERR_SESSION_RESET:
message = " Disconnected from the replica-server due to internal error!";
break;
case ERR_TIMEOUT:
message = " The operation timeout is " + timeout + "ms!";
message = " The operation is timed out!";
break;
case ERR_OBJECT_NOT_FOUND:
message = " The replica server doesn't serve this partition!";
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import java.util.Properties;

public abstract class Cluster {
public static final int MIN_SOCK_CONNECT_TIMEOUT = 1000;

public static final String PEGASUS_META_SERVERS_KEY = "meta_servers";

public static final String PEGASUS_OPERATION_TIMEOUT_KEY = "operation_timeout";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.rpc.async;

import static java.lang.Math.max;

import com.xiaomi.infra.pegasus.base.rpc_address;
import com.xiaomi.infra.pegasus.metrics.MetricsManager;
import com.xiaomi.infra.pegasus.rpc.Cluster;
Expand Down Expand Up @@ -83,7 +85,9 @@ public ReplicaSession getReplicaSession(rpc_address address) {
synchronized (this) {
ss = replicaSessions.get(address);
if (ss != null) return ss;
ss = new ReplicaSession(address, replicaGroup, operationTimeout);
ss =
new ReplicaSession(
address, replicaGroup, max(operationTimeout, Cluster.MIN_SOCK_CONNECT_TIMEOUT));
replicaSessions.put(address, ss);
return ss;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ public void testHandleReplicationException() throws Exception {
op.rpc_error.errno = error_code.error_types.ERR_OBJECT_NOT_FOUND;

// set failure in promise, the exception is thrown as ExecutionException.
int timeout = 1000;
PegasusTable pegasusTable = new PegasusTable(null, table);
pegasusTable.handleReplicaException(promise, op, table, 1000);
pegasusTable.handleReplicaException(promise, op, table, timeout);
try {
promise.get();
} catch (ExecutionException e) {
Expand All @@ -69,8 +70,8 @@ public void testHandleReplicationException() throws Exception {

String msg =
String.format(
"com.xiaomi.infra.pegasus.client.PException: {version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_OBJECT_NOT_FOUND: [table=temp,operation=put,replicaServer=%s,gpid=(%s)] The replica server doesn't serve this partition!",
server, gpid.toString());
"com.xiaomi.infra.pegasus.client.PException: {version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_OBJECT_NOT_FOUND: [table=temp,operation=put,replicaServer=%s,gpid=(%s),timeout=%dms] The replica server doesn't serve this partition!",
server, gpid.toString(), timeout);
Assert.assertEquals(e.getMessage(), msg);
return;
} catch (InterruptedException e) {
Expand Down Expand Up @@ -102,7 +103,7 @@ public void testTimeOutIsZero() throws Exception {

String msg =
String.format(
"com.xiaomi.infra.pegasus.client.PException: {version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_TIMEOUT: [table=temp,operation=put,replicaServer=%s,gpid=(%s)] The operation timeout is 1000ms!",
"com.xiaomi.infra.pegasus.client.PException: {version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_TIMEOUT: [table=temp,operation=put,replicaServer=%s,gpid=(%s),timeout=1000ms] The operation is timed out!",
server, gpid.toString());
Assert.assertEquals(e.getMessage(), msg);
}
Expand Down