diff --git a/.asf.yaml b/.asf.yaml index a6449ffb8b5f..e090635bbd01 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -36,11 +36,9 @@ github: # Give some users issue triage permissions collaborators: - - pcoet - - olehborysevych - - rshamunov - - andreydevyatkin - - liferoad + - Amar3tto + - mrshakirov + - akashorabek enabled_merge_buttons: squash: true diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 62ae7886c573..b73af5e61a43 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { - "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 4 + "comment": "Modify this file in a trivial way to cause this test suite to run.", + "modification": 1 } diff --git a/.test-infra/tools/stale_k8s_workload_cleaner.sh b/.test-infra/tools/stale_k8s_workload_cleaner.sh index 9ddaf17f2ce8..c167871c8b94 100755 --- a/.test-infra/tools/stale_k8s_workload_cleaner.sh +++ b/.test-infra/tools/stale_k8s_workload_cleaner.sh @@ -43,7 +43,10 @@ function should_teardown() { gcloud container clusters get-credentials io-datastores --zone us-central1-a --project apache-beam-testing while read NAME STATUS AGE; do - if [[ $NAME =~ ^beam-.+(test|-it) ]] && should_teardown $AGE; then + # Regex has temporary workaround to avoid trying to delete beam-performancetests-singlestoreio-* to avoid getting stuck in a terminal state + # See https://github.com/apache/beam/pull/33545 for context. + # This may be safe to remove if https://cloud.google.com/knowledge/kb/deleted-namespace-remains-in-terminating-status-000004867 has been resolved, just try it before checking in :) + if [[ $NAME =~ ^beam-.+(test|-it)(?!s-singlestoreio) ]] && should_teardown $AGE; then kubectl delete namespace $NAME fi done < <( kubectl get namespaces --context=gke_${PROJECT}_${LOCATION}_${CLUSTER} ) diff --git a/CHANGES.md b/CHANGES.md index 2d9b6cba6361..44f5fe88c4dc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -66,11 +66,14 @@ ## New Features / Improvements +* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Upgraded to protobuf 4 (Java) ([#33192](https://github.com/apache/beam/issues/33192)). * [GCSIO] Added retry logic to each batch method of the GCS IO (Python) ([#33539](https://github.com/apache/beam/pull/33539)) ## Breaking Changes * AWS V1 I/Os have been removed (Java). As part of this, x-lang Python Kinesis I/O has been updated to consume the V2 IO and it also no longer supports setting producer_properties ([#33430](https://github.com/apache/beam/issues/33430)). +* Upgraded to protobuf 4 (Java) ([#33192](https://github.com/apache/beam/issues/33192)), but forced Debezium IO to use protobuf 3 ([#33541](https://github.com/apache/beam/issues/33541) because Debezium clients are not protobuf 4 compatible. This may cause conflicts when using clients which are only compatible with protobuf 4. ## Deprecations @@ -78,6 +81,7 @@ ## Bugfixes +* Fix data loss issues when reading gzipped files with TextIO (Python) ([#18390](https://github.com/apache/beam/issues/18390), [#31040](https://github.com/apache/beam/issues/31040)). * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231)) @@ -113,6 +117,7 @@ ## Bugfixes * Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)). +* [Managed Iceberg] Fixed a bug where DataFile metadata was assigned incorrect partition values ([#33549](https://github.com/apache/beam/pull/33549)). ## Security Fixes @@ -155,6 +160,11 @@ * Adding flag to support conditionally disabling auto-commit in JdbcIO ReadFn ([#31111](https://github.com/apache/beam/issues/31111)) * (Python) Fixed BigQuery Enrichment bug that can lead to multiple conditions returning duplicate rows, batching returning incorrect results and conditions not scoped by row during batching ([#32780](https://github.com/apache/beam/pull/32780)). +## Known Issues + +* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)). + * Fixed in 2.62.0 + # [2.60.0] - 2024-10-17 ## Highlights @@ -209,6 +219,8 @@ when running on 3.8. ([#31192](https://github.com/apache/beam/issues/31192)) * Duplicate Rows: Multiple conditions may be applied incorrectly, leading to the duplication of rows in the output. * Incorrect Results with Batched Requests: Conditions may not be correctly scoped to individual rows within the batch, potentially causing inaccurate results. * Fixed in 2.61.0. +* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)). + * Fixed in 2.62.0 # [2.59.0] - 2024-09-11 @@ -257,6 +269,8 @@ when running on 3.8. ([#31192](https://github.com/apache/beam/issues/31192)) * Duplicate Rows: Multiple conditions may be applied incorrectly, leading to the duplication of rows in the output. * Incorrect Results with Batched Requests: Conditions may not be correctly scoped to individual rows within the batch, potentially causing inaccurate results. * Fixed in 2.61.0. +* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)). + * Fixed in 2.62.0 # [2.58.1] - 2024-08-15 diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 7b791ef9aa8e..76bc7f8a819f 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -618,6 +618,7 @@ class BeamModulePlugin implements Plugin { def influxdb_version = "2.19" def httpclient_version = "4.5.13" def httpcore_version = "4.4.14" + def iceberg_bqms_catalog_version = "1.5.2-0.1.0" def jackson_version = "2.15.4" def jaxb_api_version = "2.3.3" def jsr305_version = "3.0.2" @@ -650,6 +651,10 @@ class BeamModulePlugin implements Plugin { // Export Spark versions, so they are defined in a single place only project.ext.spark3_version = spark3_version + // version for BigQueryMetastore catalog (used by sdks:java:io:iceberg:bqms) + // TODO: remove this and download the jar normally when the catalog gets + // open-sourced (https://github.com/apache/iceberg/pull/11039) + project.ext.iceberg_bqms_catalog_version = iceberg_bqms_catalog_version // A map of maps containing common libraries used per language. To use: // dependencies { diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index 791952c625f4..3aae54c4a8cc 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -96,9 +96,6 @@ def sickbayTests = [ 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerEarly', 'org.apache.beam.sdk.transforms.ParDoTest$BundleInvariantsTests.testWatermarkUpdateMidBundle', 'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton', - // Requires Allowed Lateness, among others. - 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerSetWithinAllowedLateness', - 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness', 'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate', 'org.apache.beam.sdk.testing.TestStreamTest.testDiscardingMode', 'org.apache.beam.sdk.testing.TestStreamTest.testEarlyPanesOfWindow', @@ -116,10 +113,6 @@ def sickbayTests = [ // Coding error somehow: short write: reached end of stream after reading 5 bytes; 98 bytes expected 'org.apache.beam.sdk.testing.TestStreamTest.testMultiStage', - // Prism not firing sessions correctly (seems to be merging inapppropriately) - 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombine', - 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombineWithContext', - // Java side dying during execution. // https://github.com/apache/beam/issues/32930 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders', @@ -161,14 +154,6 @@ def sickbayTests = [ // TODO(https://github.com/apache/beam/issues/31231) 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata', - - // These tests fail once Late Data was being precisely dropped. - // They set a single element to be late data, and expect it (correctly) to be preserved. - // Since presently, these are treated as No-ops, the fix is to disable the - // dropping behavior when a stage's input is a Reshuffle/Redistribute transform. - 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming', - 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeWithTimestampsStreaming', - // Prism isn't handling Java's side input views properly. // https://github.com/apache/beam/issues/32932 // java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index bb3c8ceceeb8..fb1ae47d301e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -261,8 +261,10 @@ func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string, side // StageAggregates marks the given stage as an aggregation, which // means elements will only be processed based on windowing strategies. -func (em *ElementManager) StageAggregates(ID string) { - em.stages[ID].aggregate = true +func (em *ElementManager) StageAggregates(ID string, strat WinStrat) { + ss := em.stages[ID] + ss.aggregate = true + ss.strat = strat } // StageStateful marks the given stage as stateful, which means elements are @@ -1095,7 +1097,7 @@ type stageState struct { // Special handling bits stateful bool // whether this stage uses state or timers, and needs keyed processing. aggregate bool // whether this stage needs to block for aggregation. - strat winStrat // Windowing Strategy for aggregation fireings. + strat WinStrat // Windowing Strategy for aggregation fireings. processingTimeTimersFamilies map[string]bool // Indicates which timer families use the processing time domain. // onWindowExpiration management @@ -1154,7 +1156,6 @@ func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *st ID: ID, outputIDs: outputIDs, sides: sides, - strat: defaultStrat{}, state: map[LinkID]map[typex.Window]map[string]StateData{}, watermarkHolds: newHoldTracker(), @@ -1179,18 +1180,21 @@ func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *st func (ss *stageState) AddPending(newPending []element) int { ss.mu.Lock() defer ss.mu.Unlock() - // TODO(#https://github.com/apache/beam/issues/31438): - // Adjust with AllowedLateness - // Data that arrives after the *output* watermark is late. - threshold := ss.output - origPending := make([]element, 0, ss.pending.Len()) - for _, e := range newPending { - if e.window.MaxTimestamp() < threshold { - continue + if ss.aggregate { + // Late Data is data that has arrived after that window has expired. + // We only need to drop late data before aggregations. + // TODO - handle for side inputs too. + threshold := ss.output + origPending := make([]element, 0, ss.pending.Len()) + for _, e := range newPending { + if ss.strat.EarliestCompletion(e.window) < threshold { + // TODO: figure out Pane and trigger firings. + continue + } + origPending = append(origPending, e) } - origPending = append(origPending, e) + newPending = origPending } - newPending = origPending if ss.stateful { if ss.pendingByKeys == nil { ss.pendingByKeys = map[string]*dataAndTimers{} @@ -1626,10 +1630,8 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { // They'll never be read in again. for _, wins := range ss.sideInputs { for win := range wins { - // TODO(#https://github.com/apache/beam/issues/31438): - // Adjust with AllowedLateness // Clear out anything we've already used. - if win.MaxTimestamp() < newOut { + if ss.strat.EarliestCompletion(win) < newOut { // If the expiry is in progress, skip this window. if ss.inProgressExpiredWindows[win] > 0 { continue @@ -1640,9 +1642,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { } for _, wins := range ss.state { for win := range wins { - // TODO(#https://github.com/apache/beam/issues/31438): - // Adjust with AllowedLateness - if win.MaxTimestamp() < newOut { + if ss.strat.EarliestCompletion(win) < newOut { // If the expiry is in progress, skip collecting this window. if ss.inProgressExpiredWindows[win] > 0 { continue @@ -1685,9 +1685,7 @@ func (ss *stageState) createOnWindowExpirationBundles(newOut mtime.Time, em *Ele var preventDownstreamUpdate bool for win, keys := range ss.keysToExpireByWindow { // Check if the window has expired. - // TODO(#https://github.com/apache/beam/issues/31438): - // Adjust with AllowedLateness - if win.MaxTimestamp() >= newOut { + if ss.strat.EarliestCompletion(win) >= newOut { continue } // We can't advance the output watermark if there's garbage to collect. diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go index 44e6064958c0..b0af2a09a75d 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go @@ -23,28 +23,16 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" ) -type winStrat interface { - EarliestCompletion(typex.Window) mtime.Time +// WinStrat configures the windowing strategy for the stage, based on the +// stage's input PCollection. +type WinStrat struct { + AllowedLateness time.Duration // Used to extend duration } -type defaultStrat struct{} - -func (ws defaultStrat) EarliestCompletion(w typex.Window) mtime.Time { - return w.MaxTimestamp() -} - -func (defaultStrat) String() string { - return "default" -} - -type sessionStrat struct { - GapSize time.Duration -} - -func (ws sessionStrat) EarliestCompletion(w typex.Window) mtime.Time { - return w.MaxTimestamp().Add(ws.GapSize) +func (ws WinStrat) EarliestCompletion(w typex.Window) mtime.Time { + return w.MaxTimestamp().Add(ws.AllowedLateness) } -func (ws sessionStrat) String() string { - return fmt.Sprintf("session[GapSize:%v]", ws.GapSize) +func (ws WinStrat) String() string { + return fmt.Sprintf("WinStrat[AllowedLateness:%v]", ws.AllowedLateness) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go index 9d558396f806..845f748064c3 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go @@ -26,15 +26,16 @@ import ( func TestEarliestCompletion(t *testing.T) { tests := []struct { - strat winStrat + strat WinStrat input typex.Window want mtime.Time }{ - {defaultStrat{}, window.GlobalWindow{}, mtime.EndOfGlobalWindowTime}, - {defaultStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3}, - {defaultStrat{}, window.IntervalWindow{Start: mtime.MinTimestamp, End: mtime.MaxTimestamp}, mtime.MaxTimestamp - 1}, - {sessionStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3}, - {sessionStrat{GapSize: 3 * time.Millisecond}, window.IntervalWindow{Start: 0, End: 4}, 6}, + {WinStrat{}, window.GlobalWindow{}, mtime.EndOfGlobalWindowTime}, + {WinStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3}, + {WinStrat{}, window.IntervalWindow{Start: mtime.MinTimestamp, End: mtime.MaxTimestamp}, mtime.MaxTimestamp - 1}, + {WinStrat{AllowedLateness: 5 * time.Second}, window.GlobalWindow{}, mtime.EndOfGlobalWindowTime.Add(5 * time.Second)}, + {WinStrat{AllowedLateness: 5 * time.Millisecond}, window.IntervalWindow{Start: 0, End: 4}, 8}, + {WinStrat{AllowedLateness: 5 * time.Second}, window.IntervalWindow{Start: mtime.MinTimestamp, End: mtime.MaxTimestamp}, mtime.MaxTimestamp.Add(5 * time.Second)}, } for _, test := range tests { diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index d41c3cd9c75c..ecc9cecdaa50 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -223,7 +223,10 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic KeyDec: kd, } } - em.StageAggregates(stage.ID) + ws := windowingStrategy(comps, tid) + em.StageAggregates(stage.ID, engine.WinStrat{ + AllowedLateness: time.Duration(ws.GetAllowedLateness()) * time.Millisecond, + }) case urns.TransformImpulse: impulses = append(impulses, stage.ID) em.AddStage(stage.ID, nil, []string{getOnlyValue(t.GetOutputs())}, nil) @@ -266,11 +269,11 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic case *pipepb.TestStreamPayload_Event_ElementEvent: var elms []engine.TestStreamElement for _, e := range ev.ElementEvent.GetElements() { - elms = append(elms, engine.TestStreamElement{Encoded: mayLP(e.GetEncodedElement()), EventTime: mtime.Time(e.GetTimestamp())}) + elms = append(elms, engine.TestStreamElement{Encoded: mayLP(e.GetEncodedElement()), EventTime: mtime.FromMilliseconds(e.GetTimestamp())}) } tsb.AddElementEvent(ev.ElementEvent.GetTag(), elms) case *pipepb.TestStreamPayload_Event_WatermarkEvent: - tsb.AddWatermarkEvent(ev.WatermarkEvent.GetTag(), mtime.Time(ev.WatermarkEvent.GetNewWatermark())) + tsb.AddWatermarkEvent(ev.WatermarkEvent.GetTag(), mtime.FromMilliseconds(ev.WatermarkEvent.GetNewWatermark())) case *pipepb.TestStreamPayload_Event_ProcessingTimeEvent: if ev.ProcessingTimeEvent.GetAdvanceDuration() == int64(mtime.MaxTimestamp) { // TODO: Determine the SDK common formalism for setting processing time to infinity. diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go index be9d39ad02b7..340eb0b7eb5f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go @@ -19,7 +19,6 @@ import ( "bytes" "fmt" "io" - "log/slog" "reflect" "sort" @@ -33,7 +32,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker" "google.golang.org/protobuf/encoding/prototext" - "google.golang.org/protobuf/proto" ) // This file retains the logic for the pardo handler @@ -67,7 +65,12 @@ func (*runner) ConfigCharacteristic() reflect.Type { var _ transformPreparer = (*runner)(nil) func (*runner) PrepareUrns() []string { - return []string{urns.TransformReshuffle, urns.TransformFlatten} + return []string{ + urns.TransformReshuffle, + urns.TransformRedistributeArbitrarily, + urns.TransformRedistributeByKey, + urns.TransformFlatten, + } } // PrepareTransform handles special processing with respect runner transforms, like reshuffle. @@ -75,7 +78,7 @@ func (h *runner) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipep switch t.GetSpec().GetUrn() { case urns.TransformFlatten: return h.handleFlatten(tid, t, comps) - case urns.TransformReshuffle: + case urns.TransformReshuffle, urns.TransformRedistributeArbitrarily, urns.TransformRedistributeByKey: return h.handleReshuffle(tid, t, comps) default: panic("unknown urn to Prepare: " + t.GetSpec().GetUrn()) @@ -190,7 +193,13 @@ func (h *runner) handleReshuffle(tid string, t *pipepb.PTransform, comps *pipepb var _ transformExecuter = (*runner)(nil) func (*runner) ExecuteUrns() []string { - return []string{urns.TransformFlatten, urns.TransformGBK, urns.TransformReshuffle} + return []string{ + urns.TransformFlatten, + urns.TransformGBK, + urns.TransformReshuffle, + urns.TransformRedistributeArbitrarily, + urns.TransformRedistributeByKey, + } } // ExecuteWith returns what environment the transform should execute in. @@ -289,6 +298,17 @@ func windowingStrategy(comps *pipepb.Components, tid string) *pipepb.WindowingSt return comps.GetWindowingStrategies()[pcol.GetWindowingStrategyId()] } +// getOrMake is a generic helper function for extracting or initializing a sub map. +// Avoids an amount of boiler plate. +func getOrMake[K, VK comparable, VV any, V map[VK]VV, M map[K]V](m M, key K) V { + v, ok := m[key] + if !ok { + v = make(V) + m[key] = v + } + return v +} + // gbkBytes re-encodes gbk inputs in a gbk result. func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregate [][]byte, coders map[string]*pipepb.Coder) []byte { // Pick how the timestamp of the aggregated output is computed. @@ -325,15 +345,15 @@ func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregat time mtime.Time values [][]byte } - // Map windows to a map of keys to a map of keys to time. + // Map keys to windows to element batches. // We ultimately emit the window, the key, the time, and the iterable of elements, // all contained in the final value. - windows := map[typex.Window]map[string]keyTime{} + keys := map[string]map[typex.Window]keyTime{} kd := pullDecoder(kc, coders) vd := pullDecoder(vc, coders) - // Aggregate by windows and keys, using the window coder and KV coders. + // Aggregate by keys, and windows, using the window coder and KV coders. // We need to extract and split the key bytes from the element bytes. for _, data := range toAggregate { // Parse out each element's data, and repeat. @@ -351,12 +371,8 @@ func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregat key := string(keyByt) value := vd(buf) for _, w := range ws { - wk, ok := windows[w] - if !ok { - wk = make(map[string]keyTime) - windows[w] = wk - } - kt, ok := wk[key] + wins := getOrMake(keys, key) + kt, ok := wins[w] if !ok { // If the window+key map doesn't have a value, inititialize time with the element time. // This allows earliest or latest to work properly in the outputTime function's first use. @@ -366,69 +382,64 @@ func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregat kt.key = keyByt kt.w = w kt.values = append(kt.values, value) - wk[key] = kt + wins[w] = kt } } } // If the strategy is session windows, then we need to get all the windows, sort them // and see which ones need to be merged together. + // Each key has their windows merged separately. if ws.GetWindowFn().GetUrn() == urns.WindowFnSession { - slog.Debug("sorting by session window") - session := &pipepb.SessionWindowsPayload{} - if err := (proto.UnmarshalOptions{}).Unmarshal(ws.GetWindowFn().GetPayload(), session); err != nil { - panic("unable to decode SessionWindowsPayload") - } - gapSize := mtime.Time(session.GetGapSize().AsDuration()) - - ordered := make([]window.IntervalWindow, 0, len(windows)) - for k := range windows { - ordered = append(ordered, k.(window.IntervalWindow)) - } - // Use a decreasing sort (latest to earliest) so we can correct - // the output timestamp to the new end of window immeadiately. - sort.Slice(ordered, func(i, j int) bool { - return ordered[i].MaxTimestamp() > ordered[j].MaxTimestamp() - }) - - cur := ordered[0] - sessionData := windows[cur] - delete(windows, cur) - for _, iw := range ordered[1:] { - // Check if the gap between windows is less than the gapSize. - // If not, this window is done, and we start a next window. - if iw.End+gapSize < cur.Start { - // Store current data with the current window. - windows[cur] = sessionData - // Use the incoming window instead, and clear it from the map. - cur = iw - sessionData = windows[iw] - delete(windows, cur) - // There's nothing to merge, since we've just started with this windowed data. - continue + for _, windows := range keys { + ordered := make([]window.IntervalWindow, 0, len(windows)) + for win := range windows { + ordered = append(ordered, win.(window.IntervalWindow)) } - // Extend the session with the incoming window, and merge the the incoming window's data. - cur.Start = iw.Start - toMerge := windows[iw] - delete(windows, iw) - for k, kt := range toMerge { - skt := sessionData[k] + // Use a decreasing sort (latest to earliest) so we can correct + // the output timestamp to the new end of window immeadiately. + sort.Slice(ordered, func(i, j int) bool { + return ordered[i].MaxTimestamp() > ordered[j].MaxTimestamp() + }) + + cur := ordered[0] + sessionData := windows[cur] + delete(windows, cur) + for _, iw := range ordered[1:] { + // GapSize is already incorporated into the windows, + // check for consecutive windows that don't overlap. + if cur.Start-iw.End > 0 { + // If so, this window is done, and we start a next window. + // Store current data with the current window. + windows[cur] = sessionData + // Use the incoming window instead, and clear it from the map. + cur = iw + sessionData = windows[iw] + delete(windows, cur) + // There's nothing to merge, since we've just started with this windowed data. + continue + } + // Extend the session with the incoming window, and merge the the incoming window's data. + cur.Start = iw.Start + toMerge := windows[iw] + delete(windows, iw) + // Ensure the output time matches the given function. - skt.time = outputTime(cur, kt.time, skt.time) - skt.key = kt.key - skt.w = cur - skt.values = append(skt.values, kt.values...) - sessionData[k] = skt + sessionData.time = outputTime(cur, toMerge.time, sessionData.time) + sessionData.key = toMerge.key + sessionData.w = cur + // TODO: May need to adjust the ordering here. + sessionData.values = append(sessionData.values, toMerge.values...) } + windows[cur] = sessionData } - windows[cur] = sessionData } // Everything's aggregated! // Time to turn things into a windowed KV> var buf bytes.Buffer - for _, w := range windows { - for _, kt := range w { + for _, wins := range keys { + for _, kt := range wins { exec.EncodeWindowedValueHeader( wEnc, []typex.Window{kt.w}, diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index b9a28e4bc652..a4307b706fa3 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -23,7 +23,6 @@ import ( "sync" "sync/atomic" - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" @@ -144,7 +143,7 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ * urns.TransformAssignWindows: // Very few expected transforms types for submitted pipelines. // Most URNs are for the runner to communicate back to the SDK for execution. - case urns.TransformReshuffle: + case urns.TransformReshuffle, urns.TransformRedistributeArbitrarily, urns.TransformRedistributeByKey: // Reshuffles use features we don't yet support, but we would like to // support them by making them the no-op they are, and be precise about // what we're ignoring. @@ -227,8 +226,6 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ * // Inspect Windowing strategies for unsupported features. for wsID, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() { - check("WindowingStrategy.AllowedLateness", ws.GetAllowedLateness(), int64(0), mtime.MaxTimestamp.Milliseconds()) - // Both Closing behaviors are identical without additional trigger firings. check("WindowingStrategy.ClosingBehaviour", ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY, pipepb.ClosingBehavior_EMIT_ALWAYS) check("WindowingStrategy.AccumulationMode", ws.GetAccumulationMode(), pipepb.AccumulationMode_DISCARDING) diff --git a/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go b/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go index 12e62ef84a81..170073b72419 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go +++ b/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go @@ -56,18 +56,20 @@ var ( var ( // SDK transforms. - TransformParDo = ptUrn(pipepb.StandardPTransforms_PAR_DO) - TransformCombinePerKey = ctUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY) - TransformCombineGlobally = ctUrn(pipepb.StandardPTransforms_COMBINE_GLOBALLY) - TransformReshuffle = ctUrn(pipepb.StandardPTransforms_RESHUFFLE) - TransformCombineGroupedValues = cmbtUrn(pipepb.StandardPTransforms_COMBINE_GROUPED_VALUES) - TransformPreCombine = cmbtUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY_PRECOMBINE) - TransformMerge = cmbtUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY_MERGE_ACCUMULATORS) - TransformExtract = cmbtUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY_EXTRACT_OUTPUTS) - TransformPairWithRestriction = sdfUrn(pipepb.StandardPTransforms_PAIR_WITH_RESTRICTION) - TransformSplitAndSize = sdfUrn(pipepb.StandardPTransforms_SPLIT_AND_SIZE_RESTRICTIONS) - TransformProcessSizedElements = sdfUrn(pipepb.StandardPTransforms_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS) - TransformTruncate = sdfUrn(pipepb.StandardPTransforms_TRUNCATE_SIZED_RESTRICTION) + TransformParDo = ptUrn(pipepb.StandardPTransforms_PAR_DO) + TransformCombinePerKey = ctUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY) + TransformCombineGlobally = ctUrn(pipepb.StandardPTransforms_COMBINE_GLOBALLY) + TransformReshuffle = ctUrn(pipepb.StandardPTransforms_RESHUFFLE) + TransformRedistributeArbitrarily = ctUrn(pipepb.StandardPTransforms_REDISTRIBUTE_ARBITRARILY) + TransformRedistributeByKey = ctUrn(pipepb.StandardPTransforms_REDISTRIBUTE_BY_KEY) + TransformCombineGroupedValues = cmbtUrn(pipepb.StandardPTransforms_COMBINE_GROUPED_VALUES) + TransformPreCombine = cmbtUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY_PRECOMBINE) + TransformMerge = cmbtUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY_MERGE_ACCUMULATORS) + TransformExtract = cmbtUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY_EXTRACT_OUTPUTS) + TransformPairWithRestriction = sdfUrn(pipepb.StandardPTransforms_PAIR_WITH_RESTRICTION) + TransformSplitAndSize = sdfUrn(pipepb.StandardPTransforms_SPLIT_AND_SIZE_RESTRICTIONS) + TransformProcessSizedElements = sdfUrn(pipepb.StandardPTransforms_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS) + TransformTruncate = sdfUrn(pipepb.StandardPTransforms_TRUNCATE_SIZED_RESTRICTION) // Window Manipulation TransformAssignWindows = ptUrn(pipepb.StandardPTransforms_ASSIGN_WINDOWS) diff --git a/sdks/java/io/debezium/build.gradle b/sdks/java/io/debezium/build.gradle index e3b88e22607d..d8ab954b4473 100644 --- a/sdks/java/io/debezium/build.gradle +++ b/sdks/java/io/debezium/build.gradle @@ -89,3 +89,14 @@ task integrationTest(type: Test, dependsOn: processTestResources) { useJUnit { } } + +configurations.all (Configuration it) -> { + resolutionStrategy { + // Force protobuf 3 because debezium is currently incompatible with protobuf 4. + // TODO - remove this and upgrade the version of debezium once a proto-4 compatible version is available + // https://github.com/apache/beam/pull/33526 does some of this, but was abandoned because it still doesn't + // work with protobuf 4. + force "com.google.protobuf:protobuf-java:3.25.5" + force "com.google.protobuf:protobuf-java-util:3.25.5" + } +} diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index 38bee450e752..433000922b61 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -59,11 +59,13 @@ dependencies { // **** IcebergIO runtime dependencies **** runtimeOnly library.java.hadoop_auth runtimeOnly library.java.hadoop_client - // Needed when using GCS as the warehouse location. + // For writing to GCS runtimeOnly library.java.bigdataoss_gcs_connector - // Needed for HiveCatalog + // HiveCatalog runtimeOnly ("org.apache.iceberg:iceberg-hive-metastore:1.4.2") runtimeOnly project(path: ":sdks:java:io:iceberg:hive") + // BigQueryMetastoreCatalog (Java 11+) + runtimeOnly project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow") runtimeOnly library.java.kafka_clients runtimeOnly library.java.slf4j_jdk14 diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java index de6c14722898..7fb30a1099aa 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.gcp.pubsub; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import com.google.auth.Credentials; diff --git a/sdks/java/io/iceberg/bqms/build.gradle b/sdks/java/io/iceberg/bqms/build.gradle new file mode 100644 index 000000000000..e42aafc5f424 --- /dev/null +++ b/sdks/java/io/iceberg/bqms/build.gradle @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +plugins { + id 'org.apache.beam.module' +} + +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.io.iceberg.bqms', + shadowClosure: {}, + exportJavadoc: false, + publish: false, // it's an intermediate jar for io-expansion-service + validateShadowJar: false +) + +def libDir = "$buildDir/libs" +def bqmsFileName = "iceberg-bqms-catalog-${iceberg_bqms_catalog_version}.jar" +task downloadBqmsJar(type: Copy) { + // TODO: remove this workaround and downlooad normally when the catalog gets open-sourced: + // (https://github.com/apache/iceberg/pull/11039) + def jarUrl = "https://storage.googleapis.com/spark-lib/bigquery/iceberg-bigquery-catalog-${iceberg_bqms_catalog_version}.jar" + def outputDir = file("$libDir") + outputDir.mkdirs() + def destFile = new File(outputDir, bqmsFileName) + + if (!destFile.exists()) { + try { + ant.get(src: jarUrl, dest: destFile) + println "Successfully downloaded BQMS catalog jar: $destFile" + } catch (Exception e) { + println "Could not download $jarUrl: ${e.message}" + } + } +} + +repositories { + flatDir { + dirs "$libDir" + } +} + +compileJava.dependsOn downloadBqmsJar + +dependencies { + implementation files("$libDir/$bqmsFileName") +} + +description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg :: BigQuery Metastore" +ext.summary = "A copy of the BQMS catalog." diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 19fcbb7d1ea0..1775dfc5b77b 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -83,6 +83,9 @@ dependencies { exclude group: "org.apache.parquet", module: "parquet-hadoop-bundle" } + // BigQueryMetastore catalog dep + testImplementation project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow") + testRuntimeOnly library.java.slf4j_jdk14 testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") testRuntimeOnly project(path: ":runners:google-cloud-dataflow-java") @@ -136,6 +139,11 @@ task integrationTest(type: Test) { outputs.upToDateWhen { false } include '**/*IT.class' + // BQ metastore catalog doesn't support java 8 + if (project.findProperty('testJavaVersion') == '8' || + JavaVersion.current().equals(JavaVersion.VERSION_1_8)) { + exclude '**/BigQueryMetastoreCatalogIT.class' + } maxParallelForks 4 classpath = sourceSets.test.runtimeClasspath diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java index deec779c6cc9..72220faf3004 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java @@ -48,7 +48,6 @@ import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFile; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -187,8 +186,10 @@ private void appendManifestFiles(Table table, Iterable fileWrit int specId = entry.getKey(); List files = entry.getValue(); PartitionSpec spec = Preconditions.checkStateNotNull(specs.get(specId)); - ManifestWriter writer = - createManifestWriter(table.location(), uuid, spec, table.io()); + ManifestWriter writer; + try (FileIO io = table.io()) { + writer = createManifestWriter(table.location(), uuid, spec, io); + } for (DataFile file : files) { writer.add(file); committedDataFileByteSize.update(file.fileSizeInBytes()); @@ -207,8 +208,7 @@ private ManifestWriter createManifestWriter( String.format( "%s/metadata/%s-%s-%s.manifest", tableLocation, manifestFilePrefix, uuid, spec.specId())); - OutputFile outputFile = io.newOutputFile(location); - return ManifestFiles.write(spec, outputFile); + return ManifestFiles.write(spec, io.newOutputFile(location)); } } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java index 7941c13b0dfe..0d81d868c49f 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java @@ -29,6 +29,7 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; import org.slf4j.Logger; @@ -66,7 +67,10 @@ class RecordWriter { fileFormat.addExtension( table.locationProvider().newDataLocation(table.spec(), partitionKey, filename)); } - OutputFile outputFile = table.io().newOutputFile(absoluteFilename); + OutputFile outputFile; + try (FileIO io = table.io()) { + outputFile = io.newOutputFile(absoluteFilename); + } switch (fileFormat) { case AVRO: diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 4c21a0175ab0..63186f26fb5a 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -96,7 +96,9 @@ class DestinationState { private final IcebergDestination icebergDestination; private final PartitionSpec spec; private final org.apache.iceberg.Schema schema; - private final PartitionKey partitionKey; + // used to determine the partition to which a record belongs + // must not be directly used to create a writer + private final PartitionKey routingPartitionKey; private final Table table; private final String stateToken = UUID.randomUUID().toString(); final Cache writers; @@ -109,7 +111,7 @@ class DestinationState { this.icebergDestination = icebergDestination; this.schema = table.schema(); this.spec = table.spec(); - this.partitionKey = new PartitionKey(spec, schema); + this.routingPartitionKey = new PartitionKey(spec, schema); this.table = table; for (PartitionField partitionField : spec.fields()) { partitionFieldMap.put(partitionField.name(), partitionField); @@ -154,12 +156,12 @@ class DestinationState { * can't create a new writer, the {@link Record} is rejected and {@code false} is returned. */ boolean write(Record record) { - partitionKey.partition(getPartitionableRecord(record)); + routingPartitionKey.partition(getPartitionableRecord(record)); - if (!writers.asMap().containsKey(partitionKey) && openWriters >= maxNumWriters) { + if (!writers.asMap().containsKey(routingPartitionKey) && openWriters >= maxNumWriters) { return false; } - RecordWriter writer = fetchWriterForPartition(partitionKey); + RecordWriter writer = fetchWriterForPartition(routingPartitionKey); writer.write(record); return true; } @@ -173,10 +175,12 @@ private RecordWriter fetchWriterForPartition(PartitionKey partitionKey) { RecordWriter recordWriter = writers.getIfPresent(partitionKey); if (recordWriter == null || recordWriter.bytesWritten() > maxFileSize) { + // each writer must have its own PartitionKey object + PartitionKey copy = partitionKey.copy(); // calling invalidate for a non-existent key is a safe operation - writers.invalidate(partitionKey); - recordWriter = createWriter(partitionKey); - writers.put(partitionKey, recordWriter); + writers.invalidate(copy); + recordWriter = createWriter(copy); + writers.put(copy, recordWriter); } return recordWriter; } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java new file mode 100644 index 000000000000..39920e66199b --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.catalog; + +import java.io.IOException; +import java.util.Map; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; + +public class BigQueryMetastoreCatalogIT extends IcebergCatalogBaseIT { + static final String BQMS_CATALOG = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"; + static final String DATASET = "managed_iceberg_bqms_tests_no_delete"; + static final long SALT = System.nanoTime(); + + @Override + public String tableId() { + return DATASET + "." + testName.getMethodName() + "_" + SALT; + } + + @Override + public Catalog createCatalog() { + return CatalogUtil.loadCatalog( + BQMS_CATALOG, + "bqms_" + catalogName, + ImmutableMap.builder() + .put("gcp_project", options.getProject()) + .put("gcp_location", "us-central1") + .put("warehouse", warehouse) + .build(), + new Configuration()); + } + + @Override + public void catalogCleanup() throws IOException { + for (TableIdentifier tableIdentifier : catalog.listTables(Namespace.of(DATASET))) { + // only delete tables that were created in this test run + if (tableIdentifier.name().contains(String.valueOf(SALT))) { + catalog.dropTable(tableIdentifier); + } + } + } + + @Override + public Map managedIcebergConfig(String tableId) { + return ImmutableMap.builder() + .put("table", tableId) + .put( + "catalog_properties", + ImmutableMap.builder() + .put("gcp_project", options.getProject()) + .put("gcp_location", "us-central1") + .put("warehouse", warehouse) + .put("catalog-impl", BQMS_CATALOG) + .build()) + .build(); + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java index f31eb19906ff..076d3f4f9db8 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java @@ -65,7 +65,6 @@ public Catalog createCatalog() { @Override public void catalogCleanup() throws Exception { - System.out.println("xxx CLEANING UP!"); if (hiveMetastoreExtension != null) { hiveMetastoreExtension.cleanup(); } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java index 8e4a74cd61d4..df2ca5adb7ac 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java @@ -28,10 +28,13 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.LongStream; import java.util.stream.Stream; @@ -65,6 +68,7 @@ import org.apache.iceberg.TableScan; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IdentityPartitionConverters; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; @@ -74,7 +78,10 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.PartitionUtil; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -85,6 +92,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -145,7 +153,11 @@ public void setUp() throws Exception { @After public void cleanUp() throws Exception { - catalogCleanup(); + try { + catalogCleanup(); + } catch (Exception e) { + LOG.warn("Catalog cleanup failed.", e); + } try { GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil(); @@ -163,7 +175,7 @@ public void cleanUp() throws Exception { gcsUtil.remove(filesToDelete); } catch (Exception e) { - LOG.warn("Failed to clean up files.", e); + LOG.warn("Failed to clean up GCS files.", e); } } @@ -173,6 +185,7 @@ public void cleanUp() throws Exception { private static final String RANDOM = UUID.randomUUID().toString(); @Rule public TestPipeline pipeline = TestPipeline.create(); @Rule public TestName testName = new TestName(); + @Rule public transient Timeout globalTimeout = Timeout.seconds(300); private static final int NUM_SHARDS = 10; private static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogBaseIT.class); private static final Schema DOUBLY_NESTED_ROW_SCHEMA = @@ -289,6 +302,22 @@ private List populateTable(Table table) throws IOException { return expectedRows; } + private static Map constantsMap( + FileScanTask task, + BiFunction converter, + org.apache.iceberg.Schema schema) { + PartitionSpec spec = task.spec(); + Set idColumns = spec.identitySourceIds(); + org.apache.iceberg.Schema partitionSchema = TypeUtil.select(schema, idColumns); + boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty(); + + if (projectsIdentityPartitionColumns) { + return PartitionUtil.constantsMap(task, converter); + } else { + return Collections.emptyMap(); + } + } + private List readRecords(Table table) { org.apache.iceberg.Schema tableSchema = table.schema(); TableScan tableScan = table.newScan().project(tableSchema); @@ -297,13 +326,16 @@ private List readRecords(Table table) { InputFilesDecryptor descryptor = new InputFilesDecryptor(task, table.io(), table.encryption()); for (FileScanTask fileTask : task.files()) { + Map idToConstants = + constantsMap(fileTask, IdentityPartitionConverters::convertConstant, tableSchema); InputFile inputFile = descryptor.getInputFile(fileTask); CloseableIterable iterable = Parquet.read(inputFile) .split(fileTask.start(), fileTask.length()) .project(tableSchema) .createReaderFunc( - fileSchema -> GenericParquetReaders.buildReader(tableSchema, fileSchema)) + fileSchema -> + GenericParquetReaders.buildReader(tableSchema, fileSchema, idToConstants)) .filter(fileTask.residual()) .build(); diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index cbdb70b9ffec..3e2f5d4cf635 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -620,8 +620,29 @@ def __init__( blob, chunk_size=DEFAULT_READ_BUFFER_SIZE, enable_read_bucket_metric=False, - retry=DEFAULT_RETRY): - super().__init__(blob, chunk_size=chunk_size, retry=retry) + retry=DEFAULT_RETRY, + raw_download=True): + # By default, we always request to retrieve raw data from GCS even if the + # object meets the criteria of decompressive transcoding + # (https://cloud.google.com/storage/docs/transcoding). + super().__init__( + blob, chunk_size=chunk_size, retry=retry, raw_download=raw_download) + # TODO: Remove this after + # https://github.com/googleapis/python-storage/issues/1406 is fixed. + # As a workaround, we manually trigger a reload here. Otherwise, an internal + # call of reader.seek() will cause an exception if raw_download is set + # when initializing BlobReader(), + blob.reload() + + # TODO: Currently there is a bug in GCS server side when a client requests + # a file with "content-encoding=gzip" and "content-type=application/gzip" or + # "content-type=application/x-gzip", which will lead to infinite loop. + # We skip the support of this type of files until the GCS bug is fixed. + # Internal bug id: 203845981. + if (blob.content_encoding == "gzip" and + blob.content_type in ["application/gzip", "application/x-gzip"]): + raise NotImplementedError("Doubly compressed files not supported.") + self.enable_read_bucket_metric = enable_read_bucket_metric self.mode = "r" diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py index 07a5fb5df553..2ce355060bb4 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py @@ -31,14 +31,22 @@ import logging import unittest import uuid +import zlib import mock import pytest +from parameterized import parameterized from parameterized import parameterized_class +from apache_beam import Create +from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystems import FileSystems +from apache_beam.io.textio import ReadAllFromText from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.transforms.combiners import Count try: from apache_beam.io.gcp import gcsio @@ -230,6 +238,69 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name): self.assertIsNone(self.gcsio.get_bucket(overridden_bucket_name)) +class GcsIOReadGzipTest(unittest.TestCase): + gcs_path_prefix = "gs://apache-beam-samples/textio/" + gzip_test_files = [ + "textio-test-data.content-type-gzip-content-encoding-gzip.1k.txt.gz", + "textio-test-data.content-type-gzip-content-encoding-none.1k.txt.gz", + "textio-test-data.content-type-none-content-encoding-gzip.1k.txt.gz", + "textio-test-data.content-type-none-content-encoding-none.1k.txt.gz", + "textio-test-data.content-type-text-content-encoding-gzip.1k.txt.gz", + "textio-test-data.content-type-text-content-encoding-none.1k.txt.gz", + "textio-test-data.default.1k.txt", + "textio-test-data.default.1k.txt.gz", + "textio-test-data.gzip-local.1k.txt.gz", + ] + + @parameterized.expand([ + (gzip_test_files[0], CompressionTypes.UNCOMPRESSED, NotImplementedError), + (gzip_test_files[0], CompressionTypes.GZIP, NotImplementedError), + (gzip_test_files[0], CompressionTypes.AUTO, NotImplementedError), + (gzip_test_files[1], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[1], CompressionTypes.GZIP, None), + (gzip_test_files[1], CompressionTypes.AUTO, None), + (gzip_test_files[2], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[2], CompressionTypes.GZIP, None), + (gzip_test_files[2], CompressionTypes.AUTO, None), + (gzip_test_files[3], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[3], CompressionTypes.GZIP, None), + (gzip_test_files[3], CompressionTypes.AUTO, None), + (gzip_test_files[4], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[4], CompressionTypes.GZIP, None), + (gzip_test_files[4], CompressionTypes.AUTO, None), + (gzip_test_files[5], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[5], CompressionTypes.GZIP, None), + (gzip_test_files[5], CompressionTypes.AUTO, None), + (gzip_test_files[6], CompressionTypes.UNCOMPRESSED, None), + (gzip_test_files[6], CompressionTypes.GZIP, zlib.error), + (gzip_test_files[6], CompressionTypes.AUTO, None), + (gzip_test_files[7], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[7], CompressionTypes.GZIP, None), + (gzip_test_files[7], CompressionTypes.AUTO, None), + (gzip_test_files[8], CompressionTypes.UNCOMPRESSED, UnicodeDecodeError), + (gzip_test_files[8], CompressionTypes.GZIP, None), + (gzip_test_files[8], CompressionTypes.AUTO, None), + ]) + @unittest.skipIf(NotFound is None, 'GCP dependencies are not installed') + def test_read_gzip_file(self, file_name, compression_type, exception): + p = TestPipeline(runner="Direct", is_integration_test=True) + r = ( + p + | Create([f"{GcsIOReadGzipTest.gcs_path_prefix}{file_name}"]) + | "Read File from GCS" >> + ReadAllFromText(compression_type=compression_type) + | Count.Globally()) + assert_that(r, equal_to([1000])) + + if exception is None: + result = p.run() + result.wait_until_finish() + else: + with self.assertRaises(exception): + result = p.run() + result.wait_until_finish() + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index 0818dda465b8..1faae2b2a8f1 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -178,6 +178,11 @@ def __init__( self._fail_when_getting_metadata = fail_when_getting_metadata self._fail_when_reading = fail_when_reading self.generation = random.randint(0, (1 << 63) - 1) + self.content_encoding = None + self.content_type = None + + def reload(self): + pass def delete(self): self.bucket.delete_blob(self.name) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index 337ac9919487..1e8880a60a35 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -34,9 +34,6 @@ from apache_beam.options.pipeline_options import PortableOptions from apache_beam.runners.portability import portable_runner_test from apache_beam.testing.util import assert_that -from apache_beam.testing.util import equal_to -from apache_beam.transforms import window -from apache_beam.utils import timestamp # Run as # @@ -174,26 +171,6 @@ def create_options(self): return options - # Slightly more robust session window test: - # Validates that an inner grouping doesn't duplicate data either. - # Copied also because the timestamp in fn_runner_test.py isn't being - # inferred correctly as seconds for some reason, but as micros. - # The belabored specification is validating the timestamp type works at least. - # See https://github.com/apache/beam/issues/32085 - def test_windowing(self): - with self.create_pipeline() as p: - res = ( - p - | beam.Create([1, 2, 100, 101, 102, 123]) - | beam.Map( - lambda t: window.TimestampedValue( - ('k', t), timestamp.Timestamp.of(t).micros)) - | beam.WindowInto(beam.transforms.window.Sessions(10)) - | beam.GroupByKey() - | beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1])))) - assert_that( - res, equal_to([('k', [1, 2]), ('k', [100, 101, 102]), ('k', [123])])) - # Can't read host files from within docker, read a "local" file there. def test_read(self): print('name:', __name__) diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index 633a8511a19a..8000c24f28aa 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -34,7 +34,7 @@ requires = [ 'pyyaml>=3.12,<7.0.0', # also update Jinja2 bounds in test-suites/xlang/build.gradle (look for xlangWrapperValidation task) "jinja2>=2.7.1,<4.0.0", - 'yapf==0.43.0' + 'yapf==0.29.0' ] diff --git a/settings.gradle.kts b/settings.gradle.kts index 67ba773d4070..69105a9c1cf2 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -350,3 +350,5 @@ include("sdks:java:extensions:combiners") findProject(":sdks:java:extensions:combiners")?.name = "combiners" include("sdks:java:io:iceberg:hive") findProject(":sdks:java:io:iceberg:hive")?.name = "hive" +include("sdks:java:io:iceberg:bqms") +findProject(":sdks:java:io:iceberg:bqms")?.name = "bqms"