Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reintroduce slot supplier & add many tests #2143

Merged
merged 28 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b45d063
Revert "Revert configurable slot provider (#2134)"
Sushisource Jul 9, 2024
61a6523
Add tests for worker slots
Quinn-With-Two-Ns Jul 4, 2024
df96e8a
Fix new tests by publishing metrics on start & setting permit properl…
Sushisource Jul 9, 2024
50c0ac5
Add tests for small counts / hitting failed slot acquisition
Sushisource Jul 10, 2024
11cbb4b
More tests
Sushisource Jul 10, 2024
f06f231
Add counting slot supplier for tests
Sushisource Jul 10, 2024
39b419f
License headers
Sushisource Jul 10, 2024
cc81648
Ensure markUsed is called on eager activities too
Sushisource Jul 10, 2024
198d23e
Avoid having to set metric scope on tracking supplier after construction
Sushisource Jul 11, 2024
d0006b1
Don't emit available metric unless max slots is sensible
Sushisource Jul 11, 2024
b953a32
Short WFT timeout was unnecessary & could cause opposite problem of m…
Sushisource Jul 12, 2024
5a1b47c
Provide correct release reason in case of handler errors
Sushisource Jul 12, 2024
26fdcb4
Fix flaky hang in test
Sushisource Jul 12, 2024
5064110
Add used metric
Sushisource Jul 17, 2024
d9e785d
Add local activities to resource tuner test
Sushisource Jul 17, 2024
d1ceb94
Throw if permit is not set when marking used
Sushisource Jul 17, 2024
e411584
Add timeouts to interface
Sushisource Jul 17, 2024
c1a9481
Change exception type
Sushisource Jul 17, 2024
b8f79f1
Fix problem with schedule-to-start LA timeouts
Sushisource Jul 17, 2024
bd58c11
Verify slots aren't exceeded without custom tuner
Sushisource Jul 17, 2024
d9d6a59
Change interface to overload tryReserveSlot
Sushisource Jul 17, 2024
f97b2da
Deal with possibility of calling markUsed multiple times
Sushisource Jul 18, 2024
60f4ab0
More closely mimic old behavior w/ backpressure
Sushisource Jul 18, 2024
1af5890
Change double-mark-used check / license header
Sushisource Jul 18, 2024
412b196
add experimental tag to new metric
Sushisource Jul 18, 2024
061d066
Move releasing to handle
Sushisource Jul 19, 2024
93ed3fd
A few last bits of review feedback
Sushisource Jul 22, 2024
0f8e0b6
Make sure first poll doesn't go through until after initial slot check
Sushisource Jul 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ jobs:
USE_DOCKER_SERVICE: false
run: ./gradlew --no-daemon test -x checkLicenseMain -x checkLicenses -x spotlessCheck -x spotlessApply -x spotlessJava -P edgeDepsTest

- name: Run independent resource tuner test
env:
USER: unittest
USE_DOCKER_SERVICE: false
run: ./gradlew --no-daemon temporal-sdk:testResourceIndependent -x checkLicenseMain -x checkLicenses -x spotlessCheck -x spotlessApply -x spotlessJava -P edgeDepsTest

- name: Publish Test Report
uses: mikepenz/action-junit-report@v4
if: success() || failure() # always run even if the previous step fails
Expand Down
13 changes: 13 additions & 0 deletions temporal-sdk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,16 @@ task registerNamespace(type: JavaExec) {
}

test.dependsOn 'registerNamespace'

test {
useJUnit {
excludeCategories 'io.temporal.worker.IndependentResourceBasedTests'
}
}

