Skip to content

Commit

Permalink
Merge branch 'master' into gcsio-add-batch-retry
Browse files Browse the repository at this point in the history
  • Loading branch information
sadovnychyi authored Jan 9, 2025
2 parents 8c453dd + 2b72e12 commit 5e95357
Show file tree
Hide file tree
Showing 30 changed files with 478 additions and 199 deletions.
8 changes: 3 additions & 5 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,9 @@ github:

# Give some users issue triage permissions
collaborators:
- pcoet
- olehborysevych
- rshamunov
- andreydevyatkin
- liferoad
- Amar3tto
- mrshakirov
- akashorabek

enabled_merge_buttons:
squash: true
Expand Down
4 changes: 2 additions & 2 deletions .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 4
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 1
}
5 changes: 4 additions & 1 deletion .test-infra/tools/stale_k8s_workload_cleaner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ function should_teardown() {
gcloud container clusters get-credentials io-datastores --zone us-central1-a --project apache-beam-testing

while read NAME STATUS AGE; do
if [[ $NAME =~ ^beam-.+(test|-it) ]] && should_teardown $AGE; then
# Regex has temporary workaround to avoid trying to delete beam-performancetests-singlestoreio-* to avoid getting stuck in a terminal state
# See https://github.com/apache/beam/pull/33545 for context.
# This may be safe to remove if https://cloud.google.com/knowledge/kb/deleted-namespace-remains-in-terminating-status-000004867 has been resolved, just try it before checking in :)
if [[ $NAME =~ ^beam-.+(test|-it)(?!s-singlestoreio) ]] && should_teardown $AGE; then
kubectl delete namespace $NAME
fi
done < <( kubectl get namespaces --context=gke_${PROJECT}_${LOCATION}_${CLUSTER} )
14 changes: 14 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,22 @@

## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Upgraded to protobuf 4 (Java) ([#33192](https://github.com/apache/beam/issues/33192)).
* [GCSIO] Added retry logic to each batch method of the GCS IO (Python) ([#33539](https://github.com/apache/beam/pull/33539))

## Breaking Changes

* AWS V1 I/Os have been removed (Java). As part of this, x-lang Python Kinesis I/O has been updated to consume the V2 IO and it also no longer supports setting producer_properties ([#33430](https://github.com/apache/beam/issues/33430)).
* Upgraded to protobuf 4 (Java) ([#33192](https://github.com/apache/beam/issues/33192)), but forced Debezium IO to use protobuf 3 ([#33541](https://github.com/apache/beam/issues/33541) because Debezium clients are not protobuf 4 compatible. This may cause conflicts when using clients which are only compatible with protobuf 4.

## Deprecations

* X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)).

## Bugfixes

* Fix data loss issues when reading gzipped files with TextIO (Python) ([#18390](https://github.com/apache/beam/issues/18390), [#31040](https://github.com/apache/beam/issues/31040)).
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231))

Expand Down Expand Up @@ -113,6 +117,7 @@
## Bugfixes

* Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)).
* [Managed Iceberg] Fixed a bug where DataFile metadata was assigned incorrect partition values ([#33549](https://github.com/apache/beam/pull/33549)).

## Security Fixes

Expand Down Expand Up @@ -155,6 +160,11 @@
* Adding flag to support conditionally disabling auto-commit in JdbcIO ReadFn ([#31111](https://github.com/apache/beam/issues/31111))
* (Python) Fixed BigQuery Enrichment bug that can lead to multiple conditions returning duplicate rows, batching returning incorrect results and conditions not scoped by row during batching ([#32780](https://github.com/apache/beam/pull/32780)).

## Known Issues

* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)).
* Fixed in 2.62.0

# [2.60.0] - 2024-10-17

## Highlights
Expand Down Expand Up @@ -209,6 +219,8 @@ when running on 3.8. ([#31192](https://github.com/apache/beam/issues/31192))
* Duplicate Rows: Multiple conditions may be applied incorrectly, leading to the duplication of rows in the output.
* Incorrect Results with Batched Requests: Conditions may not be correctly scoped to individual rows within the batch, potentially causing inaccurate results.
* Fixed in 2.61.0.
* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)).
* Fixed in 2.62.0

# [2.59.0] - 2024-09-11

Expand Down Expand Up @@ -257,6 +269,8 @@ when running on 3.8. ([#31192](https://github.com/apache/beam/issues/31192))
* Duplicate Rows: Multiple conditions may be applied incorrectly, leading to the duplication of rows in the output.
* Incorrect Results with Batched Requests: Conditions may not be correctly scoped to individual rows within the batch, potentially causing inaccurate results.
* Fixed in 2.61.0.
* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)).
* Fixed in 2.62.0

# [2.58.1] - 2024-08-15

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ class BeamModulePlugin implements Plugin<Project> {
def influxdb_version = "2.19"
def httpclient_version = "4.5.13"
def httpcore_version = "4.4.14"
def iceberg_bqms_catalog_version = "1.5.2-0.1.0"
def jackson_version = "2.15.4"
def jaxb_api_version = "2.3.3"
def jsr305_version = "3.0.2"
Expand Down Expand Up @@ -650,6 +651,10 @@ class BeamModulePlugin implements Plugin<Project> {

// Export Spark versions, so they are defined in a single place only
project.ext.spark3_version = spark3_version
// version for BigQueryMetastore catalog (used by sdks:java:io:iceberg:bqms)
// TODO: remove this and download the jar normally when the catalog gets
// open-sourced (https://github.com/apache/iceberg/pull/11039)
project.ext.iceberg_bqms_catalog_version = iceberg_bqms_catalog_version

// A map of maps containing common libraries used per language. To use:
// dependencies {
Expand Down
15 changes: 0 additions & 15 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@ def sickbayTests = [
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerEarly',
'org.apache.beam.sdk.transforms.ParDoTest$BundleInvariantsTests.testWatermarkUpdateMidBundle',
'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',
// Requires Allowed Lateness, among others.
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerSetWithinAllowedLateness',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',
'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate',
'org.apache.beam.sdk.testing.TestStreamTest.testDiscardingMode',
'org.apache.beam.sdk.testing.TestStreamTest.testEarlyPanesOfWindow',
Expand All @@ -116,10 +113,6 @@ def sickbayTests = [
// Coding error somehow: short write: reached end of stream after reading 5 bytes; 98 bytes expected
'org.apache.beam.sdk.testing.TestStreamTest.testMultiStage',

// Prism not firing sessions correctly (seems to be merging inapppropriately)
'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombine',
'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombineWithContext',

// Java side dying during execution.
// https://github.com/apache/beam/issues/32930
'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders',
Expand Down Expand Up @@ -161,14 +154,6 @@ def sickbayTests = [
// TODO(https://github.com/apache/beam/issues/31231)
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata',


// These tests fail once Late Data was being precisely dropped.
// They set a single element to be late data, and expect it (correctly) to be preserved.
// Since presently, these are treated as No-ops, the fix is to disable the
// dropping behavior when a stage's input is a Reshuffle/Redistribute transform.
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming',
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeWithTimestampsStreaming',

// Prism isn't handling Java's side input views properly.
// https://github.com/apache/beam/issues/32932
// java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view.
Expand Down
44 changes: 21 additions & 23 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,10 @@ func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string, side

// StageAggregates marks the given stage as an aggregation, which
// means elements will only be processed based on windowing strategies.
func (em *ElementManager) StageAggregates(ID string) {
em.stages[ID].aggregate = true
func (em *ElementManager) StageAggregates(ID string, strat WinStrat) {
ss := em.stages[ID]
ss.aggregate = true
ss.strat = strat
}

// StageStateful marks the given stage as stateful, which means elements are
Expand Down Expand Up @@ -1095,7 +1097,7 @@ type stageState struct {
// Special handling bits
stateful bool // whether this stage uses state or timers, and needs keyed processing.
aggregate bool // whether this stage needs to block for aggregation.
strat winStrat // Windowing Strategy for aggregation fireings.
strat WinStrat // Windowing Strategy for aggregation fireings.
processingTimeTimersFamilies map[string]bool // Indicates which timer families use the processing time domain.

// onWindowExpiration management
Expand Down Expand Up @@ -1154,7 +1156,6 @@ func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *st
ID: ID,
outputIDs: outputIDs,
sides: sides,
strat: defaultStrat{},
state: map[LinkID]map[typex.Window]map[string]StateData{},
watermarkHolds: newHoldTracker(),

Expand All @@ -1179,18 +1180,21 @@ func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *st
func (ss *stageState) AddPending(newPending []element) int {
ss.mu.Lock()
defer ss.mu.Unlock()
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
// Data that arrives after the *output* watermark is late.
threshold := ss.output
origPending := make([]element, 0, ss.pending.Len())
for _, e := range newPending {
if e.window.MaxTimestamp() < threshold {
continue
if ss.aggregate {
// Late Data is data that has arrived after that window has expired.
// We only need to drop late data before aggregations.
// TODO - handle for side inputs too.
threshold := ss.output
origPending := make([]element, 0, ss.pending.Len())
for _, e := range newPending {
if ss.strat.EarliestCompletion(e.window) < threshold {
// TODO: figure out Pane and trigger firings.
continue
}
origPending = append(origPending, e)
}
origPending = append(origPending, e)
newPending = origPending
}
newPending = origPending
if ss.stateful {
if ss.pendingByKeys == nil {
ss.pendingByKeys = map[string]*dataAndTimers{}
Expand Down Expand Up @@ -1626,10 +1630,8 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
// They'll never be read in again.
for _, wins := range ss.sideInputs {
for win := range wins {
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
// Clear out anything we've already used.
if win.MaxTimestamp() < newOut {
if ss.strat.EarliestCompletion(win) < newOut {
// If the expiry is in progress, skip this window.
if ss.inProgressExpiredWindows[win] > 0 {
continue
Expand All @@ -1640,9 +1642,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
}
for _, wins := range ss.state {
for win := range wins {
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
if win.MaxTimestamp() < newOut {
if ss.strat.EarliestCompletion(win) < newOut {
// If the expiry is in progress, skip collecting this window.
if ss.inProgressExpiredWindows[win] > 0 {
continue
Expand Down Expand Up @@ -1685,9 +1685,7 @@ func (ss *stageState) createOnWindowExpirationBundles(newOut mtime.Time, em *Ele
var preventDownstreamUpdate bool
for win, keys := range ss.keysToExpireByWindow {
// Check if the window has expired.
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
if win.MaxTimestamp() >= newOut {
if ss.strat.EarliestCompletion(win) >= newOut {
continue
}
// We can't advance the output watermark if there's garbage to collect.
Expand Down
28 changes: 8 additions & 20 deletions sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,16 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
)

type winStrat interface {
EarliestCompletion(typex.Window) mtime.Time
// WinStrat configures the windowing strategy for the stage, based on the
// stage's input PCollection.
type WinStrat struct {
AllowedLateness time.Duration // Used to extend duration
}

type defaultStrat struct{}

func (ws defaultStrat) EarliestCompletion(w typex.Window) mtime.Time {
return w.MaxTimestamp()
}

func (defaultStrat) String() string {
return "default"
}

type sessionStrat struct {
GapSize time.Duration
}

func (ws sessionStrat) EarliestCompletion(w typex.Window) mtime.Time {
return w.MaxTimestamp().Add(ws.GapSize)
func (ws WinStrat) EarliestCompletion(w typex.Window) mtime.Time {
return w.MaxTimestamp().Add(ws.AllowedLateness)
}

func (ws sessionStrat) String() string {
return fmt.Sprintf("session[GapSize:%v]", ws.GapSize)
func (ws WinStrat) String() string {
return fmt.Sprintf("WinStrat[AllowedLateness:%v]", ws.AllowedLateness)
}
13 changes: 7 additions & 6 deletions sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@ import (

func TestEarliestCompletion(t *testing.T) {
tests := []struct {
strat winStrat
strat WinStrat
input typex.Window
want mtime.Time
}{
{defaultStrat{}, window.GlobalWindow{}, mtime.EndOfGlobalWindowTime},
{defaultStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3},
{defaultStrat{}, window.IntervalWindow{Start: mtime.MinTimestamp, End: mtime.MaxTimestamp}, mtime.MaxTimestamp - 1},
{sessionStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3},
{sessionStrat{GapSize: 3 * time.Millisecond}, window.IntervalWindow{Start: 0, End: 4}, 6},
{WinStrat{}, window.GlobalWindow{}, mtime.EndOfGlobalWindowTime},
{WinStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3},
{WinStrat{}, window.IntervalWindow{Start: mtime.MinTimestamp, End: mtime.MaxTimestamp}, mtime.MaxTimestamp - 1},
{WinStrat{AllowedLateness: 5 * time.Second}, window.GlobalWindow{}, mtime.EndOfGlobalWindowTime.Add(5 * time.Second)},
{WinStrat{AllowedLateness: 5 * time.Millisecond}, window.IntervalWindow{Start: 0, End: 4}, 8},
{WinStrat{AllowedLateness: 5 * time.Second}, window.IntervalWindow{Start: mtime.MinTimestamp, End: mtime.MaxTimestamp}, mtime.MaxTimestamp.Add(5 * time.Second)},
}

for _, test := range tests {
Expand Down
9 changes: 6 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,10 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
KeyDec: kd,
}
}
em.StageAggregates(stage.ID)
ws := windowingStrategy(comps, tid)
em.StageAggregates(stage.ID, engine.WinStrat{
AllowedLateness: time.Duration(ws.GetAllowedLateness()) * time.Millisecond,
})
case urns.TransformImpulse:
impulses = append(impulses, stage.ID)
em.AddStage(stage.ID, nil, []string{getOnlyValue(t.GetOutputs())}, nil)
Expand Down Expand Up @@ -266,11 +269,11 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
case *pipepb.TestStreamPayload_Event_ElementEvent:
var elms []engine.TestStreamElement
for _, e := range ev.ElementEvent.GetElements() {
elms = append(elms, engine.TestStreamElement{Encoded: mayLP(e.GetEncodedElement()), EventTime: mtime.Time(e.GetTimestamp())})
elms = append(elms, engine.TestStreamElement{Encoded: mayLP(e.GetEncodedElement()), EventTime: mtime.FromMilliseconds(e.GetTimestamp())})
}
tsb.AddElementEvent(ev.ElementEvent.GetTag(), elms)
case *pipepb.TestStreamPayload_Event_WatermarkEvent:
tsb.AddWatermarkEvent(ev.WatermarkEvent.GetTag(), mtime.Time(ev.WatermarkEvent.GetNewWatermark()))
tsb.AddWatermarkEvent(ev.WatermarkEvent.GetTag(), mtime.FromMilliseconds(ev.WatermarkEvent.GetNewWatermark()))
case *pipepb.TestStreamPayload_Event_ProcessingTimeEvent:
if ev.ProcessingTimeEvent.GetAdvanceDuration() == int64(mtime.MaxTimestamp) {
// TODO: Determine the SDK common formalism for setting processing time to infinity.
Expand Down
Loading

0 comments on commit 5e95357

Please sign in to comment.