Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race conditions in updating RemoteTaskStats #11044

Merged
merged 1 commit into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import static com.google.common.base.Strings.isNullOrEmpty;
Expand All @@ -51,7 +50,6 @@
import static java.util.Objects.requireNonNull;

class ContinuousTaskStatusFetcher
implements SimpleHttpResponseCallback<TaskStatus>
{
private static final Logger log = Logger.get(ContinuousTaskStatusFetcher.class);

Expand All @@ -67,8 +65,6 @@ class ContinuousTaskStatusFetcher
private final RequestErrorTracker errorTracker;
private final RemoteTaskStats stats;

private final AtomicLong currentRequestStartNanos = new AtomicLong();

@GuardedBy("this")
private boolean running;

Expand Down Expand Up @@ -154,61 +150,66 @@ private synchronized void scheduleNextRequest()

errorTracker.startRequest();
future = httpClient.executeAsync(request, createFullJsonResponseHandler(taskStatusCodec));
currentRequestStartNanos.set(System.nanoTime());
Futures.addCallback(future, new SimpleHttpResponseHandler<>(this, request.getUri(), stats), executor);
Futures.addCallback(future, new SimpleHttpResponseHandler<>(new TaskStatusResponseCallback(), request.getUri(), stats), executor);
}

TaskStatus getTaskStatus()
{
return taskStatus.get();
}

@Override
public void success(TaskStatus value)
private class TaskStatusResponseCallback
implements SimpleHttpResponseCallback<TaskStatus>
{
try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", taskId)) {
updateStats(currentRequestStartNanos.get());
try {
updateTaskStatus(value);
errorTracker.requestSucceeded();
}
finally {
scheduleNextRequest();
private final long requestStartNanos = System.nanoTime();

@Override
public void success(TaskStatus value)
{
try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", taskId)) {
updateStats(requestStartNanos);
try {
updateTaskStatus(value);
errorTracker.requestSucceeded();
}
finally {
scheduleNextRequest();
}
}
}
}

