Skip to content

Commit

Permalink
Merge remote-tracking branch 'beam/master' into microopt7
Browse files Browse the repository at this point in the history
  • Loading branch information
arunpandianp committed Jan 29, 2025
2 parents ba6d769 + b5884e8 commit 53eb11b
Show file tree
Hide file tree
Showing 29 changed files with 761 additions and 451 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/cut_release_branch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ on:
RELEASE_VERSION:
description: Beam version of current release (branch being cut)
required: true
default: '2.XX.0'
default: '2.XX'
NEXT_VERSION:
description: Next release version
required: true
default: '2.XX.0'
default: '2.XX'
CREATE_RELEASE_BRANCH:
description: Whether to cut the release branch. You shouldnt skip this unless it has already been completed successfully (yes/no)
required: true
Expand Down
16 changes: 2 additions & 14 deletions contributor-docs/release-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ The following must be manually done or confirmed:
- [ ] The `master` branch has the SNAPSHOT/dev version incremented.
- [ ] The release branch has the SNAPSHOT/dev version to be released.
- [ ] The Dataflow container image should be modified to the version to be released.
- [ ] Due to current limitation in the workflow, you must navigate to https://github.com/apache/beam/actions/workflows/beam_Release_NightlySnapshot.yml and click "Run workflow" and select the branch just created (release-2.xx.0) to build a snapshot.
- [ ] Due to current limitation in the workflow, you must navigate to https://github.com/apache/beam/actions/workflows/beam_Release_NightlySnapshot.yml and click "Run workflow" and select the branch just created (release-2.xx) to build a snapshot.
- [ ] Manually update `CHANGES.md` on `master` by adding a new section for the
next release
([example](https://github.com/apache/beam/commit/96ab1fb3fe07acf7f7dc9d8c829ae36890d1535c)).
Expand Down Expand Up @@ -1398,19 +1398,7 @@ To drive down time to resolution, this can happen in parallel to any discussion
The above document assumes a minor version bump cut off of the master branch. If you want to do a patch release cut off of a previous release branch, use the following steps instead:
- Create a new release branch:
```
git clone https://github.com/apache/beam
cd beam
git fetch origin release-2.XX.0
git checkout release-2.XX.0
git checkout -b release-2.XX.1
git push origin release-2.XX.1
```
- Add a PR to add the new release branch to the set of protected branches in .asf.yml - [example PR](https://github.com/apache/beam/pull/30832)
- Add a PR to bump the Dataflow containers versions - [example PR](https://github.com/apache/beam/pull/30827)
- Add a PR to bump the Dataflow containers versions on the release branch (`release-2.XX`) - [example PR](https://github.com/apache/beam/pull/30827)
- Create PRs to cherry-pick any desired commits to the release branch
- Follow the normal release steps, starting with [release branch stabilization](#stabilize-the-release-branch), to build/vote/validate/finalize the release candidate that are listed above. See Voting and Finalization of a Patch Release to see differences in the voting process.
- Depending on the nature of the issue, certain post-release finalization steps may be skipped. For example, if an issue doesn’t dramatically impact the Beam Playground, consider skipping that step
Expand Down
21 changes: 21 additions & 0 deletions examples/notebooks/beam-ml/run_inference_huggingface.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,27 @@
}
]
},
{
"cell_type": "markdown",
"source": [
"### Try with a different model\n",
"One of the best parts of using RunInference is how easy it is to swap in different models. For example, if we wanted to use a larger model like DeepSeek-R1-Distill-Llama-8B outside of Colab (which has very tight memory constraints and limited GPU access), all we need to change is our ModelHandler:\n",
"\n",
"```\n",
"model_handler = HuggingFacePipelineModelHandler(\n",
" task=PipelineTask.Translation_XX_to_YY,\n",
" model = \"deepseek-ai/DeepSeek-R1-Distill-Llama-8B\",\n",
" load_pipeline_args={'framework': 'pt'},\n",
" inference_args={'max_length': 400}\n",
")\n",
"```\n",
"\n",
"We can then run the exact same pipeline code and Beam will take care of the rest."
],
"metadata": {
"id": "LWNM81ivoZcF"
}
},
{
"cell_type": "markdown",
"source": [
Expand Down
2 changes: 1 addition & 1 deletion release/src/main/groovy/MobileGamingCommands.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class MobileGamingCommands {
SparkRunner: "spark-runner",
FlinkRunner: "flink-runner"]

public static final EXECUTION_TIMEOUT_IN_MINUTES = 40
public static final EXECUTION_TIMEOUT_IN_MINUTES = 60

// Lists used to verify team names generated in the LeaderBoard example.
// This list should be kept sync with COLORS in org.apache.beam.examples.complete.game.injector.Injector.
Expand Down
4 changes: 3 additions & 1 deletion release/src/main/scripts/choose_rc_commit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,11 @@ SCRIPT_DIR=$(dirname $0)

RC_TAG="v${RELEASE}-RC${RC}"

RELEASE_BRANCH="release-$(cut -d '.' -f 1,2 <<< $RELEASE)"

if [[ "$CLONE" == yes ]] ; then
CLONE_DIR=`mktemp -d`
git clone "$GIT_REPO" "$CLONE_DIR" --single-branch --branch "release-$RELEASE" --shallow-exclude master
git clone "$GIT_REPO" "$CLONE_DIR" --single-branch --branch "$RELEASE_BRANCH" --shallow-exclude master
else
echo "Not cloning repo; assuming working dir is the desired repo. To run with a fresh clone, run with --clone."
CLONE_DIR=$PWD
Expand Down
2 changes: 0 additions & 2 deletions runners/core-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ dependencies {
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":model:job-management", configuration: "shadow")
implementation library.java.google_api_services_dataflow
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.joda_time
implementation library.java.vendored_grpc_1_69_0
Expand All @@ -53,6 +52,5 @@ dependencies {
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation library.java.slf4j_api
testImplementation(library.java.google_api_services_dataflow)
testRuntimeOnly library.java.slf4j_simple
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,19 @@
*/
package org.apache.beam.runners.core.metrics;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.services.dataflow.model.Base2Exponent;
import com.google.api.services.dataflow.model.BucketOptions;
import com.google.api.services.dataflow.model.DataflowHistogramValue;
import com.google.api.services.dataflow.model.Linear;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.joda.time.Instant;

// TODO(#33093): Refactor out DataflowHistogramValue to be runner agnostic, and rename to
// remove Dataflow reference.

/** A set of functions used to encode and decode common monitoring info types. */
public class MonitoringInfoEncodings {
private static final Coder<Long> VARINT_CODER = VarLongCoder.of();
Expand Down Expand Up @@ -178,98 +163,4 @@ public static double decodeDoubleCounter(ByteString payload) {
throw new RuntimeException(e);
}
}

/** Encodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
try {
int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets();

DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue();

if (inputHistogram.getBucketType() instanceof HistogramData.LinearBuckets) {
HistogramData.LinearBuckets buckets =
(HistogramData.LinearBuckets) inputHistogram.getBucketType();
Linear linear = new Linear();
linear.setNumberOfBuckets(numberOfBuckets);
linear.setWidth(buckets.getWidth());
linear.setStart(buckets.getStart());
outputHistogram2.setBucketOptions(new BucketOptions().setLinear(linear));
} else if (inputHistogram.getBucketType() instanceof HistogramData.ExponentialBuckets) {
HistogramData.ExponentialBuckets buckets =
(HistogramData.ExponentialBuckets) inputHistogram.getBucketType();
Base2Exponent base2Exp = new Base2Exponent();
base2Exp.setNumberOfBuckets(numberOfBuckets);
base2Exp.setScale(buckets.getScale());
outputHistogram2.setBucketOptions(new BucketOptions().setExponential(base2Exp));
} else {
throw new HistogramParsingException(
"Unable to encode Int64 Histogram, bucket is not recognized");
}

outputHistogram2.setCount(inputHistogram.getTotalCount());

List<Long> bucketCounts = new ArrayList<>();

Arrays.stream(inputHistogram.getBucketCount())
.forEach(
val -> {
bucketCounts.add(val);
});

outputHistogram2.setBucketCounts(bucketCounts);

ObjectMapper objectMapper = new ObjectMapper();
String jsonString = objectMapper.writeValueAsString(outputHistogram2);

return ByteString.copyFromUtf8(jsonString);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

static class HistogramParsingException extends RuntimeException {
public HistogramParsingException(String message) {
super(message);
}
}

/** Decodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
public static HistogramData decodeInt64Histogram(ByteString payload) {
try {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
JsonNode jsonNode = objectMapper.readTree(payload.toStringUtf8()); // parse afterwards
DataflowHistogramValue newHist = new DataflowHistogramValue();
newHist.setCount(jsonNode.get("count").asLong());

List<Long> bucketCounts = new ArrayList<>();
Iterator<JsonNode> itr = jsonNode.get("bucketCounts").iterator();
while (itr.hasNext()) {
Long item = itr.next().asLong();
bucketCounts.add(item);
}
newHist.setBucketCounts(bucketCounts);

if (jsonNode.get("bucketOptions").has("linear")) {
Linear linear = new Linear();
JsonNode linearNode = jsonNode.get("bucketOptions").get("linear");
linear.setNumberOfBuckets(linearNode.get("numberOfBuckets").asInt());
linear.setWidth(linearNode.get("width").asDouble());
linear.setStart(linearNode.get("start").asDouble());
newHist.setBucketOptions(new BucketOptions().setLinear(linear));
} else if (jsonNode.get("bucketOptions").has("exponential")) {
Base2Exponent base2Exp = new Base2Exponent();
JsonNode expNode = jsonNode.get("bucketOptions").get("exponential");
base2Exp.setNumberOfBuckets(expNode.get("numberOfBuckets").asInt());
base2Exp.setScale(expNode.get("scale").asInt());
newHist.setBucketOptions(new BucketOptions().setExponential(base2Exp));
} else {
throw new HistogramParsingException(
"Unable to parse Int64 Histogram, bucket is not recognized");
}
return new HistogramData(newHist);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,30 @@
*/
package org.apache.beam.runners.core.metrics;

import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.HistogramParsingException;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeDoubleCounter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Histogram;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeStringSet;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleCounter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleDistribution;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Histogram;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet;
import static org.junit.Assert.assertEquals;

import java.util.Collections;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link MonitoringInfoEncodings}. */
@RunWith(JUnit4.class)
public class MonitoringInfoEncodingsTest {
@Rule
public ExpectedLogs monitoringInfoCodingsExpectedLogs =
ExpectedLogs.none(MonitoringInfoEncodings.class);

@Rule public ExpectedException thrown = ExpectedException.none();

@Test
public void testInt64DistributionEncoding() {
DistributionData data = DistributionData.create(1L, 2L, 3L, 4L);
Expand Down Expand Up @@ -118,36 +105,4 @@ public void testDoubleCounterEncoding() {
assertEquals(ByteString.copyFrom(new byte[] {0x3f, (byte) 0xf0, 0, 0, 0, 0, 0, 0}), payload);
assertEquals(1.0, decodeDoubleCounter(payload), 0.001);
}

@Test
public void testHistgramInt64EncodingLinearHist() {
HistogramData.BucketType buckets = HistogramData.LinearBuckets.of(0, 5, 5);

HistogramData inputHistogram = new HistogramData(buckets);
inputHistogram.record(5, 10, 15, 20);
ByteString payload = encodeInt64Histogram(inputHistogram);

assertEquals(inputHistogram, decodeInt64Histogram(payload));
}

@Test
public void testHistgramInt64EncodingExpHist() {
HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 10);
HistogramData inputHistogram = new HistogramData(buckets);
inputHistogram.record(2, 4, 8, 16, 32);
ByteString payload = encodeInt64Histogram(inputHistogram);
assertEquals(inputHistogram, decodeInt64Histogram(payload));
}

@Test
public void testHistgramInt64EncodingUnsupportedBucket() {
thrown.expect(HistogramParsingException.class);
thrown.expectMessage("Unable to encode Int64 Histogram, bucket is not recognized");

HistogramData.BucketType buckets = HistogramData.UnsupportedBuckets.of();

HistogramData inputHistogram = new HistogramData(buckets);
inputHistogram.record(2, 4, 8, 16, 32);
encodeInt64Histogram(inputHistogram);
}
}
10 changes: 6 additions & 4 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ evaluationDependsOn(":sdks:java:container:java11")

ext.dataflowLegacyEnvironmentMajorVersion = '8'
ext.dataflowFnapiEnvironmentMajorVersion = '8'
ext.dataflowLegacyContainerVersion = 'beam-master-20240930'
ext.dataflowFnapiContainerVersion = 'beam-master-20240930'
ext.dataflowLegacyContainerVersion = 'beam-master-20250128'
ext.dataflowFnapiContainerVersion = 'beam-master-20250128'
ext.dataflowContainerBaseRepository = 'gcr.io/cloud-dataflow/v1beta3'

processResources {
Expand Down Expand Up @@ -639,11 +639,12 @@ task googleCloudPlatformLegacyWorkerIntegrationTest(type: Test, dependsOn: copyG
useJUnit {
excludeCategories "org.apache.beam.sdk.testing.UsesKms"
filter {
// Only needs to run on direct runner
// Error handling tests, only needs to run on direct runner
excludeTestsMatching 'org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteIT.testE2EBigtableWriteWithEmptyMutationFailures'
excludeTestsMatching 'org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteIT.testE2EBigtableWriteWithEmptyRowFailures'
excludeTestsMatching 'org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteIT.testE2EBigtableWriteWithInvalidTimestampFailures'
excludeTestsMatching 'org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteIT.testE2EBigtableWriteWithOversizedQualifierFailures'
excludeTestsMatching 'org.apache.beam.sdk.io.gcp.spanner.SpannerReadIT.testReadFailsBadSession'
}
}
}
Expand Down Expand Up @@ -697,11 +698,12 @@ task googleCloudPlatformRunnerV2IntegrationTest(type: Test) {
testClassesDirs = files(project(":sdks:java:io:google-cloud-platform").sourceSets.test.output.classesDirs)
useJUnit {
filter {
// Only needs to run on direct runner
// Error handling tests, only needs to run on direct runner
excludeTestsMatching 'org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteIT.testE2EBigtableWriteWithEmptyMutationFailures'
excludeTestsMatching 'org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteIT.testE2EBigtableWriteWithEmptyRowFailures'
excludeTestsMatching 'org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteIT.testE2EBigtableWriteWithInvalidTimestampFailures'
excludeTestsMatching 'org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteIT.testE2EBigtableWriteWithOversizedQualifierFailures'
excludeTestsMatching 'org.apache.beam.sdk.io.gcp.spanner.SpannerReadIT.testReadFailsBadSession'
}
}
}
Expand Down
Loading

0 comments on commit 53eb11b

Please sign in to comment.