Skip to content

Commit

Permalink
[aggregator] Simplify (Active)StagedPlacement API (#3199)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis authored Feb 9, 2021
1 parent 7bf4a4e commit 0526c0f
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 222 deletions.
9 changes: 2 additions & 7 deletions src/aggregator/aggregator/placement_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,21 +221,16 @@ func (mgr *placementManager) placementWithLock() (placement.ActiveStagedPlacemen
return nil, nil, errPlacementManagerNotOpenOrClosed
}

// NB(xichen): avoid using defer here because this is called on the write path
// for every incoming metric and defered return and func execution is expensive.
stagedPlacement, onStagedPlacementDoneFn, err := mgr.placementWatcher.ActiveStagedPlacement()
stagedPlacement, err := mgr.placementWatcher.ActiveStagedPlacement()
if err != nil {
mgr.metrics.activeStagedPlacementErrors.Inc(1)
return nil, nil, err
}
placement, onPlacementDoneFn, err := stagedPlacement.ActivePlacement()
placement, err := stagedPlacement.ActivePlacement()
if err != nil {
onStagedPlacementDoneFn()
mgr.metrics.activePlacementErrors.Inc(1)
return nil, nil, err
}
onPlacementDoneFn()
onStagedPlacementDoneFn()
return stagedPlacement, placement, nil
}

