Skip to content

Commit

Permalink
Test fix after runner bump to Java11
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn committed Oct 23, 2024
1 parent df3c45d commit dd73a2e
Show file tree
Hide file tree
Showing 17 changed files with 64 additions and 113 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/beam_PreCommit_Java.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ jobs:
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
- name: Setup environment
uses: ./.github/actions/setup-environment-action
with:
java-version: 8
- name: run Java PreCommit script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,15 @@ static class ResultCoder extends AtomicCoder<WriteTables.Result> {
static final ResultCoder INSTANCE = new ResultCoder();

@Override
public void encode(Result value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream)
throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull
@Initialized IOException {
public void encode(Result value, OutputStream outStream)
throws CoderException, @UnknownKeyFor @NonNull @Initialized IOException {
StringUtf8Coder.of().encode(value.getTableName(), outStream);
BooleanCoder.of().encode(value.isFirstPane(), outStream);
}

@Override
public Result decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream)
throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull
@Initialized IOException {
public Result decode(InputStream inStream)
throws CoderException, @UnknownKeyFor @NonNull @Initialized IOException {
return new AutoValue_WriteTables_Result(
StringUtf8Coder.of().decode(inStream), BooleanCoder.of().decode(inStream));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,10 @@ interface RpcWriteAttempt extends RpcAttempt {
* provided {@code instant}.
*
* @param instant The intended start time of the next rpc
* @param <T> The type which will be sent in the request
* @param <ElementT> The {@link Element} type which the returned buffer will contain
* @return a new {@link FlushBuffer} which queued messages can be staged to before final flush
*/
<T, ElementT extends Element<T>> FlushBuffer<ElementT> newFlushBuffer(Instant instant);
<ElementT extends Element<?>> FlushBuffer<ElementT> newFlushBuffer(Instant instant);

/** Record the start time of sending the rpc. */
void recordRequestStart(Instant start, int numWrites);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ public boolean awaitSafeToProceed(Instant instant) throws InterruptedException {
}

@Override
public <T, ElementT extends Element<T>> FlushBufferImpl<T, ElementT> newFlushBuffer(
public <ElementT extends Element<?>> FlushBufferImpl<ElementT> newFlushBuffer(
Instant instantSinceEpoch) {
state.checkActive();
int availableWriteCountBudget = writeRampUp.getAvailableWriteCountBudget(instantSinceEpoch);
Expand Down Expand Up @@ -935,7 +935,7 @@ private static O11y create(
}
}

static class FlushBufferImpl<T, ElementT extends Element<T>> implements FlushBuffer<ElementT> {
static class FlushBufferImpl<ElementT extends Element<?>> implements FlushBuffer<ElementT> {

final int nextBatchMaxCount;
final long nextBatchMaxBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

/**
* An implementation of {@link TypedSchemaTransformProvider} for Pub/Sub reads configured using
Expand Down Expand Up @@ -313,19 +310,17 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
}

@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
public String identifier() {
return "beam:schematransform:org.apache.beam:pubsub_read:v1";
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
inputCollectionNames() {
public List<String> inputCollectionNames() {
return Collections.emptyList();
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
outputCollectionNames() {
public List<String> outputCollectionNames() {
return Arrays.asList("output", "errors");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

/**
* An implementation of {@link TypedSchemaTransformProvider} for Pub/Sub reads configured using
Expand Down Expand Up @@ -248,19 +245,17 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
}

@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
public String identifier() {
return "beam:schematransform:org.apache.beam:pubsub_write:v1";
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
inputCollectionNames() {
public List<String> inputCollectionNames() {
return Collections.singletonList("input");
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
outputCollectionNames() {
public List<String> outputCollectionNames() {
return Collections.singletonList("errors");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,7 @@
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -86,8 +83,7 @@ public class PubsubLiteReadSchemaTransformProvider
public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};

@Override
protected @UnknownKeyFor @NonNull @Initialized Class<PubsubLiteReadSchemaTransformConfiguration>
configurationClass() {
protected Class<PubsubLiteReadSchemaTransformConfiguration> configurationClass() {
return PubsubLiteReadSchemaTransformConfiguration.class;
}

Expand Down Expand Up @@ -192,8 +188,7 @@ public void finish(FinishBundleContext c) {
}

@Override
public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
PubsubLiteReadSchemaTransformConfiguration configuration) {
public SchemaTransform from(PubsubLiteReadSchemaTransformConfiguration configuration) {
if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) {
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -399,19 +394,17 @@ public Uuid apply(SequencedMessage input) {
}

@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
public String identifier() {
return "beam:schematransform:org.apache.beam:pubsublite_read:v1";
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
inputCollectionNames() {
public List<String> inputCollectionNames() {
return Collections.emptyList();
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
outputCollectionNames() {
public List<String> outputCollectionNames() {
return Arrays.asList("output", "errors");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -81,8 +78,7 @@ public class PubsubLiteWriteSchemaTransformProvider
LoggerFactory.getLogger(PubsubLiteWriteSchemaTransformProvider.class);

@Override
protected @UnknownKeyFor @NonNull @Initialized Class<PubsubLiteWriteSchemaTransformConfiguration>
configurationClass() {
protected Class<PubsubLiteWriteSchemaTransformConfiguration> configurationClass() {
return PubsubLiteWriteSchemaTransformConfiguration.class;
}

Expand Down Expand Up @@ -172,8 +168,7 @@ public void finish() {
}

@Override
public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
PubsubLiteWriteSchemaTransformConfiguration configuration) {
public SchemaTransform from(PubsubLiteWriteSchemaTransformConfiguration configuration) {

if (!SUPPORTED_FORMATS.contains(configuration.getFormat())) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -317,19 +312,17 @@ public byte[] apply(Row input) {
}

@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
public String identifier() {
return "beam:schematransform:org.apache.beam:pubsublite_write:v1";
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
inputCollectionNames() {
public List<String> inputCollectionNames() {
return Collections.singletonList("input");
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
outputCollectionNames() {
public List<String> outputCollectionNames() {
return Collections.singletonList("errors");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand Down Expand Up @@ -128,19 +125,17 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
}

@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
public String identifier() {
return "beam:schematransform:org.apache.beam:spanner_read:v1";
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
inputCollectionNames() {
public List<String> inputCollectionNames() {
return Collections.emptyList();
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
outputCollectionNames() {
public List<String> outputCollectionNames() {
return Collections.singletonList("output");
}

Expand Down Expand Up @@ -222,14 +217,12 @@ public static Builder builder() {
}

@Override
protected @UnknownKeyFor @NonNull @Initialized Class<SpannerReadSchemaTransformConfiguration>
configurationClass() {
protected Class<SpannerReadSchemaTransformConfiguration> configurationClass() {
return SpannerReadSchemaTransformConfiguration.class;
}

@Override
protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
SpannerReadSchemaTransformConfiguration configuration) {
protected SchemaTransform from(SpannerReadSchemaTransformConfiguration configuration) {
return new SpannerSchemaTransformRead(configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand Down Expand Up @@ -113,14 +111,12 @@ public class SpannerWriteSchemaTransformProvider
SpannerWriteSchemaTransformProvider.SpannerWriteSchemaTransformConfiguration> {

@Override
protected @UnknownKeyFor @NonNull @Initialized Class<SpannerWriteSchemaTransformConfiguration>
configurationClass() {
protected Class<SpannerWriteSchemaTransformConfiguration> configurationClass() {
return SpannerWriteSchemaTransformConfiguration.class;
}

@Override
protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
SpannerWriteSchemaTransformConfiguration configuration) {
protected SchemaTransform from(SpannerWriteSchemaTransformConfiguration configuration) {
return new SpannerSchemaTransformWrite(configuration);
}

Expand Down Expand Up @@ -230,19 +226,17 @@ public PCollectionRowTuple expand(@NonNull PCollectionRowTuple input) {
}

@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
public String identifier() {
return "beam:schematransform:org.apache.beam:spanner_write:v1";
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
inputCollectionNames() {
public List<String> inputCollectionNames() {
return Collections.singletonList("input");
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
outputCollectionNames() {
public List<String> outputCollectionNames() {
return Arrays.asList("post-write", "errors");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@
import org.apache.beam.vendor.grpc.v1p60p1.com.google.gson.Gson;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand All @@ -80,8 +77,7 @@ public class SpannerChangestreamsReadSchemaTransformProvider
extends TypedSchemaTransformProvider<
SpannerChangestreamsReadSchemaTransformProvider.SpannerChangestreamsReadConfiguration> {
@Override
protected @UnknownKeyFor @NonNull @Initialized Class<SpannerChangestreamsReadConfiguration>
configurationClass() {
protected Class<SpannerChangestreamsReadConfiguration> configurationClass() {
return SpannerChangestreamsReadConfiguration.class;
}

Expand All @@ -94,7 +90,7 @@ public class SpannerChangestreamsReadSchemaTransformProvider
Schema.builder().addStringField("error").addNullableStringField("row").build();

@Override
public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
public SchemaTransform from(
SpannerChangestreamsReadSchemaTransformProvider.SpannerChangestreamsReadConfiguration
configuration) {
return new SchemaTransform() {
Expand Down Expand Up @@ -142,19 +138,17 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
}

@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
public String identifier() {
return "beam:schematransform:org.apache.beam:spanner_cdc_read:v1";
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
inputCollectionNames() {
public List<String> inputCollectionNames() {
return Collections.emptyList();
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
outputCollectionNames() {
public List<String> outputCollectionNames() {
return Arrays.asList("output", "errors");
}

Expand Down
Loading

0 comments on commit dd73a2e

Please sign in to comment.