Skip to content

Commit

Permalink
add readBatch method
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jan 6, 2023
1 parent 857e7cf commit ae43997
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 38 deletions.
76 changes: 40 additions & 36 deletions cdc/processor/sourcemanager/engine/mounted_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type MountedEventIter struct {
maxBatchSize int

rawEvents []rawEvent
nextToMount int
nextToEmit int
savedIterError error
}
Expand All @@ -47,54 +46,59 @@ func NewMountedEventIter(

// Next returns the next mounted event.
func (i *MountedEventIter) Next(ctx context.Context) (event *model.PolymorphicEvent, txnFinished Position, err error) {
// There are no events in mounting. Fetch more events and mounting them.
// The batch size is determined by `maxBatchSize`.
if i.nextToEmit >= len(i.rawEvents) {
err = i.readBatch(ctx)
if err != nil {
return
}
}

// Check whether there are events in mounting or not.
for idx := i.nextToEmit; idx < i.nextToMount; idx++ {
if i.nextToEmit < len(i.rawEvents) {
idx := i.nextToEmit
if err = i.rawEvents[idx].event.WaitFinished(ctx); err == nil {
event = i.rawEvents[idx].event
txnFinished = i.rawEvents[idx].txnFinished
i.nextToEmit += 1
}
return
}
return
}

// There are no events in mounting. Fetch more events and mounting them.
// The batch size is determined by `maxBatchSize`.
if i.mg != nil && i.iter != nil {
i.nextToMount = 0
i.nextToEmit = 0
if cap(i.rawEvents) == 0 {
i.rawEvents = make([]rawEvent, 0, i.maxBatchSize)
} else {
i.rawEvents = i.rawEvents[:0]
}
func (i *MountedEventIter) readBatch(ctx context.Context) error {
if i.mg == nil || i.iter == nil {
return nil
}

i.nextToEmit = 0
if cap(i.rawEvents) == 0 {
i.rawEvents = make([]rawEvent, 0, i.maxBatchSize)
} else {
i.rawEvents = i.rawEvents[:0]
}

for len(i.rawEvents) < cap(i.rawEvents) {
event, txnFinished, err = i.iter.Next()
if err != nil {
return
}
if event == nil {
i.savedIterError = i.iter.Close()
i.iter = nil
break
}
i.rawEvents = append(i.rawEvents, rawEvent{event, txnFinished})
for len(i.rawEvents) < cap(i.rawEvents) {
event, txnFinished, err := i.iter.Next()
if err != nil {
return err
}
for idx := i.nextToMount; idx < len(i.rawEvents); idx++ {
i.rawEvents[idx].event.SetUpFinishedCh()
if err = i.mg.AddEvent(ctx, i.rawEvents[idx].event); err != nil {
i.mg = nil
return
}
i.nextToMount += 1
if event == nil {
i.savedIterError = i.iter.Close()
i.iter = nil
break
}

// More events are fetched and in mounting. So re-call this function to wait them.
if i.nextToEmit < i.nextToMount {
return i.Next(ctx)
i.rawEvents = append(i.rawEvents, rawEvent{event, txnFinished})
}
for idx := 0; idx < len(i.rawEvents); idx++ {
i.rawEvents[idx].event.SetUpFinishedCh()
if err := i.mg.AddEvent(ctx, i.rawEvents[idx].event); err != nil {
i.mg = nil
return err
}
}
return
return nil
}

// Close implements sorter.EventIterator.
Expand Down
2 changes: 0 additions & 2 deletions cdc/processor/sourcemanager/engine/mounted_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,12 @@ func TestMountedEventIter(t *testing.T) {
require.NotNil(t, event)
require.Nil(t, err)
}
require.Equal(t, iter.nextToMount, 3)
require.Equal(t, iter.nextToEmit, 3)

rawIter.repeatItem = func() *model.PolymorphicEvent { return nil }
event, _, err := iter.Next(context.Background())
require.Nil(t, event)
require.Nil(t, err)
require.Equal(t, iter.nextToMount, 0)
require.Equal(t, iter.nextToEmit, 0)
require.Nil(t, iter.iter)
}

0 comments on commit ae43997

Please sign in to comment.