Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dataflow Streaming] Code micro optimizations (1/N) #33580

Merged
merged 13 commits into from
Jan 22, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ public boolean throwExceptionsForLargeOutput() {
}

public boolean workIsFailed() {
return Optional.ofNullable(work).map(Work::isFailed).orElse(false);
if (work != null) return work.isFailed();
arunpandianp marked this conversation as resolved.
Show resolved Hide resolved
return false;
}

public void start(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker;

import com.google.auto.value.AutoValue;
import java.util.Objects;
import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.TextFormat;
Expand Down Expand Up @@ -45,4 +46,9 @@ public final String toString() {
return String.format(
"%s: %s-%d", computationId(), TextFormat.escapeBytes(key()), shardingKey());
}

@Override
public final int hashCode() {
return Objects.hash(shardingKey(), computationId());
arunpandianp marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.function.Consumer;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
Expand All @@ -33,8 +32,6 @@
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.ExposedByteArrayInputStream;
import org.apache.beam.sdk.util.ExposedByteArrayOutputStream;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand All @@ -54,6 +51,7 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class WindmillTimerInternals implements TimerInternals {

private static final Instant OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE =
GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1));

Expand Down Expand Up @@ -406,36 +404,27 @@ private static boolean useNewTimerTagEncoding(TimerData timerData) {
*/
public static ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) {
String tagString;
ExposedByteArrayOutputStream out = new ExposedByteArrayOutputStream();
try {
if (useNewTimerTagEncoding(timerData)) {
tagString =
new StringBuilder()
.append(prefix.byteString().toStringUtf8()) // this never ends with a slash
.append(
timerData.getNamespace().stringKey()) // this must begin and end with a slash
.append('+')
.append(timerData.getTimerId()) // this is arbitrary; currently unescaped
.append('+')
.append(timerData.getTimerFamilyId())
.toString();
out.write(tagString.getBytes(StandardCharsets.UTF_8));
} else {
// Timers without timerFamily would have timerFamily would be an empty string
tagString =
new StringBuilder()
.append(prefix.byteString().toStringUtf8()) // this never ends with a slash
.append(
timerData.getNamespace().stringKey()) // this must begin and end with a slash
.append('+')
.append(timerData.getTimerId()) // this is arbitrary; currently unescaped
.toString();
out.write(tagString.getBytes(StandardCharsets.UTF_8));
}
return ByteString.readFrom(new ExposedByteArrayInputStream(out.toByteArray()));
} catch (IOException e) {
throw new RuntimeException(e);
if (useNewTimerTagEncoding(timerData)) {
tagString =
new StringBuilder()
.append(prefix.byteString().toStringUtf8()) // this never ends with a slash
.append(timerData.getNamespace().stringKey()) // this must begin and end with a slash
.append('+')
.append(timerData.getTimerId()) // this is arbitrary; currently unescaped
.append('+')
.append(timerData.getTimerFamilyId())
.toString();
} else {
// Timers without timerFamily would have timerFamily would be an empty string
tagString =
new StringBuilder()
.append(prefix.byteString().toStringUtf8()) // this never ends with a slash
.append(timerData.getNamespace().stringKey()) // this must begin and end with a slash
.append('+')
.append(timerData.getTimerId()) // this is arbitrary; currently unescaped
.toString();
}
return ByteString.copyFromUtf8(tagString);
arunpandianp marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
Expand Down Expand Up @@ -77,14 +76,15 @@ public final class ActiveWorkState {
* activated in {@link #activateWorkForKey(ExecutableWork)}, and decremented when work is
* completed in {@link #completeWorkAndGetNextWorkForKey(ShardedKey, WorkId)}.
*/
private final AtomicReference<GetWorkBudget> activeGetWorkBudget;
@GuardedBy("this")
private GetWorkBudget activeGetWorkBudget;

private ActiveWorkState(
Map<ShardedKey, Deque<ExecutableWork>> activeWork,
WindmillStateCache.ForComputation computationStateCache) {
this.activeWork = activeWork;
this.computationStateCache = computationStateCache;
this.activeGetWorkBudget = new AtomicReference<>(GetWorkBudget.noBudget());
this.activeGetWorkBudget = GetWorkBudget.noBudget();
}

static ActiveWorkState create(WindmillStateCache.ForComputation computationStateCache) {
Expand Down Expand Up @@ -219,14 +219,12 @@ synchronized ImmutableList<RefreshableWork> getRefreshableWork(Instant refreshDe
.collect(toImmutableList());
}

private void incrementActiveWorkBudget(Work work) {
activeGetWorkBudget.updateAndGet(
getWorkBudget -> getWorkBudget.apply(1, work.getWorkItem().getSerializedSize()));
private synchronized void incrementActiveWorkBudget(Work work) {
activeGetWorkBudget = activeGetWorkBudget.apply(1, work.getWorkItem().getSerializedSize());
}

private void decrementActiveWorkBudget(Work work) {
activeGetWorkBudget.updateAndGet(
getWorkBudget -> getWorkBudget.subtract(1, work.getWorkItem().getSerializedSize()));
private synchronized void decrementActiveWorkBudget(Work work) {
arunpandianp marked this conversation as resolved.
Show resolved Hide resolved
activeGetWorkBudget = activeGetWorkBudget.subtract(1, work.getWorkItem().getSerializedSize());
}

/**
Expand Down Expand Up @@ -332,7 +330,9 @@ private synchronized ImmutableMap<ShardedKey, WorkId> getStuckCommitsAt(
* {@link ActiveWorkState}, and not committed back to Windmill.
*/
GetWorkBudget currentActiveWorkBudget() {
return activeGetWorkBudget.get();
synchronized (this) {
return activeGetWorkBudget;
arunpandianp marked this conversation as resolved.
Show resolved Hide resolved
}
}

synchronized void printActiveWork(PrintWriter writer, Instant now) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,9 @@ public static ShardedKey create(ByteString key, long shardingKey) {
public final String toString() {
return String.format("%016x", shardingKey());
}

@Override
public final int hashCode() {
return Long.hashCode(shardingKey());
arunpandianp marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading