From c07fa0ac1e504e6837979a06a321fb9875005852 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Tue, 20 Aug 2024 00:43:57 -0700 Subject: [PATCH] Adds an ORDERED_LIST_STATE capability to the Java SDK. (#32067) --- CHANGES.md | 1 + .../org/apache/beam/model/pipeline/v1/beam_runner_api.proto | 4 ++++ .../org/apache/beam/sdk/util/construction/Environments.java | 1 + .../apache/beam/sdk/util/construction/EnvironmentsTest.java | 3 +++ 4 files changed, 9 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 1c0957b1fbf5..78f243a2055a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -73,6 +73,7 @@ * Go SDK Minimum Go Version updated to 1.21 ([#32092](https://github.com/apache/beam/pull/32092)). * [BigQueryIO] Added support for withFormatRecordOnFailureFunction() for STORAGE_WRITE_API and STORAGE_API_AT_LEAST_ONCE methods (Java) ([#31354](https://github.com/apache/beam/issues/31354)). * Updated Go protobuf package to new version (Go) ([#21515](https://github.com/apache/beam/issues/21515)). +* Adds OrderedListState support for Java SDK via FnApi. ## Breaking Changes diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto index 422c2e1a5f7c..10434e514eae 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto @@ -1681,6 +1681,10 @@ message StandardProtocols { // the ProcessBundleProgressResponse messages. SDK_CONSUMING_RECEIVED_DATA = 9 [(beam_urn) = "beam:protocol:sdk_consuming_received_data:v1"]; + + // Indicates whether the SDK supports ordered list state. + ORDERED_LIST_STATE = 10 + [(beam_urn) = "beam:protocol:ordered_list_state:v1"]; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Environments.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Environments.java index 9bdb45fe32eb..52120b396e1f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Environments.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Environments.java @@ -488,6 +488,7 @@ public static Set getJavaCapabilities() { capabilities.add(BeamUrns.getUrn(Primitives.TO_STRING)); capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.DATA_SAMPLING)); capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.SDK_CONSUMING_RECEIVED_DATA)); + capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.ORDERED_LIST_STATE)); return capabilities.build(); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/EnvironmentsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/EnvironmentsTest.java index 93ec4f7e8f7f..3c6862bd3b3b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/EnvironmentsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/EnvironmentsTest.java @@ -216,6 +216,9 @@ public void testCapabilities() { assertThat( Environments.getJavaCapabilities(), hasItem(BeamUrns.getUrn(RunnerApi.StandardProtocols.Enum.DATA_SAMPLING))); + assertThat( + Environments.getJavaCapabilities(), + hasItem(BeamUrns.getUrn(RunnerApi.StandardProtocols.Enum.ORDERED_LIST_STATE))); // Check that SDF truncation is supported assertThat( Environments.getJavaCapabilities(),