Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sinkv2(ticdc): remove redundant nextToMount in MountedEventIter #8030

Merged
merged 3 commits into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 42 additions & 40 deletions cdc/processor/sourcemanager/engine/mounted_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type MountedEventIter struct {
maxBatchSize int

rawEvents []rawEvent
nextToMount int
nextToEmit int
savedIterError error
}
Expand All @@ -47,54 +46,57 @@ func NewMountedEventIter(

// Next returns the next mounted event.
func (i *MountedEventIter) Next(ctx context.Context) (event *model.PolymorphicEvent, txnFinished Position, err error) {
// Check whether there are events in mounting or not.
for idx := i.nextToEmit; idx < i.nextToMount; idx++ {
if err = i.rawEvents[idx].event.WaitFinished(ctx); err == nil {
event = i.rawEvents[idx].event
txnFinished = i.rawEvents[idx].txnFinished
i.nextToEmit += 1
// There are no events in mounting. Fetch more events and mounting them.
// The batch size is determined by `maxBatchSize`.
if i.nextToEmit >= len(i.rawEvents) {
if err = i.readBatch(ctx); err != nil {
return
}
return
}

// There are no events in mounting. Fetch more events and mounting them.
// The batch size is determined by `maxBatchSize`.
if i.mg != nil && i.iter != nil {
i.nextToMount = 0
i.nextToEmit = 0
if cap(i.rawEvents) == 0 {
i.rawEvents = make([]rawEvent, 0, i.maxBatchSize)
} else {
i.rawEvents = i.rawEvents[:0]
// Check whether there are events in mounting or not.
if i.nextToEmit < len(i.rawEvents) {
hicqu marked this conversation as resolved.
Show resolved Hide resolved
idx := i.nextToEmit
if err = i.rawEvents[idx].event.WaitFinished(ctx); err != nil {
return
}
event = i.rawEvents[idx].event
txnFinished = i.rawEvents[idx].txnFinished
i.nextToEmit += 1
}
return
}

func (i *MountedEventIter) readBatch(ctx context.Context) error {
if i.mg == nil || i.iter == nil {
return nil
}

i.nextToEmit = 0
if cap(i.rawEvents) == 0 {
i.rawEvents = make([]rawEvent, 0, i.maxBatchSize)
} else {
i.rawEvents = i.rawEvents[:0]
}

for len(i.rawEvents) < cap(i.rawEvents) {
event, txnFinished, err = i.iter.Next()
if err != nil {
return
}
if event == nil {
i.savedIterError = i.iter.Close()
i.iter = nil
break
}
i.rawEvents = append(i.rawEvents, rawEvent{event, txnFinished})
for len(i.rawEvents) < cap(i.rawEvents) {
event, txnFinished, err := i.iter.Next()
if err != nil {
return err
}
for idx := i.nextToMount; idx < len(i.rawEvents); idx++ {
i.rawEvents[idx].event.SetUpFinishedCh()
if err = i.mg.AddEvent(ctx, i.rawEvents[idx].event); err != nil {
i.mg = nil
return
}
i.nextToMount += 1
if event == nil {
i.savedIterError = i.iter.Close()
i.iter = nil
break
}

// More events are fetched and in mounting. So re-call this function to wait them.
if i.nextToEmit < i.nextToMount {
return i.Next(ctx)
event.SetUpFinishedCh()
if err := i.mg.AddEvent(ctx, event); err != nil {
i.mg = nil
return err
}
i.rawEvents = append(i.rawEvents, rawEvent{event, txnFinished})
}
return
return nil
}

// Close implements sorter.EventIterator.
Expand Down
2 changes: 0 additions & 2 deletions cdc/processor/sourcemanager/engine/mounted_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,12 @@ func TestMountedEventIter(t *testing.T) {
require.NotNil(t, event)
require.Nil(t, err)
}
require.Equal(t, iter.nextToMount, 3)
require.Equal(t, iter.nextToEmit, 3)

rawIter.repeatItem = func() *model.PolymorphicEvent { return nil }
event, _, err := iter.Next(context.Background())
require.Nil(t, event)
require.Nil(t, err)
require.Equal(t, iter.nextToMount, 0)
require.Equal(t, iter.nextToEmit, 0)
require.Nil(t, iter.iter)
}