Skip to content

Commit

Permalink
Parallelize supervisor stop logic to make it run faster (apache#17535)
Browse files Browse the repository at this point in the history
- Add new method `Supervisor.stopAsync`
- Implement `SeekableStreamSupervisor.stopAsync()` to use a shutdown executor
- Call `stopAsync` from `SupervisorManager`
  • Loading branch information
georgew5656 authored Dec 18, 2024
1 parent a44ab10 commit 9ff1173
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Tasks;
Expand All @@ -39,10 +41,12 @@
import org.apache.druid.segment.incremental.ParseExceptionReport;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;

/**
* Manages the creation and lifetime of {@link Supervisor}.
Expand Down Expand Up @@ -212,11 +216,12 @@ public void start()
public void stop()
{
Preconditions.checkState(started, "SupervisorManager not started");

List<ListenableFuture<Void>> stopFutures = new ArrayList<>();
synchronized (lock) {
log.info("Stopping [%d] supervisors", supervisors.keySet().size());
for (String id : supervisors.keySet()) {
try {
supervisors.get(id).lhs.stop(false);
stopFutures.add(supervisors.get(id).lhs.stopAsync());
SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
if (autoscaler != null) {
autoscaler.stop();
Expand All @@ -226,6 +231,18 @@ public void stop()
log.warn(e, "Caught exception while stopping supervisor [%s]", id);
}
}
log.info("Waiting for [%d] supervisors to shutdown", stopFutures.size());
try {
FutureUtils.coalesce(stopFutures).get();
}
catch (Exception e) {
log.warn(
e,
"Stopped [%d] out of [%d] supervisors. Remaining supervisors will be killed.",
stopFutures.stream().filter(Future::isDone).count(),
stopFutures.size()
);
}
supervisors.clear();
autoscalers.clear();
started = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap;
Expand Down Expand Up @@ -1103,6 +1104,19 @@ public void stop(boolean stopGracefully)
}
}

@Override
public ListenableFuture<Void> stopAsync()
{
ListeningExecutorService shutdownExec = MoreExecutors.listeningDecorator(
Execs.singleThreaded("supervisor-shutdown-" + StringUtils.encodeForFormat(supervisorId) + "--%d")
);
return shutdownExec.submit(() -> {
stop(false);
shutdownExec.shutdown();
return null;
});
}

@Override
public void reset(@Nullable final DataSourceMetadata dataSourceMetadata)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexing.common.TaskLockType;
Expand Down Expand Up @@ -130,7 +131,9 @@ public void testCreateUpdateAndRemoveSupervisor()
verifyAll();

resetAll();
supervisor3.stop(false);
SettableFuture<Void> stopFuture = SettableFuture.create();
stopFuture.set(null);
EasyMock.expect(supervisor3.stopAsync()).andReturn(stopFuture);
replayAll();

manager.stop();
Expand Down Expand Up @@ -361,7 +364,7 @@ public void testStopThrowsException()

EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
supervisor1.stop(false);
supervisor1.stopAsync();
EasyMock.expectLastCall().andThrow(new RuntimeException("RTE"));
replayAll();

Expand Down Expand Up @@ -511,7 +514,9 @@ public void testCreateSuspendResumeAndStopSupervisor()

// mock manager shutdown to ensure supervisor 3 stops
resetAll();
supervisor3.stop(false);
SettableFuture<Void> stopFuture = SettableFuture.create();
stopFuture.set(null);
EasyMock.expect(supervisor3.stopAsync()).andReturn(stopFuture);
replayAll();

manager.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,28 @@ public void testStopping()
verifyAll();
}

@Test
public void testStopGracefully() throws Exception
{
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();

taskRunner.unregisterListener("testSupervisorId");
indexTaskClient.close();
recordSupplier.close();

replayAll();
SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

supervisor.start();
supervisor.runInternal();
ListenableFuture<Void> stopFuture = supervisor.stopAsync();
stopFuture.get();
verifyAll();
}

@Test
public void testStoppingGracefully()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.segment.incremental.ParseExceptionReport;

Expand All @@ -44,6 +46,22 @@ public interface Supervisor
*/
void stop(boolean stopGracefully);

/**
* Starts non-graceful shutdown of the supervisor and returns a future that completes when shutdown is complete.
*/
default ListenableFuture<Void> stopAsync()
{
SettableFuture<Void> stopFuture = SettableFuture.create();
try {
stop(false);
stopFuture.set(null);
}
catch (Exception e) {
stopFuture.setException(e);
}
return stopFuture;
}

SupervisorReport getStatus();

SupervisorStateManager.State getState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.junit.Test;

import javax.annotation.Nullable;
import java.util.concurrent.Future;

public class StreamSupervisorTest
{
Expand Down Expand Up @@ -100,4 +101,74 @@ public int getActiveTaskGroupsCount()
ex.getMessage()
);
}

@Test
public void testDefaultStopAsync()
{
// Create an instance of stream supervisor without overriding stopAsync().
final StreamSupervisor streamSupervisor = new StreamSupervisor()
{
private SupervisorStateManager.State state = SupervisorStateManager.BasicState.RUNNING;

@Override
public void start()
{

}

@Override
public void stop(boolean stopGracefully)
{
state = SupervisorStateManager.BasicState.STOPPING;
}

@Override
public SupervisorReport getStatus()
{
return null;
}

@Override
public SupervisorStateManager.State getState()
{
return state;
}

@Override
public void reset(@Nullable DataSourceMetadata dataSourceMetadata)
{

}

@Override
public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
{

}

@Override
public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
{

}

@Override
public LagStats computeLagStats()
{
return null;
}

@Override
public int getActiveTaskGroupsCount()
{
return 0;
}
};

Future<Void> stopAsyncFuture = streamSupervisor.stopAsync();
Assert.assertTrue(stopAsyncFuture.isDone());

// stop should be called by stopAsync
Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, streamSupervisor.getState());
}
}

0 comments on commit 9ff1173

Please sign in to comment.