From 2397980fa857edeb507d9d6962031e61eeb655b7 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 18 May 2022 22:38:50 -0700 Subject: [PATCH 1/4] [BEAM-14489] Remove non-SDF version of TextIO. --- sdks/go/pkg/beam/io/textio/sdf.go | 8 +++- sdks/go/pkg/beam/io/textio/textio.go | 48 +---------------------- sdks/go/pkg/beam/io/textio/textio_test.go | 18 --------- 3 files changed, 8 insertions(+), 66 deletions(-) diff --git a/sdks/go/pkg/beam/io/textio/sdf.go b/sdks/go/pkg/beam/io/textio/sdf.go index 91149d05dd6a..c54235d57bb1 100644 --- a/sdks/go/pkg/beam/io/textio/sdf.go +++ b/sdks/go/pkg/beam/io/textio/sdf.go @@ -37,8 +37,10 @@ func init() { // ReadSdf is a variation of Read implemented via SplittableDoFn. This should // result in increased performance with runners that support splitting. +// +// Deprecated: Called directly by Read, use that instead. func ReadSdf(s beam.Scope, glob string) beam.PCollection { - s = s.Scope("textio.ReadSdf") + s = s.Scope("textio.Read") filesystem.ValidateScheme(glob) return readSdf(s, beam.Create(s, glob)) @@ -46,8 +48,10 @@ func ReadSdf(s beam.Scope, glob string) beam.PCollection { // ReadAllSdf is a variation of ReadAll implemented via SplittableDoFn. This // should result in increased performance with runners that support splitting. +// +// Deprecated: Called directly by ReadAll, use that instead. func ReadAllSdf(s beam.Scope, col beam.PCollection) beam.PCollection { - s = s.Scope("textio.ReadAllSdf") + s = s.Scope("textio.ReadAll") return readSdf(s, col) } diff --git a/sdks/go/pkg/beam/io/textio/textio.go b/sdks/go/pkg/beam/io/textio/textio.go index 3b0abf58877f..8af3071292a3 100644 --- a/sdks/go/pkg/beam/io/textio/textio.go +++ b/sdks/go/pkg/beam/io/textio/textio.go @@ -19,7 +19,6 @@ package textio import ( "bufio" "context" - "io" "os" "reflect" "strings" @@ -31,31 +30,20 @@ import ( func init() { beam.RegisterType(reflect.TypeOf((*writeFileFn)(nil)).Elem()) - beam.RegisterFunction(readFn) beam.RegisterFunction(expandFn) } // Read reads a set of file and returns the lines as a PCollection. The // newlines are not part of the lines. func Read(s beam.Scope, glob string) beam.PCollection { - s = s.Scope("textio.Read") - - filesystem.ValidateScheme(glob) - return read(s, beam.Create(s, glob)) + return ReadSdf(s, glob) } // ReadAll expands and reads the filename given as globs by the incoming // PCollection. It returns the lines of all files as a single // PCollection. The newlines are not part of the lines. func ReadAll(s beam.Scope, col beam.PCollection) beam.PCollection { - s = s.Scope("textio.ReadAll") - - return read(s, col) -} - -func read(s beam.Scope, col beam.PCollection) beam.PCollection { - files := beam.ParDo(s, expandFn, col) - return beam.ParDo(s, readFn, files) + return ReadAllSdf(s, col) } func expandFn(ctx context.Context, glob string, emit func(string)) error { @@ -79,38 +67,6 @@ func expandFn(ctx context.Context, glob string, emit func(string)) error { return nil } -func readFn(ctx context.Context, filename string, emit func(string)) error { - log.Infof(ctx, "Reading from %v", filename) - - fs, err := filesystem.New(ctx, filename) - if err != nil { - return err - } - defer fs.Close() - - fd, err := fs.OpenRead(ctx, filename) - if err != nil { - return err - } - defer fd.Close() - - rd := bufio.NewReader(fd) - for { - line, err := rd.ReadString('\n') - if err == io.EOF { - if len(line) != 0 { - emit(strings.TrimSuffix(line, "\n")) - } - break - } - if err != nil { - return err - } - emit(strings.TrimSuffix(line, "\n")) - } - return nil -} - // TODO(herohde) 7/12/2017: extend Write to write to a series of files // as well as allow sharding. diff --git a/sdks/go/pkg/beam/io/textio/textio_test.go b/sdks/go/pkg/beam/io/textio/textio_test.go index e5c3c978eb68..507583ac0ebd 100644 --- a/sdks/go/pkg/beam/io/textio/textio_test.go +++ b/sdks/go/pkg/beam/io/textio/textio_test.go @@ -17,7 +17,6 @@ package textio import ( - "context" "errors" "os" "testing" @@ -30,23 +29,6 @@ import ( const testFilePath = "../../../../data/textio_test.txt" -func TestReadFn(t *testing.T) { - receivedLines := []string{} - getLines := func(line string) { - receivedLines = append(receivedLines, line) - } - - err := readFn(context.Background(), testFilePath, getLines) - if err != nil { - t.Fatalf("failed with %v", err) - } - want := 1 - if len(receivedLines) != 1 { - t.Fatalf("received %v lines, want %v", len(receivedLines), want) - } - -} - func TestRead(t *testing.T) { p, s := beam.NewPipelineWithRoot() lines := Read(s, testFilePath) From 67d95123ec25807d8557e814067dfec2312ec1d6 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 23 May 2022 15:46:25 -0700 Subject: [PATCH 2/4] [BEAM-14489] Consolidate & keep scopes. --- sdks/go/pkg/beam/io/textio/sdf.go | 207 ---------------------- sdks/go/pkg/beam/io/textio/sdf_test.go | 49 ----- sdks/go/pkg/beam/io/textio/textio.go | 186 ++++++++++++++++++- sdks/go/pkg/beam/io/textio/textio_test.go | 25 +++ 4 files changed, 209 insertions(+), 258 deletions(-) delete mode 100644 sdks/go/pkg/beam/io/textio/sdf.go delete mode 100644 sdks/go/pkg/beam/io/textio/sdf_test.go diff --git a/sdks/go/pkg/beam/io/textio/sdf.go b/sdks/go/pkg/beam/io/textio/sdf.go deleted file mode 100644 index c54235d57bb1..000000000000 --- a/sdks/go/pkg/beam/io/textio/sdf.go +++ /dev/null @@ -1,207 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package textio - -import ( - "bufio" - "context" - "io" - "reflect" - "strings" - - "github.com/apache/beam/sdks/v2/go/pkg/beam" - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" - "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" - "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" - "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" - "github.com/apache/beam/sdks/v2/go/pkg/beam/log" -) - -func init() { - beam.RegisterType(reflect.TypeOf((*readSdfFn)(nil)).Elem()) - beam.RegisterFunction(sizeFn) -} - -// ReadSdf is a variation of Read implemented via SplittableDoFn. This should -// result in increased performance with runners that support splitting. -// -// Deprecated: Called directly by Read, use that instead. -func ReadSdf(s beam.Scope, glob string) beam.PCollection { - s = s.Scope("textio.Read") - - filesystem.ValidateScheme(glob) - return readSdf(s, beam.Create(s, glob)) -} - -// ReadAllSdf is a variation of ReadAll implemented via SplittableDoFn. This -// should result in increased performance with runners that support splitting. -// -// Deprecated: Called directly by ReadAll, use that instead. -func ReadAllSdf(s beam.Scope, col beam.PCollection) beam.PCollection { - s = s.Scope("textio.ReadAll") - - return readSdf(s, col) -} - -// readSdf takes a PCollection of globs and returns a PCollection of lines from -// all files in those globs. Unlike textio.read, this version uses an SDF for -// reading files. -func readSdf(s beam.Scope, col beam.PCollection) beam.PCollection { - files := beam.ParDo(s, expandFn, col) - sized := beam.ParDo(s, sizeFn, files) - return beam.ParDo(s, &readSdfFn{}, sized) -} - -// sizeFn pairs a filename with the size of that file in bytes. -// TODO(BEAM-11109): Once CreateInitialRestriction supports Context params and -// error return values, this can be done in readSdfFn.CreateInitialRestriction. -func sizeFn(ctx context.Context, filename string) (string, int64, error) { - fs, err := filesystem.New(ctx, filename) - if err != nil { - return "", -1, err - } - defer fs.Close() - - size, err := fs.Size(ctx, filename) - if err != nil { - return "", -1, err - } - return filename, size, nil -} - -// readSdfFn reads individual lines from a text file, given a filename and a -// size in bytes for that file. -type readSdfFn struct { -} - -// CreateInitialRestriction creates an offset range restriction representing -// the file, using the paired size rather than fetching the file's size. -func (fn *readSdfFn) CreateInitialRestriction(_ string, size int64) offsetrange.Restriction { - return offsetrange.Restriction{ - Start: 0, - End: size, - } -} - -const ( - // blockSize is the desired size of each block for initial splits. - blockSize int64 = 64 * 1024 * 1024 // 64 MB - // tooSmall is the size limit for a block. If the last block is smaller than - // this, it gets merged with the previous block. - tooSmall = blockSize / 4 -) - -// SplitRestriction splits each file restriction into blocks of a predeterined -// size, with some checks to avoid having small remainders. -func (fn *readSdfFn) SplitRestriction(_ string, _ int64, rest offsetrange.Restriction) []offsetrange.Restriction { - splits := rest.SizedSplits(blockSize) - numSplits := len(splits) - if numSplits > 1 { - last := splits[numSplits-1] - if last.End-last.Start <= tooSmall { - // Last restriction is too small, so merge it with previous one. - splits[numSplits-2].End = last.End - splits = splits[:numSplits-1] - } - } - return splits -} - -// Size returns the size of each restriction as its range. -func (fn *readSdfFn) RestrictionSize(_ string, _ int64, rest offsetrange.Restriction) float64 { - return rest.Size() -} - -// CreateTracker creates sdf.LockRTrackers wrapping offsetRange.Trackers for -// each restriction. -func (fn *readSdfFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker { - return sdf.NewLockRTracker(offsetrange.NewTracker(rest)) -} - -// ProcessElement outputs all lines in the file that begin within the paired -// restriction. -// -// Note that restrictions may not align perfectly with lines. So lines can begin -// before the restriction and end within it (those are ignored), and lines can -// begin within the restriction and past the restriction (those are entirely -// output, including the portion outside the restriction). In some cases a -// valid restriction might not output any lines. -func (fn *readSdfFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, filename string, _ int64, emit func(string)) error { - log.Infof(ctx, "Reading from %v", filename) - - fs, err := filesystem.New(ctx, filename) - if err != nil { - return err - } - defer fs.Close() - - fd, err := fs.OpenRead(ctx, filename) - if err != nil { - return err - } - defer fd.Close() - - rd := bufio.NewReader(fd) - - i := rt.GetRestriction().(offsetrange.Restriction).Start - if i > 0 { - // If restriction's starts after 0, we cannot assume a new line starts - // at the beginning of the restriction, so we must search for the first - // line beginning at or after restriction.Start. This is done by - // scanning to the byte just before the restriction and then reading - // until the next newline, leaving the reader at the start of a new - // line past restriction.Start. - i -= 1 - n, err := rd.Discard(int(i)) // Scan to just before restriction. - if err == io.EOF { - return errors.Errorf("TextIO restriction lies outside the file being read. "+ - "Restriction begins at %v bytes, but file is only %v bytes.", i+1, n) - } - if err != nil { - return err - } - line, err := rd.ReadString('\n') // Read until the first line within the restriction. - if err == io.EOF { - // No lines start in the restriction but it's still valid, so - // finish claiming before returning to avoid errors. - rt.TryClaim(rt.GetRestriction().(offsetrange.Restriction).End) - return nil - } - if err != nil { - return err - } - i += int64(len(line)) - } - - // Claim each line until we claim a line outside the restriction. - for rt.TryClaim(i) { - line, err := rd.ReadString('\n') - if err == io.EOF { - if len(line) != 0 { - emit(strings.TrimSuffix(line, "\n")) - } - // Finish claiming restriction before breaking to avoid errors. - rt.TryClaim(rt.GetRestriction().(offsetrange.Restriction).End) - break - } - if err != nil { - return err - } - emit(strings.TrimSuffix(line, "\n")) - i += int64(len(line)) - } - return nil -} diff --git a/sdks/go/pkg/beam/io/textio/sdf_test.go b/sdks/go/pkg/beam/io/textio/sdf_test.go deleted file mode 100644 index 7b1b41663761..000000000000 --- a/sdks/go/pkg/beam/io/textio/sdf_test.go +++ /dev/null @@ -1,49 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package textio - -import ( - "context" - "testing" - - "github.com/apache/beam/sdks/v2/go/pkg/beam" - _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct" - "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" -) - -// TestReadSdf tests that readSdf successfully reads a test text file, and -// outputs the correct number of lines for it, even for an exceedingly long -// line. -func TestReadSdf(t *testing.T) { - p, s := beam.NewPipelineWithRoot() - lines := ReadSdf(s, testFilePath) - passert.Count(s, lines, "NumLines", 1) - - if _, err := beam.Run(context.Background(), "direct", p); err != nil { - t.Fatalf("Failed to execute job: %v", err) - } -} - -func TestReadAllSdf(t *testing.T) { - p, s := beam.NewPipelineWithRoot() - files := beam.Create(s, testFilePath) - lines := ReadAllSdf(s, files) - passert.Count(s, lines, "NumLines", 1) - - if _, err := beam.Run(context.Background(), "direct", p); err != nil { - t.Fatalf("Failed to execute job: %v", err) - } -} diff --git a/sdks/go/pkg/beam/io/textio/textio.go b/sdks/go/pkg/beam/io/textio/textio.go index 8af3071292a3..1c66d107b07c 100644 --- a/sdks/go/pkg/beam/io/textio/textio.go +++ b/sdks/go/pkg/beam/io/textio/textio.go @@ -19,16 +19,22 @@ package textio import ( "bufio" "context" + "io" "os" "reflect" "strings" "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" ) func init() { + beam.RegisterType(reflect.TypeOf((*readFn)(nil)).Elem()) + beam.RegisterFunction(sizeFn) beam.RegisterType(reflect.TypeOf((*writeFileFn)(nil)).Elem()) beam.RegisterFunction(expandFn) } @@ -36,16 +42,50 @@ func init() { // Read reads a set of file and returns the lines as a PCollection. The // newlines are not part of the lines. func Read(s beam.Scope, glob string) beam.PCollection { - return ReadSdf(s, glob) + s = s.Scope("textio.Read") + + filesystem.ValidateScheme(glob) + return read(s, beam.Create(s, glob)) } // ReadAll expands and reads the filename given as globs by the incoming // PCollection. It returns the lines of all files as a single // PCollection. The newlines are not part of the lines. func ReadAll(s beam.Scope, col beam.PCollection) beam.PCollection { - return ReadAllSdf(s, col) + s = s.Scope("textio.ReadAll") + return read(s, col) +} + +// ReadSdf is a variation of Read implemented via SplittableDoFn. This should +// result in increased performance with runners that support splitting. +// +// Deprecated: Use Read instead. +func ReadSdf(s beam.Scope, glob string) beam.PCollection { + s = s.Scope("textio.ReadSdf") + + filesystem.ValidateScheme(glob) + return read(s, beam.Create(s, glob)) +} + +// ReadAllSdf is a variation of ReadAll implemented via SplittableDoFn. This +// should result in increased performance with runners that support splitting. +// +// Deprecated: Use ReadAll instead. +func ReadAllSdf(s beam.Scope, col beam.PCollection) beam.PCollection { + s = s.Scope("textio.ReadAllSdf") + return read(s, col) } +// read takes a PCollection of globs and returns a PCollection of lines from +// all files in those globs. Uses an SDF to allow splitting reads of files +// into separate bundles. +func read(s beam.Scope, col beam.PCollection) beam.PCollection { + files := beam.ParDo(s, expandFn, col) + sized := beam.ParDo(s, sizeFn, files) + return beam.ParDo(s, &readFn{}, sized) +} + +// expandFn expands a glob pattern into all matching file names. func expandFn(ctx context.Context, glob string, emit func(string)) error { if strings.TrimSpace(glob) == "" { return nil // ignore empty string elements here @@ -67,6 +107,148 @@ func expandFn(ctx context.Context, glob string, emit func(string)) error { return nil } +// sizeFn pairs a filename with the size of that file in bytes. +// TODO(BEAM-11109): Once CreateInitialRestriction supports Context params and +// error return values, this can be done in readSdfFn.CreateInitialRestriction. +func sizeFn(ctx context.Context, filename string) (string, int64, error) { + fs, err := filesystem.New(ctx, filename) + if err != nil { + return "", -1, err + } + defer fs.Close() + + size, err := fs.Size(ctx, filename) + if err != nil { + return "", -1, err + } + return filename, size, nil +} + +// readFn reads individual lines from a text file, given a filename and a +// size in bytes for that file. Implemented as an SDF to allow splitting +// within a file. +type readFn struct { +} + +// CreateInitialRestriction creates an offset range restriction representing +// the file, using the paired size rather than fetching the file's size. +func (fn *readFn) CreateInitialRestriction(_ string, size int64) offsetrange.Restriction { + return offsetrange.Restriction{ + Start: 0, + End: size, + } +} + +const ( + // blockSize is the desired size of each block for initial splits. + blockSize int64 = 64 * 1024 * 1024 // 64 MB + // tooSmall is the size limit for a block. If the last block is smaller than + // this, it gets merged with the previous block. + tooSmall = blockSize / 4 +) + +// SplitRestriction splits each file restriction into blocks of a predeterined +// size, with some checks to avoid having small remainders. +func (fn *readFn) SplitRestriction(_ string, _ int64, rest offsetrange.Restriction) []offsetrange.Restriction { + splits := rest.SizedSplits(blockSize) + numSplits := len(splits) + if numSplits > 1 { + last := splits[numSplits-1] + if last.End-last.Start <= tooSmall { + // Last restriction is too small, so merge it with previous one. + splits[numSplits-2].End = last.End + splits = splits[:numSplits-1] + } + } + return splits +} + +// Size returns the size of each restriction as its range. +func (fn *readFn) RestrictionSize(_ string, _ int64, rest offsetrange.Restriction) float64 { + return rest.Size() +} + +// CreateTracker creates sdf.LockRTrackers wrapping offsetRange.Trackers for +// each restriction. +func (fn *readFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker { + return sdf.NewLockRTracker(offsetrange.NewTracker(rest)) +} + +// ProcessElement outputs all lines in the file that begin within the paired +// restriction. +// +// Note that restrictions may not align perfectly with lines. So lines can begin +// before the restriction and end within it (those are ignored), and lines can +// begin within the restriction and past the restriction (those are entirely +// output, including the portion outside the restriction). In some cases a +// valid restriction might not output any lines. +func (fn *readFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, filename string, _ int64, emit func(string)) error { + log.Infof(ctx, "Reading from %v", filename) + + fs, err := filesystem.New(ctx, filename) + if err != nil { + return err + } + defer fs.Close() + + fd, err := fs.OpenRead(ctx, filename) + if err != nil { + return err + } + defer fd.Close() + + rd := bufio.NewReader(fd) + + i := rt.GetRestriction().(offsetrange.Restriction).Start + if i > 0 { + // If restriction's starts after 0, we cannot assume a new line starts + // at the beginning of the restriction, so we must search for the first + // line beginning at or after restriction.Start. This is done by + // scanning to the byte just before the restriction and then reading + // until the next newline, leaving the reader at the start of a new + // line past restriction.Start. + i -= 1 + n, err := rd.Discard(int(i)) // Scan to just before restriction. + if err == io.EOF { + return errors.Errorf("TextIO restriction lies outside the file being read. "+ + "Restriction begins at %v bytes, but file is only %v bytes.", i+1, n) + } + if err != nil { + return err + } + line, err := rd.ReadString('\n') // Read until the first line within the restriction. + if err == io.EOF { + // No lines start in the restriction but it's still valid, so + // finish claiming before returning to avoid errors. + rt.TryClaim(rt.GetRestriction().(offsetrange.Restriction).End) + return nil + } + if err != nil { + return err + } + i += int64(len(line)) + } + + // Claim each line until we claim a line outside the restriction. + for rt.TryClaim(i) { + line, err := rd.ReadString('\n') + if err == io.EOF { + if len(line) != 0 { + emit(strings.TrimSuffix(line, "\n")) + } + // Finish claiming restriction before breaking to avoid errors. + rt.TryClaim(rt.GetRestriction().(offsetrange.Restriction).End) + break + } + if err != nil { + return err + } + emit(strings.TrimSuffix(line, "\n")) + i += int64(len(line)) + } + return nil +} + // TODO(herohde) 7/12/2017: extend Write to write to a series of files // as well as allow sharding. diff --git a/sdks/go/pkg/beam/io/textio/textio_test.go b/sdks/go/pkg/beam/io/textio/textio_test.go index 507583ac0ebd..3a80f44cd4c6 100644 --- a/sdks/go/pkg/beam/io/textio/textio_test.go +++ b/sdks/go/pkg/beam/io/textio/textio_test.go @@ -17,6 +17,7 @@ package textio import ( + "context" "errors" "os" "testing" @@ -88,3 +89,27 @@ func TestImmediate(t *testing.T) { ptest.RunAndValidate(t, p) } + +// TestReadSdf tests that readSdf successfully reads a test text file, and +// outputs the correct number of lines for it, even for an exceedingly long +// line. +func TestReadSdf(t *testing.T) { + p, s := beam.NewPipelineWithRoot() + lines := ReadSdf(s, testFilePath) + passert.Count(s, lines, "NumLines", 1) + + if _, err := beam.Run(context.Background(), "direct", p); err != nil { + t.Fatalf("Failed to execute job: %v", err) + } +} + +func TestReadAllSdf(t *testing.T) { + p, s := beam.NewPipelineWithRoot() + files := beam.Create(s, testFilePath) + lines := ReadAllSdf(s, files) + passert.Count(s, lines, "NumLines", 1) + + if _, err := beam.Run(context.Background(), "direct", p); err != nil { + t.Fatalf("Failed to execute job: %v", err) + } +} From bb8d0afaf9678b1bb2563b805b081e27e9274d1b Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 23 May 2022 16:02:50 -0700 Subject: [PATCH 3/4] wordsmith --- sdks/go/pkg/beam/io/textio/textio.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/go/pkg/beam/io/textio/textio.go b/sdks/go/pkg/beam/io/textio/textio.go index 1c66d107b07c..315487840b56 100644 --- a/sdks/go/pkg/beam/io/textio/textio.go +++ b/sdks/go/pkg/beam/io/textio/textio.go @@ -39,8 +39,9 @@ func init() { beam.RegisterFunction(expandFn) } -// Read reads a set of file and returns the lines as a PCollection. The -// newlines are not part of the lines. +// Read reads a set of files indicated by the glob pattern and returns +// the lines as a PCollection. +// The newlines are not part of the lines. func Read(s beam.Scope, glob string) beam.PCollection { s = s.Scope("textio.Read") @@ -59,7 +60,7 @@ func ReadAll(s beam.Scope, col beam.PCollection) beam.PCollection { // ReadSdf is a variation of Read implemented via SplittableDoFn. This should // result in increased performance with runners that support splitting. // -// Deprecated: Use Read instead. +// Deprecated: Use Read instead, which has been migrated to use this SDF implementation. func ReadSdf(s beam.Scope, glob string) beam.PCollection { s = s.Scope("textio.ReadSdf") @@ -70,7 +71,7 @@ func ReadSdf(s beam.Scope, glob string) beam.PCollection { // ReadAllSdf is a variation of ReadAll implemented via SplittableDoFn. This // should result in increased performance with runners that support splitting. // -// Deprecated: Use ReadAll instead. +// Deprecated: Use ReadAll instead, which has been migrated to use this SDF implementation. func ReadAllSdf(s beam.Scope, col beam.PCollection) beam.PCollection { s = s.Scope("textio.ReadAllSdf") return read(s, col) From b95bafc4d74b510cd352a6a626dabfb03d74faa0 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 23 May 2022 16:03:05 -0700 Subject: [PATCH 4/4] CHANGES.md callout --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 65610a9a7fcd..42c0592ec975 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -65,6 +65,7 @@ * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). * Go SDK users can now use generic registration functions to optimize their DoFn execution. ([BEAM-14347](https://issues.apache.org/jira/browse/BEAM-14347)) * Go SDK users may now write self-checkpointing Splittable DoFns to read from streaming sources. ([BEAM-11104](https://issues.apache.org/jira/browse/BEAM-11104)) +* Go SDK textio Reads have been moved to Splittable DoFns exclusively. ([BEAM-14489](https://issues.apache.org/jira/browse/BEAM-14489)) ## Breaking Changes