Skip to content

Commit

Permalink
Merge branch 'master' into reorganize_iceberg_its
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 authored Dec 27, 2024
2 parents fdffb2c + 5944a30 commit 5c83a49
Show file tree
Hide file tree
Showing 21 changed files with 1,221 additions and 225 deletions.
21 changes: 21 additions & 0 deletions .github/workflows/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,27 @@
under the License.
-->

# How to fix Workflows for Committers

The following is guidance on how to practically make changes that fix workflows.

1) Create a branch in https://github.com/apache/beam not your fork.

The reason to perform changes to a branch of the main repo instead of your fork is due to the challenge in replicating the environment within which Beam GitHub workflows execute. GitHub workflows allow you to execute against a branch of a repo.

2) Make changes in this branch you anticipate will fix the failing workflow.

3) Run the workflow designating your branch.

In the GitHub workflow interface, you can designate any branch of the repository to run the workflow against. Selecting your branch allows you to test the changes you made. The following screenshot shows an example of this feature.
![image](https://github.com/user-attachments/assets/33ca43fb-b0f8-42c8-80e2-ac84a49e2490)

5) Create a PR, pasting the link to your successful workflow run in the branch

When doing a PR, the checks will not run against your branch. Your reviewer may not know this so you'll want to mention this in your PR description, pasting the link to your successful run.

6) After PR merges, execute the workflow manually to validate your merged changes.

# Running Workflows Manually

Most workflows will get kicked off automatically when you open a PR, push code, or on a schedule.
Expand Down
56 changes: 2 additions & 54 deletions .github/workflows/python_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ on:
tags: 'v*'
paths: ['sdks/python/**', 'model/**']
workflow_dispatch:
inputs:
runDataflow:
description: 'Type "true" if you want to run Dataflow tests (GCP variables must be configured, check CI.md)'
default: false

# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
Expand All @@ -57,16 +53,15 @@ jobs:
GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }}
GCP_REGION: ${{ secrets.GCP_REGION }}
GCP_SA_EMAIL: ${{ secrets.GCP_SA_EMAIL }}
GCP_SA_KEY: ${{ secrets.GCP_SA_KEY }}
GCP_TESTING_BUCKET: ${{ secrets.GCP_TESTING_BUCKET }}
GCP_PYTHON_WHEELS_BUCKET: "not-needed-here"

build_python_sdk_source:
name: 'Build python source distribution'
if: |
needs.check_gcp_variables.outputs.gcp-variables-set == 'true' && (
(github.event_name == 'push' || github.event_name == 'schedule') ||
(github.event_name == 'workflow_dispatch' && github.event.inputs.runDataflow == 'true')
((github.event_name == 'push' || github.event_name == 'schedule') ||
github.event_name == 'workflow_dispatch')
)
needs:
- check_gcp_variables
Expand Down Expand Up @@ -153,50 +148,3 @@ jobs:
working-directory: ./sdks/python
shell: bash
run: python -m apache_beam.examples.wordcount --input MANIFEST.in --output counts

