Skip to content

Commit

Permalink
[cluster] Expose underlying staged placement version in ActiveStagedP…
Browse files Browse the repository at this point in the history
…lacement
  • Loading branch information
vdarulis committed Dec 1, 2020
1 parent e03804b commit 5738b8a
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 5 deletions.
14 changes: 14 additions & 0 deletions src/cluster/placement/placement_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 12 additions & 3 deletions src/cluster/placement/staged_placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type activeStagedPlacement struct {
sync.RWMutex

placements Placements
version int
nowFn clock.NowFn
onPlacementsAddedFn OnPlacementsAddedFn
onPlacementsRemovedFn OnPlacementsRemovedFn
Expand All @@ -50,13 +51,15 @@ type activeStagedPlacement struct {

func newActiveStagedPlacement(
placements []Placement,
version int,
opts ActiveStagedPlacementOptions,
) ActiveStagedPlacement {
) *activeStagedPlacement {
if opts == nil {
opts = NewActiveStagedPlacementOptions()
}
p := &activeStagedPlacement{
placements: placements,
version: version,
nowFn: opts.ClockOptions().NowFn(),
onPlacementsAddedFn: opts.OnPlacementsAddedFn(),
onPlacementsRemovedFn: opts.OnPlacementsRemovedFn(),
Expand Down Expand Up @@ -94,6 +97,12 @@ func (p *activeStagedPlacement) Close() error {
return nil
}

func (p *activeStagedPlacement) Version() int {
p.RLock()
defer p.RUnlock()
return p.version
}

func (p *activeStagedPlacement) onPlacementDone() { p.RUnlock() }

func (p *activeStagedPlacement) activePlacementWithLock(timeNanos int64) (Placement, error) {
Expand Down Expand Up @@ -174,9 +183,9 @@ func (sp *stagedPlacement) ActiveStagedPlacement(timeNanos int64) ActiveStagedPl
idx--
}
if idx < 0 {
return newActiveStagedPlacement(sp.placements, sp.opts)
return newActiveStagedPlacement(sp.placements, sp.version, sp.opts)
}
return newActiveStagedPlacement(sp.placements[idx:], sp.opts)
return newActiveStagedPlacement(sp.placements[idx:], sp.version, sp.opts)
}

func (sp *stagedPlacement) Version() int { return sp.version }
Expand Down
11 changes: 9 additions & 2 deletions src/cluster/placement/staged_placement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func TestNewActiveStagedPlacement(t *testing.T) {
}
},
)
ap := newActiveStagedPlacement(testActivePlacements, opts).(*activeStagedPlacement)
ap := newActiveStagedPlacement(testActivePlacements, 0, opts)
require.Equal(t, len(testActivePlacements), len(allInstances))
require.Equal(t, len(testActivePlacements), len(ap.placements))
for i := 0; i < len(testActivePlacements); i++ {
Expand Down Expand Up @@ -380,7 +380,7 @@ func TestActiveStagedPlacementCloseSuccess(t *testing.T) {
removedInstances = append(removedInstances, placement.Instances())
}
})
p := newActiveStagedPlacement(testActivePlacements, opts)
p := newActiveStagedPlacement(testActivePlacements, 0, opts)
require.NoError(t, p.Close())
require.Equal(t, 2, len(addedInstances))
require.Equal(t, 2, len(removedInstances))
Expand Down Expand Up @@ -452,9 +452,16 @@ func TestStagedPlacementNilProto(t *testing.T) {
func TestStagedPlacementValidProto(t *testing.T) {
sp, err := NewStagedPlacementFromProto(1, testStagedPlacementProto, NewActiveStagedPlacementOptions())
require.NoError(t, err)

pss := sp.(*stagedPlacement)
require.Equal(t, 1, pss.Version())
require.Equal(t, 1, pss.ActiveStagedPlacement(0).Version())

pss.SetVersion(42)
require.Equal(t, 42, pss.ActiveStagedPlacement(0).Version())

require.Equal(t, len(pss.placements), len(testStagedPlacementProto.Snapshots))

for i := 0; i < len(testStagedPlacementProto.Snapshots); i++ {
validateSnapshot(t, testActivePlacements[i], pss.placements[i])
}
Expand Down
2 changes: 2 additions & 0 deletions src/cluster/placement/staged_placement_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,5 @@ func (mp *mockPlacement) ActivePlacement() (Placement, DoneFn, error) {
}

func (mp *mockPlacement) Close() error { return mp.closeFn() }

func (mp *mockPlacement) Version() int { return 0 }
3 changes: 3 additions & 0 deletions src/cluster/placement/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ type ActiveStagedPlacement interface {
// function when the caller is done using the placement, and any errors encountered.
ActivePlacement() (Placement, DoneFn, error)

// Version returns the version of the underlying staged placement.
Version() int

// Close closes the active staged placement.
Close() error
}
Expand Down

0 comments on commit 5738b8a

Please sign in to comment.