Skip to content

Commit

Permalink
Change interface to overload tryReserveSlot
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Jul 17, 2024
1 parent edebcc7 commit 765dfca
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.temporal.worker.MetricsType;
import io.temporal.worker.tuning.*;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -102,9 +101,7 @@ public ActivityTask poll() {
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()),
Long.MAX_VALUE,
TimeUnit.MILLISECONDS);
pollRequest.getWorkerVersionCapabilities().getBuildId()));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,14 @@ private boolean submitANewExecution(
SlotReservationData reservationCtx =
new SlotReservationData(taskQueue, options.getIdentity(), options.getBuildId());
if (acceptanceTimeoutMs <= 0) {
permit = slotSupplier.reserveSlot(reservationCtx, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
permit = slotSupplier.reserveSlot(reservationCtx);
} else {
try {
permit =
slotSupplier.reserveSlot(
reservationCtx, acceptanceTimeoutMs, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
Optional<SlotPermit> maybePermit =
slotSupplier.tryReserveSlot(
reservationCtx, acceptanceTimeoutMs, TimeUnit.MILLISECONDS);
if (maybePermit.isPresent()) {
permit = maybePermit.get();
} else {
// In the event that we timed out waiting for a permit *because of schedule to start* we
// still want to proceed with the "attempt" with a null permit, which will then
// immediately fail with the s2s timeout.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ public TrackingSlotSupplier(SlotSupplier<SI> inner, Scope metricsScope) {
publishSlotsMetric();
}

public SlotPermit reserveSlot(SlotReservationData dat, long timeout, TimeUnit timeUnit)
public SlotPermit reserveSlot(SlotReservationData dat)
throws InterruptedException, TimeoutException {
SlotPermit p = inner.reserveSlot(createCtx(dat), timeout, timeUnit);
SlotPermit p = inner.reserveSlot(createCtx(dat));
issuedSlots.incrementAndGet();
return p;
}
Expand All @@ -65,6 +65,15 @@ public Optional<SlotPermit> tryReserveSlot(SlotReservationData dat) {
return p;
}

public Optional<SlotPermit> tryReserveSlot(
SlotReservationData dat, long timeout, TimeUnit timeUnit) throws InterruptedException {
Optional<SlotPermit> p = inner.tryReserveSlot(createCtx(dat), timeout, timeUnit);
if (p.isPresent()) {
issuedSlots.incrementAndGet();
}
return p;
}

public void markSlotUsed(SI slotInfo, SlotPermit permit) {
if (permit == null) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import io.temporal.worker.MetricsType;
import io.temporal.worker.tuning.*;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -128,9 +127,7 @@ public WorkflowTask poll() {
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()),
Long.MAX_VALUE,
TimeUnit.MILLISECONDS);
pollRequest.getWorkerVersionCapabilities().getBuildId()));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,9 @@ public FixedSizeSlotSupplier(int numSlots) {
}

@Override
public SlotPermit reserveSlot(SlotReserveContext<SI> ctx, long timeout, TimeUnit timeUnit)
throws InterruptedException, TimeoutException {
if (executorSlotsSemaphore.tryAcquire(timeout, timeUnit)) {
return new SlotPermit();
}
throw new TimeoutException("Timed out waiting for a slot to become available");
public SlotPermit reserveSlot(SlotReserveContext<SI> ctx) throws InterruptedException {
executorSlotsSemaphore.acquire();
return new SlotPermit();
}

@Override
Expand All @@ -58,6 +55,20 @@ public Optional<SlotPermit> tryReserveSlot(SlotReserveContext<SI> ctx) {
return Optional.empty();
}

@Override
public Optional<SlotPermit> tryReserveSlot(
SlotReserveContext<SI> ctx, long timeout, TimeUnit timeUnit) {
try {
boolean gotOne = executorSlotsSemaphore.tryAcquire(timeout, timeUnit);
if (gotOne) {
return Optional.of(new SlotPermit());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return Optional.empty();
}

@Override
public void markSlotUsed(SlotMarkUsedContext<SI> ctx) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/** Implements a {@link SlotSupplier} based on resource usage for a particular slot type. */
@Experimental
Expand Down Expand Up @@ -113,12 +112,31 @@ private ResourceBasedSlotSupplier(
}

@Override
public SlotPermit reserveSlot(SlotReserveContext<SI> ctx, long timeout, TimeUnit timeUnit)
throws InterruptedException, TimeoutException {
public SlotPermit reserveSlot(SlotReserveContext<SI> ctx) throws InterruptedException {
return tryReserveSlot(ctx, Long.MAX_VALUE, TimeUnit.MILLISECONDS)
.orElseThrow(() -> new IllegalStateException("Waited infinite time to reserve a slot"));
}

@Override
public Optional<SlotPermit> tryReserveSlot(SlotReserveContext<SI> ctx) {
int numIssued = ctx.getNumIssuedSlots();
if (numIssued < options.getMinimumSlots()
|| (timeSinceLastSlotIssued().compareTo(options.getRampThrottle()) > 0
&& numIssued < options.getMaximumSlots()
&& resourceController.pidDecision())) {
lastSlotIssuedAt = Instant.now();
return Optional.of(new SlotPermit());
}
return Optional.empty();
}

@Override
public Optional<SlotPermit> tryReserveSlot(
SlotReserveContext<SI> ctx, long timeout, TimeUnit timeUnit) throws InterruptedException {
Instant started = Instant.now();
while (started.plusMillis(timeUnit.toMillis(timeout)).isAfter(Instant.now())) {
if (ctx.getNumIssuedSlots() < options.getMinimumSlots()) {
return new SlotPermit();
return Optional.of(new SlotPermit());
} else {
Duration mustWaitFor;
try {
Expand All @@ -132,25 +150,12 @@ public SlotPermit reserveSlot(SlotReserveContext<SI> ctx, long timeout, TimeUnit

Optional<SlotPermit> permit = tryReserveSlot(ctx);
if (permit.isPresent()) {
return permit.get();
return permit;
} else {
Thread.sleep(10);
}
}
}
throw new TimeoutException("Timed out waiting for a slot to become available");
}

@Override
public Optional<SlotPermit> tryReserveSlot(SlotReserveContext<SI> ctx) {
int numIssued = ctx.getNumIssuedSlots();
if (numIssued < options.getMinimumSlots()
|| (timeSinceLastSlotIssued().compareTo(options.getRampThrottle()) > 0
&& numIssued < options.getMaximumSlots()
&& resourceController.pidDecision())) {
lastSlotIssuedAt = Instant.now();
return Optional.of(new SlotPermit());
}
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,15 @@
public interface SlotSupplier<SI extends SlotInfo> {
/**
* This function is called before polling for new tasks. Your implementation should block until a
* slot is available then return a permit to use that slot. If the provided timeout is reached
* before a slot is available, a {@link TimeoutException} must be thrown.
* slot is available then return a permit to use that slot.
*
* @param ctx The context for slot reservation.
* @param timeout The maximum amount of time to wait for a slot to become available.
* @param timeUnit The time unit for the timeout.
* @return A permit to use the slot which may be populated with your own data.
* @throws InterruptedException The worker may choose to interrupt the thread in order to cancel
* the reservation, or during shutdown. You may perform cleanup, and then should rethrow the
* exception.
*/
SlotPermit reserveSlot(SlotReserveContext<SI> ctx, long timeout, TimeUnit timeUnit)
throws InterruptedException, TimeoutException;
SlotPermit reserveSlot(SlotReserveContext<SI> ctx) throws InterruptedException, TimeoutException;

/**
* This function is called when trying to reserve slots for "eager" workflow and activity tasks.
Expand All @@ -63,6 +59,22 @@ SlotPermit reserveSlot(SlotReserveContext<SI> ctx, long timeout, TimeUnit timeUn
*/
Optional<SlotPermit> tryReserveSlot(SlotReserveContext<SI> ctx);

/**
* This function is called specifically when trying to reserve slots for local activities. Since
* they may time out before getting a chance to run, if slots are unavailable, a timeout is
* necessary. If the provided timeout is reached before a slot is available, then the
* implementation must return an empty optional.
*
* @param ctx The context for slot reservation.
* @param timeout The maximum amount of time to wait for a slot to become available.
* @param timeUnit The time unit for the timeout.
* @return Maybe a permit to use the slot which may be populated with your own data.
* @throws InterruptedException If the thread is interrupted while waiting for a slot to become
* available.
*/
Optional<SlotPermit> tryReserveSlot(SlotReserveContext<SI> ctx, long timeout, TimeUnit timeUnit)
throws InterruptedException;

/**
* This function is called once a slot is actually being used to process some task, which may be
* some time after the slot was reserved originally. For example, if there is no work for a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ public void supplierIsCalledAppropriately() throws InterruptedException, Timeout
src -> {
usedSlotsWhenCalled.set(src.getUsedSlots().size());
return true;
}),
anyLong(),
any()))
})))
.thenReturn(new SlotPermit());

StickyQueueBalancer stickyQueueBalancer = new StickyQueueBalancer(5, true);
Expand Down Expand Up @@ -121,7 +119,7 @@ public void supplierIsCalledAppropriately() throws InterruptedException, Timeout

if (throwOnPoll) {
assertThrows(RuntimeException.class, poller::poll);
verify(mockSupplier, times(1)).reserveSlot(any(), anyLong(), any());
verify(mockSupplier, times(1)).reserveSlot(any());
verify(mockSupplier, times(1)).releaseSlot(any());
assertEquals(0, trackingSS.getUsedSlots().size());
} else {
Expand All @@ -131,8 +129,7 @@ public void supplierIsCalledAppropriately() throws InterruptedException, Timeout
// where the slot *is* used.
assertEquals(0, usedSlotsWhenCalled.get());
verify(mockSupplier, times(1))
.reserveSlot(
argThat(arg -> Objects.equals(arg.getTaskQueue(), TASK_QUEUE)), anyLong(), any());
.reserveSlot(argThat(arg -> Objects.equals(arg.getTaskQueue(), TASK_QUEUE)));
verify(mockSupplier, times(0)).releaseSlot(any());
assertEquals(1, trackingSS.getUsedSlots().size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.temporal.worker.tuning.*;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

public class CountingSlotSupplier<SI extends SlotInfo> extends FixedSizeSlotSupplier<SI> {
Expand All @@ -35,9 +34,8 @@ public CountingSlotSupplier(int numSlots) {
}

@Override
public SlotPermit reserveSlot(SlotReserveContext<SI> ctx, long timeout, TimeUnit timeUnit)
throws InterruptedException, TimeoutException {
SlotPermit p = super.reserveSlot(ctx, timeout, timeUnit);
public SlotPermit reserveSlot(SlotReserveContext<SI> ctx) throws InterruptedException {
SlotPermit p = super.reserveSlot(ctx);
reservedCount.incrementAndGet();
return p;
}
Expand All @@ -51,6 +49,16 @@ public Optional<SlotPermit> tryReserveSlot(SlotReserveContext<SI> ctx) {
return p;
}

@Override
public Optional<SlotPermit> tryReserveSlot(
SlotReserveContext<SI> ctx, long timeout, TimeUnit timeUnit) {
Optional<SlotPermit> p = super.tryReserveSlot(ctx, timeout, timeUnit);
if (p.isPresent()) {
reservedCount.incrementAndGet();
}
return p;
}

@Override
public void releaseSlot(SlotReleaseContext<SI> ctx) {
super.releaseSlot(ctx);
Expand Down

0 comments on commit 765dfca

Please sign in to comment.