From ae4399796edd46caf1943a6780ceda6bea9fb5e0 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 6 Jan 2023 16:57:46 +0800 Subject: [PATCH 1/3] add readBatch method --- .../sourcemanager/engine/mounted_iter.go | 76 ++++++++++--------- .../sourcemanager/engine/mounted_iter_test.go | 2 - 2 files changed, 40 insertions(+), 38 deletions(-) diff --git a/cdc/processor/sourcemanager/engine/mounted_iter.go b/cdc/processor/sourcemanager/engine/mounted_iter.go index 2612c2cf14d..e365bae452c 100644 --- a/cdc/processor/sourcemanager/engine/mounted_iter.go +++ b/cdc/processor/sourcemanager/engine/mounted_iter.go @@ -27,7 +27,6 @@ type MountedEventIter struct { maxBatchSize int rawEvents []rawEvent - nextToMount int nextToEmit int savedIterError error } @@ -47,54 +46,59 @@ func NewMountedEventIter( // Next returns the next mounted event. func (i *MountedEventIter) Next(ctx context.Context) (event *model.PolymorphicEvent, txnFinished Position, err error) { + // There are no events in mounting. Fetch more events and mounting them. + // The batch size is determined by `maxBatchSize`. + if i.nextToEmit >= len(i.rawEvents) { + err = i.readBatch(ctx) + if err != nil { + return + } + } + // Check whether there are events in mounting or not. - for idx := i.nextToEmit; idx < i.nextToMount; idx++ { + if i.nextToEmit < len(i.rawEvents) { + idx := i.nextToEmit if err = i.rawEvents[idx].event.WaitFinished(ctx); err == nil { event = i.rawEvents[idx].event txnFinished = i.rawEvents[idx].txnFinished i.nextToEmit += 1 } - return } + return +} - // There are no events in mounting. Fetch more events and mounting them. - // The batch size is determined by `maxBatchSize`. - if i.mg != nil && i.iter != nil { - i.nextToMount = 0 - i.nextToEmit = 0 - if cap(i.rawEvents) == 0 { - i.rawEvents = make([]rawEvent, 0, i.maxBatchSize) - } else { - i.rawEvents = i.rawEvents[:0] - } +func (i *MountedEventIter) readBatch(ctx context.Context) error { + if i.mg == nil || i.iter == nil { + return nil + } + + i.nextToEmit = 0 + if cap(i.rawEvents) == 0 { + i.rawEvents = make([]rawEvent, 0, i.maxBatchSize) + } else { + i.rawEvents = i.rawEvents[:0] + } - for len(i.rawEvents) < cap(i.rawEvents) { - event, txnFinished, err = i.iter.Next() - if err != nil { - return - } - if event == nil { - i.savedIterError = i.iter.Close() - i.iter = nil - break - } - i.rawEvents = append(i.rawEvents, rawEvent{event, txnFinished}) + for len(i.rawEvents) < cap(i.rawEvents) { + event, txnFinished, err := i.iter.Next() + if err != nil { + return err } - for idx := i.nextToMount; idx < len(i.rawEvents); idx++ { - i.rawEvents[idx].event.SetUpFinishedCh() - if err = i.mg.AddEvent(ctx, i.rawEvents[idx].event); err != nil { - i.mg = nil - return - } - i.nextToMount += 1 + if event == nil { + i.savedIterError = i.iter.Close() + i.iter = nil + break } - - // More events are fetched and in mounting. So re-call this function to wait them. - if i.nextToEmit < i.nextToMount { - return i.Next(ctx) + i.rawEvents = append(i.rawEvents, rawEvent{event, txnFinished}) + } + for idx := 0; idx < len(i.rawEvents); idx++ { + i.rawEvents[idx].event.SetUpFinishedCh() + if err := i.mg.AddEvent(ctx, i.rawEvents[idx].event); err != nil { + i.mg = nil + return err } } - return + return nil } // Close implements sorter.EventIterator. diff --git a/cdc/processor/sourcemanager/engine/mounted_iter_test.go b/cdc/processor/sourcemanager/engine/mounted_iter_test.go index 3811d095981..f1ae2a3f524 100644 --- a/cdc/processor/sourcemanager/engine/mounted_iter_test.go +++ b/cdc/processor/sourcemanager/engine/mounted_iter_test.go @@ -56,14 +56,12 @@ func TestMountedEventIter(t *testing.T) { require.NotNil(t, event) require.Nil(t, err) } - require.Equal(t, iter.nextToMount, 3) require.Equal(t, iter.nextToEmit, 3) rawIter.repeatItem = func() *model.PolymorphicEvent { return nil } event, _, err := iter.Next(context.Background()) require.Nil(t, event) require.Nil(t, err) - require.Equal(t, iter.nextToMount, 0) require.Equal(t, iter.nextToEmit, 0) require.Nil(t, iter.iter) } From 26080e81a68c86d44187d6a6f98d561c6739c36b Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 6 Jan 2023 17:35:47 +0800 Subject: [PATCH 2/3] address comment --- cdc/processor/sourcemanager/engine/mounted_iter.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/cdc/processor/sourcemanager/engine/mounted_iter.go b/cdc/processor/sourcemanager/engine/mounted_iter.go index e365bae452c..4e71d3e3adb 100644 --- a/cdc/processor/sourcemanager/engine/mounted_iter.go +++ b/cdc/processor/sourcemanager/engine/mounted_iter.go @@ -89,14 +89,12 @@ func (i *MountedEventIter) readBatch(ctx context.Context) error { i.iter = nil break } - i.rawEvents = append(i.rawEvents, rawEvent{event, txnFinished}) - } - for idx := 0; idx < len(i.rawEvents); idx++ { - i.rawEvents[idx].event.SetUpFinishedCh() - if err := i.mg.AddEvent(ctx, i.rawEvents[idx].event); err != nil { + event.SetUpFinishedCh() + if err := i.mg.AddEvent(ctx, event); err != nil { i.mg = nil return err } + i.rawEvents = append(i.rawEvents, rawEvent{event, txnFinished}) } return nil } From 1a4491c127fcb3f37a0e4fde9d4e9c4fd66d1e79 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 6 Jan 2023 17:38:19 +0800 Subject: [PATCH 3/3] check err inline --- cdc/processor/sourcemanager/engine/mounted_iter.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cdc/processor/sourcemanager/engine/mounted_iter.go b/cdc/processor/sourcemanager/engine/mounted_iter.go index 4e71d3e3adb..77e61ba5be2 100644 --- a/cdc/processor/sourcemanager/engine/mounted_iter.go +++ b/cdc/processor/sourcemanager/engine/mounted_iter.go @@ -49,8 +49,7 @@ func (i *MountedEventIter) Next(ctx context.Context) (event *model.PolymorphicEv // There are no events in mounting. Fetch more events and mounting them. // The batch size is determined by `maxBatchSize`. if i.nextToEmit >= len(i.rawEvents) { - err = i.readBatch(ctx) - if err != nil { + if err = i.readBatch(ctx); err != nil { return } } @@ -58,11 +57,12 @@ func (i *MountedEventIter) Next(ctx context.Context) (event *model.PolymorphicEv // Check whether there are events in mounting or not. if i.nextToEmit < len(i.rawEvents) { idx := i.nextToEmit - if err = i.rawEvents[idx].event.WaitFinished(ctx); err == nil { - event = i.rawEvents[idx].event - txnFinished = i.rawEvents[idx].txnFinished - i.nextToEmit += 1 + if err = i.rawEvents[idx].event.WaitFinished(ctx); err != nil { + return } + event = i.rawEvents[idx].event + txnFinished = i.rawEvents[idx].txnFinished + i.nextToEmit += 1 } return }