Expand Down
17 changes: 5 additions & 12 deletions src/aggregator/client/tcp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,32 +229,29 @@ func (c *TCPClient) WriteForwarded(

// ActivePlacement returns a copy of the currently active placement and its version.
func (c *TCPClient) ActivePlacement() (placement.Placement, int, error) {
stagedPlacement, onStagedPlacementDoneFn, err := c.placementWatcher.ActiveStagedPlacement()
stagedPlacement, err := c.placementWatcher.ActiveStagedPlacement()
if err != nil {
return nil, 0, err
}
defer onStagedPlacementDoneFn()
if stagedPlacement == nil {
return nil, 0, errNilPlacement
}

placement, onPlacementDoneFn, err := stagedPlacement.ActivePlacement()
placement, err := stagedPlacement.ActivePlacement()
if err != nil {
return nil, 0, err
}
defer onPlacementDoneFn()

return placement.Clone(), stagedPlacement.Version(), nil
}

// ActivePlacementVersion returns a copy of the currently active placement version. It is a far less expensive call
// than ActivePlacement, as it does not clone the placement.
func (c *TCPClient) ActivePlacementVersion() (int, error) {
stagedPlacement, onStagedPlacementDoneFn, err := c.placementWatcher.ActiveStagedPlacement()
stagedPlacement, err := c.placementWatcher.ActiveStagedPlacement()
if err != nil {
return 0, err
}
defer onStagedPlacementDoneFn()
if stagedPlacement == nil {
return 0, errNilPlacement
}
Expand All @@ -281,17 +278,15 @@ func (c *TCPClient) write(
timeNanos int64,
payload payloadUnion,
) error {
stagedPlacement, onStagedPlacementDoneFn, err := c.placementWatcher.ActiveStagedPlacement()
stagedPlacement, err := c.placementWatcher.ActiveStagedPlacement()
if err != nil {
return err
}
if stagedPlacement == nil {
onStagedPlacementDoneFn()
return errNilPlacement
}
placement, onPlacementDoneFn, err := stagedPlacement.ActivePlacement()
placement, err := stagedPlacement.ActivePlacement()
if err != nil {
onStagedPlacementDoneFn()
return err
}
var (
Expand Down Expand Up @@ -327,8 +322,6 @@ func (c *TCPClient) write(
c.metrics.dropped.Inc(1)
}

onPlacementDoneFn()
onStagedPlacementDoneFn()
return multiErr.FinalError()
}

Expand Down
55 changes: 26 additions & 29 deletions src/aggregator/client/tcp_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func TestTCPClientWriteUntimedMetricActiveStagedPlacementError(t *testing.T) {
errActiveStagedPlacementError := errors.New("error active staged placement")
watcher := placement.NewMockStagedPlacementWatcher(ctrl)
watcher.EXPECT().ActiveStagedPlacement().
Return(nil, nil, errActiveStagedPlacementError).
Return(nil, errActiveStagedPlacementError).
MinTimes(1)
c := mustNewTestTCPClient(t, testOptions())
c.placementWatcher = watcher
Expand All @@ -247,7 +247,7 @@ func TestTCPClientWriteUntimedMetricActiveStagedPlacementNil(t *testing.T) {

watcher := placement.NewMockStagedPlacementWatcher(ctrl)
watcher.EXPECT().ActiveStagedPlacement().
Return(nil, func() {}, nil).
Return(nil, nil).
MinTimes(1)
c := mustNewTestTCPClient(t, testOptions())
c.placementWatcher = watcher
Expand All @@ -272,9 +272,9 @@ func TestTCPClientWriteUntimedMetricActivePlacementError(t *testing.T) {

errActivePlacementError := errors.New("error active placement")
stagedPlacement := placement.NewMockActiveStagedPlacement(ctrl)
stagedPlacement.EXPECT().ActivePlacement().Return(nil, nil, errActivePlacementError).MinTimes(1)
stagedPlacement.EXPECT().ActivePlacement().Return(nil, errActivePlacementError).MinTimes(1)
watcher := placement.NewMockStagedPlacementWatcher(ctrl)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, nil).MinTimes(1)
c := mustNewTestTCPClient(t, testOptions())
c.placementWatcher = watcher

Expand Down Expand Up @@ -316,9 +316,9 @@ func TestTCPClientWriteUntimedMetricSuccess(t *testing.T) {
}).
MinTimes(1)
stagedPlacement := placement.NewMockActiveStagedPlacement(ctrl)
stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1)
stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, nil).MinTimes(1)
watcher := placement.NewMockStagedPlacementWatcher(ctrl)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, nil).MinTimes(1)
c := mustNewTestTCPClient(t, testOptions())
c.nowFn = func() time.Time { return time.Unix(0, testNowNanos) }
c.writerMgr = writerMgr
Expand Down Expand Up @@ -381,9 +381,9 @@ func TestTCPClientWriteUntimedMetricPartialError(t *testing.T) {
}).
MinTimes(1)
stagedPlacement := placement.NewMockActiveStagedPlacement(ctrl)
stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1)
stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, nil).MinTimes(1)
watcher := placement.NewMockStagedPlacementWatcher(ctrl)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, nil).MinTimes(1)
c := mustNewTestTCPClient(t, testOptions())
c.nowFn = func() time.Time { return time.Unix(0, testNowNanos) }
c.writerMgr = writerMgr
Expand All @@ -408,9 +408,9 @@ func TestTCPClientWriteUntimedMetricBeforeShardCutover(t *testing.T) {

var instancesRes []placement.Instance
stagedPlacement := placement.NewMockActiveStagedPlacement(ctrl)
stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1)
stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, nil).MinTimes(1)
watcher := placement.NewMockStagedPlacementWatcher(ctrl)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, nil).MinTimes(1)
c := mustNewTestTCPClient(t, testOptions())
c.shardCutoverWarmupDuration = time.Second
c.nowFn = func() time.Time { return time.Unix(0, testCutoverNanos-1).Add(-time.Second) }
Expand All @@ -428,9 +428,9 @@ func TestTCPClientWriteUntimedMetricAfterShardCutoff(t *testing.T) {

var instancesRes []placement.Instance
stagedPlacement := placement.NewMockActiveStagedPlacement(ctrl)
stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1)
stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, nil).MinTimes(1)
watcher := placement.NewMockStagedPlacementWatcher(ctrl)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, nil).MinTimes(1)
c := mustNewTestTCPClient(t, testOptions())
c.shardCutoffLingerDuration = time.Second
c.nowFn = func() time.Time { return time.Unix(0, testCutoffNanos+1).Add(time.Second) }
Expand Down Expand Up @@ -466,9 +466,9 @@ func TestTCPClientWriteTimedMetricSuccess(t *testing.T) {
}).
MinTimes(1)
stagedPlacement := placement.NewMockActiveStagedPlacement(ctrl)
stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1)
stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, nil).MinTimes(1)
watcher := placement.NewMockStagedPlacementWatcher(ctrl)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, nil).MinTimes(1)
c := mustNewTestTCPClient(t, testOptions())
c.nowFn = func() time.Time { return time.Unix(0, testNowNanos) }
c.writerMgr = writerMgr
Expand Down Expand Up @@ -517,9 +517,9 @@ func TestTCPClientWriteTimedMetricPartialError(t *testing.T) {
}).
MinTimes(1)
stagedPlacement := placement.NewMockActiveStagedPlacement(ctrl)
stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1)
stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, nil).MinTimes(1)
watcher := placement.NewMockStagedPlacementWatcher(ctrl)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, nil).MinTimes(1)
c := mustNewTestTCPClient(t, testOptions())
c.nowFn = func() time.Time { return time.Unix(0, testNowNanos) }
c.writerMgr = writerMgr
Expand Down Expand Up @@ -564,9 +564,9 @@ func TestTCPClientWriteForwardedMetricSuccess(t *testing.T) {
}).
MinTimes(1)
stagedPlacement := placement.NewMockActiveStagedPlacement(ctrl)
stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1)
stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, nil).MinTimes(1)
watcher := placement.NewMockStagedPlacementWatcher(ctrl)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, nil).MinTimes(1)
c := mustNewTestTCPClient(t, testOptions())
c.nowFn = func() time.Time { return time.Unix(0, testNowNanos) }
c.writerMgr = writerMgr
Expand Down Expand Up @@ -615,9 +615,9 @@ func TestTCPClientWriteForwardedMetricPartialError(t *testing.T) {
}).
MinTimes(1)
stagedPlacement := placement.NewMockActiveStagedPlacement(ctrl)
stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1)
stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, nil).MinTimes(1)
watcher := placement.NewMockStagedPlacementWatcher(ctrl)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, nil).MinTimes(1)
c := mustNewTestTCPClient(t, testOptions())
c.nowFn = func() time.Time { return time.Unix(0, testNowNanos) }
c.writerMgr = writerMgr
Expand Down Expand Up @@ -662,9 +662,9 @@ func TestTCPClientWritePassthroughMetricSuccess(t *testing.T) {
}).
MinTimes(1)
stagedPlacement := placement.NewMockActiveStagedPlacement(ctrl)
stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1)
stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, nil).MinTimes(1)
watcher := placement.NewMockStagedPlacementWatcher(ctrl)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, nil).MinTimes(1)
c := mustNewTestTCPClient(t, testOptions())
c.nowFn = func() time.Time { return time.Unix(0, testNowNanos) }
c.writerMgr = writerMgr
Expand Down Expand Up @@ -713,9 +713,9 @@ func TestTCPClientWritePassthroughMetricPartialError(t *testing.T) {
}).
MinTimes(1)
stagedPlacement := placement.NewMockActiveStagedPlacement(ctrl)
stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1)
stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, nil).MinTimes(1)
watcher := placement.NewMockStagedPlacementWatcher(ctrl)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, nil).MinTimes(1)
c := mustNewTestTCPClient(t, testOptions())
c.nowFn = func() time.Time { return time.Unix(0, testNowNanos) }
c.writerMgr = writerMgr
Expand Down Expand Up @@ -820,25 +820,22 @@ func TestTCPClientActivePlacement(t *testing.T) {
mockPl = placement.NewMockPlacement(ctrl)
stagedPlacement = placement.NewMockActiveStagedPlacement(ctrl)
watcher = placement.NewMockStagedPlacementWatcher(ctrl)
doneCalls int
)

c.placementWatcher = watcher
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() { doneCalls++ }, nil).Times(2)
watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, nil).Times(2)
stagedPlacement.EXPECT().Version().Return(42).Times(2)
stagedPlacement.EXPECT().ActivePlacement().Return(mockPl, func() { doneCalls++ }, nil)
stagedPlacement.EXPECT().ActivePlacement().Return(mockPl, nil)
mockPl.EXPECT().Clone().Return(emptyPl)

pl, v, err := c.ActivePlacement()
assert.NoError(t, err)
assert.Equal(t, 42, v)
assert.Equal(t, 2, doneCalls)
assert.Equal(t, emptyPl, pl)

v, err = c.ActivePlacementVersion()
assert.NoError(t, err)
assert.Equal(t, 42, v)
assert.Equal(t, 3, doneCalls)
}

func TestTCPClientInitAndClose(t *testing.T) {
Expand Down
16 changes: 7 additions & 9 deletions src/cluster/placement/placement_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0526c0f

Please sign in to comment.