Skip to content

Commit

Permalink
Merge pull request #4763 from hashicorp/f-devices-parse
Browse files Browse the repository at this point in the history
Parse devices in job spec
  • Loading branch information
dadgar authored Oct 9, 2018
2 parents 6e369c6 + 2be2650 commit 4568da6
Show file tree
Hide file tree
Showing 9 changed files with 512 additions and 38 deletions.
30 changes: 30 additions & 0 deletions api/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type Resources struct {
DiskMB *int `mapstructure:"disk"`
IOPS *int
Networks []*NetworkResource
Devices []*RequestedDevice
}

// Canonicalize will supply missing values in the cases
Expand All @@ -28,6 +29,9 @@ func (r *Resources) Canonicalize() {
for _, n := range r.Networks {
n.Canonicalize()
}
for _, d := range r.Devices {
d.Canonicalize()
}
}

// DefaultResources is a small resources object that contains the
Expand Down Expand Up @@ -75,6 +79,9 @@ func (r *Resources) Merge(other *Resources) {
if len(other.Networks) != 0 {
r.Networks = other.Networks
}
if len(other.Devices) != 0 {
r.Devices = other.Devices
}
}

type Port struct {
Expand All @@ -98,3 +105,26 @@ func (n *NetworkResource) Canonicalize() {
n.MBits = helper.IntToPtr(10)
}
}

// RequestedDevice is used to request a device for a task.
type RequestedDevice struct {
// Name is the request name. The possible values are as follows:
// * <type>: A single value only specifies the type of request.
// * <vendor>/<type>: A single slash delimiter assumes the vendor and type of device is specified.
// * <vendor>/<type>/<name>: Two slash delimiters assume vendor, type and specific model are specified.
//
// Examples are as follows:
// * "gpu"
// * "nvidia/gpu"
// * "nvidia/gpu/GTX2080Ti"
Name string

// Count is the number of requested devices
Count *uint64
}

func (d *RequestedDevice) Canonicalize() {
if d.Count == nil {
d.Count = helper.Uint64ToPtr(1)
}
}
92 changes: 56 additions & 36 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,42 +833,7 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
}
}

structsTask.Resources = &structs.Resources{
CPU: *apiTask.Resources.CPU,
MemoryMB: *apiTask.Resources.MemoryMB,
IOPS: *apiTask.Resources.IOPS,
}

if l := len(apiTask.Resources.Networks); l != 0 {
structsTask.Resources.Networks = make([]*structs.NetworkResource, l)
for i, nw := range apiTask.Resources.Networks {
structsTask.Resources.Networks[i] = &structs.NetworkResource{
CIDR: nw.CIDR,
IP: nw.IP,
MBits: *nw.MBits,
}

if l := len(nw.DynamicPorts); l != 0 {
structsTask.Resources.Networks[i].DynamicPorts = make([]structs.Port, l)
for j, dp := range nw.DynamicPorts {
structsTask.Resources.Networks[i].DynamicPorts[j] = structs.Port{
Label: dp.Label,
Value: dp.Value,
}
}
}

if l := len(nw.ReservedPorts); l != 0 {
structsTask.Resources.Networks[i].ReservedPorts = make([]structs.Port, l)
for j, rp := range nw.ReservedPorts {
structsTask.Resources.Networks[i].ReservedPorts[j] = structs.Port{
Label: rp.Label,
Value: rp.Value,
}
}
}
}
}
structsTask.Resources = ApiResourcesToStructs(apiTask.Resources)

structsTask.LogConfig = &structs.LogConfig{
MaxFiles: *apiTask.LogConfig.MaxFiles,
Expand Down Expand Up @@ -922,6 +887,61 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
}
}

