Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#22737] Re-write Go SDK data plane to support timers. #25982

Merged
merged 6 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions sdks/go/pkg/beam/core/runtime/exec/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@ type SideCache interface {
// DataManager manages external data byte streams. Each data stream can be
// opened by one consumer only.
type DataManager interface {
// OpenRead opens a closable byte stream for reading.
OpenRead(ctx context.Context, id StreamID) (io.ReadCloser, error)
// OpenWrite opens a closable byte stream for writing.
// OpenElementChan opens a channel for data and timers.
OpenElementChan(ctx context.Context, id StreamID, expectedTimerTransforms []string) (<-chan Elements, error)
// OpenWrite opens a closable byte stream for data writing.
OpenWrite(ctx context.Context, id StreamID) (io.WriteCloser, error)
// OpenTimerWrite opens a byte stream for writing timers
OpenTimerWrite(ctx context.Context, id StreamID, family string) (io.WriteCloser, error)
}

// StateReader is the interface for reading side input data.
Expand Down Expand Up @@ -91,4 +93,10 @@ type StateReader interface {
GetSideInputCache() SideCache
}

// TODO(herohde) 7/20/2018: user state management
// Elements holds data or timers sent across the data channel.
// If TimerFamilyID is populated, it's a timer, otherwise it's
// data elements.
type Elements struct {
Data, Timers []byte
TimerFamilyID, PtransformID string
}
182 changes: 122 additions & 60 deletions sdks/go/pkg/beam/core/runtime/exec/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"golang.org/x/exp/maps"
)

