diff --git a/CHANGES.md b/CHANGES.md index 65610a9a7fcd..42c0592ec975 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -65,6 +65,7 @@ * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). * Go SDK users can now use generic registration functions to optimize their DoFn execution. ([BEAM-14347](https://issues.apache.org/jira/browse/BEAM-14347)) * Go SDK users may now write self-checkpointing Splittable DoFns to read from streaming sources. ([BEAM-11104](https://issues.apache.org/jira/browse/BEAM-11104)) +* Go SDK textio Reads have been moved to Splittable DoFns exclusively. ([BEAM-14489](https://issues.apache.org/jira/browse/BEAM-14489)) ## Breaking Changes diff --git a/sdks/go/pkg/beam/io/textio/sdf.go b/sdks/go/pkg/beam/io/textio/sdf.go deleted file mode 100644 index 91149d05dd6a..000000000000 --- a/sdks/go/pkg/beam/io/textio/sdf.go +++ /dev/null @@ -1,203 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package textio - -import ( - "bufio" - "context" - "io" - "reflect" - "strings" - - "github.com/apache/beam/sdks/v2/go/pkg/beam" - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" - "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" - "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" - "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" - "github.com/apache/beam/sdks/v2/go/pkg/beam/log" -) - -func init() { - beam.RegisterType(reflect.TypeOf((*readSdfFn)(nil)).Elem()) - beam.RegisterFunction(sizeFn) -} - -// ReadSdf is a variation of Read implemented via SplittableDoFn. This should -// result in increased performance with runners that support splitting. -func ReadSdf(s beam.Scope, glob string) beam.PCollection { - s = s.Scope("textio.ReadSdf") - - filesystem.ValidateScheme(glob) - return readSdf(s, beam.Create(s, glob)) -} - -// ReadAllSdf is a variation of ReadAll implemented via SplittableDoFn. This -// should result in increased performance with runners that support splitting. -func ReadAllSdf(s beam.Scope, col beam.PCollection) beam.PCollection { - s = s.Scope("textio.ReadAllSdf") - - return readSdf(s, col) -} - -// readSdf takes a PCollection of globs and returns a PCollection of lines from -// all files in those globs. Unlike textio.read, this version uses an SDF for -// reading files. -func readSdf(s beam.Scope, col beam.PCollection) beam.PCollection { - files := beam.ParDo(s, expandFn, col) - sized := beam.ParDo(s, sizeFn, files) - return beam.ParDo(s, &readSdfFn{}, sized) -} - -// sizeFn pairs a filename with the size of that file in bytes. -// TODO(BEAM-11109): Once CreateInitialRestriction supports Context params and -// error return values, this can be done in readSdfFn.CreateInitialRestriction. -func sizeFn(ctx context.Context, filename string) (string, int64, error) { - fs, err := filesystem.New(ctx, filename) - if err != nil { - return "", -1, err - } - defer fs.Close() - - size, err := fs.Size(ctx, filename) - if err != nil { - return "", -1, err - } - return filename, size, nil -} - -// readSdfFn reads individual lines from a text file, given a filename and a -// size in bytes for that file. -type readSdfFn struct { -} - -// CreateInitialRestriction creates an offset range restriction representing -// the file, using the paired size rather than fetching the file's size. -func (fn *readSdfFn) CreateInitialRestriction(_ string, size int64) offsetrange.Restriction { - return offsetrange.Restriction{ - Start: 0, - End: size, - } -} - -const ( - // blockSize is the desired size of each block for initial splits. - blockSize int64 = 64 * 1024 * 1024 // 64 MB - // tooSmall is the size limit for a block. If the last block is smaller than - // this, it gets merged with the previous block. - tooSmall = blockSize / 4 -) - -// SplitRestriction splits each file restriction into blocks of a predeterined -// size, with some checks to avoid having small remainders. -func (fn *readSdfFn) SplitRestriction(_ string, _ int64, rest offsetrange.Restriction) []offsetrange.Restriction { - splits := rest.SizedSplits(blockSize) - numSplits := len(splits) - if numSplits > 1 { - last := splits[numSplits-1] - if last.End-last.Start <= tooSmall { - // Last restriction is too small, so merge it with previous one. - splits[numSplits-2].End = last.End - splits = splits[:numSplits-1] - } - } - return splits -} - -// Size returns the size of each restriction as its range. -func (fn *readSdfFn) RestrictionSize(_ string, _ int64, rest offsetrange.Restriction) float64 { - return rest.Size() -} - -// CreateTracker creates sdf.LockRTrackers wrapping offsetRange.Trackers for -// each restriction. -func (fn *readSdfFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker { - return sdf.NewLockRTracker(offsetrange.NewTracker(rest)) -} - -// ProcessElement outputs all lines in the file that begin within the paired -// restriction. -// -// Note that restrictions may not align perfectly with lines. So lines can begin -// before the restriction and end within it (those are ignored), and lines can -// begin within the restriction and past the restriction (those are entirely -// output, including the portion outside the restriction). In some cases a -// valid restriction might not output any lines. -func (fn *readSdfFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, filename string, _ int64, emit func(string)) error { - log.Infof(ctx, "Reading from %v", filename) - - fs, err := filesystem.New(ctx, filename) - if err != nil { - return err - } - defer fs.Close() - - fd, err := fs.OpenRead(ctx, filename) - if err != nil { - return err - } - defer fd.Close() - - rd := bufio.NewReader(fd) - - i := rt.GetRestriction().(offsetrange.Restriction).Start - if i > 0 { - // If restriction's starts after 0, we cannot assume a new line starts - // at the beginning of the restriction, so we must search for the first - // line beginning at or after restriction.Start. This is done by - // scanning to the byte just before the restriction and then reading - // until the next newline, leaving the reader at the start of a new - // line past restriction.Start. - i -= 1 - n, err := rd.Discard(int(i)) // Scan to just before restriction. - if err == io.EOF { - return errors.Errorf("TextIO restriction lies outside the file being read. "+ - "Restriction begins at %v bytes, but file is only %v bytes.", i+1, n) - } - if err != nil { - return err - } - line, err := rd.ReadString('\n') // Read until the first line within the restriction. - if err == io.EOF { - // No lines start in the restriction but it's still valid, so - // finish claiming before returning to avoid errors. - rt.TryClaim(rt.GetRestriction().(offsetrange.Restriction).End) - return nil - } - if err != nil { - return err - } - i += int64(len(line)) - } - - // Claim each line until we claim a line outside the restriction. - for rt.TryClaim(i) { - line, err := rd.ReadString('\n') - if err == io.EOF { - if len(line) != 0 { - emit(strings.TrimSuffix(line, "\n")) - } - // Finish claiming restriction before breaking to avoid errors. - rt.TryClaim(rt.GetRestriction().(offsetrange.Restriction).End) - break - } - if err != nil { - return err - } - emit(strings.TrimSuffix(line, "\n")) - i += int64(len(line)) - } - return nil -} diff --git a/sdks/go/pkg/beam/io/textio/sdf_test.go b/sdks/go/pkg/beam/io/textio/sdf_test.go deleted file mode 100644 index 7b1b41663761..000000000000 --- a/sdks/go/pkg/beam/io/textio/sdf_test.go +++ /dev/null @@ -1,49 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package textio - -import ( - "context" - "testing" - - "github.com/apache/beam/sdks/v2/go/pkg/beam" - _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct" - "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" -) - -// TestReadSdf tests that readSdf successfully reads a test text file, and -// outputs the correct number of lines for it, even for an exceedingly long -// line. -func TestReadSdf(t *testing.T) { - p, s := beam.NewPipelineWithRoot() - lines := ReadSdf(s, testFilePath) - passert.Count(s, lines, "NumLines", 1) - - if _, err := beam.Run(context.Background(), "direct", p); err != nil { - t.Fatalf("Failed to execute job: %v", err) - } -} - -func TestReadAllSdf(t *testing.T) { - p, s := beam.NewPipelineWithRoot() - files := beam.Create(s, testFilePath) - lines := ReadAllSdf(s, files) - passert.Count(s, lines, "NumLines", 1) - - if _, err := beam.Run(context.Background(), "direct", p); err != nil { - t.Fatalf("Failed to execute job: %v", err) - } -} diff --git a/sdks/go/pkg/beam/io/textio/textio.go b/sdks/go/pkg/beam/io/textio/textio.go index 3b0abf58877f..315487840b56 100644 --- a/sdks/go/pkg/beam/io/textio/textio.go +++ b/sdks/go/pkg/beam/io/textio/textio.go @@ -25,18 +25,23 @@ import ( "strings" "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" ) func init() { + beam.RegisterType(reflect.TypeOf((*readFn)(nil)).Elem()) + beam.RegisterFunction(sizeFn) beam.RegisterType(reflect.TypeOf((*writeFileFn)(nil)).Elem()) - beam.RegisterFunction(readFn) beam.RegisterFunction(expandFn) } -// Read reads a set of file and returns the lines as a PCollection. The -// newlines are not part of the lines. +// Read reads a set of files indicated by the glob pattern and returns +// the lines as a PCollection. +// The newlines are not part of the lines. func Read(s beam.Scope, glob string) beam.PCollection { s = s.Scope("textio.Read") @@ -49,15 +54,39 @@ func Read(s beam.Scope, glob string) beam.PCollection { // PCollection. The newlines are not part of the lines. func ReadAll(s beam.Scope, col beam.PCollection) beam.PCollection { s = s.Scope("textio.ReadAll") + return read(s, col) +} + +// ReadSdf is a variation of Read implemented via SplittableDoFn. This should +// result in increased performance with runners that support splitting. +// +// Deprecated: Use Read instead, which has been migrated to use this SDF implementation. +func ReadSdf(s beam.Scope, glob string) beam.PCollection { + s = s.Scope("textio.ReadSdf") + + filesystem.ValidateScheme(glob) + return read(s, beam.Create(s, glob)) +} +// ReadAllSdf is a variation of ReadAll implemented via SplittableDoFn. This +// should result in increased performance with runners that support splitting. +// +// Deprecated: Use ReadAll instead, which has been migrated to use this SDF implementation. +func ReadAllSdf(s beam.Scope, col beam.PCollection) beam.PCollection { + s = s.Scope("textio.ReadAllSdf") return read(s, col) } +// read takes a PCollection of globs and returns a PCollection of lines from +// all files in those globs. Uses an SDF to allow splitting reads of files +// into separate bundles. func read(s beam.Scope, col beam.PCollection) beam.PCollection { files := beam.ParDo(s, expandFn, col) - return beam.ParDo(s, readFn, files) + sized := beam.ParDo(s, sizeFn, files) + return beam.ParDo(s, &readFn{}, sized) } +// expandFn expands a glob pattern into all matching file names. func expandFn(ctx context.Context, glob string, emit func(string)) error { if strings.TrimSpace(glob) == "" { return nil // ignore empty string elements here @@ -79,7 +108,82 @@ func expandFn(ctx context.Context, glob string, emit func(string)) error { return nil } -func readFn(ctx context.Context, filename string, emit func(string)) error { +// sizeFn pairs a filename with the size of that file in bytes. +// TODO(BEAM-11109): Once CreateInitialRestriction supports Context params and +// error return values, this can be done in readSdfFn.CreateInitialRestriction. +func sizeFn(ctx context.Context, filename string) (string, int64, error) { + fs, err := filesystem.New(ctx, filename) + if err != nil { + return "", -1, err + } + defer fs.Close() + + size, err := fs.Size(ctx, filename) + if err != nil { + return "", -1, err + } + return filename, size, nil +} + +// readFn reads individual lines from a text file, given a filename and a +// size in bytes for that file. Implemented as an SDF to allow splitting +// within a file. +type readFn struct { +} + +// CreateInitialRestriction creates an offset range restriction representing +// the file, using the paired size rather than fetching the file's size. +func (fn *readFn) CreateInitialRestriction(_ string, size int64) offsetrange.Restriction { + return offsetrange.Restriction{ + Start: 0, + End: size, + } +} + +const ( + // blockSize is the desired size of each block for initial splits. + blockSize int64 = 64 * 1024 * 1024 // 64 MB + // tooSmall is the size limit for a block. If the last block is smaller than + // this, it gets merged with the previous block. + tooSmall = blockSize / 4 +) + +// SplitRestriction splits each file restriction into blocks of a predeterined +// size, with some checks to avoid having small remainders. +func (fn *readFn) SplitRestriction(_ string, _ int64, rest offsetrange.Restriction) []offsetrange.Restriction { + splits := rest.SizedSplits(blockSize) + numSplits := len(splits) + if numSplits > 1 { + last := splits[numSplits-1] + if last.End-last.Start <= tooSmall { + // Last restriction is too small, so merge it with previous one. + splits[numSplits-2].End = last.End + splits = splits[:numSplits-1] + } + } + return splits +} + +// Size returns the size of each restriction as its range. +func (fn *readFn) RestrictionSize(_ string, _ int64, rest offsetrange.Restriction) float64 { + return rest.Size() +} + +// CreateTracker creates sdf.LockRTrackers wrapping offsetRange.Trackers for +// each restriction. +func (fn *readFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker { + return sdf.NewLockRTracker(offsetrange.NewTracker(rest)) +} + +// ProcessElement outputs all lines in the file that begin within the paired +// restriction. +// +// Note that restrictions may not align perfectly with lines. So lines can begin +// before the restriction and end within it (those are ignored), and lines can +// begin within the restriction and past the restriction (those are entirely +// output, including the portion outside the restriction). In some cases a +// valid restriction might not output any lines. +func (fn *readFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, filename string, _ int64, emit func(string)) error { log.Infof(ctx, "Reading from %v", filename) fs, err := filesystem.New(ctx, filename) @@ -95,18 +199,53 @@ func readFn(ctx context.Context, filename string, emit func(string)) error { defer fd.Close() rd := bufio.NewReader(fd) - for { + + i := rt.GetRestriction().(offsetrange.Restriction).Start + if i > 0 { + // If restriction's starts after 0, we cannot assume a new line starts + // at the beginning of the restriction, so we must search for the first + // line beginning at or after restriction.Start. This is done by + // scanning to the byte just before the restriction and then reading + // until the next newline, leaving the reader at the start of a new + // line past restriction.Start. + i -= 1 + n, err := rd.Discard(int(i)) // Scan to just before restriction. + if err == io.EOF { + return errors.Errorf("TextIO restriction lies outside the file being read. "+ + "Restriction begins at %v bytes, but file is only %v bytes.", i+1, n) + } + if err != nil { + return err + } + line, err := rd.ReadString('\n') // Read until the first line within the restriction. + if err == io.EOF { + // No lines start in the restriction but it's still valid, so + // finish claiming before returning to avoid errors. + rt.TryClaim(rt.GetRestriction().(offsetrange.Restriction).End) + return nil + } + if err != nil { + return err + } + i += int64(len(line)) + } + + // Claim each line until we claim a line outside the restriction. + for rt.TryClaim(i) { line, err := rd.ReadString('\n') if err == io.EOF { if len(line) != 0 { emit(strings.TrimSuffix(line, "\n")) } + // Finish claiming restriction before breaking to avoid errors. + rt.TryClaim(rt.GetRestriction().(offsetrange.Restriction).End) break } if err != nil { return err } emit(strings.TrimSuffix(line, "\n")) + i += int64(len(line)) } return nil } diff --git a/sdks/go/pkg/beam/io/textio/textio_test.go b/sdks/go/pkg/beam/io/textio/textio_test.go index e5c3c978eb68..3a80f44cd4c6 100644 --- a/sdks/go/pkg/beam/io/textio/textio_test.go +++ b/sdks/go/pkg/beam/io/textio/textio_test.go @@ -30,23 +30,6 @@ import ( const testFilePath = "../../../../data/textio_test.txt" -func TestReadFn(t *testing.T) { - receivedLines := []string{} - getLines := func(line string) { - receivedLines = append(receivedLines, line) - } - - err := readFn(context.Background(), testFilePath, getLines) - if err != nil { - t.Fatalf("failed with %v", err) - } - want := 1 - if len(receivedLines) != 1 { - t.Fatalf("received %v lines, want %v", len(receivedLines), want) - } - -} - func TestRead(t *testing.T) { p, s := beam.NewPipelineWithRoot() lines := Read(s, testFilePath) @@ -106,3 +89,27 @@ func TestImmediate(t *testing.T) { ptest.RunAndValidate(t, p) } + +// TestReadSdf tests that readSdf successfully reads a test text file, and +// outputs the correct number of lines for it, even for an exceedingly long +// line. +func TestReadSdf(t *testing.T) { + p, s := beam.NewPipelineWithRoot() + lines := ReadSdf(s, testFilePath) + passert.Count(s, lines, "NumLines", 1) + + if _, err := beam.Run(context.Background(), "direct", p); err != nil { + t.Fatalf("Failed to execute job: %v", err) + } +} + +func TestReadAllSdf(t *testing.T) { + p, s := beam.NewPipelineWithRoot() + files := beam.Create(s, testFilePath) + lines := ReadAllSdf(s, files) + passert.Count(s, lines, "NumLines", 1) + + if _, err := beam.Run(context.Background(), "direct", p); err != nil { + t.Fatalf("Failed to execute job: %v", err) + } +}