diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 7ab7bcd9a9c6..37dd25bf9029 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 2 + "modification": 3 } diff --git a/.github/workflows/beam_PerformanceTests_Kafka_IO.yml b/.github/workflows/beam_PerformanceTests_Kafka_IO.yml index 1e482f8e7825..16749f0febff 100644 --- a/.github/workflows/beam_PerformanceTests_Kafka_IO.yml +++ b/.github/workflows/beam_PerformanceTests_Kafka_IO.yml @@ -54,7 +54,7 @@ jobs: github.event_name == 'workflow_dispatch' || (github.event_name == 'schedule' && github.repository == 'apache/beam') || github.event.comment.body == 'Run Java KafkaIO Performance Test' - runs-on: [self-hosted, ubuntu-20.04, main] + runs-on: [self-hosted, ubuntu-20.04, highmem] timeout-minutes: 120 name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) strategy: diff --git a/.github/workflows/finalize_release.yml b/.github/workflows/finalize_release.yml index 497b32629ef6..c89dabc70c66 100644 --- a/.github/workflows/finalize_release.yml +++ b/.github/workflows/finalize_release.yml @@ -138,6 +138,7 @@ jobs: env: VERSION_TAG: "v${{ github.event.inputs.RELEASE }}" RC_TAG: "v${{ github.event.inputs.RELEASE }}-RC${{ github.event.inputs.RC }}" + POST_RELEASE_BRANCH: "release-${{ github.event.inputs.RELEASE }}-postrelease" run: | # Ensure local tags are in sync. If there's a mismatch, it will tell you. git fetch --all --tags --prune @@ -152,3 +153,27 @@ jobs: # Tag for repo root. git tag "$VERSION_TAG" "$RC_TAG"^{} -m "Tagging release" --local-user="${{steps.import_gpg.outputs.name}}" git push https://github.com/apache/beam "$VERSION_TAG" + + git checkout -b "$POST_RELEASE_BRANCH" "$VERSION_TAG" + git push https://github.com/apache/beam "$POST_RELEASE_BRANCH" + + update_master: + needs: push_git_tags + runs-on: ubuntu-latest + env: + POST_RELEASE_BRANCH: "release-${{ github.event.inputs.RELEASE }}-postrelease" + steps: + - name: Check out code + uses: actions/checkout@v4 + - name: Set git config + run: | + git config user.name $GITHUB_ACTOR + git config user.email actions@"$RUNNER_NAME".local + - name: Update .asf.yaml to protect new postrelease branch from force push + run: | + sed -i -e "s/master: {}/master: {}\n ${POST_RELEASE_BRANCH}: {}/g" .asf.yaml + - name: Commit and Push to master branch files with Next Version + run: | + git add .asf.yaml + git commit -m "Moving to ${NEXT_VERSION_IN_BASE_BRANCH}-SNAPSHOT on master branch." + git push origin ${MASTER_BRANCH} diff --git a/.github/workflows/republish_released_docker_containers.yml b/.github/workflows/republish_released_docker_containers.yml index 7a44e219f34c..639aef7b629e 100644 --- a/.github/workflows/republish_released_docker_containers.yml +++ b/.github/workflows/republish_released_docker_containers.yml @@ -43,7 +43,7 @@ jobs: - name: Checkout uses: actions/checkout@v4 with: - ref: "v${{ env.release }}-RC${{ env.rc }}" + ref: "release-${{ env.release }}-postrelease" repository: apache/beam - name: Free Disk Space (Ubuntu) uses: jlumbroso/free-disk-space@v1.3.0 diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index 5c4fd674950a..8472f969356c 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -239,7 +239,9 @@ def sickbayTests = [ // Flink errors are not deterministic. Exception may just be // org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task is not running, but in state FAILED // instead of the actual cause. Real cause is visible in the logs. - 'org.apache.beam.sdk.transforms.ParDoTest$LifecycleTests' + 'org.apache.beam.sdk.transforms.ParDoTest$LifecycleTests', + 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState', + // TODO(https://github.com/apache/beam/issues/18198) ] def createValidatesRunnerTask(Map m) { @@ -254,7 +256,7 @@ def createValidatesRunnerTask(Map m) { "--streaming=${config.streaming}", "--useDataStreamForBatch=${config.useDataStreamForBatch}", "--parallelism=1", - ] + ]w if (config.checkpointing) { pipelineOptionsArray.addAll([ "--checkpointingInterval=3000", diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java deleted file mode 100644 index 26f1e60b6773..000000000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/LargeCommitTest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.ValidatesRunner; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.hamcrest.Matchers; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -@RunWith(JUnit4.class) -public class LargeCommitTest { - - @Rule public transient TestPipeline p = TestPipeline.create(); - - @Test - @Category({ValidatesRunner.class}) - public void testLargeCommit() { - // 5 50MB values shuffling to a single key - String value = bigString('a', 50 << 20); - KV kv = KV.of("a", value); - PCollection>> result = - p.apply(Create.of(kv, kv, kv, kv, kv)).apply(GroupByKey.create()); - - PAssert.that(result) - .satisfies( - kvs -> { - assertTrue(kvs.iterator().hasNext()); - KV> outputKV = kvs.iterator().next(); - assertFalse(kvs.iterator().hasNext()); - assertEquals("a", outputKV.getKey()); - assertThat(outputKV.getValue(), Matchers.contains(value, value, value, value, value)); - return null; - }); - p.run(); - } - - private static String bigString(char c, int size) { - char[] buf = new char[size]; - for (int i = 0; i < size; i++) { - buf[i] = c; - } - return new String(buf); - } -} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/OutputObjectAndByteCounter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/OutputObjectAndByteCounter.java index 9aa28b393ece..6c3be8c6f9e9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/OutputObjectAndByteCounter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/OutputObjectAndByteCounter.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.worker.util.common.worker; import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.runners.core.ElementByteSizeObservable; import org.apache.beam.runners.dataflow.worker.counters.Counter; import org.apache.beam.runners.dataflow.worker.counters.CounterBackedElementByteSizeObserver; @@ -25,6 +26,7 @@ import org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterMean; import org.apache.beam.runners.dataflow.worker.counters.CounterName; import org.apache.beam.runners.dataflow.worker.counters.NameContext; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; /** An {@link ElementCounter} that counts output objects, bytes, and mean bytes. */ @@ -32,13 +34,12 @@ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class OutputObjectAndByteCounter implements ElementCounter { + // Might be null, e.g., undeclared outputs will not have an // elementByteSizeObservable. private final ElementByteSizeObservable elementByteSizeObservable; private final CounterFactory counterFactory; - private Random randomGenerator = new Random(); - // Lowest sampling probability: 0.001%. private static final int SAMPLING_TOKEN_UPPER_BOUND = 1000000; private static final int SAMPLING_CUTOFF = 10; @@ -163,12 +164,12 @@ protected boolean sampleElement() { // samplingCutoff / samplingTokenUpperBound. This algorithm may be refined // later. samplingToken = Math.min(samplingToken + 1, samplingTokenUpperBound); - return randomGenerator.nextInt(samplingToken) < SAMPLING_CUTOFF; + return getRandom().nextInt(samplingToken) < SAMPLING_CUTOFF; } - public OutputObjectAndByteCounter setRandom(Random random) { - this.randomGenerator = random; - return this; + @VisibleForTesting + protected Random getRandom() { + return ThreadLocalRandom.current(); } private CounterName getCounterName(String name) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/OutputObjectAndByteCounterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/OutputObjectAndByteCounterTest.java index 67370bdea86b..a2add528ffdd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/OutputObjectAndByteCounterTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/OutputObjectAndByteCounterTest.java @@ -92,13 +92,19 @@ public void testAddingCountersIntoCounterSet() throws Exception { } private OutputObjectAndByteCounter makeCounter(String name, int samplingPeriod, int seed) { - return new OutputObjectAndByteCounter( + OutputObjectAndByteCounter outputObjectAndByteCounter = + new OutputObjectAndByteCounter( new ElementByteSizeObservableCoder<>(StringUtf8Coder.of()), counterSet, - NameContextsForTests.nameContextForTest()) - .setSamplingPeriod(samplingPeriod) - .setRandom(new Random(seed)) - .countBytes(name); + NameContextsForTests.nameContextForTest()) { + private final Random random = new Random(seed); + + @Override + protected Random getRandom() { + return random; + } + }; + return outputObjectAndByteCounter.setSamplingPeriod(samplingPeriod).countBytes(name); } @Test diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 41e12921e6f8..d84205791d48 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -55,6 +55,7 @@ dependencies { implementation "org.apache.iceberg:iceberg-api:$iceberg_version" implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version" implementation "org.apache.iceberg:iceberg-orc:$iceberg_version" + implementation "org.apache.iceberg:iceberg-data:$iceberg_version" implementation library.java.hadoop_common runtimeOnly "org.apache.iceberg:iceberg-gcp:$iceberg_version" diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 78b01b15e62c..bbde6c9e864e 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -50,14 +50,11 @@ import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.InternalRecordWrapper; import org.apache.iceberg.data.Record; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.expressions.Literal; -import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.transforms.Transforms; -import org.apache.iceberg.types.Types; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,12 +103,14 @@ class DestinationState { @VisibleForTesting final Map writerCounts = Maps.newHashMap(); private final Map partitionFieldMap = Maps.newHashMap(); private final List exceptions = Lists.newArrayList(); + private final InternalRecordWrapper wrapper; // wrapper that facilitates partitioning DestinationState(IcebergDestination icebergDestination, Table table) { this.icebergDestination = icebergDestination; this.schema = table.schema(); this.spec = table.spec(); this.routingPartitionKey = new PartitionKey(spec, schema); + this.wrapper = new InternalRecordWrapper(schema.asStruct()); this.table = table; for (PartitionField partitionField : spec.fields()) { partitionFieldMap.put(partitionField.name(), partitionField); @@ -156,7 +155,7 @@ class DestinationState { * can't create a new writer, the {@link Record} is rejected and {@code false} is returned. */ boolean write(Record record) { - routingPartitionKey.partition(getPartitionableRecord(record)); + routingPartitionKey.partition(wrapper.wrap(record)); @Nullable RecordWriter writer = writers.getIfPresent(routingPartitionKey); if (writer == null && openWriters >= maxNumWriters) { @@ -207,30 +206,6 @@ private RecordWriter createWriter(PartitionKey partitionKey) { e); } } - - /** - * Resolves an input {@link Record}'s partition values and returns another {@link Record} that - * can be applied to the destination's {@link PartitionSpec}. - */ - private Record getPartitionableRecord(Record record) { - if (spec.isUnpartitioned()) { - return record; - } - Record output = GenericRecord.create(schema); - for (PartitionField partitionField : spec.fields()) { - Transform transform = partitionField.transform(); - Types.NestedField field = schema.findField(partitionField.sourceId()); - String name = field.name(); - Object value = record.getField(name); - @Nullable Literal literal = Literal.of(value.toString()).to(field.type()); - if (literal == null || transform.isVoid() || transform.isIdentity()) { - output.setField(name, value); - } else { - output.setField(name, literal.value()); - } - } - return output; - } } /** diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index 5168f71fef99..eb10722c263f 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; @@ -27,9 +28,12 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -451,10 +455,27 @@ public void testIdentityPartitioning() throws IOException { .addFloatField("float") .addDoubleField("double") .addStringField("str") + .addLogicalTypeField("date", SqlTypes.DATE) + .addLogicalTypeField("time", SqlTypes.TIME) + .addLogicalTypeField("datetime", SqlTypes.DATETIME) + .addDateTimeField("datetime_tz") .build(); - + String timestamp = "2025-01-21T13:18:20.053"; + LocalDateTime localDateTime = LocalDateTime.parse(timestamp); Row row = - Row.withSchema(primitiveTypeSchema).addValues(true, 1, 1L, 1.23f, 4.56, "str").build(); + Row.withSchema(primitiveTypeSchema) + .addValues( + true, + 1, + 1L, + 1.23f, + 4.56, + "str", + localDateTime.toLocalDate(), + localDateTime.toLocalTime(), + localDateTime, + DateTime.parse(timestamp)) + .build(); org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(primitiveTypeSchema); PartitionSpec spec = @@ -465,6 +486,10 @@ public void testIdentityPartitioning() throws IOException { .identity("float") .identity("double") .identity("str") + .identity("date") + .identity("time") + .identity("datetime") + .identity("datetime_tz") .build(); WindowedValue dest = getWindowedDestination("identity_partitioning", icebergSchema, spec); @@ -479,8 +504,12 @@ public void testIdentityPartitioning() throws IOException { assertEquals(1, dataFile.getRecordCount()); // build this string: bool=true/int=1/long=1/float=1.0/double=1.0/str=str List expectedPartitions = new ArrayList<>(); + List dateTypes = Arrays.asList("date", "time", "datetime", "datetime_tz"); for (Schema.Field field : primitiveTypeSchema.getFields()) { - Object val = row.getValue(field.getName()); + Object val = checkStateNotNull(row.getValue(field.getName())); + if (dateTypes.contains(field.getName())) { + val = URLEncoder.encode(val.toString(), StandardCharsets.UTF_8.toString()); + } expectedPartitions.add(field.getName() + "=" + val); } String expectedPartitionPath = String.join("/", expectedPartitions); diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index cef3bc80d613..adf31dc72b54 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -94,6 +94,7 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -142,6 +143,16 @@ public class KafkaIOIT { private static final Logger LOG = LoggerFactory.getLogger(KafkaIOIT.class); + private static final int RETRIES_CONFIG = 10; + + private static final int REQUEST_TIMEOUT_MS_CONFIG = 600000; + + private static final int MAX_BLOCK_MS_CONFIG = 300000; + + private static final int BUFFER_MEMORY_CONFIG = 100554432; + + private static final int RETRY_BACKOFF_MS_CONFIG = 5000; + private static SyntheticSourceOptions sourceOptions; private static Options options; @@ -938,7 +949,14 @@ private KafkaIO.Write writeToKafka() { return KafkaIO.write() .withBootstrapServers(options.getKafkaBootstrapServerAddresses()) .withKeySerializer(ByteArraySerializer.class) - .withValueSerializer(ByteArraySerializer.class); + .withValueSerializer(ByteArraySerializer.class) + .withProducerConfigUpdates( + ImmutableMap.of( + ProducerConfig.RETRIES_CONFIG, RETRIES_CONFIG, + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, REQUEST_TIMEOUT_MS_CONFIG, + ProducerConfig.MAX_BLOCK_MS_CONFIG, MAX_BLOCK_MS_CONFIG, + ProducerConfig.BUFFER_MEMORY_CONFIG, BUFFER_MEMORY_CONFIG, + ProducerConfig.RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS_CONFIG)); } private KafkaIO.Read readFromBoundedKafka() { diff --git a/sdks/python/test-suites/tox/py39/build.gradle b/sdks/python/test-suites/tox/py39/build.gradle index e9624f8e810e..ea02e9d5b1e8 100644 --- a/sdks/python/test-suites/tox/py39/build.gradle +++ b/sdks/python/test-suites/tox/py39/build.gradle @@ -168,9 +168,7 @@ postCommitPyDep.dependsOn "testPy39transformers-430" toxTask "testPy39embeddingsMLTransform", "py39-embeddings", "${posargs}" test.dependsOn "testPy39embeddingsMLTransform" -// TODO(https://github.com/apache/beam/issues/32965): re-enable this suite for the dep -// postcommit once the sentence-transformers import error is debugged -// postCommitPyDep.dependsOn "testPy39embeddingsMLTransform" +postCommitPyDep.dependsOn "testPy39embeddingsMLTransform" // Part of MLTransform embeddings test suite but requires tensorflow hub, which we need to test on // mutliple versions so keeping this suite separate. diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index ed1f723d6d4d..fe818324c194 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -481,7 +481,7 @@ commands = [testenv:py{39,310}-embeddings] deps = - sentence-transformers==2.2.2 + sentence-transformers==3.3.1 passenv = HF_INFERENCE_TOKEN extras = test,gcp commands = diff --git a/settings.gradle.kts b/settings.gradle.kts index e189e3ccb58b..63e36c655bf4 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -24,7 +24,7 @@ pluginManagement { } plugins { - id("com.gradle.develocity") version "3.18.2" + id("com.gradle.develocity") version "3.19" id("com.gradle.common-custom-user-data-gradle-plugin") version "2.0.2" }