Skip to content

Commit

Permalink
add scaling policy type
Browse files Browse the repository at this point in the history
  • Loading branch information
lgfa29 committed Sep 29, 2020
1 parent 29db7be commit 5a48a8d
Show file tree
Hide file tree
Showing 14 changed files with 737 additions and 94 deletions.
10 changes: 10 additions & 0 deletions api/scaling.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package api

const (
// ScalingPolicyTypeHorizontal indicates a policy that does horizontal scaling.
ScalingPolicyTypeHorizontal = "horizontal"
)

// Scaling is used to query scaling-related API endpoints
type Scaling struct {
client *Client
Expand Down Expand Up @@ -36,6 +41,9 @@ func (p *ScalingPolicy) Canonicalize(taskGroupCount int) {
var m int64 = int64(taskGroupCount)
p.Min = &m
}
if p.Type == "" {
p.Type = ScalingPolicyTypeHorizontal
}
}

// ScalingRequest is the payload for a generic scaling action
Expand All @@ -54,6 +62,7 @@ type ScalingRequest struct {
type ScalingPolicy struct {
ID string
Namespace string
Type string
Target map[string]string
Min *int64
Max *int64
Expand All @@ -68,6 +77,7 @@ type ScalingPolicy struct {
type ScalingPolicyListStub struct {
ID string
Enabled bool
Type string
Target map[string]string
CreateIndex uint64
ModifyIndex uint64
Expand Down
4 changes: 4 additions & 0 deletions api/scaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func TestScalingPolicies_ListPolicies(t *testing.T) {

// Check that the scaling policy references the right group
require.Equal(policy.Target["Group"], *job.TaskGroups[0].Name)

// Check that the scaling policy has the right type
require.Equal(ScalingPolicyTypeHorizontal, policy.Type)
}

func TestScalingPolicies_GetPolicy(t *testing.T) {
Expand Down Expand Up @@ -117,4 +120,5 @@ func TestScalingPolicies_GetPolicy(t *testing.T) {
require.Equal(policy.Enabled, resp.Enabled)
require.Equal(*policy.Min, *resp.Min)
require.Equal(policy.Max, resp.Max)
require.Equal(ScalingPolicyTypeHorizontal, resp.Type)
}
4 changes: 4 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1474,6 +1474,10 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return err
}

// Handle upgrade path:
// - Set policy type if empty
scalingPolicy.Canonicalize()

if err := restore.ScalingPolicyRestore(scalingPolicy); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5589,7 +5589,7 @@ func TestJobEndpoint_Plan_Scaling(t *testing.T) {
job := mock.Job()
tg := job.TaskGroups[0]
tg.Tasks[0].Resources.MemoryMB = 999999999
scaling := &structs.ScalingPolicy{Min: 1, Max: 100}
scaling := &structs.ScalingPolicy{Min: 1, Max: 100, Type: structs.ScalingPolicyTypeHorizontal}
tg.Scaling = scaling.TargetTaskGroup(job, tg)
planReq := &structs.JobPlanRequest{
Job: job,
Expand Down
9 changes: 6 additions & 3 deletions nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -1350,13 +1350,15 @@ func ACLManagementToken() *structs.ACLToken {

func ScalingPolicy() *structs.ScalingPolicy {
return &structs.ScalingPolicy{
ID: uuid.Generate(),
Min: 1,
Max: 100,
ID: uuid.Generate(),
Min: 1,
Max: 100,
Type: structs.ScalingPolicyTypeHorizontal,
Target: map[string]string{
structs.ScalingTargetNamespace: structs.DefaultNamespace,
structs.ScalingTargetJob: uuid.Generate(),
structs.ScalingTargetGroup: uuid.Generate(),
structs.ScalingTargetTask: uuid.Generate(),
},
Policy: map[string]interface{}{
"a": "b",
Expand All @@ -1373,6 +1375,7 @@ func JobWithScalingPolicy() (*structs.Job, *structs.ScalingPolicy) {
ID: uuid.Generate(),
Min: int64(job.TaskGroups[0].Count),
Max: int64(job.TaskGroups[0].Count),
Type: structs.ScalingPolicyTypeHorizontal,
Policy: map[string]interface{}{},
Enabled: true,
}
Expand Down
38 changes: 29 additions & 9 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,10 @@ func csiPluginTableSchema() *memdb.TableSchema {
// using reflection and builds an index on that field.
type ScalingPolicyTargetFieldIndex struct {
Field string

// AllowMissing controls if the field should be ignored if the field is
// not provided.
AllowMissing bool
}

// FromObject is used to extract an index value from an
Expand All @@ -751,7 +755,7 @@ func (s *ScalingPolicyTargetFieldIndex) FromObject(obj interface{}) (bool, []byt
}

val, ok := policy.Target[s.Field]
if !ok {
if !ok && !s.AllowMissing {
return false, nil, nil
}

Expand Down Expand Up @@ -808,28 +812,44 @@ func scalingPolicyTableSchema() *memdb.TableSchema {
// Target index is used for listing by namespace or job, or looking up a specific target.
// A given task group can have only a single scaling policies, so this is guaranteed to be unique.
"target": {
Name: "target",
AllowMissing: false,
Unique: true,
Name: "target",
Unique: false,

// Use a compound index so the tuple of (Namespace, Job, Group) is
// uniquely identifying
// Use a compound index so the tuple of (Namespace, Job, Group, Task) is
// used when looking for a policy
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&ScalingPolicyTargetFieldIndex{
Field: "Namespace",
Field: "Namespace",
AllowMissing: true,
},

&ScalingPolicyTargetFieldIndex{
Field: "Job",
AllowMissing: true,
},

&ScalingPolicyTargetFieldIndex{
Field: "Job",
Field: "Group",
AllowMissing: true,
},

&ScalingPolicyTargetFieldIndex{
Field: "Group",
Field: "Task",
AllowMissing: true,
},
},
},
},
// Type index is used for listing by policy type
"type": {
Name: "type",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "Type",
},
},
// Used to filter by enabled
"enabled": {
Name: "enabled",
Expand Down
60 changes: 60 additions & 0 deletions nomad/state/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -84,3 +85,62 @@ func TestState_singleRecord(t *testing.T) {
require.Equal(1, numRecordsInTable())
require.Equal("three", first())
}

func TestState_ScalingPolicyTargetFieldIndex_FromObject(t *testing.T) {
require := require.New(t)

policy := mock.ScalingPolicy()
policy.Target["TestField"] = "test"

// Create test indexers
indexersAllowMissingTrue := &ScalingPolicyTargetFieldIndex{Field: "TestField", AllowMissing: true}
indexersAllowMissingFalse := &ScalingPolicyTargetFieldIndex{Field: "TestField", AllowMissing: false}

// Check if box indexers can find the test field
ok, val, err := indexersAllowMissingTrue.FromObject(policy)
require.True(ok)
require.NoError(err)
require.Equal("test\x00", string(val))

ok, val, err = indexersAllowMissingFalse.FromObject(policy)
require.True(ok)
require.NoError(err)
require.Equal("test\x00", string(val))

// Check for empty field
policy.Target["TestField"] = ""

ok, val, err = indexersAllowMissingTrue.FromObject(policy)
require.True(ok)
require.NoError(err)
require.Equal("\x00", string(val))

ok, val, err = indexersAllowMissingFalse.FromObject(policy)
require.True(ok)
require.NoError(err)
require.Equal("\x00", string(val))

// Check for missing field
delete(policy.Target, "TestField")

ok, val, err = indexersAllowMissingTrue.FromObject(policy)
require.True(ok)
require.NoError(err)
require.Equal("\x00", string(val))

ok, val, err = indexersAllowMissingFalse.FromObject(policy)
require.False(ok)
require.NoError(err)
require.Equal("", string(val))

// Check for invalid input
ok, val, err = indexersAllowMissingTrue.FromObject("not-a-scaling-policy")
require.False(ok)
require.Error(err)
require.Equal("", string(val))

ok, val, err = indexersAllowMissingFalse.FromObject("not-a-scaling-policy")
require.False(ok)
require.Error(err)
require.Equal("", string(val))
}
70 changes: 59 additions & 11 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5387,22 +5387,34 @@ func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*s

for _, policy := range scalingPolicies {
// Check if the scaling policy already exists
existing, err := txn.First("scaling_policy", "target",
// Policy uniqueness is based on target and type
it, err := txn.Get("scaling_policy", "target",
policy.Target[structs.ScalingTargetNamespace],
policy.Target[structs.ScalingTargetJob],
policy.Target[structs.ScalingTargetGroup])
policy.Target[structs.ScalingTargetGroup],
policy.Target[structs.ScalingTargetTask],
)
if err != nil {
return fmt.Errorf("scaling policy lookup failed: %v", err)
}

// Check if type matches
var existing *structs.ScalingPolicy
for raw := it.Next(); raw != nil; raw = it.Next() {
p := raw.(*structs.ScalingPolicy)
if p.Type == policy.Type {
existing = p
break
}
}

// Setup the indexes correctly
if existing != nil {
p := existing.(*structs.ScalingPolicy)
if !p.Diff(policy) {
if !existing.Diff(policy) {
continue
}
policy.ID = p.ID
policy.CreateIndex = p.CreateIndex
policy.ID = existing.ID
policy.CreateIndex = existing.CreateIndex
policy.ModifyIndex = index
} else {
// policy.ID must have been set already in Job.Register before log apply
Expand Down Expand Up @@ -5483,6 +5495,32 @@ func (s *StateStore) ScalingPolicies(ws memdb.WatchSet) (memdb.ResultIterator, e
return iter, nil
}

// ScalingPoliciesByType returns an iterator over scaling policies of a certain type.
func (s *StateStore) ScalingPoliciesByType(ws memdb.WatchSet, t string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

iter, err := txn.Get("scaling_policy", "type", t)
if err != nil {
return nil, err
}

ws.Add(iter.WatchCh())
return iter, nil
}

// ScalingPoliciesByTypePrefix returns an iterator over scaling policies with a certain type prefix.
func (s *StateStore) ScalingPoliciesByTypePrefix(ws memdb.WatchSet, t string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

iter, err := txn.Get("scaling_policy", "type_prefix", t)
if err != nil {
return nil, err
}

ws.Add(iter.WatchCh())
return iter, nil
}

func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

Expand Down Expand Up @@ -5551,23 +5589,33 @@ func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.S
return nil, nil
}

func (s *StateStore) ScalingPolicyByTarget(ws memdb.WatchSet, target map[string]string) (*structs.ScalingPolicy,
func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[string]string, typ string) (*structs.ScalingPolicy,
error) {
txn := s.db.ReadTxn()

// currently, only scaling policy type is against a task group
namespace := target[structs.ScalingTargetNamespace]
job := target[structs.ScalingTargetJob]
group := target[structs.ScalingTargetGroup]
task := target[structs.ScalingTargetTask]

watchCh, existing, err := txn.FirstWatch("scaling_policy", "target", namespace, job, group)
it, err := txn.Get("scaling_policy", "target", namespace, job, group, task)
if err != nil {
return nil, fmt.Errorf("scaling_policy lookup failed: %v", err)
}
ws.Add(watchCh)
ws.Add(it.WatchCh())

// Check for type
var existing *structs.ScalingPolicy
for raw := it.Next(); raw != nil; raw = it.Next() {
p := raw.(*structs.ScalingPolicy)
if p.Type == typ {
existing = p
break
}
}

if existing != nil {
return existing.(*structs.ScalingPolicy), nil
return existing, nil
}

return nil, nil
Expand Down
Loading

0 comments on commit 5a48a8d

Please sign in to comment.