// DataSource is a Root execution unit.
Expand All @@ -40,9 +41,12 @@ type DataSource struct {
Coder *coder.Coder
Out Node
PCol PCollection // Handles size metrics. Value instead of pointer so it's initialized by default in tests.
// OnTimerTransforms maps PtransformIDs to their execution nodes that handle OnTimer callbacks.
OnTimerTransforms map[string]*ParDo

source DataManager
state StateReader
source DataManager
state StateReader
curInst string

index int64
splitIdx int64
Expand Down Expand Up @@ -94,20 +98,79 @@ func (n *DataSource) Up(ctx context.Context) error {
// StartBundle initializes this datasource for the bundle.
func (n *DataSource) StartBundle(ctx context.Context, id string, data DataContext) error {
n.mu.Lock()
n.curInst = id
n.source = data.Data
n.state = data.State
n.start = time.Now()
n.index = -1
n.index = 0
n.splitIdx = math.MaxInt64
n.mu.Unlock()
return n.Out.StartBundle(ctx, id, data)
}

// splitSuccess is a marker error to indicate we've reached the split index.
// Akin to io.EOF.
var splitSuccess = errors.New("split index reached")

// process handles converting elements from the data source to timers.
//
// The data and timer callback functions must return an io.EOF if the reader terminates to signal that an additional
// buffer is desired. On successful splits, [splitSuccess] must be returned to indicate that the
// PTransform is done processing data for this instruction.
func (n *DataSource) process(ctx context.Context, data func(bcr *byteCountReader, ptransformID string) error, timer func(bcr *byteCountReader, ptransformID, timerFamilyID string) error) error {
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
// The SID contains this instruction's expected data processing transform (this one).
elms, err := n.source.OpenElementChan(ctx, n.SID, maps.Keys(n.OnTimerTransforms))
if err != nil {
return err
}

n.PCol.resetSize() // initialize the size distribution for this bundle.
var r bytes.Reader

var byteCount int
bcr := byteCountReader{reader: &r, count: &byteCount}

splitPrimaryComplete := map[string]bool{}
for {
var err error
select {
case e, ok := <-elms:
// Channel closed, so time to exit
if !ok {
return nil
}
if splitPrimaryComplete[e.PtransformID] {
continue
}
if len(e.Data) > 0 {
r.Reset(e.Data)
err = data(&bcr, e.PtransformID)
}
if len(e.Timers) > 0 {
r.Reset(e.Timers)
err = timer(&bcr, e.PtransformID, e.TimerFamilyID)
}

if err == splitSuccess {
// Returning splitSuccess means we've split, and aren't consuming the remaining buffer.
// We mark the PTransform done to ignore further data.
splitPrimaryComplete[e.PtransformID] = true
} else if err != nil && err != io.EOF {
return errors.Wrap(err, "source failed")
}
// io.EOF means the reader successfully drained.
// We're ready for a new buffer.
case <-ctx.Done():
return nil
}
}
}

// ByteCountReader is a passthrough reader that counts all the bytes read through it.
// It trusts the nested reader to return accurate byte information.
type byteCountReader struct {
count *int
reader io.ReadCloser
reader io.Reader
}

func (r *byteCountReader) Read(p []byte) (int, error) {
Expand All @@ -117,7 +180,10 @@ func (r *byteCountReader) Read(p []byte) (int, error) {
}

func (r *byteCountReader) Close() error {
return r.reader.Close()
if c, ok := r.reader.(io.Closer); ok {
c.Close()
}
return nil
}

func (r *byteCountReader) reset() int {
Expand All @@ -128,15 +194,6 @@ func (r *byteCountReader) reset() int {

// Process opens the data source, reads and decodes data, kicking off element processing.
func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) {
r, err := n.source.OpenRead(ctx, n.SID)
if err != nil {
return nil, err
}
defer r.Close()
n.PCol.resetSize() // initialize the size distribution for this bundle.
var byteCount int
bcr := byteCountReader{reader: r, count: &byteCount}

c := coder.SkipW(n.Coder)
wc := MakeWindowDecoder(n.Coder.Window)

Expand All @@ -155,58 +212,63 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) {
}

var checkpoints []*Checkpoint
for {
if n.incrementIndexAndCheckSplit() {
break
}
// TODO(lostluck) 2020/02/22: Should we include window headers or just count the element sizes?
ws, t, pn, err := DecodeWindowedValueHeader(wc, r)
if err != nil {
if err == io.EOF {
break
err := n.process(ctx, func(bcr *byteCountReader, ptransformID string) error {
for {
// TODO(lostluck) 2020/02/22: Should we include window headers or just count the element sizes?
ws, t, pn, err := DecodeWindowedValueHeader(wc, bcr.reader)
if err != nil {
return err
}
return nil, errors.Wrap(err, "source failed")
}

// Decode key or parallel element.
pe, err := cp.Decode(&bcr)
if err != nil {
return nil, errors.Wrap(err, "source decode failed")
}
pe.Timestamp = t
pe.Windows = ws
pe.Pane = pn

var valReStreams []ReStream
for _, cv := range cvs {
values, err := n.makeReStream(ctx, cv, &bcr, len(cvs) == 1 && n.singleIterate)
// Decode key or parallel element.
pe, err := cp.Decode(bcr)
if err != nil {
return nil, err
return errors.Wrap(err, "source decode failed")
}
valReStreams = append(valReStreams, values)
}
pe.Timestamp = t
pe.Windows = ws
pe.Pane = pn

if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err != nil {
return nil, err
}
// Collect the actual size of the element, and reset the bytecounter reader.
n.PCol.addSize(int64(bcr.reset()))
bcr.reader = r

// Check if there's a continuation and return residuals
// Needs to be done immeadiately after processing to not lose the element.
if c := n.getProcessContinuation(); c != nil {
cp, err := n.checkpointThis(ctx, c)
if err != nil {
// Errors during checkpointing should fail a bundle.
return nil, err
var valReStreams []ReStream
for _, cv := range cvs {
values, err := n.makeReStream(ctx, cv, bcr, len(cvs) == 1 && n.singleIterate)
if err != nil {
return err
}
valReStreams = append(valReStreams, values)
}
if cp != nil {
checkpoints = append(checkpoints, cp)

if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err != nil {
return err
}
// Collect the actual size of the element, and reset the bytecounter reader.
n.PCol.addSize(int64(bcr.reset()))

// Check if there's a continuation and return residuals
// Needs to be done immediately after processing to not lose the element.
if c := n.getProcessContinuation(); c != nil {
cp, err := n.checkpointThis(ctx, c)
if err != nil {
// Errors during checkpointing should fail a bundle.
return err
}
if cp != nil {
checkpoints = append(checkpoints, cp)
}
}
// We've finished processing an element, check if we have finished a split.
if n.incrementIndexAndCheckSplit() {
return splitSuccess
}
}
}
return checkpoints, nil
},
func(bcr *byteCountReader, ptransformID, timerFamilyID string) error {
tmap, err := decodeTimer(cp, wc, bcr)
log.Infof(ctx, "DEBUGLOG: timer received for: %v and %v - %+v err: %v", ptransformID, timerFamilyID, tmap, err)
return nil
})

return checkpoints, err
}

func (n *DataSource) makeReStream(ctx context.Context, cv ElementDecoder, bcr *byteCountReader, onlyStream bool) (ReStream, error) {
Expand Down Expand Up @@ -313,7 +375,7 @@ func (n *DataSource) makeReStream(ctx context.Context, cv ElementDecoder, bcr *b
}
}

func readStreamToBuffer(cv ElementDecoder, r io.ReadCloser, size int64, buf []FullValue) ([]FullValue, error) {
func readStreamToBuffer(cv ElementDecoder, r io.Reader, size int64, buf []FullValue) ([]FullValue, error) {
for i := int64(0); i < size; i++ {
value, err := cv.Decode(r)
if err != nil {
Expand Down Expand Up @@ -472,7 +534,7 @@ func (n *DataSource) checkpointThis(ctx context.Context, pc sdf.ProcessContinuat
// The bufSize param specifies the estimated number of elements that will be
// sent to this DataSource, and is used to be able to perform accurate splits
// even if the DataSource has not yet received all its elements. A bufSize of
// 0 or less indicates that its unknown, and so uses the current known size.
// 0 or less indicates that it's unknown, and so uses the current known size.
func (n *DataSource) Split(ctx context.Context, splits []int64, frac float64, bufSize int64) (SplitResult, error) {
if n == nil {
return SplitResult{}, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)
Expand Down
Loading