Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test fix after runner bump to Java11 #32909

Merged
merged 2 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,8 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -110,17 +108,13 @@ static class ResultCoder extends AtomicCoder<WriteTables.Result> {
static final ResultCoder INSTANCE = new ResultCoder();

@Override
public void encode(Result value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream)
throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull
@Initialized IOException {
public void encode(Result value, OutputStream outStream) throws CoderException, IOException {
StringUtf8Coder.of().encode(value.getTableName(), outStream);
BooleanCoder.of().encode(value.isFirstPane(), outStream);
}

@Override
public Result decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream)
throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull
@Initialized IOException {
public Result decode(InputStream inStream) throws CoderException, IOException {
return new AutoValue_WriteTables_Result(
StringUtf8Coder.of().decode(inStream), BooleanCoder.of().decode(inStream));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,10 @@ interface RpcWriteAttempt extends RpcAttempt {
* provided {@code instant}.
*
* @param instant The intended start time of the next rpc
* @param <T> The type which will be sent in the request
* @param <ElementT> The {@link Element} type which the returned buffer will contain
* @return a new {@link FlushBuffer} which queued messages can be staged to before final flush
*/
<T, ElementT extends Element<T>> FlushBuffer<ElementT> newFlushBuffer(Instant instant);
<ElementT extends Element<?>> FlushBuffer<ElementT> newFlushBuffer(Instant instant);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a regression in functionality. Now the type cannot assure that it is a FlushBuffer<Element<T>>

Copy link
Contributor Author

@Abacn Abacn Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Admit that it is a compile time regression. Not a matter at runtime as generics are erased.

RpcWriteAttempt / RpcWriteAttempt.FlushBuffer / RpcWriteAttempt.Element are package private visible interfaces so user won't feel it


/** Record the start time of sending the rpc. */
void recordRequestStart(Instant start, int numWrites);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ public boolean awaitSafeToProceed(Instant instant) throws InterruptedException {
}

@Override
public <T, ElementT extends Element<T>> FlushBufferImpl<T, ElementT> newFlushBuffer(
public <ElementT extends Element<?>> FlushBufferImpl<ElementT> newFlushBuffer(
Instant instantSinceEpoch) {
state.checkActive();
int availableWriteCountBudget = writeRampUp.getAvailableWriteCountBudget(instantSinceEpoch);
Expand Down Expand Up @@ -935,7 +935,7 @@ private static O11y create(
}
}

static class FlushBufferImpl<T, ElementT extends Element<T>> implements FlushBuffer<ElementT> {
static class FlushBufferImpl<ElementT extends Element<?>> implements FlushBuffer<ElementT> {

final int nextBatchMaxCount;
final long nextBatchMaxBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

/**
* An implementation of {@link TypedSchemaTransformProvider} for Pub/Sub reads configured using
Expand Down Expand Up @@ -313,19 +310,17 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
}

@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
public String identifier() {
return "beam:schematransform:org.apache.beam:pubsub_read:v1";
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
inputCollectionNames() {
public List<String> inputCollectionNames() {
return Collections.emptyList();
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
outputCollectionNames() {
public List<String> outputCollectionNames() {
return Arrays.asList("output", "errors");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

/**
* An implementation of {@link TypedSchemaTransformProvider} for Pub/Sub reads configured using
Expand Down Expand Up @@ -248,19 +245,17 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
}

@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
public String identifier() {
return "beam:schematransform:org.apache.beam:pubsub_write:v1";
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
inputCollectionNames() {
public List<String> inputCollectionNames() {
return Collections.singletonList("input");
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
outputCollectionNames() {
public List<String> outputCollectionNames() {
return Collections.singletonList("errors");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,7 @@
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -86,8 +83,7 @@ public class PubsubLiteReadSchemaTransformProvider
public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};

@Override
protected @UnknownKeyFor @NonNull @Initialized Class<PubsubLiteReadSchemaTransformConfiguration>
configurationClass() {
protected Class<PubsubLiteReadSchemaTransformConfiguration> configurationClass() {
return PubsubLiteReadSchemaTransformConfiguration.class;
}

Expand Down Expand Up @@ -192,8 +188,7 @@ public void finish(FinishBundleContext c) {
}

@Override
public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
PubsubLiteReadSchemaTransformConfiguration configuration) {
public SchemaTransform from(PubsubLiteReadSchemaTransformConfiguration configuration) {
if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) {
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -399,19 +394,17 @@ public Uuid apply(SequencedMessage input) {
}

@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
public String identifier() {
return "beam:schematransform:org.apache.beam:pubsublite_read:v1";
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
inputCollectionNames() {
public List<String> inputCollectionNames() {
return Collections.emptyList();
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
outputCollectionNames() {
public List<String> outputCollectionNames() {
return Arrays.asList("output", "errors");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -81,8 +78,7 @@ public class PubsubLiteWriteSchemaTransformProvider
LoggerFactory.getLogger(PubsubLiteWriteSchemaTransformProvider.class);

@Override
protected @UnknownKeyFor @NonNull @Initialized Class<PubsubLiteWriteSchemaTransformConfiguration>
configurationClass() {
protected Class<PubsubLiteWriteSchemaTransformConfiguration> configurationClass() {
return PubsubLiteWriteSchemaTransformConfiguration.class;
}

Expand Down Expand Up @@ -172,8 +168,7 @@ public void finish() {
}

@Override
public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
PubsubLiteWriteSchemaTransformConfiguration configuration) {
public SchemaTransform from(PubsubLiteWriteSchemaTransformConfiguration configuration) {

if (!SUPPORTED_FORMATS.contains(configuration.getFormat())) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -317,19 +312,17 @@ public byte[] apply(Row input) {
}

@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
public String identifier() {
return "beam:schematransform:org.apache.beam:pubsublite_write:v1";
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
inputCollectionNames() {
public List<String> inputCollectionNames() {
return Collections.singletonList("input");
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
outputCollectionNames() {
public List<String> outputCollectionNames() {
return Collections.singletonList("errors");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand Down Expand Up @@ -128,19 +125,17 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
}

@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
public String identifier() {
return "beam:schematransform:org.apache.beam:spanner_read:v1";
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
inputCollectionNames() {
public List<String> inputCollectionNames() {
return Collections.emptyList();
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
outputCollectionNames() {
public List<String> outputCollectionNames() {
return Collections.singletonList("output");
}

Expand Down Expand Up @@ -222,14 +217,12 @@ public static Builder builder() {
}

@Override
protected @UnknownKeyFor @NonNull @Initialized Class<SpannerReadSchemaTransformConfiguration>
configurationClass() {
protected Class<SpannerReadSchemaTransformConfiguration> configurationClass() {
return SpannerReadSchemaTransformConfiguration.class;
}

@Override
protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
SpannerReadSchemaTransformConfiguration configuration) {
protected SchemaTransform from(SpannerReadSchemaTransformConfiguration configuration) {
return new SpannerSchemaTransformRead(configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand Down Expand Up @@ -113,14 +111,12 @@ public class SpannerWriteSchemaTransformProvider
SpannerWriteSchemaTransformProvider.SpannerWriteSchemaTransformConfiguration> {

@Override
protected @UnknownKeyFor @NonNull @Initialized Class<SpannerWriteSchemaTransformConfiguration>
configurationClass() {
protected Class<SpannerWriteSchemaTransformConfiguration> configurationClass() {
return SpannerWriteSchemaTransformConfiguration.class;
}

@Override
protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
SpannerWriteSchemaTransformConfiguration configuration) {
protected SchemaTransform from(SpannerWriteSchemaTransformConfiguration configuration) {
return new SpannerSchemaTransformWrite(configuration);
}

Expand Down Expand Up @@ -230,19 +226,17 @@ public PCollectionRowTuple expand(@NonNull PCollectionRowTuple input) {
}

@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
public String identifier() {
return "beam:schematransform:org.apache.beam:spanner_write:v1";
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
inputCollectionNames() {
public List<String> inputCollectionNames() {
return Collections.singletonList("input");
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
outputCollectionNames() {
public List<String> outputCollectionNames() {
return Arrays.asList("post-write", "errors");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@
import org.apache.beam.vendor.grpc.v1p60p1.com.google.gson.Gson;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand All @@ -80,8 +77,7 @@ public class SpannerChangestreamsReadSchemaTransformProvider
extends TypedSchemaTransformProvider<
SpannerChangestreamsReadSchemaTransformProvider.SpannerChangestreamsReadConfiguration> {
@Override
protected @UnknownKeyFor @NonNull @Initialized Class<SpannerChangestreamsReadConfiguration>
configurationClass() {
protected Class<SpannerChangestreamsReadConfiguration> configurationClass() {
return SpannerChangestreamsReadConfiguration.class;
}

Expand All @@ -94,7 +90,7 @@ public class SpannerChangestreamsReadSchemaTransformProvider
Schema.builder().addStringField("error").addNullableStringField("row").build();

@Override
public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
public SchemaTransform from(
SpannerChangestreamsReadSchemaTransformProvider.SpannerChangestreamsReadConfiguration
configuration) {
return new SchemaTransform() {
Expand Down Expand Up @@ -142,19 +138,17 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
}

@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
public String identifier() {
return "beam:schematransform:org.apache.beam:spanner_cdc_read:v1";
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
inputCollectionNames() {
public List<String> inputCollectionNames() {
return Collections.emptyList();
}

@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
outputCollectionNames() {
public List<String> outputCollectionNames() {
return Arrays.asList("output", "errors");
}

Expand Down
Loading
Loading