python_wordcount_dataflow:
name: 'Python Wordcount Dataflow'
# TODO(https://github.com/apache/beam/issues/31848) run on Dataflow after fixes credential on macOS/win GHA runner
if: (github.event_name == 'workflow_dispatch' && github.event.inputs.runDataflow == 'true')
needs:
- build_python_sdk_source
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [[self-hosted, ubuntu-20.04, main], macos-latest, windows-latest]
python: ["3.9", "3.10", "3.11", "3.12"]
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Setup environment
uses: ./.github/actions/setup-environment-action
with:
python-version: ${{ matrix.python }}
go-version: default
- name: Download source from artifacts
uses: actions/[email protected]
with:
name: python_sdk_source
path: apache-beam-source
- name: Authenticate on GCP
id: auth
uses: google-github-actions/auth@v1
with:
credentials_json: ${{ secrets.GCP_SA_KEY }}
project_id: ${{ secrets.GCP_PROJECT_ID }}
- name: Install requirements
working-directory: ./sdks/python
run: pip install setuptools --upgrade && pip install -e ".[gcp]"
- name: Run WordCount
working-directory: ./sdks/python
shell: bash
run: |
python -m apache_beam.examples.wordcount \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://${{ secrets.GCP_TESTING_BUCKET }}/python_wordcount_dataflow/counts \
--runner DataflowRunner \
--project ${{ secrets.GCP_PROJECT_ID }} \
--region ${{ secrets.GCP_REGION }} \
--temp_location gs://${{ secrets.GCP_TESTING_BUCKET }}/tmp/python_wordcount_dataflow/ \
--sdk_location ../../apache-beam-source/apache-beam-source.tar.gz
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,20 @@
*
* <p>Example trigger command for specific test running on Dataflow runner:
*
* <p><b>Maven</b>
*
* <pre>
* mvn test -pl it/google-cloud-platform -am -Dtest="BigQueryIOLT#testAvroFileLoadsWriteThenRead" \
* -Dconfiguration=medium -Dproject=[gcpProject] -DartifactBucket=[temp bucket] -DfailIfNoTests=false
* </pre>
*
* <p><b>Gradle</b>
*
* <pre>
* ./gradlew :it:google-cloud-platform:BigQueryPerformanceTest --tests='BigQueryIOLT.testAvroFileLoadsWriteThenRead' \
* -Dconfiguration=medium -Dproject=[gcpProject] -DartifactBucket=[temp bucket] -DfailIfNoTests=false
* </pre>
*
* <p>Example trigger command for specific test and custom data configuration:
*
* <pre>mvn test -pl it/google-cloud-platform -am \
Expand Down Expand Up @@ -172,11 +181,11 @@ public static void tearDownClass() {
Configuration.class), // 1 MB
"medium",
Configuration.fromJsonString(
"{\"numRecords\":10000000,\"valueSizeBytes\":1000,\"pipelineTimeout\":20,\"runner\":\"DataflowRunner\"}",
"{\"numRecords\":10000000,\"valueSizeBytes\":1000,\"pipelineTimeout\":20,\"runner\":\"DataflowRunner\",\"workerMachineType\":\"e2-standard-2\",\"experiments\":\"disable_runner_v2\",\"numWorkers\":\"1\",\"maxNumWorkers\":\"1\"}",
Configuration.class), // 10 GB
"large",
Configuration.fromJsonString(
"{\"numRecords\":100000000,\"valueSizeBytes\":1000,\"pipelineTimeout\":80,\"runner\":\"DataflowRunner\"}",
"{\"numRecords\":100000000,\"valueSizeBytes\":1000,\"pipelineTimeout\":80,\"runner\":\"DataflowRunner\",\"workerMachineType\":\"e2-standard-2\",\"experiments\":\"disable_runner_v2\",\"numWorkers\":\"1\",\"maxNumWorkers\":\"1\",\"numStorageWriteApiStreams\":4,\"storageWriteApiTriggeringFrequencySec\":20}",
Configuration.class) // 100 GB
);
} catch (IOException e) {
Expand Down Expand Up @@ -230,16 +239,19 @@ public void testWriteAndRead() throws IOException {
writeIO =
BigQueryIO.<byte[]>write()
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withNumStorageWriteApiStreams(
configuration.numStorageWriteApiStreams) // control the number of streams
.withAvroFormatFunction(
new AvroFormatFn(
configuration.numColumns,
!("STORAGE_WRITE_API".equalsIgnoreCase(configuration.writeMethod))));

break;
case JSON:
writeIO =
BigQueryIO.<byte[]>write()
.withSuccessfulInsertsPropagation(false)
.withNumStorageWriteApiStreams(
configuration.numStorageWriteApiStreams) // control the number of streams
.withFormatFunction(new JsonFormatFn(configuration.numColumns));
break;
}
Expand Down Expand Up @@ -268,6 +280,10 @@ private void testWrite(BigQueryIO.Write<byte[]> writeIO) throws IOException {
.setSdk(PipelineLauncher.Sdk.JAVA)
.setPipeline(writePipeline)
.addParameter("runner", configuration.runner)
.addParameter("workerMachineType", configuration.workerMachineType)
.addParameter("experiments", configuration.experiments)
.addParameter("numWorkers", configuration.numWorkers)
.addParameter("maxNumWorkers", configuration.maxNumWorkers)
.build();

PipelineLauncher.LaunchInfo launchInfo = pipelineLauncher.launch(project, region, options);
Expand Down Expand Up @@ -304,6 +320,10 @@ private void testRead() throws IOException {
.setSdk(PipelineLauncher.Sdk.JAVA)
.setPipeline(readPipeline)
.addParameter("runner", configuration.runner)
.addParameter("workerMachineType", configuration.workerMachineType)
.addParameter("experiments", configuration.experiments)
.addParameter("numWorkers", configuration.numWorkers)
.addParameter("maxNumWorkers", configuration.maxNumWorkers)
.build();

PipelineLauncher.LaunchInfo launchInfo = pipelineLauncher.launch(project, region, options);
Expand Down Expand Up @@ -445,12 +465,36 @@ static class Configuration extends SyntheticSourceOptions {
/** Runner specified to run the pipeline. */
@JsonProperty public String runner = "DirectRunner";

/** Worker machine type specified to run the pipeline with Dataflow Runner. */
@JsonProperty public String workerMachineType = "";

/** Experiments specified to run the pipeline. */
@JsonProperty public String experiments = "";

/** Number of workers to start the pipeline. Must be a positive value. */
@JsonProperty public String numWorkers = "1";

/** Maximum umber of workers for the pipeline. Must be a positive value. */
@JsonProperty public String maxNumWorkers = "1";

/** BigQuery read method: DEFAULT/DIRECT_READ/EXPORT. */
@JsonProperty public String readMethod = "DEFAULT";

/** BigQuery write method: DEFAULT/FILE_LOADS/STREAMING_INSERTS/STORAGE_WRITE_API. */
@JsonProperty public String writeMethod = "DEFAULT";

/**
* BigQuery number of streams for write method STORAGE_WRITE_API. 0 let's the runner determine
* the number of streams. Remark : max limit for open connections per hour is 10K streams.
*/
@JsonProperty public int numStorageWriteApiStreams = 0;

/**
* BigQuery triggering frequency in second in combination with the number of streams for write
* method STORAGE_WRITE_API.
*/
@JsonProperty public int storageWriteApiTriggeringFrequencySec = 20;

/** BigQuery write format: AVRO/JSON. */
@JsonProperty public String writeFormat = "AVRO";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3040,9 +3040,14 @@ public Write<T> withNumFileShards(int numFileShards) {
}

/**
* Control how many parallel streams are used when using Storage API writes. Applicable only for
* streaming pipelines, and when {@link #withTriggeringFrequency} is also set. To let runner
* determine the sharding at runtime, set this to zero, or {@link #withAutoSharding()} instead.
* Control how many parallel streams are used when using Storage API writes.
*
* <p>For streaming pipelines, and when {@link #withTriggeringFrequency} is also set. To let
* runner determine the sharding at runtime, set this to zero, or {@link #withAutoSharding()}
* instead.
*
* <p>For batch pipelines, it inserts a redistribute. To not reshufle and keep the pipeline
* parallelism as is, set this to zero.
*/
public Write<T> withNumStorageWriteApiStreams(int numStorageWriteApiStreams) {
return toBuilder().setNumStorageWriteApiStreams(numStorageWriteApiStreams).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Redistribute;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
Expand Down Expand Up @@ -360,9 +361,19 @@ public WriteResult expandUntriggered(
rowUpdateFn,
badRecordRouter));

PCollection<KV<DestinationT, StorageApiWritePayload>> successfulConvertedRows =
convertMessagesResult.get(successfulConvertedRowsTag);

if (numShards > 0) {
successfulConvertedRows =
successfulConvertedRows.apply(
"ResdistibuteNumShards",
Redistribute.<KV<DestinationT, StorageApiWritePayload>>arbitrarily()
.withNumBuckets(numShards));
}

PCollectionTuple writeRecordsResult =
convertMessagesResult
.get(successfulConvertedRowsTag)
successfulConvertedRows
.apply(
"StorageApiWriteUnsharded",
new StorageApiWriteUnshardedRecords<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.Schema;
Expand Down Expand Up @@ -97,20 +96,22 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
return PCollectionRowTuple.empty(input.getPipeline());
}

BigQueryIO.Write<Row> toWrite(Schema schema, PipelineOptions options) {
@VisibleForTesting
public BigQueryIO.Write<Row> toWrite(Schema schema, PipelineOptions options) {
PortableBigQueryDestinations dynamicDestinations =
new PortableBigQueryDestinations(schema, configuration);
BigQueryIO.Write<Row> write =
BigQueryIO.<Row>write()
.to(dynamicDestinations)
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withFormatFunction(BigQueryUtils.toTableRow())
// TODO(https://github.com/apache/beam/issues/33074) BatchLoad's
// createTempFilePrefixView() doesn't pick up the pipeline option
.withCustomGcsTempLocation(
ValueProvider.StaticValueProvider.of(options.getTempLocation()))
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFormatFunction(dynamicDestinations.getFilterFormatFunction(false));
// Use Avro format for better performance. Don't change this unless it's for a good
// reason.
.withAvroFormatFunction(dynamicDestinations.getAvroFilterFormatFunction(false));

if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
CreateDisposition createDisposition =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.io.gcp.bigquery.AvroWriteRequest;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
Expand Down Expand Up @@ -102,4 +105,16 @@ public SerializableFunction<Row, TableRow> getFilterFormatFunction(boolean fetch
return BigQueryUtils.toTableRow(filtered);
};
}

public SerializableFunction<AvroWriteRequest<Row>, GenericRecord> getAvroFilterFormatFunction(
boolean fetchNestedRecord) {
return request -> {
Row row = request.getElement();
if (fetchNestedRecord) {
row = checkStateNotNull(row.getRow(RECORD));
}
Row filtered = rowFilter.filter(row);
return AvroUtils.toGenericRecord(filtered);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
import static org.apache.beam.sdk.io.gcp.bigquery.WriteTables.ResultCoder.INSTANCE;
import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryFileLoadsSchemaTransformProvider.BigQueryFileLoadsSchemaTransform;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
Expand All @@ -32,6 +33,7 @@
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -117,11 +119,13 @@
import org.apache.beam.sdk.io.gcp.bigquery.WritePartition.ResultCoder;
import org.apache.beam.sdk.io.gcp.bigquery.WriteRename.TempTableCleanupFn;
import org.apache.beam.sdk.io.gcp.bigquery.WriteTables.Result;
import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryFileLoadsSchemaTransformProvider;
import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.Schema;
Expand Down Expand Up @@ -818,6 +822,25 @@ public void testStreamingFileLoadsWithAutoSharding() throws Exception {
assertEquals(2 * numTables, fakeDatasetService.getInsertCount());
}

@Test
public void testFileLoadSchemaTransformUsesAvroFormat() {
// ensure we are writing with the more performant avro format
assumeTrue(!useStreaming);
assumeTrue(!useStorageApi);
BigQueryFileLoadsSchemaTransformProvider provider =
new BigQueryFileLoadsSchemaTransformProvider();
Row configuration =
Row.withSchema(provider.configurationSchema())
.withFieldValue("table", "some-table")
.build();
BigQueryFileLoadsSchemaTransform schemaTransform =
(BigQueryFileLoadsSchemaTransform) provider.from(configuration);
BigQueryIO.Write<Row> write =
schemaTransform.toWrite(Schema.of(), PipelineOptionsFactory.create());
assertNull(write.getFormatFunction());
assertNotNull(write.getAvroRowWriterFactory());
}

@Test
public void testBatchFileLoads() throws Exception {
assumeTrue(!useStreaming);
Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies {
implementation library.java.vendored_guava_32_1_2_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation library.java.avro
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation "org.apache.parquet:parquet-column:$parquet_version"
Expand Down
Loading

0 comments on commit 5c83a49

Please sign in to comment.