diff --git a/.github/workflows/cut_release_branch.yml b/.github/workflows/cut_release_branch.yml index 4c03e45913b4..3fe8e37d8b7c 100644 --- a/.github/workflows/cut_release_branch.yml +++ b/.github/workflows/cut_release_branch.yml @@ -31,11 +31,11 @@ on: RELEASE_VERSION: description: Beam version of current release (branch being cut) required: true - default: '2.XX.0' + default: '2.XX' NEXT_VERSION: description: Next release version required: true - default: '2.XX.0' + default: '2.XX' CREATE_RELEASE_BRANCH: description: Whether to cut the release branch. You shouldnt skip this unless it has already been completed successfully (yes/no) required: true diff --git a/contributor-docs/release-guide.md b/contributor-docs/release-guide.md index 63bd2fd3f97a..6deb13d182f4 100644 --- a/contributor-docs/release-guide.md +++ b/contributor-docs/release-guide.md @@ -196,7 +196,7 @@ The following must be manually done or confirmed: - [ ] The `master` branch has the SNAPSHOT/dev version incremented. - [ ] The release branch has the SNAPSHOT/dev version to be released. - [ ] The Dataflow container image should be modified to the version to be released. -- [ ] Due to current limitation in the workflow, you must navigate to https://github.com/apache/beam/actions/workflows/beam_Release_NightlySnapshot.yml and click "Run workflow" and select the branch just created (release-2.xx.0) to build a snapshot. +- [ ] Due to current limitation in the workflow, you must navigate to https://github.com/apache/beam/actions/workflows/beam_Release_NightlySnapshot.yml and click "Run workflow" and select the branch just created (release-2.xx) to build a snapshot. - [ ] Manually update `CHANGES.md` on `master` by adding a new section for the next release ([example](https://github.com/apache/beam/commit/96ab1fb3fe07acf7f7dc9d8c829ae36890d1535c)). @@ -1398,19 +1398,7 @@ To drive down time to resolution, this can happen in parallel to any discussion The above document assumes a minor version bump cut off of the master branch. If you want to do a patch release cut off of a previous release branch, use the following steps instead: -- Create a new release branch: - -``` -git clone https://github.com/apache/beam -cd beam -git fetch origin release-2.XX.0 -git checkout release-2.XX.0 -git checkout -b release-2.XX.1 -git push origin release-2.XX.1 -``` - -- Add a PR to add the new release branch to the set of protected branches in .asf.yml - [example PR](https://github.com/apache/beam/pull/30832) -- Add a PR to bump the Dataflow containers versions - [example PR](https://github.com/apache/beam/pull/30827) +- Add a PR to bump the Dataflow containers versions on the release branch (`release-2.XX`) - [example PR](https://github.com/apache/beam/pull/30827) - Create PRs to cherry-pick any desired commits to the release branch - Follow the normal release steps, starting with [release branch stabilization](#stabilize-the-release-branch), to build/vote/validate/finalize the release candidate that are listed above. See Voting and Finalization of a Patch Release to see differences in the voting process. - Depending on the nature of the issue, certain post-release finalization steps may be skipped. For example, if an issue doesn’t dramatically impact the Beam Playground, consider skipping that step diff --git a/examples/notebooks/beam-ml/run_inference_huggingface.ipynb b/examples/notebooks/beam-ml/run_inference_huggingface.ipynb index 2e4556fd310c..fa6831f9c6e3 100644 --- a/examples/notebooks/beam-ml/run_inference_huggingface.ipynb +++ b/examples/notebooks/beam-ml/run_inference_huggingface.ipynb @@ -296,6 +296,27 @@ } ] }, + { + "cell_type": "markdown", + "source": [ + "### Try with a different model\n", + "One of the best parts of using RunInference is how easy it is to swap in different models. For example, if we wanted to use a larger model like DeepSeek-R1-Distill-Llama-8B outside of Colab (which has very tight memory constraints and limited GPU access), all we need to change is our ModelHandler:\n", + "\n", + "```\n", + "model_handler = HuggingFacePipelineModelHandler(\n", + " task=PipelineTask.Translation_XX_to_YY,\n", + " model = \"deepseek-ai/DeepSeek-R1-Distill-Llama-8B\",\n", + " load_pipeline_args={'framework': 'pt'},\n", + " inference_args={'max_length': 400}\n", + ")\n", + "```\n", + "\n", + "We can then run the exact same pipeline code and Beam will take care of the rest." + ], + "metadata": { + "id": "LWNM81ivoZcF" + } + }, { "cell_type": "markdown", "source": [ diff --git a/release/src/main/groovy/MobileGamingCommands.groovy b/release/src/main/groovy/MobileGamingCommands.groovy index d1fd1d8319a8..eeac968f5763 100644 --- a/release/src/main/groovy/MobileGamingCommands.groovy +++ b/release/src/main/groovy/MobileGamingCommands.groovy @@ -30,7 +30,7 @@ class MobileGamingCommands { SparkRunner: "spark-runner", FlinkRunner: "flink-runner"] - public static final EXECUTION_TIMEOUT_IN_MINUTES = 40 + public static final EXECUTION_TIMEOUT_IN_MINUTES = 60 // Lists used to verify team names generated in the LeaderBoard example. // This list should be kept sync with COLORS in org.apache.beam.examples.complete.game.injector.Injector. diff --git a/release/src/main/scripts/choose_rc_commit.sh b/release/src/main/scripts/choose_rc_commit.sh index c8a1777f17bd..cb6a7755372d 100755 --- a/release/src/main/scripts/choose_rc_commit.sh +++ b/release/src/main/scripts/choose_rc_commit.sh @@ -106,9 +106,11 @@ SCRIPT_DIR=$(dirname $0) RC_TAG="v${RELEASE}-RC${RC}" +RELEASE_BRANCH="release-$(cut -d '.' -f 1,2 <<< $RELEASE)" + if [[ "$CLONE" == yes ]] ; then CLONE_DIR=`mktemp -d` - git clone "$GIT_REPO" "$CLONE_DIR" --single-branch --branch "release-$RELEASE" --shallow-exclude master + git clone "$GIT_REPO" "$CLONE_DIR" --single-branch --branch "$RELEASE_BRANCH" --shallow-exclude master else echo "Not cloning repo; assuming working dir is the desired repo. To run with a fresh clone, run with --clone." CLONE_DIR=$PWD diff --git a/runners/core-java/build.gradle b/runners/core-java/build.gradle index 3a988a8248c0..ea7989873712 100644 --- a/runners/core-java/build.gradle +++ b/runners/core-java/build.gradle @@ -42,7 +42,6 @@ dependencies { implementation project(path: ":model:pipeline", configuration: "shadow") implementation project(path: ":sdks:java:core", configuration: "shadow") implementation project(path: ":model:job-management", configuration: "shadow") - implementation library.java.google_api_services_dataflow implementation library.java.vendored_guava_32_1_2_jre implementation library.java.joda_time implementation library.java.vendored_grpc_1_69_0 @@ -53,6 +52,5 @@ dependencies { testImplementation library.java.junit testImplementation library.java.mockito_core testImplementation library.java.slf4j_api - testImplementation(library.java.google_api_services_dataflow) testRuntimeOnly library.java.slf4j_simple } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java index 7e3af8da6d77..44995b979dd0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java @@ -17,19 +17,8 @@ */ package org.apache.beam.runners.core.metrics; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.services.dataflow.model.Base2Exponent; -import com.google.api.services.dataflow.model.BucketOptions; -import com.google.api.services.dataflow.model.DataflowHistogramValue; -import com.google.api.services.dataflow.model.Linear; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; import java.util.Set; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.DoubleCoder; @@ -37,14 +26,10 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.util.ByteStringOutputStream; -import org.apache.beam.sdk.util.HistogramData; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.joda.time.Instant; -// TODO(#33093): Refactor out DataflowHistogramValue to be runner agnostic, and rename to -// remove Dataflow reference. - /** A set of functions used to encode and decode common monitoring info types. */ public class MonitoringInfoEncodings { private static final Coder VARINT_CODER = VarLongCoder.of(); @@ -178,98 +163,4 @@ public static double decodeDoubleCounter(ByteString payload) { throw new RuntimeException(e); } } - - /** Encodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */ - public static ByteString encodeInt64Histogram(HistogramData inputHistogram) { - try { - int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets(); - - DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue(); - - if (inputHistogram.getBucketType() instanceof HistogramData.LinearBuckets) { - HistogramData.LinearBuckets buckets = - (HistogramData.LinearBuckets) inputHistogram.getBucketType(); - Linear linear = new Linear(); - linear.setNumberOfBuckets(numberOfBuckets); - linear.setWidth(buckets.getWidth()); - linear.setStart(buckets.getStart()); - outputHistogram2.setBucketOptions(new BucketOptions().setLinear(linear)); - } else if (inputHistogram.getBucketType() instanceof HistogramData.ExponentialBuckets) { - HistogramData.ExponentialBuckets buckets = - (HistogramData.ExponentialBuckets) inputHistogram.getBucketType(); - Base2Exponent base2Exp = new Base2Exponent(); - base2Exp.setNumberOfBuckets(numberOfBuckets); - base2Exp.setScale(buckets.getScale()); - outputHistogram2.setBucketOptions(new BucketOptions().setExponential(base2Exp)); - } else { - throw new HistogramParsingException( - "Unable to encode Int64 Histogram, bucket is not recognized"); - } - - outputHistogram2.setCount(inputHistogram.getTotalCount()); - - List bucketCounts = new ArrayList<>(); - - Arrays.stream(inputHistogram.getBucketCount()) - .forEach( - val -> { - bucketCounts.add(val); - }); - - outputHistogram2.setBucketCounts(bucketCounts); - - ObjectMapper objectMapper = new ObjectMapper(); - String jsonString = objectMapper.writeValueAsString(outputHistogram2); - - return ByteString.copyFromUtf8(jsonString); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - static class HistogramParsingException extends RuntimeException { - public HistogramParsingException(String message) { - super(message); - } - } - - /** Decodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */ - public static HistogramData decodeInt64Histogram(ByteString payload) { - try { - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - JsonNode jsonNode = objectMapper.readTree(payload.toStringUtf8()); // parse afterwards - DataflowHistogramValue newHist = new DataflowHistogramValue(); - newHist.setCount(jsonNode.get("count").asLong()); - - List bucketCounts = new ArrayList<>(); - Iterator itr = jsonNode.get("bucketCounts").iterator(); - while (itr.hasNext()) { - Long item = itr.next().asLong(); - bucketCounts.add(item); - } - newHist.setBucketCounts(bucketCounts); - - if (jsonNode.get("bucketOptions").has("linear")) { - Linear linear = new Linear(); - JsonNode linearNode = jsonNode.get("bucketOptions").get("linear"); - linear.setNumberOfBuckets(linearNode.get("numberOfBuckets").asInt()); - linear.setWidth(linearNode.get("width").asDouble()); - linear.setStart(linearNode.get("start").asDouble()); - newHist.setBucketOptions(new BucketOptions().setLinear(linear)); - } else if (jsonNode.get("bucketOptions").has("exponential")) { - Base2Exponent base2Exp = new Base2Exponent(); - JsonNode expNode = jsonNode.get("bucketOptions").get("exponential"); - base2Exp.setNumberOfBuckets(expNode.get("numberOfBuckets").asInt()); - base2Exp.setScale(expNode.get("scale").asInt()); - newHist.setBucketOptions(new BucketOptions().setExponential(base2Exp)); - } else { - throw new HistogramParsingException( - "Unable to parse Int64 Histogram, bucket is not recognized"); - } - return new HistogramData(newHist); - } catch (IOException e) { - throw new RuntimeException(e); - } - } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java index 100cd4a0e0c2..dde180b150de 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java @@ -17,43 +17,30 @@ */ package org.apache.beam.runners.core.metrics; -import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.HistogramParsingException; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeDoubleCounter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge; -import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Histogram; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeStringSet; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleCounter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleDistribution; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge; -import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Histogram; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet; import static org.junit.Assert.assertEquals; import java.util.Collections; -import org.apache.beam.sdk.testing.ExpectedLogs; -import org.apache.beam.sdk.util.HistogramData; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.joda.time.Instant; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** Tests for {@link MonitoringInfoEncodings}. */ @RunWith(JUnit4.class) public class MonitoringInfoEncodingsTest { - @Rule - public ExpectedLogs monitoringInfoCodingsExpectedLogs = - ExpectedLogs.none(MonitoringInfoEncodings.class); - - @Rule public ExpectedException thrown = ExpectedException.none(); - @Test public void testInt64DistributionEncoding() { DistributionData data = DistributionData.create(1L, 2L, 3L, 4L); @@ -118,36 +105,4 @@ public void testDoubleCounterEncoding() { assertEquals(ByteString.copyFrom(new byte[] {0x3f, (byte) 0xf0, 0, 0, 0, 0, 0, 0}), payload); assertEquals(1.0, decodeDoubleCounter(payload), 0.001); } - - @Test - public void testHistgramInt64EncodingLinearHist() { - HistogramData.BucketType buckets = HistogramData.LinearBuckets.of(0, 5, 5); - - HistogramData inputHistogram = new HistogramData(buckets); - inputHistogram.record(5, 10, 15, 20); - ByteString payload = encodeInt64Histogram(inputHistogram); - - assertEquals(inputHistogram, decodeInt64Histogram(payload)); - } - - @Test - public void testHistgramInt64EncodingExpHist() { - HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 10); - HistogramData inputHistogram = new HistogramData(buckets); - inputHistogram.record(2, 4, 8, 16, 32); - ByteString payload = encodeInt64Histogram(inputHistogram); - assertEquals(inputHistogram, decodeInt64Histogram(payload)); - } - - @Test - public void testHistgramInt64EncodingUnsupportedBucket() { - thrown.expect(HistogramParsingException.class); - thrown.expectMessage("Unable to encode Int64 Histogram, bucket is not recognized"); - - HistogramData.BucketType buckets = HistogramData.UnsupportedBuckets.of(); - - HistogramData inputHistogram = new HistogramData(buckets); - inputHistogram.record(2, 4, 8, 16, 32); - encodeInt64Histogram(inputHistogram); - } } diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 8ed01df20f0b..104a20883afd 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -53,8 +53,8 @@ evaluationDependsOn(":sdks:java:container:java11") ext.dataflowLegacyEnvironmentMajorVersion = '8' ext.dataflowFnapiEnvironmentMajorVersion = '8' -ext.dataflowLegacyContainerVersion = 'beam-master-20240930' -ext.dataflowFnapiContainerVersion = 'beam-master-20240930' +ext.dataflowLegacyContainerVersion = 'beam-master-20250128' +ext.dataflowFnapiContainerVersion = 'beam-master-20250128' ext.dataflowContainerBaseRepository = 'gcr.io/cloud-dataflow/v1beta3' processResources { @@ -639,11 +639,12 @@ task googleCloudPlatformLegacyWorkerIntegrationTest(type: Test, dependsOn: copyG useJUnit { excludeCategories "org.apache.beam.sdk.testing.UsesKms" filter { - // Only needs to run on direct runner + // Error handling tests, only needs to run on direct runner excludeTestsMatching 'org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteIT.testE2EBigtableWriteWithEmptyMutationFailures' excludeTestsMatching 'org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteIT.testE2EBigtableWriteWithEmptyRowFailures' excludeTestsMatching 'org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteIT.testE2EBigtableWriteWithInvalidTimestampFailures' excludeTestsMatching 'org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteIT.testE2EBigtableWriteWithOversizedQualifierFailures' + excludeTestsMatching 'org.apache.beam.sdk.io.gcp.spanner.SpannerReadIT.testReadFailsBadSession' } } } @@ -697,11 +698,12 @@ task googleCloudPlatformRunnerV2IntegrationTest(type: Test) { testClassesDirs = files(project(":sdks:java:io:google-cloud-platform").sourceSets.test.output.classesDirs) useJUnit { filter { - // Only needs to run on direct runner + // Error handling tests, only needs to run on direct runner excludeTestsMatching 'org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteIT.testE2EBigtableWriteWithEmptyMutationFailures' excludeTestsMatching 'org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteIT.testE2EBigtableWriteWithEmptyRowFailures' excludeTestsMatching 'org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteIT.testE2EBigtableWriteWithInvalidTimestampFailures' excludeTestsMatching 'org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteIT.testE2EBigtableWriteWithOversizedQualifierFailures' + excludeTestsMatching 'org.apache.beam.sdk.io.gcp.spanner.SpannerReadIT.testReadFailsBadSession' } } } diff --git a/sdks/go.mod b/sdks/go.mod index 0df32846943b..1d7ee337b5da 100644 --- a/sdks/go.mod +++ b/sdks/go.mod @@ -23,19 +23,19 @@ module github.com/apache/beam/sdks/v2 go 1.22.10 require ( - cloud.google.com/go/bigquery v1.65.0 + cloud.google.com/go/bigquery v1.66.0 cloud.google.com/go/bigtable v1.35.0 cloud.google.com/go/datastore v1.20.0 cloud.google.com/go/profiler v0.4.2 cloud.google.com/go/pubsub v1.45.3 cloud.google.com/go/spanner v1.73.0 cloud.google.com/go/storage v1.50.0 - github.com/aws/aws-sdk-go-v2 v1.33.0 - github.com/aws/aws-sdk-go-v2/config v1.29.1 - github.com/aws/aws-sdk-go-v2/credentials v1.17.54 - github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.52 - github.com/aws/aws-sdk-go-v2/service/s3 v1.73.2 - github.com/aws/smithy-go v1.22.1 + github.com/aws/aws-sdk-go-v2 v1.34.0 + github.com/aws/aws-sdk-go-v2/config v1.29.2 + github.com/aws/aws-sdk-go-v2/credentials v1.17.55 + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.54 + github.com/aws/aws-sdk-go-v2/service/s3 v1.74.1 + github.com/aws/smithy-go v1.22.2 github.com/docker/go-connections v0.5.0 github.com/dustin/go-humanize v1.0.1 github.com/go-sql-driver/mysql v1.8.1 @@ -58,7 +58,7 @@ require ( golang.org/x/sync v0.10.0 golang.org/x/sys v0.29.0 golang.org/x/text v0.21.0 - google.golang.org/api v0.217.0 + google.golang.org/api v0.218.0 google.golang.org/genproto v0.0.0-20250115164207-1a7da9e5054f google.golang.org/grpc v1.69.4 google.golang.org/protobuf v1.36.4 @@ -136,19 +136,19 @@ require ( github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect github.com/apache/thrift v0.21.0 // indirect github.com/aws/aws-sdk-go v1.55.5 // indirect - github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 // indirect - github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.24 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.28 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.28 // indirect - github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect - github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.28 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.5.2 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.9 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.9 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.24.11 // indirect - github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.10 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.33.9 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.8 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.25 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.29 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.5.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.10 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.10 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.24.12 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.11 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.33.10 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cncf/xds/go v0.0.0-20241223141626-cff3c89139a3 // indirect diff --git a/sdks/go.sum b/sdks/go.sum index a281d77ef766..b7e3a1f97f83 100644 --- a/sdks/go.sum +++ b/sdks/go.sum @@ -135,8 +135,8 @@ cloud.google.com/go/bigquery v1.47.0/go.mod h1:sA9XOgy0A8vQK9+MWhEQTY6Tix87M/Zur cloud.google.com/go/bigquery v1.48.0/go.mod h1:QAwSz+ipNgfL5jxiaK7weyOhzdoAy1zFm0Nf1fysJac= cloud.google.com/go/bigquery v1.49.0/go.mod h1:Sv8hMmTFFYBlt/ftw2uN6dFdQPzBlREY9yBh7Oy7/4Q= cloud.google.com/go/bigquery v1.50.0/go.mod h1:YrleYEh2pSEbgTBZYMJ5SuSr0ML3ypjRB1zgf7pvQLU= -cloud.google.com/go/bigquery v1.65.0 h1:ZZ1EOJMHTYf6R9lhxIXZJic1qBD4/x9loBIS+82moUs= -cloud.google.com/go/bigquery v1.65.0/go.mod h1:9WXejQ9s5YkTW4ryDYzKXBooL78u5+akWGXgJqQkY6A= +cloud.google.com/go/bigquery v1.66.0 h1:cDM3xEUUTf6RDepFEvNZokCysGFYoivHHTIZOWXbV2E= +cloud.google.com/go/bigquery v1.66.0/go.mod h1:Cm1hMRzZ8teV4Nn8KikgP8bT9jd54ivP8fvXWZREmG4= cloud.google.com/go/bigtable v1.35.0 h1:UEacPwaejN2mNbz67i1Iy3G812rxtgcs6ePj1TAg7dw= cloud.google.com/go/bigtable v1.35.0/go.mod h1:EabtwwmTcOJFXp+oMZAT/jZkyDIjNwrv53TrS4DGrrM= cloud.google.com/go/billing v1.4.0/go.mod h1:g9IdKBEFlItS8bTtlrZdVLWSSdSyFUZKXNS02zKMOZY= @@ -745,83 +745,83 @@ github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= github.com/aws/aws-sdk-go-v2 v1.16.2/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU= github.com/aws/aws-sdk-go-v2 v1.23.0/go.mod h1:i1XDttT4rnf6vxc9AuskLc6s7XBee8rlLilKlc03uAA= -github.com/aws/aws-sdk-go-v2 v1.33.0 h1:Evgm4DI9imD81V0WwD+TN4DCwjUMdc94TrduMLbgZJs= -github.com/aws/aws-sdk-go-v2 v1.33.0/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U= +github.com/aws/aws-sdk-go-v2 v1.34.0 h1:9iyL+cjifckRGEVpRKZP3eIxVlL06Qk1Tk13vreaVQU= +github.com/aws/aws-sdk-go-v2 v1.34.0/go.mod h1:JgstGg0JjWU1KpVJjD5H0y0yyAIpSdKEq556EI6yOOM= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1/go.mod h1:n8Bs1ElDD2wJ9kCRTczA83gYbBmjSwZp3umc6zF4EeM= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.1/go.mod h1:t8PYl/6LzdAqsU4/9tz28V/kU+asFePvpOMkdul0gEQ= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 h1:lL7IfaFzngfx0ZwUGOZdsFFnQ5uLvR0hWqqhyE7Q9M8= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7/go.mod h1:QraP0UcVlQJsmHfioCrveWOC1nbiWUl3ej08h4mXWoc= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.8 h1:zAxi9p3wsZMIaVCdoiQp2uZ9k1LsZvmAnoTBeZPXom0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.8/go.mod h1:3XkePX5dSaxveLAYY7nsbsZZrKxCyEuE5pM4ziFxyGg= github.com/aws/aws-sdk-go-v2/config v1.15.3/go.mod h1:9YL3v07Xc/ohTsxFXzan9ZpFpdTOFl4X65BAKYaz8jg= github.com/aws/aws-sdk-go-v2/config v1.25.3/go.mod h1:tAByZy03nH5jcq0vZmkcVoo6tRzRHEwSFx3QW4NmDw8= -github.com/aws/aws-sdk-go-v2/config v1.29.1 h1:JZhGawAyZ/EuJeBtbQYnaoftczcb2drR2Iq36Wgz4sQ= -github.com/aws/aws-sdk-go-v2/config v1.29.1/go.mod h1:7bR2YD5euaxBhzt2y/oDkt3uNRb6tjFp98GlTFueRwk= +github.com/aws/aws-sdk-go-v2/config v1.29.2 h1:JuIxOEPcSKpMB0J+khMjznG9LIhIBdmqNiEcPclnwqc= +github.com/aws/aws-sdk-go-v2/config v1.29.2/go.mod h1:HktTHregOZwNSM/e7WTfVSu9RCX+3eOv+6ij27PtaYs= github.com/aws/aws-sdk-go-v2/credentials v1.11.2/go.mod h1:j8YsY9TXTm31k4eFhspiQicfXPLZ0gYXA50i4gxPE8g= github.com/aws/aws-sdk-go-v2/credentials v1.16.2/go.mod h1:sDdvGhXrSVT5yzBDR7qXz+rhbpiMpUYfF3vJ01QSdrc= -github.com/aws/aws-sdk-go-v2/credentials v1.17.54 h1:4UmqeOqJPvdvASZWrKlhzpRahAulBfyTJQUaYy4+hEI= -github.com/aws/aws-sdk-go-v2/credentials v1.17.54/go.mod h1:RTdfo0P0hbbTxIhmQrOsC/PquBZGabEPnCaxxKRPSnI= +github.com/aws/aws-sdk-go-v2/credentials v1.17.55 h1:CDhKnDEaGkLA5ZszV/qw5uwN5M8rbv9Cl0JRN+PRsaM= +github.com/aws/aws-sdk-go-v2/credentials v1.17.55/go.mod h1:kPD/vj+RB5MREDUky376+zdnjZpR+WgdBBvwrmnlmKE= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.3/go.mod h1:uk1vhHHERfSVCUnqSqz8O48LBYDSC+k6brng09jcMOk= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.4/go.mod h1:t4i+yGHMCcUNIX1x7YVYa6bH/Do7civ5I6cG/6PMfyA= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.24 h1:5grmdTdMsovn9kPZPI23Hhvp0ZyNm5cRO+IZFIYiAfw= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.24/go.mod h1:zqi7TVKTswH3Ozq28PkmBmgzG1tona7mo9G2IJg4Cis= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.25 h1:kU7tmXNaJ07LsyN3BUgGqAmVmQtq0w6duVIHAKfp0/w= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.25/go.mod h1:OiC8+OiqrURb1wrwmr/UbOVLFSWEGxjinj5C299VQdo= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.3/go.mod h1:0dHuD2HZZSiwfJSy1FO5bX1hQ1TxVV1QXXjpn3XUE44= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.14.0/go.mod h1:UcgIwJ9KHquYxs6Q5skC9qXjhYMK+JASDYcXQ4X7JZE= -github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.52 h1:6kI83R98XOnnyzHv9g9KTYXFawMyeQq8NeEERWMAwJk= -github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.52/go.mod h1:Juj7unpf3CIrWpEyJZhRJ6rJl9IYX7Hd8HOlwaZq/LE= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.54 h1:6BWOAho3Cgdy4cmNJ4HWY8VZgqODEU7Gw78XXireNZI= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.54/go.mod h1:n+t/oyYErOV3jf/GxNTVlizSM9RMV1yH7jvcIvld3Do= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.9/go.mod h1:AnVH5pvai0pAF4lXRq0bmhbes1u9R8wTE+g+183bZNM= github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.3/go.mod h1:7sGSz1JCKHWWBHq98m6sMtWQikmYPpxjqOydDemiVoM= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.28 h1:igORFSiH3bfq4lxKFkTSYDhJEUCYo6C8VKiWJjYwQuQ= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.28/go.mod h1:3So8EA/aAYm36L7XIvCVwLa0s5N0P7o2b1oqnx/2R4g= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.29 h1:Ej0Rf3GMv50Qh4G4852j2djtoDb7AzQ7MuQeFHa3D70= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.29/go.mod h1:oeNTC7PwJNoM5AznVr23wxhLnuJv0ZDe5v7w0wqIs9M= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.3/go.mod h1:ssOhaLpRlh88H3UmEcsBoVKq309quMvm3Ds8e9d4eJM= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.3/go.mod h1:ify42Rb7nKeDDPkFjKn7q1bPscVPu/+gmHH8d2c+anU= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.28 h1:1mOW9zAUMhTSrMDssEHS/ajx8JcAj/IcftzcmNlmVLI= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.28/go.mod h1:kGlXVIWDfvt2Ox5zEaNglmq0hXPHgQFNMix33Tw22jA= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.29 h1:6e8a71X+9GfghragVevC5bZqvATtc3mAMgxpSNbgzF0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.29/go.mod h1:c4jkZiQ+BWpNqq7VtrxjwISrLrt/VvPq3XiopkUIolI= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.10/go.mod h1:8DcYQcz0+ZJaSxANlHIsbbi6S+zMwjwdDqwW3r9AzaE= github.com/aws/aws-sdk-go-v2/internal/ini v1.7.1/go.mod h1:6fQQgfuGmw8Al/3M2IgIllycxV7ZW7WCdVSqfBeUiCY= -github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= -github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2 h1:Pg9URiobXy85kgFev3og2CuOZ8JZUBENF+dcgWBaYNk= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.2/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.3/go.mod h1:5yzAuE9i2RkVAttBl8yxZgQr5OCq4D5yDnG7j9x2L0U= -github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.28 h1:7kpeALOUeThs2kEjlAxlADAVfxKmkYAedlpZ3kdoSJ4= -github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.28/go.mod h1:pyaOYEdp1MJWgtXLy6q80r3DhsVdOIOZNB9hdTcJIvI= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.29 h1:g9OUETuxA8i/Www5Cby0R3WSTe7ppFTZXHVLNskNS4w= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.29/go.mod h1:CQk+koLR1QeY1+vm7lqNfFii07DEderKq6T3F1L2pyc= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.1/go.mod h1:GeUru+8VzrTXV/83XyMJ80KpH8xO89VPoUileyNQ+tc= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.1/go.mod h1:l9ymW25HOqymeU2m1gbUQ3rUIsTwKs8gYHXkqDQUhiI= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 h1:iXtILhvDxB6kPvEXgsDhGaZCSC6LQET5ZHSdJozeI0Y= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1/go.mod h1:9nu0fVANtYiAePIBh2/pFUSwtJ402hLnp854CNoDOeE= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2 h1:D4oz8/CzT9bAEYtVhSBmFj2dNOtaHOtMKc2vHBwYizA= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.2/go.mod h1:Za3IHqTQ+yNcRHxu1OFucBh0ACZT4j4VQFF0BqpZcLY= github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.3/go.mod h1:Seb8KNmD6kVTjwRjVEgOT5hPin6sq+v4C2ycJQDwuH8= github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.3/go.mod h1:R+/S1O4TYpcktbVwddeOYg+uwUfLhADP2S/x4QwsCTM= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.5.2 h1:e6um6+DWYQP1XCa+E9YVtG/9v1qk5lyAOelMOVwSyO8= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.5.2/go.mod h1:dIW8puxSbYLSPv/ju0d9A3CpwXdtqvJtYKDMVmPLOWE= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.5.3 h1:EP1ITDgYVPM2dL1bBBntJ7AW5yTjuWGz9XO+CZwpALU= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.5.3/go.mod h1:5lWNWeAgWenJ/BZ/CP9k9DjLbC0pjnM045WjXRPPi14= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.3/go.mod h1:wlY6SVjuwvh3TVRpTqdy4I1JpBFLX4UGeKZdWntaocw= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.3/go.mod h1:Owv1I59vaghv1Ax8zz8ELY8DN7/Y0rGS+WWAmjgi950= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.9 h1:TQmKDyETFGiXVhZfQ/I0cCFziqqX58pi4tKJGYGFSz0= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.9/go.mod h1:HVLPK2iHQBUx7HfZeOQSEu3v2ubZaAY2YPbAm5/WUyY= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.10 h1:hN4yJBGswmFTOVYqmbz1GBs9ZMtQe8SrYxPwrkrlRv8= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.10/go.mod h1:TsxON4fEZXyrKY+D+3d2gSTyJkGORexIYab9PTf56DA= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.3/go.mod h1:Bm/v2IaN6rZ+Op7zX+bOUMdL4fsrYZiD0dsjLhNKwZc= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.3/go.mod h1:KZgs2ny8HsxRIRbDwgvJcHHBZPOzQr/+NtGwnP+w2ec= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.9 h1:2aInXbh02XsbO0KobPGMNXyv2QP73VDKsWPNJARj/+4= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.9/go.mod h1:dgXS1i+HgWnYkPXqNoPIPKeUsUUYHaUbThC90aDnNiE= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.10 h1:fXoWC2gi7tdJYNTPnnlSGzEVwewUchOi8xVq/dkg8Qs= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.10/go.mod h1:cvzBApD5dVazHU8C2rbBQzzzsKc8m5+wNJ9mCRZLKPc= github.com/aws/aws-sdk-go-v2/service/kms v1.16.3/go.mod h1:QuiHPBqlOFCi4LqdSskYYAWpQlx3PKmohy+rE2F+o5g= github.com/aws/aws-sdk-go-v2/service/s3 v1.26.3/go.mod h1:g1qvDuRsJY+XghsV6zg00Z4KJ7DtFFCx8fJD2a491Ak= github.com/aws/aws-sdk-go-v2/service/s3 v1.43.0/go.mod h1:NXRKkiRF+erX2hnybnVU660cYT5/KChRD4iUgJ97cI8= -github.com/aws/aws-sdk-go-v2/service/s3 v1.73.2 h1:F3h8VYq9ZLBXYurmwrT8W0SPhgCcU0q+0WZJfT1dFt0= -github.com/aws/aws-sdk-go-v2/service/s3 v1.73.2/go.mod h1:jGJ/v7FIi7Ys9t54tmEFnrxuaWeJLpwNgKp2DXAVhOU= +github.com/aws/aws-sdk-go-v2/service/s3 v1.74.1 h1:9LawY3cDJ3HE+v2GMd5SOkNLDwgN4K7TsCjyVBYu/L4= +github.com/aws/aws-sdk-go-v2/service/s3 v1.74.1/go.mod h1:hHnELVnIHltd8EOF3YzahVX6F6y2C6dNqpRj1IMkS5I= github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.15.4/go.mod h1:PJc8s+lxyU8rrre0/4a0pn2wgwiDvOEzoOjcJUBr67o= github.com/aws/aws-sdk-go-v2/service/sns v1.17.4/go.mod h1:kElt+uCcXxcqFyc+bQqZPFD9DME/eC6oHBXvFzQ9Bcw= github.com/aws/aws-sdk-go-v2/service/sqs v1.18.3/go.mod h1:skmQo0UPvsjsuYYSYMVmrPc1HWCbHUJyrCEp+ZaLzqM= github.com/aws/aws-sdk-go-v2/service/ssm v1.24.1/go.mod h1:NR/xoKjdbRJ+qx0pMR4mI+N/H1I1ynHwXnO6FowXJc0= github.com/aws/aws-sdk-go-v2/service/sso v1.11.3/go.mod h1:7UQ/e69kU7LDPtY40OyoHYgRmgfGM4mgsLYtcObdveU= github.com/aws/aws-sdk-go-v2/service/sso v1.17.2/go.mod h1:/pE21vno3q1h4bbhUOEi+6Zu/aT26UK2WKkDXd+TssQ= -github.com/aws/aws-sdk-go-v2/service/sso v1.24.11 h1:kuIyu4fTT38Kj7YCC7ouNbVZSSpqkZ+LzIfhCr6Dg+I= -github.com/aws/aws-sdk-go-v2/service/sso v1.24.11/go.mod h1:Ro744S4fKiCCuZECXgOi760TiYylUM8ZBf6OGiZzJtY= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.12 h1:kznaW4f81mNMlREkU9w3jUuJvU5g/KsqDV43ab7Rp6s= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.12/go.mod h1:bZy9r8e0/s0P7BSDHgMLXK2KvdyRRBIQ2blKlvLt0IU= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.20.0/go.mod h1:dWqm5G767qwKPuayKfzm4rjzFmVjiBFbOJrpSPnAMDs= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.10 h1:l+dgv/64iVlQ3WsBbnn+JSbkj01jIi+SM0wYsj3y/hY= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.10/go.mod h1:Fzsj6lZEb8AkTE5S68OhcbBqeWPsR8RnGuKPr8Todl8= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.11 h1:mUwIpAvILeKFnRx4h1dEgGEFGuV8KJ3pEScZWVFYuZA= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.11/go.mod h1:JDJtD+b8HNVv71axz8+S5492KM8wTzHRFpMKQbPlYxw= github.com/aws/aws-sdk-go-v2/service/sts v1.16.3/go.mod h1:bfBj0iVmsUyUg4weDB4NxktD9rDGeKSVWnjTnwbx9b8= github.com/aws/aws-sdk-go-v2/service/sts v1.25.3/go.mod h1:4EqRHDCKP78hq3zOnmFXu5k0j4bXbRFfCh/zQ6KnEfQ= -github.com/aws/aws-sdk-go-v2/service/sts v1.33.9 h1:BRVDbewN6VZcwr+FBOszDKvYeXY1kJ+GGMCcpghlw0U= -github.com/aws/aws-sdk-go-v2/service/sts v1.33.9/go.mod h1:f6vjfZER1M17Fokn0IzssOTMT2N8ZSq+7jnNF0tArvw= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.10 h1:g9d+TOsu3ac7SgmY2dUf1qMgu/uJVTlQ4VCbH6hRxSw= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.10/go.mod h1:WZfNmntu92HO44MVZAubQaz3qCuIdeOdog2sADfU6hU= github.com/aws/smithy-go v1.11.2/go.mod h1:3xHYmszWVx2c0kIwQeEVf9uSm4fYZt67FBJnwub1bgM= github.com/aws/smithy-go v1.17.0/go.mod h1:NukqUGpCZIILqqiV0NIjeFh24kd/FAa4beRb6nbIUPE= -github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro= -github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= +github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ= +github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/bobg/gcsobj v0.1.2/go.mod h1:vS49EQ1A1Ib8FgrL58C8xXYZyOCR2TgzAdopy6/ipa8= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= @@ -2027,8 +2027,8 @@ google.golang.org/api v0.108.0/go.mod h1:2Ts0XTHNVWxypznxWOYUeI4g3WdP9Pk2Qk58+a/ google.golang.org/api v0.110.0/go.mod h1:7FC4Vvx1Mooxh8C5HWjzZHcavuS2f6pmJpZx60ca7iI= google.golang.org/api v0.111.0/go.mod h1:qtFHvU9mhgTJegR31csQ+rwxyUTHOKFqCKWp1J0fdw0= google.golang.org/api v0.114.0/go.mod h1:ifYI2ZsFK6/uGddGfAD5BMxlnkBqCmqHSDUVi45N5Yg= -google.golang.org/api v0.217.0 h1:GYrUtD289o4zl1AhiTZL0jvQGa2RDLyC+kX1N/lfGOU= -google.golang.org/api v0.217.0/go.mod h1:qMc2E8cBAbQlRypBTBWHklNJlaZZJBwDv81B1Iu8oSI= +google.golang.org/api v0.218.0 h1:x6JCjEWeZ9PFCRe9z0FBrNwj7pB7DOAqT35N+IPnAUA= +google.golang.org/api v0.218.0/go.mod h1:5VGHBAkxrA/8EFjLVEYmMUJ8/8+gWWQ3s4cFH0FxG2M= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= diff --git a/sdks/go/pkg/beam/transforms/sql/sql_test.go b/sdks/go/pkg/beam/transforms/sql/sql_test.go index 58d801f45f0b..851495015fe8 100644 --- a/sdks/go/pkg/beam/transforms/sql/sql_test.go +++ b/sdks/go/pkg/beam/transforms/sql/sql_test.go @@ -16,10 +16,14 @@ package sql import ( - "github.com/apache/beam/sdks/v2/go/pkg/beam" - "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/sql/sqlx" "reflect" "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/sql/sqlx" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" ) func TestOptions_Add(t *testing.T) { @@ -94,3 +98,152 @@ func TestExpansionAddr(t *testing.T) { t.Errorf("The function that ExpansionAddr(%v) returned did not work correctly. For the expansionAddr field in options, got %v, want %v", test.addr, o.expansionAddr, test.addr) } } + +// TestOutputType tests the OutputType option for setting the output type +// in an SQL transformation in Beam. It verifies both cases: when +// components are provided and when they are not. The test checks if +// the 'outType' field in the options is set correctly based on the +// output type and components. +func TestOutputType(t *testing.T) { + testCases := []struct { + name string + typ reflect.Type + components []typex.FullType + wantNil bool + }{ + { + name: "output_type_without_components", + typ: reflect.TypeOf(int64(0)), + }, + { + name: "output_type_with_components", + typ: reflect.TypeOf(int64(0)), + components: []typex.FullType{typex.New(reflect.TypeOf(""))}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + o := &options{} + + var opt Option + if len(tc.components) > 0 { + opt = OutputType(tc.typ, tc.components...) + } else { + opt = OutputType(tc.typ) + } + + opt(o) + + var expected typex.FullType + if len(tc.components) > 0 { + expected = typex.New(tc.typ, tc.components...) + } else { + expected = typex.New(tc.typ) + } + + opts := cmp.Options{ + cmp.Comparer(func(x, y typex.FullType) bool { + // Compare only the type and components + return x.Type() == y.Type() + }), + } + if d := cmp.Diff(expected, o.outType, opts); d != "" { + t.Errorf("OutputType() failed: (-want, +got)\n%s", d) + } + }) + } +} + +// TestTransform tests the behavior of the Transform function +// in the context of SQL transformations. It checks that the function +// panics when an output type is missing. +func TestTransform_MissingOutputType(t *testing.T) { + p := beam.NewPipeline() + s := p.Root() + col := beam.Create(s, 1, 2, 3) + + defer func() { + r := recover() + if r == nil { + t.Error("Transform() with missing output type should panic") + } + if msg, ok := r.(string); !ok || msg != "output type must be specified for sql.Transform" { + t.Errorf("Transform() unexpected panic message: %v", r) + } + }() + + Transform(s, "SELECT value FROM test", Input("test", col)) +} + +// TestMultipleOptions tests applying multiple options at once +// and verifying that they are all correctly applied to the options object. +func TestMultipleOptions(t *testing.T) { + testCases := []struct { + name string + inputName string + dialect string + expansionAddr string + typ reflect.Type + customOpt sqlx.Option + }{ + { + name: "all_options", + inputName: "test", + dialect: "zetasql", + expansionAddr: "localhost:8080", + typ: reflect.TypeOf(int64(0)), + customOpt: sqlx.Option{Urn: "test"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + p := beam.NewPipeline() + s := p.Root() + col := beam.Create(s, 1, 2, 3) + + o := &options{ + inputs: make(map[string]beam.PCollection), + } + + opts := []Option{ + Input(tc.inputName, col), + Dialect(tc.dialect), + ExpansionAddr(tc.expansionAddr), + OutputType(tc.typ), + } + + for _, opt := range opts { + opt(o) + } + o.Add(tc.customOpt) + + // Construct the expected options struct + expected := &options{ + inputs: map[string]beam.PCollection{ + tc.inputName: col, + }, + dialect: tc.dialect, + expansionAddr: tc.expansionAddr, + outType: typex.New(tc.typ), + customs: []sqlx.Option{tc.customOpt}, + } + + // Define a custom comparer for typex.FullType + fullTypeComparer := cmp.Comparer(func(x, y typex.FullType) bool { + return x.Type() == y.Type() // Compare only the underlying reflect.Type + }) + + if d := cmp.Diff( + expected, + o, + cmp.AllowUnexported(options{}), + cmpopts.IgnoreUnexported(beam.PCollection{}), + fullTypeComparer, // Use the custom comparer for typex.FullType + ); d != "" { + t.Errorf("Options mismatch: (-want, +got)\n%s", d) + } + }) + } +} diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 527e13949ecf..57e47cef6502 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -75,7 +75,6 @@ dependencies { permitUnusedDeclared library.java.antlr permitUsedUndeclared library.java.antlr_runtime // Required to load constants from the model, e.g. max timestamp for global window - provided library.java.google_api_services_dataflow shadow project(path: ":model:pipeline", configuration: "shadow") shadow project(path: ":model:fn-execution", configuration: "shadow") shadow project(path: ":model:job-management", configuration: "shadow") diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java index 6bcdea0c0ab6..0a3650ca133b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java @@ -198,7 +198,8 @@ public static void verifyDeterministic(Coder target, String message, Iterable } } - public static long getEncodedElementByteSizeUsingCoder(Coder target, T value) throws Exception { + public static long getEncodedElementByteSizeUsingCoder(Coder target, T value) + throws Exception { return target.getEncodedElementByteSize(value); } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java index c1ac4bcfba23..65ccda06be65 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.util; -import com.google.api.services.dataflow.model.DataflowHistogramValue; import com.google.auto.value.AutoValue; import com.google.auto.value.extension.memoized.Memoized; import java.io.Serializable; @@ -25,8 +24,6 @@ import java.util.Arrays; import java.util.Objects; import javax.annotation.concurrent.GuardedBy; -import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.DoubleMath; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.IntMath; import org.checkerframework.checker.nullness.qual.Nullable; @@ -77,41 +74,6 @@ public HistogramData(BucketType bucketType) { this.sumOfSquaredDeviations = 0; } - /** - * Create a histogram from DataflowHistogramValue proto. - * - * @param histogramProto DataflowHistogramValue proto used to populate stats for the histogram. - */ - public HistogramData(DataflowHistogramValue histogramProto) { - int numBuckets; - if (histogramProto.getBucketOptions().getLinear() != null) { - double start = histogramProto.getBucketOptions().getLinear().getStart(); - double width = histogramProto.getBucketOptions().getLinear().getWidth(); - numBuckets = histogramProto.getBucketOptions().getLinear().getNumberOfBuckets(); - this.bucketType = LinearBuckets.of(start, width, numBuckets); - this.buckets = new long[bucketType.getNumBuckets()]; - - int idx = 0; - for (long val : histogramProto.getBucketCounts()) { - this.buckets[idx] = val; - this.numBoundedBucketRecords += val; - idx++; - } - } else { - // Assume it's a exponential histogram if its not linear - int scale = histogramProto.getBucketOptions().getExponential().getScale(); - numBuckets = histogramProto.getBucketOptions().getExponential().getNumberOfBuckets(); - this.bucketType = ExponentialBuckets.of(scale, numBuckets); - this.buckets = new long[bucketType.getNumBuckets()]; - int idx = 0; - for (long val : histogramProto.getBucketCounts()) { - this.buckets[idx] = val; - this.numBoundedBucketRecords += val; - idx++; - } - } - } - public BucketType getBucketType() { return this.bucketType; } @@ -331,10 +293,6 @@ public synchronized long getTopBucketCount() { return numTopRecords; } - public synchronized long[] getBucketCount() { - return buckets; - } - public synchronized double getTopBucketMean() { return numTopRecords == 0 ? 0 : topRecordsSum / numTopRecords; } @@ -615,42 +573,6 @@ public double getRangeTo() { // Note: equals() and hashCode() are implemented by the AutoValue. } - /** Used for testing unsupported Bucket formats. */ - @AutoValue - @Internal - @VisibleForTesting - public abstract static class UnsupportedBuckets implements BucketType { - - public static UnsupportedBuckets of() { - return new AutoValue_HistogramData_UnsupportedBuckets(0); - } - - @Override - public int getBucketIndex(double value) { - return 0; - } - - @Override - public double getBucketSize(int index) { - return 0; - } - - @Override - public double getAccumulatedBucketSize(int index) { - return 0; - } - - @Override - public double getRangeFrom() { - return 0; - } - - @Override - public double getRangeTo() { - return 0; - } - } - @Override public synchronized boolean equals(@Nullable Object object) { if (object instanceof HistogramData) { diff --git a/sdks/java/harness/build.gradle b/sdks/java/harness/build.gradle index 53a3359bfdc4..b213a716dcf9 100644 --- a/sdks/java/harness/build.gradle +++ b/sdks/java/harness/build.gradle @@ -30,7 +30,6 @@ dependencies { provided project(path: ":model:pipeline", configuration: "shadow") provided project(path: ":sdks:java:core", configuration: "shadow") provided project(path: ":sdks:java:transform-service:launcher") - provided library.java.google_api_services_dataflow provided library.java.avro provided library.java.jackson_databind provided library.java.joda_time @@ -80,5 +79,4 @@ dependencies { shadowTest project(path: ":sdks:java:core", configuration: "shadowTest") shadowTestRuntimeClasspath library.java.slf4j_jdk14 permitUnusedDeclared library.java.avro - permitUnusedDeclared library.java.google_api_services_dataflow } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java index 856c29b0c535..d55501d3e583 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java @@ -44,15 +44,16 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Functions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -import org.joda.time.Days; -import org.joda.time.Instant; -import org.joda.time.ReadableInstant; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Bytes; /** * Utility methods for converting Avro {@link GenericRecord} objects to dynamic protocol message, * for use with the Storage write API. */ public class AvroGenericRecordToStorageApiProto { + + private static final org.joda.time.LocalDate EPOCH_DATE = new org.joda.time.LocalDate(1970, 1, 1); + static final Map PRIMITIVE_TYPES = ImmutableMap.builder() .put(Schema.Type.INT, TableFieldSchema.Type.INT64) @@ -67,14 +68,37 @@ public class AvroGenericRecordToStorageApiProto { .build(); // A map of supported logical types to the protobuf field type. - static final Map LOGICAL_TYPES = - ImmutableMap.builder() - .put(LogicalTypes.date().getName(), TableFieldSchema.Type.DATE) - .put(LogicalTypes.decimal(1).getName(), TableFieldSchema.Type.BIGNUMERIC) - .put(LogicalTypes.timestampMicros().getName(), TableFieldSchema.Type.TIMESTAMP) - .put(LogicalTypes.timestampMillis().getName(), TableFieldSchema.Type.TIMESTAMP) - .put(LogicalTypes.uuid().getName(), TableFieldSchema.Type.STRING) - .build(); + static Optional logicalTypes(LogicalType logicalType) { + switch (logicalType.getName()) { + case "date": + return Optional.of(TableFieldSchema.Type.DATE); + case "time-micros": + return Optional.of(TableFieldSchema.Type.TIME); + case "time-millis": + return Optional.of(TableFieldSchema.Type.TIME); + case "decimal": + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; + int scale = decimal.getScale(); + int precision = decimal.getPrecision(); + if (scale > 9 || precision - scale > 29) { + return Optional.of(TableFieldSchema.Type.BIGNUMERIC); + } else { + return Optional.of(TableFieldSchema.Type.NUMERIC); + } + case "timestamp-micros": + return Optional.of(TableFieldSchema.Type.TIMESTAMP); + case "timestamp-millis": + return Optional.of(TableFieldSchema.Type.TIMESTAMP); + case "local-timestamp-micros": + return Optional.of(TableFieldSchema.Type.DATETIME); + case "local-timestamp-millis": + return Optional.of(TableFieldSchema.Type.DATETIME); + case "uuid": + return Optional.of(TableFieldSchema.Type.STRING); + default: + return Optional.empty(); + } + } static final Map> PRIMITIVE_ENCODERS = ImmutableMap.>builder() @@ -92,16 +116,15 @@ public class AvroGenericRecordToStorageApiProto { // A map of supported logical types to their encoding functions. static final Map> LOGICAL_TYPE_ENCODERS = ImmutableMap.>builder() - .put(LogicalTypes.date().getName(), (logicalType, value) -> convertDate(value)) - .put( - LogicalTypes.decimal(1).getName(), AvroGenericRecordToStorageApiProto::convertDecimal) - .put( - LogicalTypes.timestampMicros().getName(), - (logicalType, value) -> convertTimestamp(value, true)) - .put( - LogicalTypes.timestampMillis().getName(), - (logicalType, value) -> convertTimestamp(value, false)) - .put(LogicalTypes.uuid().getName(), (logicalType, value) -> convertUUID(value)) + .put("date", (logicalType, value) -> convertDate(value)) + .put("time-micros", (logicalType, value) -> convertTime(value, true)) + .put("time-millis", (logicalType, value) -> convertTime(value, false)) + .put("decimal", AvroGenericRecordToStorageApiProto::convertDecimal) + .put("timestamp-micros", (logicalType, value) -> convertTimestamp(value, true)) + .put("timestamp-millis", (logicalType, value) -> convertTimestamp(value, false)) + .put("local-timestamp-micros", (logicalType, value) -> convertDateTime(value, true)) + .put("local-timestamp-millis", (logicalType, value) -> convertDateTime(value, false)) + .put("uuid", (logicalType, value) -> convertUUID(value)) .build(); static String convertUUID(Object value) { @@ -115,18 +138,33 @@ static String convertUUID(Object value) { } static Long convertTimestamp(Object value, boolean micros) { - if (value instanceof ReadableInstant) { - return ((ReadableInstant) value).getMillis() * (micros ? 1000 : 1); + if (value instanceof org.joda.time.ReadableInstant) { + return ((org.joda.time.ReadableInstant) value).getMillis() * 1_000L; + } else if (value instanceof java.time.Instant) { + java.time.Instant instant = (java.time.Instant) value; + long seconds = instant.getEpochSecond(); + int nanos = instant.getNano(); + + if (seconds < 0 && nanos > 0) { + long ms = Math.multiplyExact(seconds + 1, 1_000_000L); + long adjustment = (nanos / 1_000L) - 1_000_000L; + return Math.addExact(ms, adjustment); + } else { + long ms = Math.multiplyExact(seconds, 1_000_000L); + return Math.addExact(ms, nanos / 1_000L); + } } else { Preconditions.checkArgument( - value instanceof Long, "Expecting a value as Long type (millis)."); - return (Long) value; + value instanceof Long, "Expecting a value as Long type (timestamp)."); + return (micros ? 1 : 1_000L) * ((Long) value); } } static Integer convertDate(Object value) { - if (value instanceof ReadableInstant) { - return Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays(); + if (value instanceof org.joda.time.LocalDate) { + return org.joda.time.Days.daysBetween(EPOCH_DATE, (org.joda.time.LocalDate) value).getDays(); + } else if (value instanceof java.time.LocalDate) { + return (int) ((java.time.LocalDate) value).toEpochDay(); } else { Preconditions.checkArgument( value instanceof Integer, "Expecting a value as Integer type (days)."); @@ -134,15 +172,62 @@ static Integer convertDate(Object value) { } } + static Long convertTime(Object value, boolean micros) { + if (value instanceof org.joda.time.LocalTime) { + return 1_000L * (long) ((org.joda.time.LocalTime) value).getMillisOfDay(); + } else if (value instanceof java.time.LocalTime) { + return java.util.concurrent.TimeUnit.NANOSECONDS.toMicros( + ((java.time.LocalTime) value).toNanoOfDay()); + } else { + if (micros) { + Preconditions.checkArgument( + value instanceof Long, "Expecting a value as Long type (time)."); + return (Long) value; + } else { + Preconditions.checkArgument( + value instanceof Integer, "Expecting a value as Integer type (time)."); + return 1_000L * (Integer) value; + } + } + } + + static Long convertDateTime(Object value, boolean micros) { + if (value instanceof org.joda.time.LocalDateTime) { + // we should never come here as local-timestamp has been added after joda deprecation + // implement nonetheless for consistency + org.joda.time.DateTime dateTime = + ((org.joda.time.LocalDateTime) value).toDateTime(org.joda.time.DateTimeZone.UTC); + return 1_000L * dateTime.getMillis(); + } else if (value instanceof java.time.LocalDateTime) { + java.time.Instant instant = + ((java.time.LocalDateTime) value).toInstant(java.time.ZoneOffset.UTC); + return convertTimestamp(instant, micros); + } else { + Preconditions.checkArgument( + value instanceof Long, "Expecting a value as Long type (local-timestamp)."); + return (micros ? 1 : 1_000L) * ((Long) value); + } + } + static ByteString convertDecimal(LogicalType logicalType, Object value) { - ByteBuffer byteBuffer = (ByteBuffer) value; - BigDecimal bigDecimal = - new Conversions.DecimalConversion() - .fromBytes( - byteBuffer.duplicate(), - Schema.create(Schema.Type.NULL), // dummy schema, not used - logicalType); - return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal); + ByteBuffer byteBuffer; + if (value instanceof BigDecimal) { + // BigDecimalByteStringEncoder does not support parametrized NUMERIC/BIGNUMERIC + byteBuffer = + new Conversions.DecimalConversion() + .toBytes( + (BigDecimal) value, + Schema.create(Schema.Type.NULL), // dummy schema, not used + logicalType); + } else { + Preconditions.checkArgument( + value instanceof ByteBuffer, "Expecting a value as ByteBuffer type (decimal)."); + byteBuffer = (ByteBuffer) value; + } + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.duplicate().get(bytes); + Bytes.reverse(bytes); + return ByteString.copyFrom(bytes); } static ByteString convertBytes(Object value) { @@ -223,7 +308,7 @@ public static DynamicMessage messageFromGenericRecord( return builder.build(); } - private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field) { + private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Schema.Field field) { @Nullable Schema schema = field.schema(); Preconditions.checkNotNull(schema, "Unexpected null schema!"); if (StorageApiCDC.COLUMNS.contains(field.name())) { @@ -292,10 +377,12 @@ private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field) break; default: elementType = TypeWithNullability.create(schema).getType(); + Optional logicalType = + Optional.ofNullable(LogicalTypes.fromSchema(elementType)); @Nullable TableFieldSchema.Type primitiveType = - Optional.ofNullable(LogicalTypes.fromSchema(elementType)) - .map(logicalType -> LOGICAL_TYPES.get(logicalType.getName())) + logicalType + .flatMap(AvroGenericRecordToStorageApiProto::logicalTypes) .orElse(PRIMITIVE_TYPES.get(elementType.getType())); if (primitiveType == null) { throw new RuntimeException("Unsupported type " + elementType.getType()); @@ -303,6 +390,21 @@ private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field) // a scalar will be required by default, if defined as part of union then // caller will set nullability requirements builder = builder.setType(primitiveType); + // parametrized types + if (logicalType.isPresent() && logicalType.get().getName().equals("decimal")) { + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType.get(); + int precision = decimal.getPrecision(); + int scale = decimal.getScale(); + if (!(precision == 38 && scale == 9) // NUMERIC + && !(precision == 77 && scale == 38) // BIGNUMERIC + ) { + // parametrized type + builder = builder.setPrecision(precision); + if (scale != 0) { + builder = builder.setScale(scale); + } + } + } } if (builder.getMode() != TableFieldSchema.Mode.REPEATED) { if (TypeWithNullability.create(schema).isNullable()) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java index c073dec4752d..371b867ffd5e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import com.google.cloud.bigquery.storage.v1.BigDecimalByteStringEncoder; import com.google.protobuf.ByteString; import com.google.protobuf.DescriptorProtos.DescriptorProto; import com.google.protobuf.DescriptorProtos.FieldDescriptorProto; @@ -28,6 +29,7 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import java.math.BigDecimal; +import java.math.RoundingMode; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; @@ -50,8 +52,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Functions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -import org.joda.time.Days; -import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -105,23 +105,32 @@ enum TestEnum { private static final Schema LOGICAL_TYPES_SCHEMA = SchemaBuilder.record("LogicalTypesRecord") .fields() - .name("decimalValue") - .type(LogicalTypes.decimal(1, 1).addToSchema(Schema.create(Schema.Type.BYTES))) + .name("numericValue") + .type(LogicalTypes.decimal(2, 1).addToSchema(Schema.create(Schema.Type.BYTES))) + .noDefault() + .name("bigNumericValue") + .type(LogicalTypes.decimal(77, 38).addToSchema(Schema.create(Schema.Type.BYTES))) .noDefault() .name("dateValue") .type(LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))) .noDefault() + .name("timeMicrosValue") + .type(LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() + .name("timeMillisValue") + .type(LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT))) + .noDefault() .name("timestampMicrosValue") .type(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG))) .noDefault() .name("timestampMillisValue") .type(LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))) .noDefault() - .name("timestampMicrosAsInstantValue") - .type(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG))) + .name("localTimestampMicrosValue") + .type(LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG))) .noDefault() - .name("timestampMillisAsInstantValue") - .type(LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))) + .name("localTimestampMillisValue") + .type(LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG))) .noDefault() .name("uuidValue") .type(LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING))) @@ -213,50 +222,71 @@ enum TestEnum { DescriptorProto.newBuilder() .addField( FieldDescriptorProto.newBuilder() - .setName("decimalvalue") + .setName("numericvalue") .setNumber(1) .setType(Type.TYPE_BYTES) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() - .setName("datevalue") + .setName("bignumericvalue") .setNumber(2) - .setType(Type.TYPE_INT32) + .setType(Type.TYPE_BYTES) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() - .setName("timestampmicrosvalue") + .setName("datevalue") .setNumber(3) - .setType(Type.TYPE_INT64) + .setType(Type.TYPE_INT32) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() - .setName("timestampmillisvalue") + .setName("timemicrosvalue") .setNumber(4) .setType(Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() - .setName("timestampmicrosasinstantvalue") + .setName("timemillisvalue") .setNumber(5) .setType(Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() - .setName("timestampmillisasinstantvalue") + .setName("timestampmicrosvalue") .setNumber(6) .setType(Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() - .setName("uuidvalue") + .setName("timestampmillisvalue") .setNumber(7) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("localtimestampmicrosvalue") + .setNumber(8) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("localtimestampmillisvalue") + .setNumber(9) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("uuidvalue") + .setNumber(10) .setType(Type.TYPE_STRING) .setLabel(Label.LABEL_OPTIONAL) .build()) @@ -306,7 +336,9 @@ enum TestEnum { .endRecord(); private static GenericRecord baseRecord; - private static GenericRecord logicalTypesRecord; + private static GenericRecord rawLogicalTypesRecord; + private static GenericRecord jodaTimeLogicalTypesRecord; + private static GenericRecord javaTimeLogicalTypesRecord; private static Map baseProtoExpectedFields; private static Map logicalTypesProtoExpectedFields; private static GenericRecord nestedRecord; @@ -314,15 +346,15 @@ enum TestEnum { static { try { byte[] md5 = MessageDigest.getInstance("MD5").digest(BYTES); - Instant now = Instant.now(); + baseRecord = new GenericRecordBuilder(BASE_SCHEMA) - .set("bytesValue", BYTES) + .set("bytesValue", ByteBuffer.wrap(BYTES)) .set("byteBufferValue", ByteBuffer.wrap(BYTES)) - .set("intValue", (int) 3) - .set("longValue", (long) 4) - .set("floatValue", (float) 3.14) - .set("doubleValue", (double) 2.68) + .set("intValue", 3) + .set("longValue", 4L) + .set("floatValue", 3.14f) + .set("doubleValue", 2.68) .set("stringValue", "I am a string. Hear me roar.") .set("booleanValue", true) .set("arrayValue", ImmutableList.of("one", "two", "red", "blue")) @@ -334,48 +366,102 @@ enum TestEnum { "fixedValue", new GenericData.Fixed(BASE_SCHEMA.getField("fixedValue").schema(), md5)) .build(); - BigDecimal bd = BigDecimal.valueOf(1.1D); + + BigDecimal numeric = BigDecimal.valueOf(4.2); // Logical type is of precision=2 and scale 1 + ByteString numericBytes = ByteString.copyFrom(new byte[] {42}); + BigDecimal bigNumeric = BigDecimal.valueOf(4.2).setScale(38, RoundingMode.UNNECESSARY); + ByteString bigNumericBytes = + BigDecimalByteStringEncoder.encodeToBigNumericByteString(bigNumeric); UUID uuid = UUID.randomUUID(); - logicalTypesRecord = + + rawLogicalTypesRecord = new GenericRecordBuilder(LOGICAL_TYPES_SCHEMA) .set( - "decimalValue", + "numericValue", new Conversions.DecimalConversion() .toBytes( - bd, - Schema.create(Schema.Type.NULL), - LogicalTypes.decimal(bd.precision(), bd.scale()))) - .set("dateValue", now) - .set("timestampMicrosValue", now.getMillis() * 1000) - .set("timestampMicrosAsInstantValue", now) - .set("timestampMillisValue", now.getMillis()) - .set("timestampMillisAsInstantValue", now) + numeric, + Schema.create(Schema.Type.NULL), // dummy schema, not used, + LogicalTypes.decimal(2, 1))) + .set( + "bigNumericValue", + new Conversions.DecimalConversion() + .toBytes( + bigNumeric, + Schema.create(Schema.Type.NULL), // dummy schema, not used, + LogicalTypes.decimal(77, 38))) + .set("dateValue", 42) + .set("timeMicrosValue", 42_000_000L) + .set("timeMillisValue", 42_000) // expects int + .set("timestampMicrosValue", 42_000_000L) + .set("timestampMillisValue", 42_000L) + .set("localTimestampMicrosValue", 42_000_000L) + .set("localTimestampMillisValue", 42_000L) + .set("uuidValue", uuid.toString()) + .build(); + + jodaTimeLogicalTypesRecord = + new GenericRecordBuilder(LOGICAL_TYPES_SCHEMA) + .set("numericValue", numeric) + .set("bigNumericValue", bigNumeric) + .set("dateValue", new org.joda.time.LocalDate(1970, 1, 1).plusDays(42)) + .set("timeMicrosValue", org.joda.time.LocalTime.fromMillisOfDay(42_000L)) + .set("timeMillisValue", org.joda.time.LocalTime.fromMillisOfDay(42_000L)) + .set("timestampMicrosValue", org.joda.time.Instant.ofEpochSecond(42L)) + .set("timestampMillisValue", org.joda.time.Instant.ofEpochSecond(42L)) + .set( + "localTimestampMicrosValue", + new org.joda.time.LocalDateTime(42_000L, org.joda.time.DateTimeZone.UTC)) + .set( + "localTimestampMillisValue", + new org.joda.time.LocalDateTime(42_000L, org.joda.time.DateTimeZone.UTC)) .set("uuidValue", uuid) .build(); + + javaTimeLogicalTypesRecord = + new GenericRecordBuilder(LOGICAL_TYPES_SCHEMA) + .set("numericValue", numeric) + .set("bigNumericValue", bigNumeric) + .set("dateValue", java.time.LocalDate.ofEpochDay(42L)) + .set("timeMicrosValue", java.time.LocalTime.ofSecondOfDay(42L)) + .set("timeMillisValue", java.time.LocalTime.ofSecondOfDay(42L)) + .set("timestampMicrosValue", java.time.Instant.ofEpochSecond(42L)) + .set("timestampMillisValue", java.time.Instant.ofEpochSecond(42L)) + .set( + "localTimestampMicrosValue", + java.time.LocalDateTime.ofEpochSecond(42L, 0, java.time.ZoneOffset.UTC)) + .set( + "localTimestampMillisValue", + java.time.LocalDateTime.ofEpochSecond(42L, 0, java.time.ZoneOffset.UTC)) + .set("uuidValue", uuid) + .build(); + baseProtoExpectedFields = ImmutableMap.builder() .put("bytesvalue", ByteString.copyFrom(BYTES)) .put("bytebuffervalue", ByteString.copyFrom(BYTES)) - .put("intvalue", (long) 3) - .put("longvalue", (long) 4) - .put("floatvalue", (double) 3.14) - .put("doublevalue", (double) 2.68) + .put("intvalue", 3L) + .put("longvalue", 4L) + .put("floatvalue", 3.14) + .put("doublevalue", 2.68) .put("stringvalue", "I am a string. Hear me roar.") .put("booleanvalue", true) .put("arrayvalue", ImmutableList.of("one", "two", "red", "blue")) .put("enumvalue", TEST_ENUM_STRS[1]) .put("fixedvalue", ByteString.copyFrom(md5)) .build(); + logicalTypesProtoExpectedFields = ImmutableMap.builder() - .put("decimalvalue", BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bd)) - .put( - "datevalue", - Days.daysBetween(Instant.EPOCH.toDateTime(), now.toDateTime()).getDays()) - .put("timestampmicrosvalue", now.getMillis() * 1000) - .put("timestampmicrosasinstantvalue", now.getMillis() * 1000) - .put("timestampmillisvalue", now.getMillis()) - .put("timestampmillisasinstantvalue", now.getMillis()) + .put("numericvalue", numericBytes) + .put("bignumericvalue", bigNumericBytes) + .put("datevalue", 42) + .put("timemicrosvalue", 42_000_000L) + .put("timemillisvalue", 42_000_000L) + .put("timestampmicrosvalue", 42_000_000L) + .put("timestampmillisvalue", 42_000_000L) + .put("localtimestampmicrosvalue", 42_000_000L) + .put("localtimestampmillisvalue", 42_000_000L) .put("uuidvalue", uuid.toString()) .build(); nestedRecord = @@ -534,7 +620,35 @@ public void testCdcFields() throws Exception { } @Test - public void testMessageFromGenericRecordLogicalTypes() throws Exception { + public void testMessageFromGenericRecordRawLogicalTypes() throws Exception { + Descriptors.Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema( + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(LOGICAL_TYPES_SCHEMA), + true, + false); + DynamicMessage msg = + AvroGenericRecordToStorageApiProto.messageFromGenericRecord( + descriptor, rawLogicalTypesRecord, null, -1); + assertEquals(10, msg.getAllFields().size()); + assertBaseRecord(msg, logicalTypesProtoExpectedFields); + } + + @Test + public void testMessageFromGenericRecordJodaTimeLogicalTypes() throws Exception { + Descriptors.Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema( + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(LOGICAL_TYPES_SCHEMA), + true, + false); + DynamicMessage msg = + AvroGenericRecordToStorageApiProto.messageFromGenericRecord( + descriptor, jodaTimeLogicalTypesRecord, null, -1); + assertEquals(10, msg.getAllFields().size()); + assertBaseRecord(msg, logicalTypesProtoExpectedFields); + } + + @Test + public void testMessageFromGenericRecordJavaTimeLogicalTypes() throws Exception { Descriptors.Descriptor descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema( AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(LOGICAL_TYPES_SCHEMA), @@ -542,8 +656,8 @@ public void testMessageFromGenericRecordLogicalTypes() throws Exception { false); DynamicMessage msg = AvroGenericRecordToStorageApiProto.messageFromGenericRecord( - descriptor, logicalTypesRecord, null, -1); - assertEquals(7, msg.getAllFields().size()); + descriptor, javaTimeLogicalTypesRecord, null, -1); + assertEquals(10, msg.getAllFields().size()); assertBaseRecord(msg, logicalTypesProtoExpectedFields); } diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallTest.java index b369a62ae78d..b942e4207aed 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallTest.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallTest.java @@ -358,7 +358,9 @@ private static class CallerExceedsTimeout implements Caller { private final Duration timeout; CallerExceedsTimeout(Duration timeout) { - this.timeout = timeout.plus(Duration.standardSeconds(1L)); + // The timeout testing is flaky so we set a sleep time with a minute padding beyond what + // should trigger the timeout. + this.timeout = timeout.plus(Duration.standardMinutes(1L)); } @Override @@ -397,7 +399,9 @@ private static class SetupExceedsTimeout implements SetupTeardown { private final Duration timeout; private SetupExceedsTimeout(Duration timeout) { - this.timeout = timeout.plus(Duration.standardSeconds(1L)); + // The timeout testing is flaky so we set a sleep time with a minute padding beyond what + // should trigger the timeout. + this.timeout = timeout.plus(Duration.standardMinutes(1L)); } @Override @@ -443,7 +447,9 @@ private static class TeardownExceedsTimeout implements SetupTeardown { private final Duration timeout; private TeardownExceedsTimeout(Duration timeout) { - this.timeout = timeout.plus(Duration.standardSeconds(1L)); + // The timeout testing is flaky so we set a sleep time with a minute padding beyond what + // should trigger the timeout. + this.timeout = timeout.plus(Duration.standardMinutes(1L)); } @Override diff --git a/sdks/python/apache_beam/__init__.py b/sdks/python/apache_beam/__init__.py index af88934b0e71..650b639760dc 100644 --- a/sdks/python/apache_beam/__init__.py +++ b/sdks/python/apache_beam/__init__.py @@ -89,7 +89,7 @@ from apache_beam import metrics from apache_beam import typehints from apache_beam import version -from apache_beam.pipeline import Pipeline +from apache_beam.pipeline import * from apache_beam.transforms import * from apache_beam.pvalue import PCollection from apache_beam.pvalue import Row diff --git a/sdks/python/apache_beam/examples/complete/distribopt.py b/sdks/python/apache_beam/examples/complete/distribopt.py index 89c312fcbf5e..304a89cd100b 100644 --- a/sdks/python/apache_beam/examples/complete/distribopt.py +++ b/sdks/python/apache_beam/examples/complete/distribopt.py @@ -221,13 +221,28 @@ def _optimize_production_parameters(sim): # Run L-BFGS-B optimizer result = minimize(lambda x: np.sum(sim.simulate(x)), x0, bounds=bounds) - return result.x.tolist(), sim.simulate(result.x) + + # Ensure result.x is always a list, regardless of NumPy version + x_values = result.x if isinstance(result.x, list) else result.x.tolist() + + # Ensure simulation output is also properly converted + costs = sim.simulate(result.x) + costs = costs if isinstance(costs, list) else costs.tolist() + + return x_values, costs def process(self, element): mapping_identifier, greenhouse = element[0] crops, quantities = zip(*element[1]) sim = Simulator(quantities) optimum, costs = self._optimize_production_parameters(sim) + + # Ensure NumPy arrays are converted to lists before yielding + if isinstance(optimum, np.ndarray): + optimum = optimum.tolist() + if isinstance(costs, np.ndarray): + costs = costs.tolist() + solution = (mapping_identifier, (greenhouse, optimum)) yield pvalue.TaggedOutput('solution', solution) for crop, cost, quantity in zip(crops, costs, quantities): diff --git a/sdks/python/apache_beam/examples/complete/distribopt_test.py b/sdks/python/apache_beam/examples/complete/distribopt_test.py index b9d507410267..a7b02d6a25d2 100644 --- a/sdks/python/apache_beam/examples/complete/distribopt_test.py +++ b/sdks/python/apache_beam/examples/complete/distribopt_test.py @@ -77,8 +77,11 @@ def test_basics(self): # Only 1 result self.assertEqual(len(lines), 1) + # Handle NumPy string representation before parsing + cleaned_line = lines[0].replace("np.str_('", "'").replace("')", "'") + # parse result line and verify optimum - optimum = make_tuple(lines[0]) + optimum = make_tuple(cleaned_line) self.assertAlmostEqual(optimum['cost'], 454.39597, places=3) self.assertDictEqual(optimum['mapping'], EXPECTED_MAPPING) production = optimum['production'] diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 6209ca1ddae8..5196e5d29d8d 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -54,6 +54,7 @@ import re import shutil import tempfile +import threading import unicodedata import uuid from collections import defaultdict @@ -109,7 +110,7 @@ from apache_beam.runners.runner import PipelineResult from apache_beam.transforms import environments -__all__ = ['Pipeline', 'PTransformOverride'] +__all__ = ['Pipeline', 'transform_annotations'] class Pipeline(HasDisplayData): @@ -226,7 +227,9 @@ def __init__( self.runner = runner # Stack of transforms generated by nested apply() calls. The stack will # contain a root node as an enclosing (parent) node for top transforms. - self.transforms_stack = [AppliedPTransform(None, None, '', None)] + self.transforms_stack = [ + AppliedPTransform(None, None, '', None, None, None) + ] # Set of transform labels (full labels) applied to the pipeline. # If a transform is applied and the full label is already in the set # then the transform will have to be cloned with a new label. @@ -244,6 +247,7 @@ def __init__( self._display_data = display_data or {} self._error_handlers = [] + self._annotations_stack = [{}] def display_data(self): # type: () -> Dict[str, Any] @@ -268,6 +272,24 @@ def _current_transform(self): """Returns the transform currently on the top of the stack.""" return self.transforms_stack[-1] + @contextlib.contextmanager + def transform_annotations(self, **annotations): + """A context manager for attaching annotations to a set of transforms. + + All transforms applied while this context is active will have these + annotations attached. This includes sub-transforms applied within + composite transforms. + """ + self._annotations_stack.append({ + **self._annotations_stack[-1], **encode_annotations(annotations) + }) + yield + self._annotations_stack.pop() + + def _current_annotations(self): + """Returns the set of annotations that should be used on apply.""" + return {**_global_annotations_stack()[-1], **self._annotations_stack[-1]} + def _root_transform(self): # type: () -> AppliedPTransform @@ -316,7 +338,9 @@ def _replace_if_needed(self, original_transform_node): original_transform_node.parent, replacement_transform, original_transform_node.full_label, - original_transform_node.main_inputs) + original_transform_node.main_inputs, + None, + annotations=original_transform_node.annotations) # TODO(https://github.com/apache/beam/issues/21178): Merge rather # than override. @@ -741,7 +765,12 @@ def apply( 'returned %s from %s' % (transform, inputs, pvalueish)) current = AppliedPTransform( - self._current_transform(), transform, full_label, inputs) + self._current_transform(), + transform, + full_label, + inputs, + None, + annotations=self._current_annotations()) self._current_transform().add_part(current) try: @@ -1014,7 +1043,7 @@ def from_runner_api( root_transform_id, = proto.root_transform_ids p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] else: - p.transforms_stack = [AppliedPTransform(None, None, '', None)] + p.transforms_stack = [AppliedPTransform(None, None, '', None, None, None)] # TODO(robertwb): These are only needed to continue construction. Omit? p.applied_labels = { t.unique_name @@ -1124,8 +1153,8 @@ def __init__( transform, # type: Optional[ptransform.PTransform] full_label, # type: str main_inputs, # type: Optional[Mapping[str, Union[pvalue.PBegin, pvalue.PCollection]]] - environment_id=None, # type: Optional[str] - annotations=None, # type: Optional[Dict[str, bytes]] + environment_id, # type: Optional[str] + annotations, # type: Optional[Dict[str, bytes]] ): # type: (...) -> None self.parent = parent @@ -1149,24 +1178,11 @@ def __init__( transform.get_resource_hints()) if transform else { } # type: Dict[str, bytes] - if annotations is None and transform: - - def annotation_to_bytes(key, a: Any) -> bytes: - if isinstance(a, bytes): - return a - elif isinstance(a, str): - return a.encode('ascii') - elif isinstance(a, message.Message): - return a.SerializeToString() - else: - raise TypeError( - 'Unknown annotation type %r (type %s) for %s' % (a, type(a), key)) - + if transform: annotations = { - key: annotation_to_bytes(key, a) - for key, - a in transform.annotations().items() + **(annotations or {}), **encode_annotations(transform.annotations()) } + self.annotations = annotations @property @@ -1478,6 +1494,50 @@ def _merge_outer_resource_hints(self): part._merge_outer_resource_hints() +def encode_annotations(annotations: Optional[Dict[str, Any]]): + """Encodes non-byte annotation values as bytes.""" + if not annotations: + return {} + + def annotation_to_bytes(key, a: Any) -> bytes: + if isinstance(a, bytes): + return a + elif isinstance(a, str): + return a.encode('ascii') + elif isinstance(a, message.Message): + return a.SerializeToString() + else: + raise TypeError( + 'Unknown annotation type %r (type %s) for %s' % (a, type(a), key)) + + return {key: annotation_to_bytes(key, a) for (key, a) in annotations.items()} + + +_global_annotations_stack_data = threading.local() + + +def _global_annotations_stack(): + try: + return _global_annotations_stack_data.stack + except AttributeError: + _global_annotations_stack_data.stack = [{}] + return _global_annotations_stack_data.stack + + +@contextlib.contextmanager +def transform_annotations(**annotations): + """A context manager for attaching annotations to a set of transforms. + + All transforms applied while this context is active will have these + annotations attached. This includes sub-transforms applied within + composite transforms. + """ + cur_stack = _global_annotations_stack() + cur_stack.append({**cur_stack[-1], **encode_annotations(annotations)}) + yield + cur_stack.pop() + + class PTransformOverride(metaclass=abc.ABCMeta): """For internal use only; no backwards-compatibility guarantees. diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 1c11f953c58d..6480b2db3c86 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -1016,6 +1016,51 @@ def annotations(self): transform.annotations['proto'], some_proto.SerializeToString()) self.assertEqual(seen, 2) + def assertHasAnnotation(self, pipeline_proto, transform, key, value): + for transform_proto in pipeline_proto.components.transforms.values(): + if transform_proto.unique_name == transform: + self.assertIn(key, transform_proto.annotations.keys()) + self.assertEqual(transform_proto.annotations[key], value) + break + else: + self.fail( + "Unknown transform: %r not in %s" % ( + transform, + sorted([ + t.unique_name + for t in pipeline_proto.components.transforms.values() + ]))) + + def test_pipeline_context_annotations(self): + p = beam.Pipeline() + with p.transform_annotations(foo='first'): + pcoll = p | beam.Create([1, 2, 3]) | 'First' >> beam.Map(lambda x: x + 1) + with p.transform_annotations(foo='second'): + _ = pcoll | 'Second' >> beam.Map(lambda x: x * 2) + with p.transform_annotations(foo='nested', another='more'): + _ = pcoll | 'Nested' >> beam.Map(lambda x: x * 3) + + proto = p.to_runner_api() + self.assertHasAnnotation(proto, 'First', 'foo', b'first') + self.assertHasAnnotation(proto, 'Second', 'foo', b'second') + self.assertHasAnnotation(proto, 'Nested', 'foo', b'nested') + self.assertHasAnnotation(proto, 'Nested', 'another', b'more') + + def test_beam_context_annotations(self): + p = beam.Pipeline() + with beam.transform_annotations(foo='first'): + pcoll = p | beam.Create([1, 2, 3]) | 'First' >> beam.Map(lambda x: x + 1) + with beam.transform_annotations(foo='second'): + _ = pcoll | 'Second' >> beam.Map(lambda x: x * 2) + with beam.transform_annotations(foo='nested', another='more'): + _ = pcoll | 'Nested' >> beam.Map(lambda x: x * 3) + + proto = p.to_runner_api() + self.assertHasAnnotation(proto, 'First', 'foo', b'first') + self.assertHasAnnotation(proto, 'Second', 'foo', b'second') + self.assertHasAnnotation(proto, 'Nested', 'foo', b'nested') + self.assertHasAnnotation(proto, 'Nested', 'another', b'more') + def test_transform_ids(self): class MyPTransform(beam.PTransform): def expand(self, p): diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index b5568305ce65..87d9a011f1d5 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -272,7 +272,7 @@ def test_group_by_key_input_visitor_with_valid_inputs(self): pcoll3.element_type = typehints.KV[typehints.Any, typehints.Any] for pcoll in [pcoll1, pcoll2, pcoll3]: applied = AppliedPTransform( - None, beam.GroupByKey(), "label", {'pcoll': pcoll}) + None, beam.GroupByKey(), "label", {'pcoll': pcoll}, None, None) applied.outputs[None] = PCollection(None) common.group_by_key_input_visitor().visit_transform(applied) self.assertEqual( @@ -291,7 +291,8 @@ def test_group_by_key_input_visitor_with_invalid_inputs(self): for pcoll in [pcoll1, pcoll2]: with self.assertRaisesRegex(ValueError, err_msg): common.group_by_key_input_visitor().visit_transform( - AppliedPTransform(None, beam.GroupByKey(), "label", {'in': pcoll})) + AppliedPTransform( + None, beam.GroupByKey(), "label", {'in': pcoll}, None, None)) def test_group_by_key_input_visitor_for_non_gbk_transforms(self): p = TestPipeline() @@ -299,7 +300,8 @@ def test_group_by_key_input_visitor_for_non_gbk_transforms(self): for transform in [beam.Flatten(), beam.Map(lambda x: x)]: pcoll.element_type = typehints.Any common.group_by_key_input_visitor().visit_transform( - AppliedPTransform(None, transform, "label", {'in': pcoll})) + AppliedPTransform( + None, transform, "label", {'in': pcoll}, None, None)) self.assertEqual(pcoll.element_type, typehints.Any) def test_flatten_input_with_visitor_with_single_input(self): @@ -319,7 +321,8 @@ def _test_flatten_input_visitor(self, input_type, output_type, num_inputs): output_pcoll = PCollection(p) output_pcoll.element_type = output_type - flatten = AppliedPTransform(None, beam.Flatten(), "label", inputs) + flatten = AppliedPTransform( + None, beam.Flatten(), "label", inputs, None, None) flatten.add_output(output_pcoll, None) DataflowRunner.flatten_input_visitor().visit_transform(flatten) for _ in range(num_inputs): @@ -357,7 +360,8 @@ def test_side_input_visitor(self): z: (x, y, z), beam.pvalue.AsSingleton(pc), beam.pvalue.AsMultiMap(pc)) - applied_transform = AppliedPTransform(None, transform, "label", {'pc': pc}) + applied_transform = AppliedPTransform( + None, transform, "label", {'pc': pc}, None, None) DataflowRunner.side_input_visitor().visit_transform(applied_transform) self.assertEqual(2, len(applied_transform.side_inputs)) self.assertEqual( diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py index 4db9107a7e44..3a5ffc852761 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/names.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py @@ -34,6 +34,6 @@ # Unreleased sdks use container image tag specified below. # Update this tag whenever there is a change that # requires changes to SDK harness container or SDK harness launcher. -BEAM_DEV_SDK_CONTAINER_TAG = 'beam-master-20250122' +BEAM_DEV_SDK_CONTAINER_TAG = 'beam-master-20250128' DATAFLOW_CONTAINER_IMAGE_REPOSITORY = 'gcr.io/cloud-dataflow/v1beta3' diff --git a/sdks/python/apache_beam/typehints/opcodes.py b/sdks/python/apache_beam/typehints/opcodes.py index 7bea621841f6..cd78e397f25c 100644 --- a/sdks/python/apache_beam/typehints/opcodes.py +++ b/sdks/python/apache_beam/typehints/opcodes.py @@ -218,7 +218,7 @@ def binary_slice(state, args): def store_slice(state, args): - """Clears elements off the stack like it was constructing a + """Clears elements off the stack like it was constructing a container, but leaves the container type back at stack[-1] since that's all that is relevant for type checking. """ @@ -344,6 +344,10 @@ def list_to_tuple(state, arg): state.stack.append(Tuple[element_type(base), ...]) +def build_string(state, arg): + state.stack[-arg:] = [str] + + def list_extend(state, arg): tail = state.stack.pop() base = state.stack[-arg] @@ -497,7 +501,7 @@ def load_closure(state, arg): # See https://docs.python.org/3/library/dis.html#opcode-LOAD_CLOSURE if (sys.version_info.major, sys.version_info.minor) >= (3, 11): arg -= len(state.co.co_varnames) - state.stack.append(state.get_closure(arg)) + state.stack.append(state.closure_type(arg)) def load_deref(state, arg): @@ -554,6 +558,21 @@ def build_slice(state, arg): state.stack[-arg:] = [slice] # a slice object +def format_value(state, arg): + if arg & 0x04: + state.stack.pop() + state.stack.pop() + state.stack.append(str) + + +def format_simple(state, arg): + state.stack[-1:][str] + + +def format_with_spec(state, arg): + state.stack[-2:][str] + + def _unpack_lists(state, arg): """Extract inner types of Lists and Tuples. diff --git a/sdks/python/apache_beam/typehints/row_type.py b/sdks/python/apache_beam/typehints/row_type.py index 880a897bbbe8..9c6050461e32 100644 --- a/sdks/python/apache_beam/typehints/row_type.py +++ b/sdks/python/apache_beam/typehints/row_type.py @@ -198,7 +198,10 @@ def __repr__(self): '%s=%s' % (name, repr(t)) for name, t in self._fields) def get_type_for(self, name): - return dict(self._fields)[name] + try: + return dict(self._fields)[name] + except KeyError: + return typehints.Any class GeneratedClassRowTypeConstraint(RowTypeConstraint): diff --git a/sdks/python/apache_beam/typehints/trivial_inference_test.py b/sdks/python/apache_beam/typehints/trivial_inference_test.py index 48ccc8a6a2ed..9cea6d5794d8 100644 --- a/sdks/python/apache_beam/typehints/trivial_inference_test.py +++ b/sdks/python/apache_beam/typehints/trivial_inference_test.py @@ -479,6 +479,15 @@ def testRowAttr(self): lambda row: (row.x, getattr(row, 'y')), [row_type.RowTypeConstraint.from_fields([('x', int), ('y', str)])]) + def testRowMissingAttr(self): + self.assertReturnType( + typehints.Any, + lambda row: getattr(row, '_asdict'), + [row_type.RowTypeConstraint.from_fields([('x', int), ('y', str)])]) + + def testFString(self): + self.assertReturnType(str, lambda x, y: f'{x}: {y:0.2}', [str, float]) + def testPyCallable(self): self.assertReturnType( typehints.Tuple[int, str],