Skip to content

Commit

Permalink
spotless
Browse files Browse the repository at this point in the history
  • Loading branch information
steveniemitz committed May 19, 2021
1 parent c9cc655 commit feb16f1
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@
@Experimental
public interface WorkerMetricsReceiver {
void receiverCounterUpdates(List<CounterUpdate> updates);
default void receiveTopologyUpdate(String systemName, String userName) { }

default void receiveTopologyUpdate(String systemName, String userName) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.beam.runners.dataflow.worker;

import static org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.StreamingSystemCounterNames.*;
import static org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.StreamingSystemCounterNames.MEMORY_MONITOR_IS_THRASHING;
import static org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.StreamingSystemCounterNames.MEMORY_MONITOR_NUM_PUSHBACKS;

import com.google.api.services.dataflow.model.MapTask;
import com.google.api.services.dataflow.model.WorkItem;
Expand Down Expand Up @@ -65,9 +67,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.StreamingSystemCounterNames.MEMORY_MONITOR_IS_THRASHING;
import static org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.StreamingSystemCounterNames.MEMORY_MONITOR_NUM_PUSHBACKS;

/**
* This is a semi-abstract harness for executing WorkItem tasks in Java workers. Concrete
* implementations need to implement a WorkUnitClient.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,14 +552,12 @@ private static class StageInfo {

void updateWatermarkMetrics(Instant inputDataWatermark, Instant outputDataWatermark) {
long nowMillis = DateTimeUtils.currentTimeMillis();
long watermarkLag =
Math.max(0, nowMillis - inputDataWatermark.getMillis());
long watermarkLag = Math.max(0, nowMillis - inputDataWatermark.getMillis());
inputWatermarkLag.getAndReset();
inputWatermarkLag.addValue(watermarkLag);

if (outputDataWatermark != null) {
long outputWatermarkLagMillis =
Math.max(0, nowMillis - outputDataWatermark.getMillis());
long outputWatermarkLagMillis = Math.max(0, nowMillis - outputDataWatermark.getMillis());
outputWatermarkLag.getAndReset();
outputWatermarkLag.addValue(outputWatermarkLagMillis);
}
Expand Down Expand Up @@ -1000,8 +998,7 @@ public void run() {
}
},
60_000,
60_000
);
60_000);

if (windmillServiceEnabled) {
// Schedule the background getConfig thread. Blocks until windmillServer stub is ready.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public long getInvalidateRequests() {
public long getInvalidatesFromInconsistentToken() {
return invalidatesFromInconsistentToken.sum();
}

public CacheStats getCacheStats() {
return stateCache.stats();
}
Expand Down Expand Up @@ -258,9 +258,9 @@ public String getStateFamily() {
@SuppressWarnings("nullness") // Unsure how to annotate lambda return allowing null.
@Nullable
StateCacheEntry entry = localCache.computeIfAbsent(id, key -> stateCache.getIfPresent(key));

if (entry != null) {
cacheHits.increment();
cacheHits.increment();
return entry.get(namespace, address);
} else {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ private WriteResult(
}

/**
* Returns a {@link PCollection} containing the {@link TableDestinations}s that were
* successfully inserted.
* Returns a {@link PCollection} containing the {@link TableDestinations}s that were successfully
* inserted.
*
* <p>Successful Inserts are only produced when using batch inserts.
*/
Expand Down

0 comments on commit feb16f1

Please sign in to comment.