func ApiResourcesToStructs(in *api.Resources) *structs.Resources {
if in == nil {
return nil
}

out := &structs.Resources{
CPU: *in.CPU,
MemoryMB: *in.MemoryMB,
IOPS: *in.IOPS,
}

if l := len(in.Networks); l != 0 {
out.Networks = make([]*structs.NetworkResource, l)
for i, nw := range in.Networks {
out.Networks[i] = &structs.NetworkResource{
CIDR: nw.CIDR,
IP: nw.IP,
MBits: *nw.MBits,
}

if l := len(nw.DynamicPorts); l != 0 {
out.Networks[i].DynamicPorts = make([]structs.Port, l)
for j, dp := range nw.DynamicPorts {
out.Networks[i].DynamicPorts[j] = structs.Port{
Label: dp.Label,
Value: dp.Value,
}
}
}

if l := len(nw.ReservedPorts); l != 0 {
out.Networks[i].ReservedPorts = make([]structs.Port, l)
for j, rp := range nw.ReservedPorts {
out.Networks[i].ReservedPorts[j] = structs.Port{
Label: rp.Label,
Value: rp.Value,
}
}
}
}
}

if l := len(in.Devices); l != 0 {
out.Devices = make([]*structs.RequestedDevice, l)
for i, d := range in.Devices {
out.Devices[i] = &structs.RequestedDevice{
Name: d.Name,
Count: *d.Count,
}
}
}

return out
}