@Override
public void failed(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", taskId)) {
updateStats(currentRequestStartNanos.get());
try {
// if task not already done, record error
TaskStatus taskStatus = getTaskStatus();
if (!taskStatus.getState().isDone()) {
errorTracker.requestFailed(cause);
@Override
public void failed(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", taskId)) {
updateStats(requestStartNanos);
try {
// if task not already done, record error
TaskStatus taskStatus = getTaskStatus();
if (!taskStatus.getState().isDone()) {
errorTracker.requestFailed(cause);
}
}
catch (Error e) {
onFail.accept(e);
throw e;
}
catch (RuntimeException e) {
onFail.accept(e);
}
finally {
scheduleNextRequest();
}
}
catch (Error e) {
onFail.accept(e);
throw e;
}
catch (RuntimeException e) {
onFail.accept(e);
}
finally {
scheduleNextRequest();
}
}
}

@Override
public void fatal(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", taskId)) {
updateStats(currentRequestStartNanos.get());
onFail.accept(cause);
@Override
public void fatal(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", taskId)) {
updateStats(requestStartNanos);
onFail.accept(cause);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.net.URI;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
Expand All @@ -45,7 +44,6 @@
import static java.util.Objects.requireNonNull;

class DynamicFiltersFetcher
implements SimpleHttpResponseCallback<VersionedDynamicFilterDomains>
{
private final TaskId taskId;
private final URI taskUri;
Expand All @@ -57,7 +55,6 @@ class DynamicFiltersFetcher
private final RequestErrorTracker errorTracker;
private final RemoteTaskStats stats;
private final DynamicFilterService dynamicFilterService;
private final AtomicLong currentRequestStartNanos = new AtomicLong();

@GuardedBy("this")
private long dynamicFiltersVersion = INITIAL_DYNAMIC_FILTERS_VERSION;
Expand Down Expand Up @@ -158,52 +155,57 @@ private synchronized void fetchDynamicFiltersIfNecessary()

errorTracker.startRequest();
future = httpClient.executeAsync(request, createFullJsonResponseHandler(dynamicFilterDomainsCodec));
currentRequestStartNanos.set(System.nanoTime());
addCallback(future, new SimpleHttpResponseHandler<>(this, request.getUri(), stats), executor);
addCallback(future, new SimpleHttpResponseHandler<>(new DynamicFiltersResponseCallback(), request.getUri(), stats), executor);
}

@Override
public void success(VersionedDynamicFilterDomains newDynamicFilterDomains)
private class DynamicFiltersResponseCallback
implements SimpleHttpResponseCallback<VersionedDynamicFilterDomains>
{
try (SetThreadName ignored = new SetThreadName("DynamicFiltersFetcher-%s", taskId)) {
updateStats(currentRequestStartNanos.get());
try {
updateDynamicFilterDomains(newDynamicFilterDomains);
errorTracker.requestSucceeded();
}
finally {
fetchDynamicFiltersIfNecessary();
private final long requestStartNanos = System.nanoTime();

@Override
public void success(VersionedDynamicFilterDomains newDynamicFilterDomains)
{
try (SetThreadName ignored = new SetThreadName("DynamicFiltersFetcher-%s", taskId)) {
updateStats(requestStartNanos);
try {
updateDynamicFilterDomains(newDynamicFilterDomains);
errorTracker.requestSucceeded();
}
finally {
fetchDynamicFiltersIfNecessary();
}
}
}
}

@Override
public void failed(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("DynamicFiltersFetcher-%s", taskId)) {
updateStats(currentRequestStartNanos.get());
try {
errorTracker.requestFailed(cause);
}
catch (Error e) {
onFail.accept(e);
throw e;
}
catch (RuntimeException e) {
onFail.accept(e);
}
finally {
fetchDynamicFiltersIfNecessary();
@Override
public void failed(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("DynamicFiltersFetcher-%s", taskId)) {
updateStats(requestStartNanos);
try {
errorTracker.requestFailed(cause);
}
catch (Error e) {
onFail.accept(e);
throw e;
}
catch (RuntimeException e) {
onFail.accept(e);
}
finally {
fetchDynamicFiltersIfNecessary();
}
}
}
}

@Override
public void fatal(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("DynamicFiltersFetcher-%s", taskId)) {
updateStats(currentRequestStartNanos.get());
onFail.accept(cause);
@Override
public void fatal(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("DynamicFiltersFetcher-%s", taskId)) {
updateStats(requestStartNanos);
onFail.accept(cause);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class TaskInfoFetcher
implements SimpleHttpResponseCallback<TaskInfo>
{
private final TaskId taskId;
private final Consumer<Throwable> onFail;
Expand All @@ -67,10 +66,6 @@ public class TaskInfoFetcher
private final RequestErrorTracker errorTracker;

private final boolean summarizeTaskInfo;

@GuardedBy("this")
private final AtomicLong currentRequestStartNanos = new AtomicLong();

private final RemoteTaskStats stats;

@GuardedBy("this")
Expand Down Expand Up @@ -212,8 +207,7 @@ private synchronized void sendNextRequest()

errorTracker.startRequest();
future = httpClient.executeAsync(request, createFullJsonResponseHandler(taskInfoCodec));
currentRequestStartNanos.set(System.nanoTime());
Futures.addCallback(future, new SimpleHttpResponseHandler<>(this, request.getUri(), stats), executor);
Futures.addCallback(future, new SimpleHttpResponseHandler<>(new TaskInfoResponseCallback(), request.getUri(), stats), executor);
}

synchronized void updateTaskInfo(TaskInfo newTaskInfo)
Expand Down Expand Up @@ -247,49 +241,51 @@ synchronized void updateTaskInfo(TaskInfo newTaskInfo)
}
}

@Override
public void success(TaskInfo newValue)
private class TaskInfoResponseCallback
implements SimpleHttpResponseCallback<TaskInfo>
{
try (SetThreadName ignored = new SetThreadName("TaskInfoFetcher-%s", taskId)) {
lastUpdateNanos.set(System.nanoTime());
private final long requestStartNanos = System.nanoTime();

long startNanos;
synchronized (this) {
startNanos = this.currentRequestStartNanos.get();
@Override
public void success(TaskInfo newValue)
{
try (SetThreadName ignored = new SetThreadName("TaskInfoFetcher-%s", taskId)) {
lastUpdateNanos.set(System.nanoTime());

updateStats(requestStartNanos);
errorTracker.requestSucceeded();
updateTaskInfo(newValue);
}
updateStats(startNanos);
errorTracker.requestSucceeded();
updateTaskInfo(newValue);
}
}

@Override
public void failed(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("TaskInfoFetcher-%s", taskId)) {
lastUpdateNanos.set(System.nanoTime());

try {
// if task not already done, record error
if (!isDone(getTaskInfo())) {
errorTracker.requestFailed(cause);
@Override
public void failed(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("TaskInfoFetcher-%s", taskId)) {
lastUpdateNanos.set(System.nanoTime());

try {
// if task not already done, record error
if (!isDone(getTaskInfo())) {
errorTracker.requestFailed(cause);
}
}
catch (Error e) {
onFail.accept(e);
throw e;
}
catch (RuntimeException e) {
onFail.accept(e);
}
}
catch (Error e) {
onFail.accept(e);
throw e;
}
catch (RuntimeException e) {
onFail.accept(e);
}
}
}

@Override
public void fatal(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("TaskInfoFetcher-%s", taskId)) {
onFail.accept(cause);
@Override
public void fatal(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("TaskInfoFetcher-%s", taskId)) {
onFail.accept(cause);
}
}
}

Expand Down