task testResourceIndependent(type: Test) {
useJUnit {
includeCategories 'io.temporal.worker.IndependentResourceBasedTests'
maxParallelForks = 1
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.activity;

import io.temporal.activity.ActivityInfo;
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponseOrBuilder;

public class ActivityPollResponseToInfo {
public static ActivityInfo toActivityInfoImpl(
PollActivityTaskQueueResponseOrBuilder response,
String namespace,
String activityTaskQueue,
boolean local) {
return new ActivityInfoImpl(response, namespace, activityTaskQueue, local, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.MetricsType;
import io.temporal.worker.tuning.*;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -45,7 +45,7 @@ final class ActivityPollTask implements Poller.PollTask<ActivityTask> {
private static final Logger log = LoggerFactory.getLogger(ActivityPollTask.class);

private final WorkflowServiceStubs service;
private final Semaphore pollSemaphore;
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
private final Scope metricsScope;
private final PollActivityTaskQueueRequest pollRequest;

Expand All @@ -57,11 +57,11 @@ public ActivityPollTask(
@Nullable String buildId,
boolean useBuildIdForVersioning,
double activitiesPerSecond,
Semaphore pollSemaphore,
@Nonnull TrackingSlotSupplier<ActivitySlotInfo> slotSupplier,
@Nonnull Scope metricsScope,
@Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities) {
this.service = Objects.requireNonNull(service);
this.pollSemaphore = pollSemaphore;
this.slotSupplier = slotSupplier;
this.metricsScope = Objects.requireNonNull(metricsScope);

PollActivityTaskQueueRequest.Builder pollRequest =
Expand Down Expand Up @@ -92,13 +92,22 @@ public ActivityTask poll() {
log.trace("poll request begin: " + pollRequest);
}
PollActivityTaskQueueResponse response;
SlotPermit permit;
boolean isSuccessful = false;

try {
pollSemaphore.acquire();
permit =
slotSupplier.reserveSlot(
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} catch (Exception e) {
log.warn("Error while trying to reserve a slot for an activity", e.getCause());
return null;
}

try {
Expand All @@ -118,9 +127,12 @@ public ActivityTask poll() {
ProtobufTimeUtils.toM3Duration(
response.getStartedTime(), response.getCurrentAttemptScheduledTime()));
isSuccessful = true;
return new ActivityTask(response, pollSemaphore::release);
return new ActivityTask(
response,
permit,
() -> slotSupplier.releaseSlot(SlotReleaseReason.taskComplete(), permit));
} finally {
if (!isSuccessful) pollSemaphore.release();
if (!isSuccessful) slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@
package io.temporal.internal.worker;

import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponseOrBuilder;
import io.temporal.worker.tuning.SlotPermit;
import io.temporal.workflow.Functions;
import javax.annotation.Nonnull;

public final class ActivityTask {
private final @Nonnull PollActivityTaskQueueResponseOrBuilder response;
private final @Nonnull SlotPermit permit;
private final @Nonnull Functions.Proc completionCallback;

public ActivityTask(
@Nonnull PollActivityTaskQueueResponseOrBuilder response,
@Nonnull SlotPermit permit,
@Nonnull Functions.Proc completionCallback) {
this.response = response;
this.permit = permit;
this.completionCallback = completionCallback;
}

Expand All @@ -48,4 +52,9 @@ public PollActivityTaskQueueResponseOrBuilder getResponse() {
public Functions.Proc getCompletionCallback() {
return completionCallback;
}

@Nonnull
public SlotPermit getPermit() {
return permit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributesOrBuilder;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.workflowservice.v1.*;
import io.temporal.internal.activity.ActivityPollResponseToInfo;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.logging.LoggerTag;
import io.temporal.internal.retryer.GrpcRetryer;
Expand All @@ -39,9 +40,10 @@
import io.temporal.serviceclient.rpcretry.DefaultStubServiceOperationRpcRetryOptions;
import io.temporal.worker.MetricsType;
import io.temporal.worker.WorkerMetricsTag;
import io.temporal.worker.tuning.*;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
Expand All @@ -64,16 +66,16 @@ final class ActivityWorker implements SuspendableWorker {
private final Scope workerMetricsScope;
private final GrpcRetryer grpcRetryer;
private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
private final int executorSlots;
private final Semaphore executorSlotsSemaphore;
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;

public ActivityWorker(
@Nonnull WorkflowServiceStubs service,
@Nonnull String namespace,
@Nonnull String taskQueue,
double taskQueueActivitiesPerSecond,
@Nonnull SingleWorkerOptions options,
@Nonnull ActivityTaskHandler handler) {
@Nonnull ActivityTaskHandler handler,
@Nonnull SlotSupplier<ActivitySlotInfo> slotSupplier) {
this.service = Objects.requireNonNull(service);
this.namespace = Objects.requireNonNull(namespace);
this.taskQueue = Objects.requireNonNull(taskQueue);
Expand All @@ -87,8 +89,8 @@ public ActivityWorker(
this.replyGrpcRetryerOptions =
new GrpcRetryer.GrpcRetryerOptions(
DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);
this.executorSlots = options.getTaskExecutorThreadPoolSize();
this.executorSlotsSemaphore = new Semaphore(executorSlots);

this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
}

@Override
Expand All @@ -101,8 +103,7 @@ public boolean start() {
options.getIdentity(),
new TaskHandlerImpl(handler),
pollerOptions,
options.getTaskExecutorThreadPoolSize(),
workerMetricsScope,
slotSupplier.maximumSlots().orElse(Integer.MAX_VALUE),
true);
poller =
new Poller<>(
Expand All @@ -115,7 +116,7 @@ public boolean start() {
options.getBuildId(),
options.isUsingBuildIdForVersioning(),
taskQueueActivitiesPerSecond,
executorSlotsSemaphore,
this.slotSupplier,
workerMetricsScope,
service.getServerCapabilities()),
this.pollTaskExecutor,
Expand All @@ -131,14 +132,14 @@ public boolean start() {

@Override
public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
String semaphoreName = this + "#executorSlotsSemaphore";
String supplierName = this + "#executorSlots";
return poller
.shutdown(shutdownManager, interruptTasks)
.thenCompose(
ignore ->
!interruptTasks
? shutdownManager.waitForSemaphorePermitsReleaseUntimed(
executorSlotsSemaphore, executorSlots, semaphoreName)
? shutdownManager.waitForSupplierPermitsReleasedUnlimited(
slotSupplier, supplierName)
: CompletableFuture.completedFuture(null))
.thenCompose(
ignore ->
Expand Down Expand Up @@ -224,6 +225,15 @@ private TaskHandlerImpl(ActivityTaskHandler handler) {
@Override
public void handle(ActivityTask task) throws Exception {
PollActivityTaskQueueResponseOrBuilder pollResponse = task.getResponse();

slotSupplier.markSlotUsed(
new ActivitySlotInfo(
ActivityPollResponseToInfo.toActivityInfoImpl(
pollResponse, namespace, taskQueue, false),
options.getIdentity(),
options.getBuildId()),
task.getPermit());

Scope metricsScope =
workerMetricsScope.tagged(
ImmutableMap.of(
Expand Down Expand Up @@ -416,23 +426,34 @@ private void logExceptionDuringResultReporting(

private final class EagerActivityDispatcherImpl implements EagerActivityDispatcher {
@Override
public boolean tryReserveActivitySlot(
public Optional<SlotPermit> tryReserveActivitySlot(
ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes) {
return WorkerLifecycleState.ACTIVE.equals(ActivityWorker.this.getLifecycleState())
&& Objects.equals(
commandAttributes.getTaskQueue().getName(), ActivityWorker.this.taskQueue)
&& ActivityWorker.this.executorSlotsSemaphore.tryAcquire();
if (!WorkerLifecycleState.ACTIVE.equals(ActivityWorker.this.getLifecycleState())
|| !Objects.equals(
commandAttributes.getTaskQueue().getName(), ActivityWorker.this.taskQueue)) {
return Optional.empty();
}
return ActivityWorker.this.slotSupplier.tryReserveSlot(
new SlotReservationData(
ActivityWorker.this.taskQueue, options.getIdentity(), options.getBuildId()));
}

@Override
public void releaseActivitySlotReservations(int slotCounts) {
ActivityWorker.this.executorSlotsSemaphore.release(slotCounts);
public void releaseActivitySlotReservations(Iterable<SlotPermit> permits) {
for (SlotPermit permit : permits) {
ActivityWorker.this.slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
}
}

@Override
public void dispatchActivity(PollActivityTaskQueueResponse activity) {
public void dispatchActivity(PollActivityTaskQueueResponse activity, SlotPermit permit) {
ActivityWorker.this.pollTaskExecutor.process(
new ActivityTask(activity, ActivityWorker.this.executorSlotsSemaphore::release));
new ActivityTask(
activity,
permit,
() ->
ActivityWorker.this.slotSupplier.releaseSlot(
SlotReleaseReason.taskComplete(), permit)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,33 @@

import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributesOrBuilder;
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
import io.temporal.worker.tuning.SlotPermit;
import java.util.Optional;

public interface EagerActivityDispatcher {
boolean tryReserveActivitySlot(ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes);
Optional<SlotPermit> tryReserveActivitySlot(
ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes);

void releaseActivitySlotReservations(int slotCounts);
void releaseActivitySlotReservations(Iterable<SlotPermit> permits);

void dispatchActivity(PollActivityTaskQueueResponse activity);
void dispatchActivity(PollActivityTaskQueueResponse activity, SlotPermit permit);

class NoopEagerActivityDispatcher implements EagerActivityDispatcher {
@Override
public boolean tryReserveActivitySlot(
public Optional<SlotPermit> tryReserveActivitySlot(
ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes) {
return false;
return Optional.empty();
}

@Override
public void releaseActivitySlotReservations(int slotCounts) {
if (slotCounts > 0)
public void releaseActivitySlotReservations(Iterable<SlotPermit> permits) {
if (permits.iterator().hasNext())
throw new IllegalStateException(
"Trying to release activity slots on a NoopEagerActivityDispatcher");
}

@Override
public void dispatchActivity(PollActivityTaskQueueResponse activity) {
public void dispatchActivity(PollActivityTaskQueueResponse activity, SlotPermit permit) {
throw new IllegalStateException(
"Trying to dispatch activity on a NoopEagerActivityDispatcher");
}
Expand Down
Loading
Loading