func ApiConstraintToStructs(c1 *api.Constraint, c2 *structs.Constraint) {
c2.LTarget = c1.LTarget
c2.RTarget = c1.RTarget
Expand Down
20 changes: 20 additions & 0 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1416,6 +1416,16 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
},
},
},
Devices: []*api.RequestedDevice{
{
Name: "nvidia/gpu",
Count: helper.Uint64ToPtr(4),
},
{
Name: "gpu",
Count: nil,
},
},
},
Meta: map[string]string{
"lol": "code",
Expand Down Expand Up @@ -1690,6 +1700,16 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
},
},
},
Devices: []*structs.RequestedDevice{
{
Name: "nvidia/gpu",
Count: 4,
},
{
Name: "gpu",
Count: 1,
},
},
},
Meta: map[string]string{
"lol": "code",
Expand Down
44 changes: 42 additions & 2 deletions jobspec/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ func parseSpread(result *[]*api.Spread, list *ast.ObjectList) error {
// Parse spread target
if o := listVal.Filter("target"); len(o.Items) > 0 {
if err := parseSpreadTarget(&s.SpreadTarget, o); err != nil {
return multierror.Prefix(err, fmt.Sprintf("error parsing spread target"))
return multierror.Prefix(err, fmt.Sprintf("target ->"))
}
}

Expand All @@ -766,9 +766,11 @@ func parseSpread(result *[]*api.Spread, list *ast.ObjectList) error {
}

func parseSpreadTarget(result *[]*api.SpreadTarget, list *ast.ObjectList) error {

seen := make(map[string]struct{})
for _, item := range list.Items {
if len(item.Keys) != 1 {
return fmt.Errorf("missing spread target")
}
n := item.Keys[0].Token.Value().(string)

// Make sure we haven't already found this
Expand Down Expand Up @@ -1413,6 +1415,7 @@ func parseResources(result *api.Resources, list *ast.ObjectList) error {
"disk",
"memory",
"network",
"device",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return multierror.Prefix(err, "resources ->")
Expand All @@ -1423,6 +1426,7 @@ func parseResources(result *api.Resources, list *ast.ObjectList) error {
return err
}
delete(m, "network")
delete(m, "device")

if err := mapstructure.WeakDecode(m, result); err != nil {
return err
Expand Down Expand Up @@ -1465,6 +1469,42 @@ func parseResources(result *api.Resources, list *ast.ObjectList) error {
result.Networks = []*api.NetworkResource{&r}
}

// Parse the device resources
if o := listVal.Filter("device"); len(o.Items) > 0 {
result.Devices = make([]*api.RequestedDevice, len(o.Items))
for idx, do := range o.Items {
if l := len(do.Keys); l == 0 {
return multierror.Prefix(fmt.Errorf("missing device name"), fmt.Sprintf("resources, device[%d]->", idx))
} else if l > 1 {
return multierror.Prefix(fmt.Errorf("only one name may be specified"), fmt.Sprintf("resources, device[%d]->", idx))
}
name := do.Keys[0].Token.Value().(string)

// Check for invalid keys
valid := []string{
"name",
"count",
}
if err := helper.CheckHCLKeys(do.Val, valid); err != nil {
return multierror.Prefix(err, fmt.Sprintf("resources, device[%d]->", idx))
}

// Set the name
var r api.RequestedDevice
r.Name = name

var m map[string]interface{}
if err := hcl.DecodeObject(&m, do.Val); err != nil {
return err
}
if err := mapstructure.WeakDecode(m, &r); err != nil {
return err
}

result.Devices[idx] = &r
}
}

return nil
}

Expand Down
10 changes: 10 additions & 0 deletions jobspec/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,16 @@ func TestParse(t *testing.T) {
DynamicPorts: []api.Port{{Label: "http", Value: 0}, {Label: "https", Value: 0}, {Label: "admin", Value: 0}},
},
},
Devices: []*api.RequestedDevice{
{
Name: "nvidia/gpu",
Count: helper.Uint64ToPtr(10),
},
{
Name: "intel/gpu",
Count: nil,
},
},
},
KillTimeout: helper.TimeToPtr(22 * time.Second),
ShutdownDelay: 11 * time.Second,
Expand Down
6 changes: 6 additions & 0 deletions jobspec/test-fixtures/basic.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ job "binstore-storagelocker" {
port "admin" {
}
}

device "nvidia/gpu" {
count = 10
}

device "intel/gpu" {}
}

kill_timeout = "22s"
Expand Down
66 changes: 66 additions & 0 deletions nomad/structs/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,11 @@ func (r *Resources) Diff(other *Resources, contextual bool) *ObjectDiff {
diff.Objects = append(diff.Objects, nDiffs...)
}

// Requested Devices diff
if nDiffs := requestedDevicesDiffs(r.Devices, other.Devices, contextual); nDiffs != nil {
diff.Objects = append(diff.Objects, nDiffs...)
}

return diff
}

Expand Down Expand Up @@ -975,6 +980,67 @@ func portDiffs(old, new []Port, dynamic bool, contextual bool) []*ObjectDiff {

}

// Diff returns a diff of two requested devices. If contextual diff is enabled,
// non-changed fields will still be returned.
func (r *RequestedDevice) Diff(other *RequestedDevice, contextual bool) *ObjectDiff {
diff := &ObjectDiff{Type: DiffTypeNone, Name: "Device"}
var oldPrimitiveFlat, newPrimitiveFlat map[string]string

if reflect.DeepEqual(r, other) {
return nil
} else if r == nil {
diff.Type = DiffTypeAdded
newPrimitiveFlat = flatmap.Flatten(other, nil, true)
} else if other == nil {
diff.Type = DiffTypeDeleted
oldPrimitiveFlat = flatmap.Flatten(r, nil, true)
} else {
diff.Type = DiffTypeEdited
oldPrimitiveFlat = flatmap.Flatten(r, nil, true)
newPrimitiveFlat = flatmap.Flatten(other, nil, true)
}

// Diff the primitive fields.
diff.Fields = fieldDiffs(oldPrimitiveFlat, newPrimitiveFlat, contextual)

return diff
}

// requestedDevicesDiffs diffs a set of RequestedDevices. If contextual diff is enabled,
// non-changed fields will still be returned.
func requestedDevicesDiffs(old, new []*RequestedDevice, contextual bool) []*ObjectDiff {
makeSet := func(devices []*RequestedDevice) map[string]*RequestedDevice {
deviceMap := make(map[string]*RequestedDevice, len(devices))
for _, d := range devices {
deviceMap[d.Name] = d
}

return deviceMap
}

oldSet := makeSet(old)
newSet := makeSet(new)

var diffs []*ObjectDiff
for k, oldV := range oldSet {
newV := newSet[k]
if diff := oldV.Diff(newV, contextual); diff != nil {
diffs = append(diffs, diff)
}
}
for k, newV := range newSet {
if oldV, ok := oldSet[k]; !ok {
if diff := oldV.Diff(newV, contextual); diff != nil {
diffs = append(diffs, diff)
}
}
}

sort.Sort(ObjectDiffs(diffs))
return diffs

}

// configDiff returns the diff of two Task Config objects. If contextual diff is
// enabled, all fields will be returned, even if no diff occurred.
func configDiff(old, new map[string]interface{}, contextual bool) *ObjectDiff {
Expand Down
Loading

0 comments on commit 4568da6

Please sign in to comment.