Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use serializable exception in GCP listeners #33657

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;

Expand All @@ -33,7 +34,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
Expand All @@ -53,7 +53,8 @@ public interface GlobalCheckpointListener {
* Callback when the global checkpoint is updated or the shard is closed. If the shard is closed, the value of the global checkpoint
* will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null and an
* instance of {@link IndexShardClosedException }. If the listener timed out waiting for notification then the exception will be
* non-null and an instance of {@link TimeoutException}. If the global checkpoint is updated, the exception will be null.
* non-null and an instance of {@link ElasticsearchTimeoutException}. If the global checkpoint is updated, the exception will be
* null.
*
* @param globalCheckpoint the updated global checkpoint
* @param e if non-null, the shard is closed or the listener timed out
Expand Down Expand Up @@ -96,8 +97,8 @@ public interface GlobalCheckpointListener {
* shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global
* checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated or the shard
* is closed. A listener must re-register after one of these events to receive subsequent events. Callers may add a timeout to be
* notified after if the timeout elapses. In this case, the listener will be notified with a {@link TimeoutException}. Passing null for
* the timeout means no timeout will be associated to the listener.
* notified after if the timeout elapses. In this case, the listener will be notified with a {@link ElasticsearchTimeoutException}.
* Passing null for the timeout means no timeout will be associated to the listener.
*
* @param currentGlobalCheckpoint the current global checkpoint known to the listener
* @param listener the listener
Expand Down Expand Up @@ -140,7 +141,7 @@ synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpoint
removed = listeners != null && listeners.remove(listener) != null;
}
if (removed) {
final TimeoutException e = new TimeoutException(timeout.getStringRep());
final ElasticsearchTimeoutException e = new ElasticsearchTimeoutException(timeout.getStringRep());
logger.trace("global checkpoint listener timed out", e);
executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, e));
}
Expand Down Expand Up @@ -225,7 +226,7 @@ private void notifyListener(final GlobalCheckpointListener listener, final long
} else if (e instanceof IndexShardClosedException) {
logger.warn("error notifying global checkpoint listener of closed shard", caught);
} else {
assert e instanceof TimeoutException : e;
assert e instanceof ElasticsearchTimeoutException : e;
logger.warn("error notifying global checkpoint listener of timeout", caught);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
Expand All @@ -42,7 +43,6 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -512,10 +512,11 @@ public void testTimeout() throws InterruptedException {
try {
notified.set(true);
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertThat(e, instanceOf(TimeoutException.class));
assertThat(e, instanceOf(ElasticsearchTimeoutException.class));
assertThat(e, hasToString(containsString(timeout.getStringRep())));
final ArgumentCaptor<String> message = ArgumentCaptor.forClass(String.class);
final ArgumentCaptor<TimeoutException> t = ArgumentCaptor.forClass(TimeoutException.class);
final ArgumentCaptor<ElasticsearchTimeoutException> t =
ArgumentCaptor.forClass(ElasticsearchTimeoutException.class);
verify(mockLogger).trace(message.capture(), t.capture());
assertThat(message.getValue(), equalTo("global checkpoint listener timed out"));
assertThat(t.getValue(), hasToString(containsString(timeout.getStringRep())));
Expand Down Expand Up @@ -547,7 +548,7 @@ public void testTimeoutNotificationUsesExecutor() throws InterruptedException {
try {
notified.set(true);
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertThat(e, instanceOf(TimeoutException.class));
assertThat(e, instanceOf(ElasticsearchTimeoutException.class));
} finally {
latch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.index.shard;

import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -89,7 +90,6 @@
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -798,7 +798,7 @@ public void testGlobalCheckpointListenerTimeout() throws InterruptedException {
notified.set(true);
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
assertNotNull(e);
assertThat(e, instanceOf(TimeoutException.class));
assertThat(e, instanceOf(ElasticsearchTimeoutException.class));
assertThat(e.getMessage(), equalTo(timeout.getStringRep()));
} finally {
latch.countDown();
Expand Down