Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

fix: set error to ERR_SESSION_RESET to trigger meta query while se… #54

Merged
merged 5 commits into from
Sep 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ log.txt
rolling_log/
.vscode/
google-java-format-*
pegasus-*
14 changes: 10 additions & 4 deletions scripts/travis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,23 @@ if [[ $(git status -s) ]]; then
exit 1
fi

PEGASUS_PKG="pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release"
PEGASUS_PKG_URL="https://github.com/XiaoMi/pegasus/releases/download/v1.11.6/pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release.tar.gz"

# start pegasus onebox environment
wget https://github.com/XiaoMi/pegasus/releases/download/v1.11.3/pegasus-1.11.3-b45cb06-linux-x86_64-release.zip
unzip pegasus-1.11.3-b45cb06-linux-x86_64-release.zip
cd pegasus-1.11.3-b45cb06-linux-x86_64-release
if [ ! -f $PEGASUS_PKG.tar.gz ]; then
wget $PEGASUS_PKG_URL
tar xvf $PEGASUS_PKG.tar.gz
fi
cd $PEGASUS_PKG

sed -i "s#https://github.com/xiaomi/pegasus-common/raw/master/zookeeper-3.4.6.tar.gz#https://github.com/XiaoMi/pegasus-common/releases/download/deps/zookeeper-3.4.6.tar.gz#" scripts/start_zk.sh
./run.sh start_onebox -w
cd ../

if ! mvn clean test
then
cd pegasus-1.11.3-b45cb06-linux-x86_64-release
cd $PEGASUS_PKG
./run.sh list_onebox
exit 1
fi
6 changes: 4 additions & 2 deletions src/main/java/com/xiaomi/infra/pegasus/client/PException.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
package com.xiaomi.infra.pegasus.client;

/**
* @author qinzuoyan
* <p>Pegasus exception.
* The generic type of exception thrown by all of the Pegasus APIs.
*
* <p>Common strategies of handling PException include retrying, or ignoring. We recommend you to
* log the exception for future debugging.
*/
public class PException extends Exception {
private static final long serialVersionUID = 4436491238550521203L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public ReplicationException(error_code.error_types t) {
}

public ReplicationException(error_code.error_types t, String message) {
super(t.name() + ": " + message);
super(t.name() + (message.isEmpty() ? "" : (": " + message)));
err_type = t;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;

/** Created by weijiesun on 17-9-13. */
public class ReplicaSession {
public static class RequestEntry {
public int sequenceId;
Expand Down Expand Up @@ -87,7 +86,7 @@ public int asyncSend(client_operator op, Runnable callbackFunc, long timeoutInMi
entry.callback = callbackFunc;
// NOTICE: must make sure the msg is put into the pendingResponse map BEFORE
// the timer task is scheduled.
pendingResponse.put(new Integer(entry.sequenceId), entry);
pendingResponse.put(entry.sequenceId, entry);
entry.timeoutTask = addTimer(entry.sequenceId, timeoutInMilliseconds);
entry.timeoutMs = timeoutInMilliseconds;

Expand Down Expand Up @@ -136,7 +135,7 @@ public void closeSession() {
}

public RequestEntry getAndRemoveEntry(int seqID) {
return pendingResponse.remove(new Integer(seqID));
return pendingResponse.remove(seqID);
}

public final String name() {
Expand Down Expand Up @@ -179,7 +178,7 @@ private void markSessionConnected(Channel activeChannel) {
synchronized (pendingSend) {
while (!pendingSend.isEmpty()) {
RequestEntry e = pendingSend.poll();
if (pendingResponse.get(new Integer(e.sequenceId)) != null) {
if (pendingResponse.get(e.sequenceId) != null) {
write(e, newCache);
} else {
logger.info("{}: {} is removed from pending, perhaps timeout", name(), e.sequenceId);
Expand Down Expand Up @@ -221,19 +220,17 @@ private void markSessionDisconnect() {
}
}

// Notify the RPC sender if failure occurred.
private void tryNotifyWithSequenceID(int seqID, error_types errno, boolean isTimeoutTask) {
logger.debug(
"{}: {} is notified with error {}, isTimeoutTask {}",
name(),
seqID,
errno.toString(),
isTimeoutTask);
RequestEntry entry = pendingResponse.remove(new Integer(seqID));
RequestEntry entry = pendingResponse.remove(seqID);
if (entry != null) {
if (!isTimeoutTask) entry.timeoutTask.cancel(true);
entry.op.rpc_error.errno = errno;
entry.callback.run();

if (errno == error_types.ERR_TIMEOUT) {
long firstTs = firstRecentTimedOutMs.get();
if (firstTs == 0) {
Expand All @@ -246,12 +243,15 @@ private void tryNotifyWithSequenceID(int seqID, error_types errno, boolean isTim
"{}: actively close the session because it's not responding for {} seconds",
name(),
sessionResetTimeWindowMs / 1000);
closeSession();
closeSession(); // maybe fail when the session is already disconnected.
errno = error_types.ERR_SESSION_RESET;
}
}
} else {
firstRecentTimedOutMs.set(0);
}
entry.op.rpc_error.errno = errno;
entry.callback.run();
} else {
logger.warn(
"{}: {} is removed by others, current error {}, isTimeoutTask {}",
Expand Down Expand Up @@ -284,6 +284,9 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception {
});
}

// Notify the RPC caller when times out. If the RPC finishes in time,
// this task will be cancelled.
// TODO(wutao1): call it addTimeoutTicker
private ScheduledFuture addTimer(final int seqID, long timeoutInMillseconds) {
return rpcGroup.schedule(
new Runnable() {
Expand Down Expand Up @@ -337,7 +340,7 @@ ConnState getState() {
}

interface MessageResponseFilter {
public boolean abandonIt(error_types err, TMessage header);
boolean abandonIt(error_types err, TMessage header);
}

MessageResponseFilter filter = null;
Expand Down