Skip to content

Commit

Permalink
[aggregator] Fix change ActivePlacement semantics on close (#3201)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis authored Feb 10, 2021
1 parent 0526c0f commit 82cecd5
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 9 deletions.
14 changes: 6 additions & 8 deletions src/cluster/placement/staged_placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ var (
errNoApplicablePlacement = errors.New("no applicable placement found")
errActiveStagedPlacementClosed = errors.New("active staged placement is closed")
errPlacementInvalidType = errors.New("corrupt placement")

_noPlacements Placements
)

type activeStagedPlacement struct {
Expand Down Expand Up @@ -81,7 +79,6 @@ func (p *activeStagedPlacement) Close() error {
p.onPlacementsRemovedFn(pl)
}
}
p.placements.Store(_noPlacements)

return nil
}
Expand All @@ -91,11 +88,12 @@ func (p *activeStagedPlacement) Version() int {
}

func (p *activeStagedPlacement) ActivePlacement() (Placement, error) {
if p.closed.Load() {
return nil, errActiveStagedPlacementClosed
}

// placements themselves are subject to mutability races, but even in historical design, there was no actual
// NB: we explicitly don't check if placement is closed, as closing is done by placement watcher and it might
// be referenced by a client.
// the only effect of closing is execution of callback to let subscribers know that placement has been
// removed/closed - it does not invalidate the placement itself.
//
// also, placements themselves are subject to mutability races, but even in historical design, there was no actual
// need to enforce it, as long as we ensure callback ordering, and that is still the case.
placements, ok := p.placements.Load().(Placements)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/placement/staged_placement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func TestActiveStagedPlacementActivePlacementClosed(t *testing.T) {
p.placements.Store(pl)
p.closed.Store(true)

_, err := p.ActivePlacement()
err := p.Close()
require.Equal(t, errActiveStagedPlacementClosed, err)
}

Expand Down

0 comments on commit 82cecd5

Please sign in to comment.