From 22d494f812ea90f1210e763e4bacb7f8d0651496 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 23 Nov 2016 14:56:50 -0800 Subject: [PATCH 01/27] Dispatch structs --- api/jobs.go | 9 ++++ api/tasks.go | 35 ++++++++----- nomad/structs/funcs.go | 24 +++++++++ nomad/structs/structs.go | 98 +++++++++++++++++++++++++++++++++++ nomad/structs/structs_test.go | 26 ++++++++++ 5 files changed, 178 insertions(+), 14 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index fcd103b5262..2551a15317d 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -187,6 +187,14 @@ type PeriodicConfig struct { ProhibitOverlap bool } +// DispatchConfig is used to configure the dispatch template +type DispatchConfig struct { + Paused bool + InputData string + MetaRequired []string + MetaOptional []string +} + // Job is used to serialize a job. type Job struct { Region string @@ -201,6 +209,7 @@ type Job struct { TaskGroups []*TaskGroup Update *UpdateStrategy Periodic *PeriodicConfig + Dispatch *DispatchConfig Meta map[string]string VaultToken string Status string diff --git a/api/tasks.go b/api/tasks.go index 2ca804e6267..a79d1f37ce5 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -141,22 +141,29 @@ type LogConfig struct { MaxFileSizeMB int } +// DispatchInputConfig configures how a task gets its input from a job dispatch +type DispatchInputConfig struct { + Stdin bool + File string +} + // Task is a single process in a task group. type Task struct { - Name string - Driver string - User string - Config map[string]interface{} - Constraints []*Constraint - Env map[string]string - Services []Service - Resources *Resources - Meta map[string]string - KillTimeout time.Duration - LogConfig *LogConfig - Artifacts []*TaskArtifact - Vault *Vault - Templates []*Template + Name string + Driver string + User string + Config map[string]interface{} + Constraints []*Constraint + Env map[string]string + Services []Service + Resources *Resources + Meta map[string]string + KillTimeout time.Duration + LogConfig *LogConfig + Artifacts []*TaskArtifact + Vault *Vault + Templates []*Template + DispatchInput *DispatchInputConfig } // TaskArtifact is used to download artifacts before running a task. diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 104bb58b477..178da715131 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -269,6 +269,30 @@ func SliceStringIsSubset(larger, smaller []string) (bool, []string) { return subset, offending } +func SliceSetDisjoint(first, second []string) (bool, []string) { + contained := make(map[string]struct{}, len(first)) + for _, k := range first { + contained[k] = struct{}{} + } + + offending := make(map[string]struct{}) + for _, k := range second { + if _, ok := contained[k]; ok { + offending[k] = struct{}{} + } + } + + if len(offending) == 0 { + return true, nil + } + + flattened := make([]string, 0, len(offending)) + for k := range offending { + flattened = append(flattened, k) + } + return false, flattened +} + // VaultPoliciesSet takes the structure returned by VaultPolicies and returns // the set of required policies func VaultPoliciesSet(policies map[string]map[string]*Vault) []string { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 9f94bcacfbf..cdd1578b52e 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1146,6 +1146,9 @@ type Job struct { // Periodic is used to define the interval the job is run at. Periodic *PeriodicConfig + // Dispatch is used to specify the job as a template job for dispatching. + Dispatch *DispatchConfig + // Meta is used to associate arbitrary metadata with this // job. This is opaque to Nomad. Meta map[string]string @@ -1179,6 +1182,10 @@ func (j *Job) Canonicalize() { for _, tg := range j.TaskGroups { tg.Canonicalize(j) } + + if j.Dispatch != nil { + j.Dispatch.Canonicalize() + } } // Copy returns a deep copy of the Job. It is expected that callers use recover. @@ -1202,6 +1209,7 @@ func (j *Job) Copy() *Job { nj.Periodic = nj.Periodic.Copy() nj.Meta = CopyMapStringString(nj.Meta) + nj.Dispatch = nj.Dispatch.Copy() return nj } @@ -1276,6 +1284,12 @@ func (j *Job) Validate() error { } } + if j.IsDispatchTemplate() { + if err := j.Dispatch.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } + return mErr.ErrorOrNil() } @@ -1311,6 +1325,11 @@ func (j *Job) IsPeriodic() bool { return j.Periodic != nil } +// IsDispatchTemplate returns whether a job is dispatch template. +func (j *Job) IsDispatchTemplate() bool { + return j.Dispatch != nil +} + // VaultPolicies returns the set of Vault policies per task group, per task func (j *Job) VaultPolicies() map[string]map[string]*Vault { policies := make(map[string]map[string]*Vault, len(j.TaskGroups)) @@ -1525,6 +1544,80 @@ type PeriodicLaunch struct { ModifyIndex uint64 } +const ( + DispatchInputDataForbidden = "forbidden" + DispatchInputDataOptional = "optional" + DispatchInputDataRequired = "required" +) + +// DispatchConfig is used to configure the dispatch template +type DispatchConfig struct { + // Paused specifies whether jobs can be dispatched based on the template or + // if the job is paused. + Paused bool + + // InputData configure the input data requirements + InputData string + + // MetaRequired is metadata keys that must be specified by the dispatcher + MetaRequired []string + + // MetaOptional is metadata keys that may be specified by the dispatcher + MetaOptional []string +} + +func (d *DispatchConfig) Validate() error { + var mErr multierror.Error + switch d.InputData { + case DispatchInputDataOptional, DispatchInputDataRequired, DispatchInputDataForbidden: + default: + multierror.Append(&mErr, fmt.Errorf("Unknown input data requirement: %q", d.InputData)) + } + + // Check that the meta configurations are disjoint sets + disjoint, offending := SliceSetDisjoint(d.MetaRequired, d.MetaOptional) + if !disjoint { + multierror.Append(&mErr, fmt.Errorf("Required and optional meta keys should be disjoint. Following keys exist in both: %v", offending)) + } + + return mErr.ErrorOrNil() +} + +func (d *DispatchConfig) Canonicalize() { + if d.InputData == "" { + d.InputData = DispatchInputDataOptional + } +} + +func (d *DispatchConfig) Copy() *DispatchConfig { + if d == nil { + return nil + } + nd := new(DispatchConfig) + *nd = *d + nd.MetaOptional = CopySliceString(nd.MetaOptional) + nd.MetaRequired = CopySliceString(nd.MetaRequired) + return nd +} + +// DispatchInputConfig configures how a task gets its input from a job dispatch +type DispatchInputConfig struct { + // Stdin specifies whether the input should be written to the task's stdin + Stdin bool + + // File specifies a relative path to where the input data should be written + File string +} + +func (d *DispatchInputConfig) Copy() *DispatchInputConfig { + if d == nil { + return nil + } + nd := new(DispatchInputConfig) + *nd = *d + return nd +} + var ( defaultServiceJobRestartPolicy = RestartPolicy{ Delay: 15 * time.Second, @@ -2076,6 +2169,10 @@ type Task struct { // Resources is the resources needed by this task Resources *Resources + // DispatchInput configures how the task retrieves its input from a dispatch + // template + DispatchInput *DispatchInputConfig + // Meta is used to associate arbitrary metadata with this // task. This is opaque to Nomad. Meta map[string]string @@ -2113,6 +2210,7 @@ func (t *Task) Copy() *Task { nt.Vault = nt.Vault.Copy() nt.Resources = nt.Resources.Copy() nt.Meta = CopyMapStringString(nt.Meta) + nt.DispatchInput = nt.DispatchInput.Copy() if t.Artifacts != nil { artifacts := make([]*TaskArtifact, 0, len(t.Artifacts)) diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index f8c8ee4b4d2..7a8aa6f8bbf 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -1444,3 +1444,29 @@ func TestVault_Validate(t *testing.T) { t.Fatalf("Expected signal empty error") } } + +func TestDispatchConfig_Validate(t *testing.T) { + d := &DispatchConfig{ + InputData: "foo", + } + + if err := d.Validate(); err == nil || !strings.Contains(err.Error(), "input data") { + t.Fatalf("Expected unknown input data requirement: %v", err) + } + + d.InputData = DispatchInputDataOptional + d.MetaOptional = []string{"foo", "bar"} + d.MetaRequired = []string{"bar", "baz"} + + if err := d.Validate(); err == nil || !strings.Contains(err.Error(), "disjoint") { + t.Fatalf("Expected meta not being disjoint error: %v", err) + } +} + +func TestDispatchConfig_Canonicalize(t *testing.T) { + d := &DispatchConfig{} + d.Canonicalize() + if d.InputData != DispatchInputDataOptional { + t.Fatalf("Canonicalize failed") + } +} From 35d274df1f59fbaa146eaf0bbdf1d8725eef46c0 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 23 Nov 2016 15:48:36 -0800 Subject: [PATCH 02/27] Parse --- jobspec/parse.go | 121 +++++++++++++++++++++++++++-- jobspec/parse_test.go | 46 +++++++++++ jobspec/test-fixtures/dispatch.hcl | 21 +++++ nomad/structs/structs.go | 6 +- 4 files changed, 183 insertions(+), 11 deletions(-) create mode 100644 jobspec/test-fixtures/dispatch.hcl diff --git a/jobspec/parse.go b/jobspec/parse.go index f9defa7cf43..bf9eaaad2ef 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -102,6 +102,7 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error { delete(m, "update") delete(m, "periodic") delete(m, "vault") + delete(m, "dispatch") // Set the ID and name to the object key result.ID = obj.Keys[0].Token.Value().(string) @@ -127,19 +128,20 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error { // Check for invalid keys valid := []string{ + "all_at_once", + "constraint", + "datacenters", + "dispatch", + "group", "id", + "meta", "name", + "periodic", + "priority", "region", - "all_at_once", + "task", "type", - "priority", - "datacenters", - "constraint", "update", - "periodic", - "meta", - "task", - "group", "vault", "vault_token", } @@ -168,6 +170,13 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error { } } + // If we have a dispatch definition, then parse that + if o := listVal.Filter("dispatch"); len(o.Items) > 0 { + if err := parseDispatch(&result.Dispatch, o); err != nil { + return multierror.Prefix(err, "dispatch ->") + } + } + // Parse out meta fields. These are in HCL as a list so we need // to iterate over them and merge them. if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 { @@ -552,6 +561,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l "artifact", "config", "constraint", + "dispatch_input", "driver", "env", "kill_timeout", @@ -574,6 +584,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l delete(m, "artifact") delete(m, "config") delete(m, "constraint") + delete(m, "dispatch_input") delete(m, "env") delete(m, "logs") delete(m, "meta") @@ -733,6 +744,33 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l t.Vault = v } + // If we have a dispatch_input block parse that + if o := listVal.Filter("dispatch_input"); len(o.Items) > 0 { + if len(o.Items) > 1 { + return fmt.Errorf("only one dispatch_input block is allowed in a task. Number of logs block found: %d", len(o.Items)) + } + var m map[string]interface{} + dispatchBlock := o.Items[0] + + // Check for invalid keys + valid := []string{ + "stdin", + "file", + } + if err := checkHCLKeys(dispatchBlock.Val, valid); err != nil { + return multierror.Prefix(err, fmt.Sprintf("'%s', dispatch_input ->", n)) + } + + if err := hcl.DecodeObject(&m, dispatchBlock.Val); err != nil { + return err + } + + t.DispatchInput = &structs.DispatchInputConfig{} + if err := mapstructure.WeakDecode(m, t.DispatchInput); err != nil { + return err + } + } + *result = append(*result, &t) } @@ -1205,6 +1243,73 @@ func parseVault(result *structs.Vault, list *ast.ObjectList) error { return nil } +func parseDispatch(result **structs.DispatchConfig, list *ast.ObjectList) error { + list = list.Elem() + if len(list.Items) > 1 { + return fmt.Errorf("only one 'dispatch' block allowed per job") + } + + // Get our resource object + o := list.Items[0] + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, o.Val); err != nil { + return err + } + + delete(m, "meta") + + // Check for invalid keys + valid := []string{ + "input_data", + "meta_keys", + "paused", + } + if err := checkHCLKeys(o.Val, valid); err != nil { + return err + } + + // Build the dispatch block + var d structs.DispatchConfig + if err := mapstructure.WeakDecode(m, &d); err != nil { + return err + } + + var listVal *ast.ObjectList + if ot, ok := o.Val.(*ast.ObjectType); ok { + listVal = ot.List + } else { + return fmt.Errorf("dispatch block should be an object") + } + + // Parse the meta block + if metaList := listVal.Filter("meta_keys"); len(metaList.Items) > 0 { + // Get our resource object + o := metaList.Items[0] + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, o.Val); err != nil { + return err + } + + // Check for invalid keys + valid := []string{ + "optional", + "required", + } + if err := checkHCLKeys(o.Val, valid); err != nil { + return err + } + + if err := mapstructure.WeakDecode(m, &d); err != nil { + return err + } + } + + *result = &d + return nil +} + func checkHCLKeys(node ast.Node, valid []string) error { var list *ast.ObjectList switch n := node.(type) { diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 1e2a282a0f9..53d43a07b9b 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -537,6 +537,52 @@ func TestParse(t *testing.T) { }, false, }, + + { + "dispatch.hcl", + &structs.Job{ + ID: "dispatch", + Name: "dispatch", + Type: "service", + Priority: 50, + Region: "global", + + Dispatch: &structs.DispatchConfig{ + Paused: true, + InputData: "required", + MetaRequired: []string{"foo", "bar"}, + MetaOptional: []string{"baz", "bam"}, + }, + + TaskGroups: []*structs.TaskGroup{ + &structs.TaskGroup{ + Name: "foo", + Count: 1, + EphemeralDisk: structs.DefaultEphemeralDisk(), + Tasks: []*structs.Task{ + &structs.Task{ + Name: "bar", + Driver: "docker", + Resources: &structs.Resources{ + CPU: 100, + MemoryMB: 10, + IOPS: 0, + }, + LogConfig: &structs.LogConfig{ + MaxFiles: 10, + MaxFileSizeMB: 10, + }, + DispatchInput: &structs.DispatchInputConfig{ + Stdin: true, + File: "foo/bar", + }, + }, + }, + }, + }, + }, + false, + }, } for _, tc := range cases { diff --git a/jobspec/test-fixtures/dispatch.hcl b/jobspec/test-fixtures/dispatch.hcl new file mode 100644 index 00000000000..c3ed001c12c --- /dev/null +++ b/jobspec/test-fixtures/dispatch.hcl @@ -0,0 +1,21 @@ +job "dispatch" { + dispatch { + paused = true + input_data = "required" + meta_keys { + required = ["foo", "bar"] + optional = ["baz", "bam"] + } + } + group "foo" { + task "bar" { + driver = "docker" + resources {} + + dispatch_input { + stdin = true + file = "foo/bar" + } + } + } +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index cdd1578b52e..1e2c62ecd64 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1557,13 +1557,13 @@ type DispatchConfig struct { Paused bool // InputData configure the input data requirements - InputData string + InputData string `mapstructure:"input_data"` // MetaRequired is metadata keys that must be specified by the dispatcher - MetaRequired []string + MetaRequired []string `mapstructure:"required"` // MetaOptional is metadata keys that may be specified by the dispatcher - MetaOptional []string + MetaOptional []string `mapstructure:"optional"` } func (d *DispatchConfig) Validate() error { From 14de95f97f4d6a21c8dad9f9d31be8e12f806537 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 25 Nov 2016 18:04:55 -0800 Subject: [PATCH 03/27] dispatch beginning --- api/jobs.go | 1 + command/agent/job_endpoint.go | 23 +++++ nomad/job_endpoint.go | 159 +++++++++++++++++++++++++++++++++- nomad/structs/funcs.go | 8 ++ nomad/structs/structs.go | 29 +++++++ 5 files changed, 218 insertions(+), 2 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index 2551a15317d..1b701a8190e 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -210,6 +210,7 @@ type Job struct { Update *UpdateStrategy Periodic *PeriodicConfig Dispatch *DispatchConfig + InputData []byte Meta map[string]string VaultToken string Status string diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 6fd0814a577..460a91c1d4f 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -57,6 +57,9 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ case strings.HasSuffix(path, "/summary"): jobName := strings.TrimSuffix(path, "/summary") return s.jobSummaryRequest(resp, req, jobName) + case strings.HasSuffix(path, "/dispatch"): + jobName := strings.TrimSuffix(path, "/dispatch") + return s.jobDispatchRequest(resp, req, jobName) default: return s.jobCRUD(resp, req, path) } @@ -265,3 +268,23 @@ func (s *HTTPServer) jobSummaryRequest(resp http.ResponseWriter, req *http.Reque setIndex(resp, out.Index) return out.JobSummary, nil } + +func (s *HTTPServer) jobDispatchRequest(resp http.ResponseWriter, req *http.Request, name string) (interface{}, error) { + if req.Method != "PUT" && req.Method != "POST" { + return nil, CodedError(405, ErrInvalidMethod) + } + args := structs.JobDispatchRequest{ + JobID: name, + } + if err := decodeBody(req, &args); err != nil { + return nil, CodedError(400, err.Error()) + } + s.parseRegion(req, &args.Region) + + var out structs.JobDispatchResponse + if err := s.agent.RPC("Job.Dispatch", &args, &out); err != nil { + return nil, err + } + setIndex(resp, out.Index) + return out, nil +} diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 2e28341d697..e56c311b3e7 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -20,6 +20,10 @@ const ( // RegisterEnforceIndexErrPrefix is the prefix to use in errors caused by // enforcing the job modify index during registers. RegisterEnforceIndexErrPrefix = "Enforcing job modify index" + + // DispatchInputDataSizeLimit is the maximum size of the uncompressed input + // data payload. + DispatchInputDataSizeLimit = 16 * 1024 ) var ( @@ -133,8 +137,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis // Populate the reply with job information reply.JobModifyIndex = index - // If the job is periodic, we don't create an eval. - if args.Job.IsPeriodic() { + // If the job is periodic or a dispatch template, we don't create an eval. + if args.Job.IsPeriodic() || args.Job.IsDispatchTemplate() { return nil } @@ -768,3 +772,154 @@ func validateJob(job *structs.Job) error { return validationErrors.ErrorOrNil() } + +// Dispatch is used to dispatch a job based on a dispatch job template. +func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispatchResponse) error { + if done, err := j.srv.forward("Job.Dispatch", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "job", "dispatch"}, time.Now()) + + // Lookup the job + if args.JobID == "" { + return fmt.Errorf("missing dispatch template job ID") + } + + snap, err := j.srv.fsm.State().Snapshot() + if err != nil { + return err + } + tmpl, err := snap.JobByID(args.JobID) + if err != nil { + return err + } + if tmpl == nil { + return fmt.Errorf("dispatch template job not found") + } + + if !tmpl.IsDispatchTemplate() { + return fmt.Errorf("Specified job %q is not a dispatch template", args.JobID) + } + + // Validate the arguments + if err := validateDispatchRequest(args, tmpl); err != nil { + return err + } + + // XXX compress the input + + // Derive the child job and commit it via Raft + dispatchJob := tmpl.Copy() + dispatchJob.Dispatch = nil + dispatchJob.InputData = args.InputData + dispatchJob.ID = structs.DispatchedID(tmpl.ID, time.Now()) + dispatchJob.Name = dispatchJob.ID + + regReq := &structs.JobRegisterRequest{ + Job: dispatchJob, + WriteRequest: args.WriteRequest, + } + + // Commit this update via Raft + _, jobCreateIndex, err := j.srv.raftApply(structs.JobRegisterRequestType, regReq) + if err != nil { + j.srv.logger.Printf("[ERR] nomad.job: Dispatched job register failed: %v", err) + return err + } + + // Create a new evaluation + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: dispatchJob.Priority, + Type: dispatchJob.Type, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: dispatchJob.ID, + JobModifyIndex: jobCreateIndex, + Status: structs.EvalStatusPending, + } + update := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + WriteRequest: structs.WriteRequest{Region: args.Region}, + } + + // Commit this evaluation via Raft + _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + j.srv.logger.Printf("[ERR] nomad.job: Eval create failed: %v", err) + return err + } + + // Setup the reply + reply.EvalID = eval.ID + reply.EvalCreateIndex = evalIndex + reply.JobCreateIndex = jobCreateIndex + reply.DispatchedJobID = dispatchJob.ID + reply.Index = evalIndex + return nil +} + +// validateDispatchRequest returns whether the request is valid given the +// dispatch configuration of the template job +func validateDispatchRequest(req *structs.JobDispatchRequest, tmpl *structs.Job) error { + // Check the input data constraint is met + hasInputData := len(req.InputData) != 0 + if tmpl.Dispatch.InputData == structs.DispatchInputDataRequired && !hasInputData { + return fmt.Errorf("Input data is not provided but required by dispatch template") + } else if tmpl.Dispatch.InputData == structs.DispatchInputDataForbidden && hasInputData { + return fmt.Errorf("Input data provided but forbidden by dispatch template") + } + + // Check the input data doesn't exceed the size limit + if l := len(req.InputData); l > DispatchInputDataSizeLimit { + return fmt.Errorf("Input data exceeds maximum size; %d > %d", l, DispatchInputDataSizeLimit) + } + + // Check if the metadata is a set + keys := make(map[string]struct{}, len(req.Meta)) + for k := range keys { + if _, ok := keys[k]; ok { + return fmt.Errorf("Duplicate key %q in passed metadata", k) + } + keys[k] = struct{}{} + } + + required := structs.SliceStringToSet(tmpl.Dispatch.MetaRequired) + optional := structs.SliceStringToSet(tmpl.Dispatch.MetaOptional) + + // Check the metadata key constraints are met + unpermitted := make(map[string]struct{}) + for k := range req.Meta { + _, req := required[k] + _, opt := optional[k] + if !req && !opt { + unpermitted[k] = struct{}{} + } + } + + if len(unpermitted) != 0 { + flat := make([]string, 0, len(unpermitted)) + for k := range unpermitted { + flat = append(flat, k) + } + + return fmt.Errorf("Dispatch request included unpermitted metadata keys: %v", flat) + } + + missing := make(map[string]struct{}) + for _, k := range tmpl.Dispatch.MetaRequired { + if _, ok := req.Meta[k]; !ok { + missing[k] = struct{}{} + } + } + + if len(missing) != 0 { + flat := make([]string, 0, len(missing)) + for k := range missing { + flat = append(flat, k) + } + + return fmt.Errorf("Dispatch did not provided required meta keys: %v", flat) + } + + return nil +} diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 178da715131..8d7bbac4fc1 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -328,3 +328,11 @@ func MapStringStringSliceValueSet(m map[string][]string) []string { } return flat } + +func SliceStringToSet(s []string) map[string]struct{} { + m := make(map[string]struct{}, (len(s)+1)/2) + for _, k := range s { + m[k] = struct{}{} + } + return m +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 1e2c62ecd64..9553617ba60 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -272,6 +272,14 @@ type JobSummaryRequest struct { QueryOptions } +// JobDispatchRequest is used to dispatch a job based on a job dispatch template +type JobDispatchRequest struct { + JobID string + InputData []byte + Meta map[string]string + WriteRequest +} + // NodeListRequest is used to parameterize a list request type NodeListRequest struct { QueryOptions @@ -525,6 +533,14 @@ type JobSummaryResponse struct { QueryMeta } +type JobDispatchResponse struct { + DispatchedJobID string + EvalID string + EvalCreateIndex uint64 + JobCreateIndex uint64 + QueryMeta +} + // JobListResponse is used for a list request type JobListResponse struct { Jobs []*JobListStub @@ -1149,6 +1165,9 @@ type Job struct { // Dispatch is used to specify the job as a template job for dispatching. Dispatch *DispatchConfig + // InputData is the input data supplied when the job was dispatched. + InputData []byte + // Meta is used to associate arbitrary metadata with this // job. This is opaque to Nomad. Meta map[string]string @@ -1548,6 +1567,10 @@ const ( DispatchInputDataForbidden = "forbidden" DispatchInputDataOptional = "optional" DispatchInputDataRequired = "required" + + // DispatchLaunchSuffic is the string appended to the dispatch job + // templates's ID when dispatching instances of it. + DispatchLaunchSuffic = "/dispatch-" ) // DispatchConfig is used to configure the dispatch template @@ -1600,6 +1623,12 @@ func (d *DispatchConfig) Copy() *DispatchConfig { return nd } +// DispatchedID returns an ID appropriate for a job dispatched against a +// particular template +func DispatchedID(templateID string) string { + return fmt.Sprintf("%s%s%s", templateID, DispatchLaunchSuffic, GenerateUUID()) +} + // DispatchInputConfig configures how a task gets its input from a job dispatch type DispatchInputConfig struct { // Stdin specifies whether the input should be written to the task's stdin From 4d3af38ad4703fbe1b85ea4adb5e4e9e7d20c6b7 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 25 Nov 2016 20:02:18 -0800 Subject: [PATCH 04/27] Snappy + Dispatch name change Change the generated name to timestamp + 8 characters of UUID Add Snappy compression to the input --- nomad/job_endpoint.go | 10 +- nomad/structs/structs.go | 5 +- vendor/github.com/golang/snappy/AUTHORS | 15 + vendor/github.com/golang/snappy/CONTRIBUTORS | 37 + vendor/github.com/golang/snappy/LICENSE | 27 + vendor/github.com/golang/snappy/README | 107 +++ vendor/github.com/golang/snappy/decode.go | 237 ++++++ .../github.com/golang/snappy/decode_amd64.go | 14 + .../github.com/golang/snappy/decode_amd64.s | 490 ++++++++++++ .../github.com/golang/snappy/decode_other.go | 101 +++ vendor/github.com/golang/snappy/encode.go | 285 +++++++ .../github.com/golang/snappy/encode_amd64.go | 29 + .../github.com/golang/snappy/encode_amd64.s | 730 ++++++++++++++++++ .../github.com/golang/snappy/encode_other.go | 238 ++++++ vendor/github.com/golang/snappy/snappy.go | 87 +++ vendor/vendor.json | 22 +- 16 files changed, 2422 insertions(+), 12 deletions(-) create mode 100644 vendor/github.com/golang/snappy/AUTHORS create mode 100644 vendor/github.com/golang/snappy/CONTRIBUTORS create mode 100644 vendor/github.com/golang/snappy/LICENSE create mode 100644 vendor/github.com/golang/snappy/README create mode 100644 vendor/github.com/golang/snappy/decode.go create mode 100644 vendor/github.com/golang/snappy/decode_amd64.go create mode 100644 vendor/github.com/golang/snappy/decode_amd64.s create mode 100644 vendor/github.com/golang/snappy/decode_other.go create mode 100644 vendor/github.com/golang/snappy/encode.go create mode 100644 vendor/github.com/golang/snappy/encode_amd64.go create mode 100644 vendor/github.com/golang/snappy/encode_amd64.s create mode 100644 vendor/github.com/golang/snappy/encode_other.go create mode 100644 vendor/github.com/golang/snappy/snappy.go diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index e56c311b3e7..5490526ac3f 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -7,6 +7,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/golang/snappy" "github.com/hashicorp/consul/lib" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" @@ -806,15 +807,20 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa return err } - // XXX compress the input + // XXX disable job/evaluate on periodic jobs + + // XXX merge in the meta data // Derive the child job and commit it via Raft dispatchJob := tmpl.Copy() dispatchJob.Dispatch = nil - dispatchJob.InputData = args.InputData dispatchJob.ID = structs.DispatchedID(tmpl.ID, time.Now()) dispatchJob.Name = dispatchJob.ID + // Compress the input + // XXX Decompress on the HTTP endpoint + dispatchJob.InputData = snappy.Encode(dispatchJob.InputData, args.InputData) + regReq := &structs.JobRegisterRequest{ Job: dispatchJob, WriteRequest: args.WriteRequest, diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 9553617ba60..a90ce2b9531 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1625,8 +1625,9 @@ func (d *DispatchConfig) Copy() *DispatchConfig { // DispatchedID returns an ID appropriate for a job dispatched against a // particular template -func DispatchedID(templateID string) string { - return fmt.Sprintf("%s%s%s", templateID, DispatchLaunchSuffic, GenerateUUID()) +func DispatchedID(templateID string, t time.Time) string { + u := GenerateUUID()[:8] + return fmt.Sprintf("%s%s%d-%s", templateID, DispatchLaunchSuffic, t.Unix(), u) } // DispatchInputConfig configures how a task gets its input from a job dispatch diff --git a/vendor/github.com/golang/snappy/AUTHORS b/vendor/github.com/golang/snappy/AUTHORS new file mode 100644 index 00000000000..bcfa19520af --- /dev/null +++ b/vendor/github.com/golang/snappy/AUTHORS @@ -0,0 +1,15 @@ +# This is the official list of Snappy-Go authors for copyright purposes. +# This file is distinct from the CONTRIBUTORS files. +# See the latter for an explanation. + +# Names should be added to this file as +# Name or Organization +# The email address is not required for organizations. + +# Please keep the list sorted. + +Damian Gryski +Google Inc. +Jan Mercl <0xjnml@gmail.com> +Rodolfo Carvalho +Sebastien Binet diff --git a/vendor/github.com/golang/snappy/CONTRIBUTORS b/vendor/github.com/golang/snappy/CONTRIBUTORS new file mode 100644 index 00000000000..931ae31606f --- /dev/null +++ b/vendor/github.com/golang/snappy/CONTRIBUTORS @@ -0,0 +1,37 @@ +# This is the official list of people who can contribute +# (and typically have contributed) code to the Snappy-Go repository. +# The AUTHORS file lists the copyright holders; this file +# lists people. For example, Google employees are listed here +# but not in AUTHORS, because Google holds the copyright. +# +# The submission process automatically checks to make sure +# that people submitting code are listed in this file (by email address). +# +# Names should be added to this file only after verifying that +# the individual or the individual's organization has agreed to +# the appropriate Contributor License Agreement, found here: +# +# http://code.google.com/legal/individual-cla-v1.0.html +# http://code.google.com/legal/corporate-cla-v1.0.html +# +# The agreement for individuals can be filled out on the web. +# +# When adding J Random Contributor's name to this file, +# either J's name or J's organization's name should be +# added to the AUTHORS file, depending on whether the +# individual or corporate CLA was used. + +# Names should be added to this file like so: +# Name + +# Please keep the list sorted. + +Damian Gryski +Jan Mercl <0xjnml@gmail.com> +Kai Backman +Marc-Antoine Ruel +Nigel Tao +Rob Pike +Rodolfo Carvalho +Russ Cox +Sebastien Binet diff --git a/vendor/github.com/golang/snappy/LICENSE b/vendor/github.com/golang/snappy/LICENSE new file mode 100644 index 00000000000..6050c10f4c8 --- /dev/null +++ b/vendor/github.com/golang/snappy/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2011 The Snappy-Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/golang/snappy/README b/vendor/github.com/golang/snappy/README new file mode 100644 index 00000000000..cea12879a0e --- /dev/null +++ b/vendor/github.com/golang/snappy/README @@ -0,0 +1,107 @@ +The Snappy compression format in the Go programming language. + +To download and install from source: +$ go get github.com/golang/snappy + +Unless otherwise noted, the Snappy-Go source files are distributed +under the BSD-style license found in the LICENSE file. + + + +Benchmarks. + +The golang/snappy benchmarks include compressing (Z) and decompressing (U) ten +or so files, the same set used by the C++ Snappy code (github.com/google/snappy +and note the "google", not "golang"). On an "Intel(R) Core(TM) i7-3770 CPU @ +3.40GHz", Go's GOARCH=amd64 numbers as of 2016-05-29: + +"go test -test.bench=." + +_UFlat0-8 2.19GB/s ± 0% html +_UFlat1-8 1.41GB/s ± 0% urls +_UFlat2-8 23.5GB/s ± 2% jpg +_UFlat3-8 1.91GB/s ± 0% jpg_200 +_UFlat4-8 14.0GB/s ± 1% pdf +_UFlat5-8 1.97GB/s ± 0% html4 +_UFlat6-8 814MB/s ± 0% txt1 +_UFlat7-8 785MB/s ± 0% txt2 +_UFlat8-8 857MB/s ± 0% txt3 +_UFlat9-8 719MB/s ± 1% txt4 +_UFlat10-8 2.84GB/s ± 0% pb +_UFlat11-8 1.05GB/s ± 0% gaviota + +_ZFlat0-8 1.04GB/s ± 0% html +_ZFlat1-8 534MB/s ± 0% urls +_ZFlat2-8 15.7GB/s ± 1% jpg +_ZFlat3-8 740MB/s ± 3% jpg_200 +_ZFlat4-8 9.20GB/s ± 1% pdf +_ZFlat5-8 991MB/s ± 0% html4 +_ZFlat6-8 379MB/s ± 0% txt1 +_ZFlat7-8 352MB/s ± 0% txt2 +_ZFlat8-8 396MB/s ± 1% txt3 +_ZFlat9-8 327MB/s ± 1% txt4 +_ZFlat10-8 1.33GB/s ± 1% pb +_ZFlat11-8 605MB/s ± 1% gaviota + + + +"go test -test.bench=. -tags=noasm" + +_UFlat0-8 621MB/s ± 2% html +_UFlat1-8 494MB/s ± 1% urls +_UFlat2-8 23.2GB/s ± 1% jpg +_UFlat3-8 1.12GB/s ± 1% jpg_200 +_UFlat4-8 4.35GB/s ± 1% pdf +_UFlat5-8 609MB/s ± 0% html4 +_UFlat6-8 296MB/s ± 0% txt1 +_UFlat7-8 288MB/s ± 0% txt2 +_UFlat8-8 309MB/s ± 1% txt3 +_UFlat9-8 280MB/s ± 1% txt4 +_UFlat10-8 753MB/s ± 0% pb +_UFlat11-8 400MB/s ± 0% gaviota + +_ZFlat0-8 409MB/s ± 1% html +_ZFlat1-8 250MB/s ± 1% urls +_ZFlat2-8 12.3GB/s ± 1% jpg +_ZFlat3-8 132MB/s ± 0% jpg_200 +_ZFlat4-8 2.92GB/s ± 0% pdf +_ZFlat5-8 405MB/s ± 1% html4 +_ZFlat6-8 179MB/s ± 1% txt1 +_ZFlat7-8 170MB/s ± 1% txt2 +_ZFlat8-8 189MB/s ± 1% txt3 +_ZFlat9-8 164MB/s ± 1% txt4 +_ZFlat10-8 479MB/s ± 1% pb +_ZFlat11-8 270MB/s ± 1% gaviota + + + +For comparison (Go's encoded output is byte-for-byte identical to C++'s), here +are the numbers from C++ Snappy's + +make CXXFLAGS="-O2 -DNDEBUG -g" clean snappy_unittest.log && cat snappy_unittest.log + +BM_UFlat/0 2.4GB/s html +BM_UFlat/1 1.4GB/s urls +BM_UFlat/2 21.8GB/s jpg +BM_UFlat/3 1.5GB/s jpg_200 +BM_UFlat/4 13.3GB/s pdf +BM_UFlat/5 2.1GB/s html4 +BM_UFlat/6 1.0GB/s txt1 +BM_UFlat/7 959.4MB/s txt2 +BM_UFlat/8 1.0GB/s txt3 +BM_UFlat/9 864.5MB/s txt4 +BM_UFlat/10 2.9GB/s pb +BM_UFlat/11 1.2GB/s gaviota + +BM_ZFlat/0 944.3MB/s html (22.31 %) +BM_ZFlat/1 501.6MB/s urls (47.78 %) +BM_ZFlat/2 14.3GB/s jpg (99.95 %) +BM_ZFlat/3 538.3MB/s jpg_200 (73.00 %) +BM_ZFlat/4 8.3GB/s pdf (83.30 %) +BM_ZFlat/5 903.5MB/s html4 (22.52 %) +BM_ZFlat/6 336.0MB/s txt1 (57.88 %) +BM_ZFlat/7 312.3MB/s txt2 (61.91 %) +BM_ZFlat/8 353.1MB/s txt3 (54.99 %) +BM_ZFlat/9 289.9MB/s txt4 (66.26 %) +BM_ZFlat/10 1.2GB/s pb (19.68 %) +BM_ZFlat/11 527.4MB/s gaviota (37.72 %) diff --git a/vendor/github.com/golang/snappy/decode.go b/vendor/github.com/golang/snappy/decode.go new file mode 100644 index 00000000000..72efb0353dd --- /dev/null +++ b/vendor/github.com/golang/snappy/decode.go @@ -0,0 +1,237 @@ +// Copyright 2011 The Snappy-Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package snappy + +import ( + "encoding/binary" + "errors" + "io" +) + +var ( + // ErrCorrupt reports that the input is invalid. + ErrCorrupt = errors.New("snappy: corrupt input") + // ErrTooLarge reports that the uncompressed length is too large. + ErrTooLarge = errors.New("snappy: decoded block is too large") + // ErrUnsupported reports that the input isn't supported. + ErrUnsupported = errors.New("snappy: unsupported input") + + errUnsupportedLiteralLength = errors.New("snappy: unsupported literal length") +) + +// DecodedLen returns the length of the decoded block. +func DecodedLen(src []byte) (int, error) { + v, _, err := decodedLen(src) + return v, err +} + +// decodedLen returns the length of the decoded block and the number of bytes +// that the length header occupied. +func decodedLen(src []byte) (blockLen, headerLen int, err error) { + v, n := binary.Uvarint(src) + if n <= 0 || v > 0xffffffff { + return 0, 0, ErrCorrupt + } + + const wordSize = 32 << (^uint(0) >> 32 & 1) + if wordSize == 32 && v > 0x7fffffff { + return 0, 0, ErrTooLarge + } + return int(v), n, nil +} + +const ( + decodeErrCodeCorrupt = 1 + decodeErrCodeUnsupportedLiteralLength = 2 +) + +// Decode returns the decoded form of src. The returned slice may be a sub- +// slice of dst if dst was large enough to hold the entire decoded block. +// Otherwise, a newly allocated slice will be returned. +// +// The dst and src must not overlap. It is valid to pass a nil dst. +func Decode(dst, src []byte) ([]byte, error) { + dLen, s, err := decodedLen(src) + if err != nil { + return nil, err + } + if dLen <= len(dst) { + dst = dst[:dLen] + } else { + dst = make([]byte, dLen) + } + switch decode(dst, src[s:]) { + case 0: + return dst, nil + case decodeErrCodeUnsupportedLiteralLength: + return nil, errUnsupportedLiteralLength + } + return nil, ErrCorrupt +} + +// NewReader returns a new Reader that decompresses from r, using the framing +// format described at +// https://github.com/google/snappy/blob/master/framing_format.txt +func NewReader(r io.Reader) *Reader { + return &Reader{ + r: r, + decoded: make([]byte, maxBlockSize), + buf: make([]byte, maxEncodedLenOfMaxBlockSize+checksumSize), + } +} + +// Reader is an io.Reader that can read Snappy-compressed bytes. +type Reader struct { + r io.Reader + err error + decoded []byte + buf []byte + // decoded[i:j] contains decoded bytes that have not yet been passed on. + i, j int + readHeader bool +} + +// Reset discards any buffered data, resets all state, and switches the Snappy +// reader to read from r. This permits reusing a Reader rather than allocating +// a new one. +func (r *Reader) Reset(reader io.Reader) { + r.r = reader + r.err = nil + r.i = 0 + r.j = 0 + r.readHeader = false +} + +func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) { + if _, r.err = io.ReadFull(r.r, p); r.err != nil { + if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) { + r.err = ErrCorrupt + } + return false + } + return true +} + +// Read satisfies the io.Reader interface. +func (r *Reader) Read(p []byte) (int, error) { + if r.err != nil { + return 0, r.err + } + for { + if r.i < r.j { + n := copy(p, r.decoded[r.i:r.j]) + r.i += n + return n, nil + } + if !r.readFull(r.buf[:4], true) { + return 0, r.err + } + chunkType := r.buf[0] + if !r.readHeader { + if chunkType != chunkTypeStreamIdentifier { + r.err = ErrCorrupt + return 0, r.err + } + r.readHeader = true + } + chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16 + if chunkLen > len(r.buf) { + r.err = ErrUnsupported + return 0, r.err + } + + // The chunk types are specified at + // https://github.com/google/snappy/blob/master/framing_format.txt + switch chunkType { + case chunkTypeCompressedData: + // Section 4.2. Compressed data (chunk type 0x00). + if chunkLen < checksumSize { + r.err = ErrCorrupt + return 0, r.err + } + buf := r.buf[:chunkLen] + if !r.readFull(buf, false) { + return 0, r.err + } + checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24 + buf = buf[checksumSize:] + + n, err := DecodedLen(buf) + if err != nil { + r.err = err + return 0, r.err + } + if n > len(r.decoded) { + r.err = ErrCorrupt + return 0, r.err + } + if _, err := Decode(r.decoded, buf); err != nil { + r.err = err + return 0, r.err + } + if crc(r.decoded[:n]) != checksum { + r.err = ErrCorrupt + return 0, r.err + } + r.i, r.j = 0, n + continue + + case chunkTypeUncompressedData: + // Section 4.3. Uncompressed data (chunk type 0x01). + if chunkLen < checksumSize { + r.err = ErrCorrupt + return 0, r.err + } + buf := r.buf[:checksumSize] + if !r.readFull(buf, false) { + return 0, r.err + } + checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24 + // Read directly into r.decoded instead of via r.buf. + n := chunkLen - checksumSize + if n > len(r.decoded) { + r.err = ErrCorrupt + return 0, r.err + } + if !r.readFull(r.decoded[:n], false) { + return 0, r.err + } + if crc(r.decoded[:n]) != checksum { + r.err = ErrCorrupt + return 0, r.err + } + r.i, r.j = 0, n + continue + + case chunkTypeStreamIdentifier: + // Section 4.1. Stream identifier (chunk type 0xff). + if chunkLen != len(magicBody) { + r.err = ErrCorrupt + return 0, r.err + } + if !r.readFull(r.buf[:len(magicBody)], false) { + return 0, r.err + } + for i := 0; i < len(magicBody); i++ { + if r.buf[i] != magicBody[i] { + r.err = ErrCorrupt + return 0, r.err + } + } + continue + } + + if chunkType <= 0x7f { + // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f). + r.err = ErrUnsupported + return 0, r.err + } + // Section 4.4 Padding (chunk type 0xfe). + // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). + if !r.readFull(r.buf[:chunkLen], false) { + return 0, r.err + } + } +} diff --git a/vendor/github.com/golang/snappy/decode_amd64.go b/vendor/github.com/golang/snappy/decode_amd64.go new file mode 100644 index 00000000000..fcd192b849e --- /dev/null +++ b/vendor/github.com/golang/snappy/decode_amd64.go @@ -0,0 +1,14 @@ +// Copyright 2016 The Snappy-Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !appengine +// +build gc +// +build !noasm + +package snappy + +// decode has the same semantics as in decode_other.go. +// +//go:noescape +func decode(dst, src []byte) int diff --git a/vendor/github.com/golang/snappy/decode_amd64.s b/vendor/github.com/golang/snappy/decode_amd64.s new file mode 100644 index 00000000000..e6179f65e35 --- /dev/null +++ b/vendor/github.com/golang/snappy/decode_amd64.s @@ -0,0 +1,490 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !appengine +// +build gc +// +build !noasm + +#include "textflag.h" + +// The asm code generally follows the pure Go code in decode_other.go, except +// where marked with a "!!!". + +// func decode(dst, src []byte) int +// +// All local variables fit into registers. The non-zero stack size is only to +// spill registers and push args when issuing a CALL. The register allocation: +// - AX scratch +// - BX scratch +// - CX length or x +// - DX offset +// - SI &src[s] +// - DI &dst[d] +// + R8 dst_base +// + R9 dst_len +// + R10 dst_base + dst_len +// + R11 src_base +// + R12 src_len +// + R13 src_base + src_len +// - R14 used by doCopy +// - R15 used by doCopy +// +// The registers R8-R13 (marked with a "+") are set at the start of the +// function, and after a CALL returns, and are not otherwise modified. +// +// The d variable is implicitly DI - R8, and len(dst)-d is R10 - DI. +// The s variable is implicitly SI - R11, and len(src)-s is R13 - SI. +TEXT ·decode(SB), NOSPLIT, $48-56 + // Initialize SI, DI and R8-R13. + MOVQ dst_base+0(FP), R8 + MOVQ dst_len+8(FP), R9 + MOVQ R8, DI + MOVQ R8, R10 + ADDQ R9, R10 + MOVQ src_base+24(FP), R11 + MOVQ src_len+32(FP), R12 + MOVQ R11, SI + MOVQ R11, R13 + ADDQ R12, R13 + +loop: + // for s < len(src) + CMPQ SI, R13 + JEQ end + + // CX = uint32(src[s]) + // + // switch src[s] & 0x03 + MOVBLZX (SI), CX + MOVL CX, BX + ANDL $3, BX + CMPL BX, $1 + JAE tagCopy + + // ---------------------------------------- + // The code below handles literal tags. + + // case tagLiteral: + // x := uint32(src[s] >> 2) + // switch + SHRL $2, CX + CMPL CX, $60 + JAE tagLit60Plus + + // case x < 60: + // s++ + INCQ SI + +doLit: + // This is the end of the inner "switch", when we have a literal tag. + // + // We assume that CX == x and x fits in a uint32, where x is the variable + // used in the pure Go decode_other.go code. + + // length = int(x) + 1 + // + // Unlike the pure Go code, we don't need to check if length <= 0 because + // CX can hold 64 bits, so the increment cannot overflow. + INCQ CX + + // Prepare to check if copying length bytes will run past the end of dst or + // src. + // + // AX = len(dst) - d + // BX = len(src) - s + MOVQ R10, AX + SUBQ DI, AX + MOVQ R13, BX + SUBQ SI, BX + + // !!! Try a faster technique for short (16 or fewer bytes) copies. + // + // if length > 16 || len(dst)-d < 16 || len(src)-s < 16 { + // goto callMemmove // Fall back on calling runtime·memmove. + // } + // + // The C++ snappy code calls this TryFastAppend. It also checks len(src)-s + // against 21 instead of 16, because it cannot assume that all of its input + // is contiguous in memory and so it needs to leave enough source bytes to + // read the next tag without refilling buffers, but Go's Decode assumes + // contiguousness (the src argument is a []byte). + CMPQ CX, $16 + JGT callMemmove + CMPQ AX, $16 + JLT callMemmove + CMPQ BX, $16 + JLT callMemmove + + // !!! Implement the copy from src to dst as a 16-byte load and store. + // (Decode's documentation says that dst and src must not overlap.) + // + // This always copies 16 bytes, instead of only length bytes, but that's + // OK. If the input is a valid Snappy encoding then subsequent iterations + // will fix up the overrun. Otherwise, Decode returns a nil []byte (and a + // non-nil error), so the overrun will be ignored. + // + // Note that on amd64, it is legal and cheap to issue unaligned 8-byte or + // 16-byte loads and stores. This technique probably wouldn't be as + // effective on architectures that are fussier about alignment. + MOVOU 0(SI), X0 + MOVOU X0, 0(DI) + + // d += length + // s += length + ADDQ CX, DI + ADDQ CX, SI + JMP loop + +callMemmove: + // if length > len(dst)-d || length > len(src)-s { etc } + CMPQ CX, AX + JGT errCorrupt + CMPQ CX, BX + JGT errCorrupt + + // copy(dst[d:], src[s:s+length]) + // + // This means calling runtime·memmove(&dst[d], &src[s], length), so we push + // DI, SI and CX as arguments. Coincidentally, we also need to spill those + // three registers to the stack, to save local variables across the CALL. + MOVQ DI, 0(SP) + MOVQ SI, 8(SP) + MOVQ CX, 16(SP) + MOVQ DI, 24(SP) + MOVQ SI, 32(SP) + MOVQ CX, 40(SP) + CALL runtime·memmove(SB) + + // Restore local variables: unspill registers from the stack and + // re-calculate R8-R13. + MOVQ 24(SP), DI + MOVQ 32(SP), SI + MOVQ 40(SP), CX + MOVQ dst_base+0(FP), R8 + MOVQ dst_len+8(FP), R9 + MOVQ R8, R10 + ADDQ R9, R10 + MOVQ src_base+24(FP), R11 + MOVQ src_len+32(FP), R12 + MOVQ R11, R13 + ADDQ R12, R13 + + // d += length + // s += length + ADDQ CX, DI + ADDQ CX, SI + JMP loop + +tagLit60Plus: + // !!! This fragment does the + // + // s += x - 58; if uint(s) > uint(len(src)) { etc } + // + // checks. In the asm version, we code it once instead of once per switch case. + ADDQ CX, SI + SUBQ $58, SI + MOVQ SI, BX + SUBQ R11, BX + CMPQ BX, R12 + JA errCorrupt + + // case x == 60: + CMPL CX, $61 + JEQ tagLit61 + JA tagLit62Plus + + // x = uint32(src[s-1]) + MOVBLZX -1(SI), CX + JMP doLit + +tagLit61: + // case x == 61: + // x = uint32(src[s-2]) | uint32(src[s-1])<<8 + MOVWLZX -2(SI), CX + JMP doLit + +tagLit62Plus: + CMPL CX, $62 + JA tagLit63 + + // case x == 62: + // x = uint32(src[s-3]) | uint32(src[s-2])<<8 | uint32(src[s-1])<<16 + MOVWLZX -3(SI), CX + MOVBLZX -1(SI), BX + SHLL $16, BX + ORL BX, CX + JMP doLit + +tagLit63: + // case x == 63: + // x = uint32(src[s-4]) | uint32(src[s-3])<<8 | uint32(src[s-2])<<16 | uint32(src[s-1])<<24 + MOVL -4(SI), CX + JMP doLit + +// The code above handles literal tags. +// ---------------------------------------- +// The code below handles copy tags. + +tagCopy4: + // case tagCopy4: + // s += 5 + ADDQ $5, SI + + // if uint(s) > uint(len(src)) { etc } + MOVQ SI, BX + SUBQ R11, BX + CMPQ BX, R12 + JA errCorrupt + + // length = 1 + int(src[s-5])>>2 + SHRQ $2, CX + INCQ CX + + // offset = int(uint32(src[s-4]) | uint32(src[s-3])<<8 | uint32(src[s-2])<<16 | uint32(src[s-1])<<24) + MOVLQZX -4(SI), DX + JMP doCopy + +tagCopy2: + // case tagCopy2: + // s += 3 + ADDQ $3, SI + + // if uint(s) > uint(len(src)) { etc } + MOVQ SI, BX + SUBQ R11, BX + CMPQ BX, R12 + JA errCorrupt + + // length = 1 + int(src[s-3])>>2 + SHRQ $2, CX + INCQ CX + + // offset = int(uint32(src[s-2]) | uint32(src[s-1])<<8) + MOVWQZX -2(SI), DX + JMP doCopy + +tagCopy: + // We have a copy tag. We assume that: + // - BX == src[s] & 0x03 + // - CX == src[s] + CMPQ BX, $2 + JEQ tagCopy2 + JA tagCopy4 + + // case tagCopy1: + // s += 2 + ADDQ $2, SI + + // if uint(s) > uint(len(src)) { etc } + MOVQ SI, BX + SUBQ R11, BX + CMPQ BX, R12 + JA errCorrupt + + // offset = int(uint32(src[s-2])&0xe0<<3 | uint32(src[s-1])) + MOVQ CX, DX + ANDQ $0xe0, DX + SHLQ $3, DX + MOVBQZX -1(SI), BX + ORQ BX, DX + + // length = 4 + int(src[s-2])>>2&0x7 + SHRQ $2, CX + ANDQ $7, CX + ADDQ $4, CX + +doCopy: + // This is the end of the outer "switch", when we have a copy tag. + // + // We assume that: + // - CX == length && CX > 0 + // - DX == offset + + // if offset <= 0 { etc } + CMPQ DX, $0 + JLE errCorrupt + + // if d < offset { etc } + MOVQ DI, BX + SUBQ R8, BX + CMPQ BX, DX + JLT errCorrupt + + // if length > len(dst)-d { etc } + MOVQ R10, BX + SUBQ DI, BX + CMPQ CX, BX + JGT errCorrupt + + // forwardCopy(dst[d:d+length], dst[d-offset:]); d += length + // + // Set: + // - R14 = len(dst)-d + // - R15 = &dst[d-offset] + MOVQ R10, R14 + SUBQ DI, R14 + MOVQ DI, R15 + SUBQ DX, R15 + + // !!! Try a faster technique for short (16 or fewer bytes) forward copies. + // + // First, try using two 8-byte load/stores, similar to the doLit technique + // above. Even if dst[d:d+length] and dst[d-offset:] can overlap, this is + // still OK if offset >= 8. Note that this has to be two 8-byte load/stores + // and not one 16-byte load/store, and the first store has to be before the + // second load, due to the overlap if offset is in the range [8, 16). + // + // if length > 16 || offset < 8 || len(dst)-d < 16 { + // goto slowForwardCopy + // } + // copy 16 bytes + // d += length + CMPQ CX, $16 + JGT slowForwardCopy + CMPQ DX, $8 + JLT slowForwardCopy + CMPQ R14, $16 + JLT slowForwardCopy + MOVQ 0(R15), AX + MOVQ AX, 0(DI) + MOVQ 8(R15), BX + MOVQ BX, 8(DI) + ADDQ CX, DI + JMP loop + +slowForwardCopy: + // !!! If the forward copy is longer than 16 bytes, or if offset < 8, we + // can still try 8-byte load stores, provided we can overrun up to 10 extra + // bytes. As above, the overrun will be fixed up by subsequent iterations + // of the outermost loop. + // + // The C++ snappy code calls this technique IncrementalCopyFastPath. Its + // commentary says: + // + // ---- + // + // The main part of this loop is a simple copy of eight bytes at a time + // until we've copied (at least) the requested amount of bytes. However, + // if d and d-offset are less than eight bytes apart (indicating a + // repeating pattern of length < 8), we first need to expand the pattern in + // order to get the correct results. For instance, if the buffer looks like + // this, with the eight-byte and patterns marked as + // intervals: + // + // abxxxxxxxxxxxx + // [------] d-offset + // [------] d + // + // a single eight-byte copy from to will repeat the pattern + // once, after which we can move two bytes without moving : + // + // ababxxxxxxxxxx + // [------] d-offset + // [------] d + // + // and repeat the exercise until the two no longer overlap. + // + // This allows us to do very well in the special case of one single byte + // repeated many times, without taking a big hit for more general cases. + // + // The worst case of extra writing past the end of the match occurs when + // offset == 1 and length == 1; the last copy will read from byte positions + // [0..7] and write to [4..11], whereas it was only supposed to write to + // position 1. Thus, ten excess bytes. + // + // ---- + // + // That "10 byte overrun" worst case is confirmed by Go's + // TestSlowForwardCopyOverrun, which also tests the fixUpSlowForwardCopy + // and finishSlowForwardCopy algorithm. + // + // if length > len(dst)-d-10 { + // goto verySlowForwardCopy + // } + SUBQ $10, R14 + CMPQ CX, R14 + JGT verySlowForwardCopy + +makeOffsetAtLeast8: + // !!! As above, expand the pattern so that offset >= 8 and we can use + // 8-byte load/stores. + // + // for offset < 8 { + // copy 8 bytes from dst[d-offset:] to dst[d:] + // length -= offset + // d += offset + // offset += offset + // // The two previous lines together means that d-offset, and therefore + // // R15, is unchanged. + // } + CMPQ DX, $8 + JGE fixUpSlowForwardCopy + MOVQ (R15), BX + MOVQ BX, (DI) + SUBQ DX, CX + ADDQ DX, DI + ADDQ DX, DX + JMP makeOffsetAtLeast8 + +fixUpSlowForwardCopy: + // !!! Add length (which might be negative now) to d (implied by DI being + // &dst[d]) so that d ends up at the right place when we jump back to the + // top of the loop. Before we do that, though, we save DI to AX so that, if + // length is positive, copying the remaining length bytes will write to the + // right place. + MOVQ DI, AX + ADDQ CX, DI + +finishSlowForwardCopy: + // !!! Repeat 8-byte load/stores until length <= 0. Ending with a negative + // length means that we overrun, but as above, that will be fixed up by + // subsequent iterations of the outermost loop. + CMPQ CX, $0 + JLE loop + MOVQ (R15), BX + MOVQ BX, (AX) + ADDQ $8, R15 + ADDQ $8, AX + SUBQ $8, CX + JMP finishSlowForwardCopy + +verySlowForwardCopy: + // verySlowForwardCopy is a simple implementation of forward copy. In C + // parlance, this is a do/while loop instead of a while loop, since we know + // that length > 0. In Go syntax: + // + // for { + // dst[d] = dst[d - offset] + // d++ + // length-- + // if length == 0 { + // break + // } + // } + MOVB (R15), BX + MOVB BX, (DI) + INCQ R15 + INCQ DI + DECQ CX + JNZ verySlowForwardCopy + JMP loop + +// The code above handles copy tags. +// ---------------------------------------- + +end: + // This is the end of the "for s < len(src)". + // + // if d != len(dst) { etc } + CMPQ DI, R10 + JNE errCorrupt + + // return 0 + MOVQ $0, ret+48(FP) + RET + +errCorrupt: + // return decodeErrCodeCorrupt + MOVQ $1, ret+48(FP) + RET diff --git a/vendor/github.com/golang/snappy/decode_other.go b/vendor/github.com/golang/snappy/decode_other.go new file mode 100644 index 00000000000..8c9f2049bc7 --- /dev/null +++ b/vendor/github.com/golang/snappy/decode_other.go @@ -0,0 +1,101 @@ +// Copyright 2016 The Snappy-Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !amd64 appengine !gc noasm + +package snappy + +// decode writes the decoding of src to dst. It assumes that the varint-encoded +// length of the decompressed bytes has already been read, and that len(dst) +// equals that length. +// +// It returns 0 on success or a decodeErrCodeXxx error code on failure. +func decode(dst, src []byte) int { + var d, s, offset, length int + for s < len(src) { + switch src[s] & 0x03 { + case tagLiteral: + x := uint32(src[s] >> 2) + switch { + case x < 60: + s++ + case x == 60: + s += 2 + if uint(s) > uint(len(src)) { // The uint conversions catch overflow from the previous line. + return decodeErrCodeCorrupt + } + x = uint32(src[s-1]) + case x == 61: + s += 3 + if uint(s) > uint(len(src)) { // The uint conversions catch overflow from the previous line. + return decodeErrCodeCorrupt + } + x = uint32(src[s-2]) | uint32(src[s-1])<<8 + case x == 62: + s += 4 + if uint(s) > uint(len(src)) { // The uint conversions catch overflow from the previous line. + return decodeErrCodeCorrupt + } + x = uint32(src[s-3]) | uint32(src[s-2])<<8 | uint32(src[s-1])<<16 + case x == 63: + s += 5 + if uint(s) > uint(len(src)) { // The uint conversions catch overflow from the previous line. + return decodeErrCodeCorrupt + } + x = uint32(src[s-4]) | uint32(src[s-3])<<8 | uint32(src[s-2])<<16 | uint32(src[s-1])<<24 + } + length = int(x) + 1 + if length <= 0 { + return decodeErrCodeUnsupportedLiteralLength + } + if length > len(dst)-d || length > len(src)-s { + return decodeErrCodeCorrupt + } + copy(dst[d:], src[s:s+length]) + d += length + s += length + continue + + case tagCopy1: + s += 2 + if uint(s) > uint(len(src)) { // The uint conversions catch overflow from the previous line. + return decodeErrCodeCorrupt + } + length = 4 + int(src[s-2])>>2&0x7 + offset = int(uint32(src[s-2])&0xe0<<3 | uint32(src[s-1])) + + case tagCopy2: + s += 3 + if uint(s) > uint(len(src)) { // The uint conversions catch overflow from the previous line. + return decodeErrCodeCorrupt + } + length = 1 + int(src[s-3])>>2 + offset = int(uint32(src[s-2]) | uint32(src[s-1])<<8) + + case tagCopy4: + s += 5 + if uint(s) > uint(len(src)) { // The uint conversions catch overflow from the previous line. + return decodeErrCodeCorrupt + } + length = 1 + int(src[s-5])>>2 + offset = int(uint32(src[s-4]) | uint32(src[s-3])<<8 | uint32(src[s-2])<<16 | uint32(src[s-1])<<24) + } + + if offset <= 0 || d < offset || length > len(dst)-d { + return decodeErrCodeCorrupt + } + // Copy from an earlier sub-slice of dst to a later sub-slice. Unlike + // the built-in copy function, this byte-by-byte copy always runs + // forwards, even if the slices overlap. Conceptually, this is: + // + // d += forwardCopy(dst[d:d+length], dst[d-offset:]) + for end := d + length; d != end; d++ { + dst[d] = dst[d-offset] + } + } + if d != len(dst) { + return decodeErrCodeCorrupt + } + return 0 +} diff --git a/vendor/github.com/golang/snappy/encode.go b/vendor/github.com/golang/snappy/encode.go new file mode 100644 index 00000000000..87496890609 --- /dev/null +++ b/vendor/github.com/golang/snappy/encode.go @@ -0,0 +1,285 @@ +// Copyright 2011 The Snappy-Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package snappy + +import ( + "encoding/binary" + "errors" + "io" +) + +// Encode returns the encoded form of src. The returned slice may be a sub- +// slice of dst if dst was large enough to hold the entire encoded block. +// Otherwise, a newly allocated slice will be returned. +// +// The dst and src must not overlap. It is valid to pass a nil dst. +func Encode(dst, src []byte) []byte { + if n := MaxEncodedLen(len(src)); n < 0 { + panic(ErrTooLarge) + } else if len(dst) < n { + dst = make([]byte, n) + } + + // The block starts with the varint-encoded length of the decompressed bytes. + d := binary.PutUvarint(dst, uint64(len(src))) + + for len(src) > 0 { + p := src + src = nil + if len(p) > maxBlockSize { + p, src = p[:maxBlockSize], p[maxBlockSize:] + } + if len(p) < minNonLiteralBlockSize { + d += emitLiteral(dst[d:], p) + } else { + d += encodeBlock(dst[d:], p) + } + } + return dst[:d] +} + +// inputMargin is the minimum number of extra input bytes to keep, inside +// encodeBlock's inner loop. On some architectures, this margin lets us +// implement a fast path for emitLiteral, where the copy of short (<= 16 byte) +// literals can be implemented as a single load to and store from a 16-byte +// register. That literal's actual length can be as short as 1 byte, so this +// can copy up to 15 bytes too much, but that's OK as subsequent iterations of +// the encoding loop will fix up the copy overrun, and this inputMargin ensures +// that we don't overrun the dst and src buffers. +const inputMargin = 16 - 1 + +// minNonLiteralBlockSize is the minimum size of the input to encodeBlock that +// could be encoded with a copy tag. This is the minimum with respect to the +// algorithm used by encodeBlock, not a minimum enforced by the file format. +// +// The encoded output must start with at least a 1 byte literal, as there are +// no previous bytes to copy. A minimal (1 byte) copy after that, generated +// from an emitCopy call in encodeBlock's main loop, would require at least +// another inputMargin bytes, for the reason above: we want any emitLiteral +// calls inside encodeBlock's main loop to use the fast path if possible, which +// requires being able to overrun by inputMargin bytes. Thus, +// minNonLiteralBlockSize equals 1 + 1 + inputMargin. +// +// The C++ code doesn't use this exact threshold, but it could, as discussed at +// https://groups.google.com/d/topic/snappy-compression/oGbhsdIJSJ8/discussion +// The difference between Go (2+inputMargin) and C++ (inputMargin) is purely an +// optimization. It should not affect the encoded form. This is tested by +// TestSameEncodingAsCppShortCopies. +const minNonLiteralBlockSize = 1 + 1 + inputMargin + +// MaxEncodedLen returns the maximum length of a snappy block, given its +// uncompressed length. +// +// It will return a negative value if srcLen is too large to encode. +func MaxEncodedLen(srcLen int) int { + n := uint64(srcLen) + if n > 0xffffffff { + return -1 + } + // Compressed data can be defined as: + // compressed := item* literal* + // item := literal* copy + // + // The trailing literal sequence has a space blowup of at most 62/60 + // since a literal of length 60 needs one tag byte + one extra byte + // for length information. + // + // Item blowup is trickier to measure. Suppose the "copy" op copies + // 4 bytes of data. Because of a special check in the encoding code, + // we produce a 4-byte copy only if the offset is < 65536. Therefore + // the copy op takes 3 bytes to encode, and this type of item leads + // to at most the 62/60 blowup for representing literals. + // + // Suppose the "copy" op copies 5 bytes of data. If the offset is big + // enough, it will take 5 bytes to encode the copy op. Therefore the + // worst case here is a one-byte literal followed by a five-byte copy. + // That is, 6 bytes of input turn into 7 bytes of "compressed" data. + // + // This last factor dominates the blowup, so the final estimate is: + n = 32 + n + n/6 + if n > 0xffffffff { + return -1 + } + return int(n) +} + +var errClosed = errors.New("snappy: Writer is closed") + +// NewWriter returns a new Writer that compresses to w. +// +// The Writer returned does not buffer writes. There is no need to Flush or +// Close such a Writer. +// +// Deprecated: the Writer returned is not suitable for many small writes, only +// for few large writes. Use NewBufferedWriter instead, which is efficient +// regardless of the frequency and shape of the writes, and remember to Close +// that Writer when done. +func NewWriter(w io.Writer) *Writer { + return &Writer{ + w: w, + obuf: make([]byte, obufLen), + } +} + +// NewBufferedWriter returns a new Writer that compresses to w, using the +// framing format described at +// https://github.com/google/snappy/blob/master/framing_format.txt +// +// The Writer returned buffers writes. Users must call Close to guarantee all +// data has been forwarded to the underlying io.Writer. They may also call +// Flush zero or more times before calling Close. +func NewBufferedWriter(w io.Writer) *Writer { + return &Writer{ + w: w, + ibuf: make([]byte, 0, maxBlockSize), + obuf: make([]byte, obufLen), + } +} + +// Writer is an io.Writer than can write Snappy-compressed bytes. +type Writer struct { + w io.Writer + err error + + // ibuf is a buffer for the incoming (uncompressed) bytes. + // + // Its use is optional. For backwards compatibility, Writers created by the + // NewWriter function have ibuf == nil, do not buffer incoming bytes, and + // therefore do not need to be Flush'ed or Close'd. + ibuf []byte + + // obuf is a buffer for the outgoing (compressed) bytes. + obuf []byte + + // wroteStreamHeader is whether we have written the stream header. + wroteStreamHeader bool +} + +// Reset discards the writer's state and switches the Snappy writer to write to +// w. This permits reusing a Writer rather than allocating a new one. +func (w *Writer) Reset(writer io.Writer) { + w.w = writer + w.err = nil + if w.ibuf != nil { + w.ibuf = w.ibuf[:0] + } + w.wroteStreamHeader = false +} + +// Write satisfies the io.Writer interface. +func (w *Writer) Write(p []byte) (nRet int, errRet error) { + if w.ibuf == nil { + // Do not buffer incoming bytes. This does not perform or compress well + // if the caller of Writer.Write writes many small slices. This + // behavior is therefore deprecated, but still supported for backwards + // compatibility with code that doesn't explicitly Flush or Close. + return w.write(p) + } + + // The remainder of this method is based on bufio.Writer.Write from the + // standard library. + + for len(p) > (cap(w.ibuf)-len(w.ibuf)) && w.err == nil { + var n int + if len(w.ibuf) == 0 { + // Large write, empty buffer. + // Write directly from p to avoid copy. + n, _ = w.write(p) + } else { + n = copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p) + w.ibuf = w.ibuf[:len(w.ibuf)+n] + w.Flush() + } + nRet += n + p = p[n:] + } + if w.err != nil { + return nRet, w.err + } + n := copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p) + w.ibuf = w.ibuf[:len(w.ibuf)+n] + nRet += n + return nRet, nil +} + +func (w *Writer) write(p []byte) (nRet int, errRet error) { + if w.err != nil { + return 0, w.err + } + for len(p) > 0 { + obufStart := len(magicChunk) + if !w.wroteStreamHeader { + w.wroteStreamHeader = true + copy(w.obuf, magicChunk) + obufStart = 0 + } + + var uncompressed []byte + if len(p) > maxBlockSize { + uncompressed, p = p[:maxBlockSize], p[maxBlockSize:] + } else { + uncompressed, p = p, nil + } + checksum := crc(uncompressed) + + // Compress the buffer, discarding the result if the improvement + // isn't at least 12.5%. + compressed := Encode(w.obuf[obufHeaderLen:], uncompressed) + chunkType := uint8(chunkTypeCompressedData) + chunkLen := 4 + len(compressed) + obufEnd := obufHeaderLen + len(compressed) + if len(compressed) >= len(uncompressed)-len(uncompressed)/8 { + chunkType = chunkTypeUncompressedData + chunkLen = 4 + len(uncompressed) + obufEnd = obufHeaderLen + } + + // Fill in the per-chunk header that comes before the body. + w.obuf[len(magicChunk)+0] = chunkType + w.obuf[len(magicChunk)+1] = uint8(chunkLen >> 0) + w.obuf[len(magicChunk)+2] = uint8(chunkLen >> 8) + w.obuf[len(magicChunk)+3] = uint8(chunkLen >> 16) + w.obuf[len(magicChunk)+4] = uint8(checksum >> 0) + w.obuf[len(magicChunk)+5] = uint8(checksum >> 8) + w.obuf[len(magicChunk)+6] = uint8(checksum >> 16) + w.obuf[len(magicChunk)+7] = uint8(checksum >> 24) + + if _, err := w.w.Write(w.obuf[obufStart:obufEnd]); err != nil { + w.err = err + return nRet, err + } + if chunkType == chunkTypeUncompressedData { + if _, err := w.w.Write(uncompressed); err != nil { + w.err = err + return nRet, err + } + } + nRet += len(uncompressed) + } + return nRet, nil +} + +// Flush flushes the Writer to its underlying io.Writer. +func (w *Writer) Flush() error { + if w.err != nil { + return w.err + } + if len(w.ibuf) == 0 { + return nil + } + w.write(w.ibuf) + w.ibuf = w.ibuf[:0] + return w.err +} + +// Close calls Flush and then closes the Writer. +func (w *Writer) Close() error { + w.Flush() + ret := w.err + if w.err == nil { + w.err = errClosed + } + return ret +} diff --git a/vendor/github.com/golang/snappy/encode_amd64.go b/vendor/github.com/golang/snappy/encode_amd64.go new file mode 100644 index 00000000000..2a56fb504c7 --- /dev/null +++ b/vendor/github.com/golang/snappy/encode_amd64.go @@ -0,0 +1,29 @@ +// Copyright 2016 The Snappy-Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !appengine +// +build gc +// +build !noasm + +package snappy + +// emitLiteral has the same semantics as in encode_other.go. +// +//go:noescape +func emitLiteral(dst, lit []byte) int + +// emitCopy has the same semantics as in encode_other.go. +// +//go:noescape +func emitCopy(dst []byte, offset, length int) int + +// extendMatch has the same semantics as in encode_other.go. +// +//go:noescape +func extendMatch(src []byte, i, j int) int + +// encodeBlock has the same semantics as in encode_other.go. +// +//go:noescape +func encodeBlock(dst, src []byte) (d int) \ No newline at end of file diff --git a/vendor/github.com/golang/snappy/encode_amd64.s b/vendor/github.com/golang/snappy/encode_amd64.s new file mode 100644 index 00000000000..adfd979fe27 --- /dev/null +++ b/vendor/github.com/golang/snappy/encode_amd64.s @@ -0,0 +1,730 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !appengine +// +build gc +// +build !noasm + +#include "textflag.h" + +// The XXX lines assemble on Go 1.4, 1.5 and 1.7, but not 1.6, due to a +// Go toolchain regression. See https://github.com/golang/go/issues/15426 and +// https://github.com/golang/snappy/issues/29 +// +// As a workaround, the package was built with a known good assembler, and +// those instructions were disassembled by "objdump -d" to yield the +// 4e 0f b7 7c 5c 78 movzwq 0x78(%rsp,%r11,2),%r15 +// style comments, in AT&T asm syntax. Note that rsp here is a physical +// register, not Go/asm's SP pseudo-register (see https://golang.org/doc/asm). +// The instructions were then encoded as "BYTE $0x.." sequences, which assemble +// fine on Go 1.6. + +// The asm code generally follows the pure Go code in encode_other.go, except +// where marked with a "!!!". + +// ---------------------------------------------------------------------------- + +// func emitLiteral(dst, lit []byte) int +// +// All local variables fit into registers. The register allocation: +// - AX len(lit) +// - BX n +// - DX return value +// - DI &dst[i] +// - R10 &lit[0] +// +// The 24 bytes of stack space is to call runtime·memmove. +// +// The unusual register allocation of local variables, such as R10 for the +// source pointer, matches the allocation used at the call site in encodeBlock, +// which makes it easier to manually inline this function. +TEXT ·emitLiteral(SB), NOSPLIT, $24-56 + MOVQ dst_base+0(FP), DI + MOVQ lit_base+24(FP), R10 + MOVQ lit_len+32(FP), AX + MOVQ AX, DX + MOVL AX, BX + SUBL $1, BX + + CMPL BX, $60 + JLT oneByte + CMPL BX, $256 + JLT twoBytes + +threeBytes: + MOVB $0xf4, 0(DI) + MOVW BX, 1(DI) + ADDQ $3, DI + ADDQ $3, DX + JMP memmove + +twoBytes: + MOVB $0xf0, 0(DI) + MOVB BX, 1(DI) + ADDQ $2, DI + ADDQ $2, DX + JMP memmove + +oneByte: + SHLB $2, BX + MOVB BX, 0(DI) + ADDQ $1, DI + ADDQ $1, DX + +memmove: + MOVQ DX, ret+48(FP) + + // copy(dst[i:], lit) + // + // This means calling runtime·memmove(&dst[i], &lit[0], len(lit)), so we push + // DI, R10 and AX as arguments. + MOVQ DI, 0(SP) + MOVQ R10, 8(SP) + MOVQ AX, 16(SP) + CALL runtime·memmove(SB) + RET + +// ---------------------------------------------------------------------------- + +// func emitCopy(dst []byte, offset, length int) int +// +// All local variables fit into registers. The register allocation: +// - AX length +// - SI &dst[0] +// - DI &dst[i] +// - R11 offset +// +// The unusual register allocation of local variables, such as R11 for the +// offset, matches the allocation used at the call site in encodeBlock, which +// makes it easier to manually inline this function. +TEXT ·emitCopy(SB), NOSPLIT, $0-48 + MOVQ dst_base+0(FP), DI + MOVQ DI, SI + MOVQ offset+24(FP), R11 + MOVQ length+32(FP), AX + +loop0: + // for length >= 68 { etc } + CMPL AX, $68 + JLT step1 + + // Emit a length 64 copy, encoded as 3 bytes. + MOVB $0xfe, 0(DI) + MOVW R11, 1(DI) + ADDQ $3, DI + SUBL $64, AX + JMP loop0 + +step1: + // if length > 64 { etc } + CMPL AX, $64 + JLE step2 + + // Emit a length 60 copy, encoded as 3 bytes. + MOVB $0xee, 0(DI) + MOVW R11, 1(DI) + ADDQ $3, DI + SUBL $60, AX + +step2: + // if length >= 12 || offset >= 2048 { goto step3 } + CMPL AX, $12 + JGE step3 + CMPL R11, $2048 + JGE step3 + + // Emit the remaining copy, encoded as 2 bytes. + MOVB R11, 1(DI) + SHRL $8, R11 + SHLB $5, R11 + SUBB $4, AX + SHLB $2, AX + ORB AX, R11 + ORB $1, R11 + MOVB R11, 0(DI) + ADDQ $2, DI + + // Return the number of bytes written. + SUBQ SI, DI + MOVQ DI, ret+40(FP) + RET + +step3: + // Emit the remaining copy, encoded as 3 bytes. + SUBL $1, AX + SHLB $2, AX + ORB $2, AX + MOVB AX, 0(DI) + MOVW R11, 1(DI) + ADDQ $3, DI + + // Return the number of bytes written. + SUBQ SI, DI + MOVQ DI, ret+40(FP) + RET + +// ---------------------------------------------------------------------------- + +// func extendMatch(src []byte, i, j int) int +// +// All local variables fit into registers. The register allocation: +// - DX &src[0] +// - SI &src[j] +// - R13 &src[len(src) - 8] +// - R14 &src[len(src)] +// - R15 &src[i] +// +// The unusual register allocation of local variables, such as R15 for a source +// pointer, matches the allocation used at the call site in encodeBlock, which +// makes it easier to manually inline this function. +TEXT ·extendMatch(SB), NOSPLIT, $0-48 + MOVQ src_base+0(FP), DX + MOVQ src_len+8(FP), R14 + MOVQ i+24(FP), R15 + MOVQ j+32(FP), SI + ADDQ DX, R14 + ADDQ DX, R15 + ADDQ DX, SI + MOVQ R14, R13 + SUBQ $8, R13 + +cmp8: + // As long as we are 8 or more bytes before the end of src, we can load and + // compare 8 bytes at a time. If those 8 bytes are equal, repeat. + CMPQ SI, R13 + JA cmp1 + MOVQ (R15), AX + MOVQ (SI), BX + CMPQ AX, BX + JNE bsf + ADDQ $8, R15 + ADDQ $8, SI + JMP cmp8 + +bsf: + // If those 8 bytes were not equal, XOR the two 8 byte values, and return + // the index of the first byte that differs. The BSF instruction finds the + // least significant 1 bit, the amd64 architecture is little-endian, and + // the shift by 3 converts a bit index to a byte index. + XORQ AX, BX + BSFQ BX, BX + SHRQ $3, BX + ADDQ BX, SI + + // Convert from &src[ret] to ret. + SUBQ DX, SI + MOVQ SI, ret+40(FP) + RET + +cmp1: + // In src's tail, compare 1 byte at a time. + CMPQ SI, R14 + JAE extendMatchEnd + MOVB (R15), AX + MOVB (SI), BX + CMPB AX, BX + JNE extendMatchEnd + ADDQ $1, R15 + ADDQ $1, SI + JMP cmp1 + +extendMatchEnd: + // Convert from &src[ret] to ret. + SUBQ DX, SI + MOVQ SI, ret+40(FP) + RET + +// ---------------------------------------------------------------------------- + +// func encodeBlock(dst, src []byte) (d int) +// +// All local variables fit into registers, other than "var table". The register +// allocation: +// - AX . . +// - BX . . +// - CX 56 shift (note that amd64 shifts by non-immediates must use CX). +// - DX 64 &src[0], tableSize +// - SI 72 &src[s] +// - DI 80 &dst[d] +// - R9 88 sLimit +// - R10 . &src[nextEmit] +// - R11 96 prevHash, currHash, nextHash, offset +// - R12 104 &src[base], skip +// - R13 . &src[nextS], &src[len(src) - 8] +// - R14 . len(src), bytesBetweenHashLookups, &src[len(src)], x +// - R15 112 candidate +// +// The second column (56, 64, etc) is the stack offset to spill the registers +// when calling other functions. We could pack this slightly tighter, but it's +// simpler to have a dedicated spill map independent of the function called. +// +// "var table [maxTableSize]uint16" takes up 32768 bytes of stack space. An +// extra 56 bytes, to call other functions, and an extra 64 bytes, to spill +// local variables (registers) during calls gives 32768 + 56 + 64 = 32888. +TEXT ·encodeBlock(SB), 0, $32888-56 + MOVQ dst_base+0(FP), DI + MOVQ src_base+24(FP), SI + MOVQ src_len+32(FP), R14 + + // shift, tableSize := uint32(32-8), 1<<8 + MOVQ $24, CX + MOVQ $256, DX + +calcShift: + // for ; tableSize < maxTableSize && tableSize < len(src); tableSize *= 2 { + // shift-- + // } + CMPQ DX, $16384 + JGE varTable + CMPQ DX, R14 + JGE varTable + SUBQ $1, CX + SHLQ $1, DX + JMP calcShift + +varTable: + // var table [maxTableSize]uint16 + // + // In the asm code, unlike the Go code, we can zero-initialize only the + // first tableSize elements. Each uint16 element is 2 bytes and each MOVOU + // writes 16 bytes, so we can do only tableSize/8 writes instead of the + // 2048 writes that would zero-initialize all of table's 32768 bytes. + SHRQ $3, DX + LEAQ table-32768(SP), BX + PXOR X0, X0 + +memclr: + MOVOU X0, 0(BX) + ADDQ $16, BX + SUBQ $1, DX + JNZ memclr + + // !!! DX = &src[0] + MOVQ SI, DX + + // sLimit := len(src) - inputMargin + MOVQ R14, R9 + SUBQ $15, R9 + + // !!! Pre-emptively spill CX, DX and R9 to the stack. Their values don't + // change for the rest of the function. + MOVQ CX, 56(SP) + MOVQ DX, 64(SP) + MOVQ R9, 88(SP) + + // nextEmit := 0 + MOVQ DX, R10 + + // s := 1 + ADDQ $1, SI + + // nextHash := hash(load32(src, s), shift) + MOVL 0(SI), R11 + IMULL $0x1e35a7bd, R11 + SHRL CX, R11 + +outer: + // for { etc } + + // skip := 32 + MOVQ $32, R12 + + // nextS := s + MOVQ SI, R13 + + // candidate := 0 + MOVQ $0, R15 + +inner0: + // for { etc } + + // s := nextS + MOVQ R13, SI + + // bytesBetweenHashLookups := skip >> 5 + MOVQ R12, R14 + SHRQ $5, R14 + + // nextS = s + bytesBetweenHashLookups + ADDQ R14, R13 + + // skip += bytesBetweenHashLookups + ADDQ R14, R12 + + // if nextS > sLimit { goto emitRemainder } + MOVQ R13, AX + SUBQ DX, AX + CMPQ AX, R9 + JA emitRemainder + + // candidate = int(table[nextHash]) + // XXX: MOVWQZX table-32768(SP)(R11*2), R15 + // XXX: 4e 0f b7 7c 5c 78 movzwq 0x78(%rsp,%r11,2),%r15 + BYTE $0x4e + BYTE $0x0f + BYTE $0xb7 + BYTE $0x7c + BYTE $0x5c + BYTE $0x78 + + // table[nextHash] = uint16(s) + MOVQ SI, AX + SUBQ DX, AX + + // XXX: MOVW AX, table-32768(SP)(R11*2) + // XXX: 66 42 89 44 5c 78 mov %ax,0x78(%rsp,%r11,2) + BYTE $0x66 + BYTE $0x42 + BYTE $0x89 + BYTE $0x44 + BYTE $0x5c + BYTE $0x78 + + // nextHash = hash(load32(src, nextS), shift) + MOVL 0(R13), R11 + IMULL $0x1e35a7bd, R11 + SHRL CX, R11 + + // if load32(src, s) != load32(src, candidate) { continue } break + MOVL 0(SI), AX + MOVL (DX)(R15*1), BX + CMPL AX, BX + JNE inner0 + +fourByteMatch: + // As per the encode_other.go code: + // + // A 4-byte match has been found. We'll later see etc. + + // !!! Jump to a fast path for short (<= 16 byte) literals. See the comment + // on inputMargin in encode.go. + MOVQ SI, AX + SUBQ R10, AX + CMPQ AX, $16 + JLE emitLiteralFastPath + + // ---------------------------------------- + // Begin inline of the emitLiteral call. + // + // d += emitLiteral(dst[d:], src[nextEmit:s]) + + MOVL AX, BX + SUBL $1, BX + + CMPL BX, $60 + JLT inlineEmitLiteralOneByte + CMPL BX, $256 + JLT inlineEmitLiteralTwoBytes + +inlineEmitLiteralThreeBytes: + MOVB $0xf4, 0(DI) + MOVW BX, 1(DI) + ADDQ $3, DI + JMP inlineEmitLiteralMemmove + +inlineEmitLiteralTwoBytes: + MOVB $0xf0, 0(DI) + MOVB BX, 1(DI) + ADDQ $2, DI + JMP inlineEmitLiteralMemmove + +inlineEmitLiteralOneByte: + SHLB $2, BX + MOVB BX, 0(DI) + ADDQ $1, DI + +inlineEmitLiteralMemmove: + // Spill local variables (registers) onto the stack; call; unspill. + // + // copy(dst[i:], lit) + // + // This means calling runtime·memmove(&dst[i], &lit[0], len(lit)), so we push + // DI, R10 and AX as arguments. + MOVQ DI, 0(SP) + MOVQ R10, 8(SP) + MOVQ AX, 16(SP) + ADDQ AX, DI // Finish the "d +=" part of "d += emitLiteral(etc)". + MOVQ SI, 72(SP) + MOVQ DI, 80(SP) + MOVQ R15, 112(SP) + CALL runtime·memmove(SB) + MOVQ 56(SP), CX + MOVQ 64(SP), DX + MOVQ 72(SP), SI + MOVQ 80(SP), DI + MOVQ 88(SP), R9 + MOVQ 112(SP), R15 + JMP inner1 + +inlineEmitLiteralEnd: + // End inline of the emitLiteral call. + // ---------------------------------------- + +emitLiteralFastPath: + // !!! Emit the 1-byte encoding "uint8(len(lit)-1)<<2". + MOVB AX, BX + SUBB $1, BX + SHLB $2, BX + MOVB BX, (DI) + ADDQ $1, DI + + // !!! Implement the copy from lit to dst as a 16-byte load and store. + // (Encode's documentation says that dst and src must not overlap.) + // + // This always copies 16 bytes, instead of only len(lit) bytes, but that's + // OK. Subsequent iterations will fix up the overrun. + // + // Note that on amd64, it is legal and cheap to issue unaligned 8-byte or + // 16-byte loads and stores. This technique probably wouldn't be as + // effective on architectures that are fussier about alignment. + MOVOU 0(R10), X0 + MOVOU X0, 0(DI) + ADDQ AX, DI + +inner1: + // for { etc } + + // base := s + MOVQ SI, R12 + + // !!! offset := base - candidate + MOVQ R12, R11 + SUBQ R15, R11 + SUBQ DX, R11 + + // ---------------------------------------- + // Begin inline of the extendMatch call. + // + // s = extendMatch(src, candidate+4, s+4) + + // !!! R14 = &src[len(src)] + MOVQ src_len+32(FP), R14 + ADDQ DX, R14 + + // !!! R13 = &src[len(src) - 8] + MOVQ R14, R13 + SUBQ $8, R13 + + // !!! R15 = &src[candidate + 4] + ADDQ $4, R15 + ADDQ DX, R15 + + // !!! s += 4 + ADDQ $4, SI + +inlineExtendMatchCmp8: + // As long as we are 8 or more bytes before the end of src, we can load and + // compare 8 bytes at a time. If those 8 bytes are equal, repeat. + CMPQ SI, R13 + JA inlineExtendMatchCmp1 + MOVQ (R15), AX + MOVQ (SI), BX + CMPQ AX, BX + JNE inlineExtendMatchBSF + ADDQ $8, R15 + ADDQ $8, SI + JMP inlineExtendMatchCmp8 + +inlineExtendMatchBSF: + // If those 8 bytes were not equal, XOR the two 8 byte values, and return + // the index of the first byte that differs. The BSF instruction finds the + // least significant 1 bit, the amd64 architecture is little-endian, and + // the shift by 3 converts a bit index to a byte index. + XORQ AX, BX + BSFQ BX, BX + SHRQ $3, BX + ADDQ BX, SI + JMP inlineExtendMatchEnd + +inlineExtendMatchCmp1: + // In src's tail, compare 1 byte at a time. + CMPQ SI, R14 + JAE inlineExtendMatchEnd + MOVB (R15), AX + MOVB (SI), BX + CMPB AX, BX + JNE inlineExtendMatchEnd + ADDQ $1, R15 + ADDQ $1, SI + JMP inlineExtendMatchCmp1 + +inlineExtendMatchEnd: + // End inline of the extendMatch call. + // ---------------------------------------- + + // ---------------------------------------- + // Begin inline of the emitCopy call. + // + // d += emitCopy(dst[d:], base-candidate, s-base) + + // !!! length := s - base + MOVQ SI, AX + SUBQ R12, AX + +inlineEmitCopyLoop0: + // for length >= 68 { etc } + CMPL AX, $68 + JLT inlineEmitCopyStep1 + + // Emit a length 64 copy, encoded as 3 bytes. + MOVB $0xfe, 0(DI) + MOVW R11, 1(DI) + ADDQ $3, DI + SUBL $64, AX + JMP inlineEmitCopyLoop0 + +inlineEmitCopyStep1: + // if length > 64 { etc } + CMPL AX, $64 + JLE inlineEmitCopyStep2 + + // Emit a length 60 copy, encoded as 3 bytes. + MOVB $0xee, 0(DI) + MOVW R11, 1(DI) + ADDQ $3, DI + SUBL $60, AX + +inlineEmitCopyStep2: + // if length >= 12 || offset >= 2048 { goto inlineEmitCopyStep3 } + CMPL AX, $12 + JGE inlineEmitCopyStep3 + CMPL R11, $2048 + JGE inlineEmitCopyStep3 + + // Emit the remaining copy, encoded as 2 bytes. + MOVB R11, 1(DI) + SHRL $8, R11 + SHLB $5, R11 + SUBB $4, AX + SHLB $2, AX + ORB AX, R11 + ORB $1, R11 + MOVB R11, 0(DI) + ADDQ $2, DI + JMP inlineEmitCopyEnd + +inlineEmitCopyStep3: + // Emit the remaining copy, encoded as 3 bytes. + SUBL $1, AX + SHLB $2, AX + ORB $2, AX + MOVB AX, 0(DI) + MOVW R11, 1(DI) + ADDQ $3, DI + +inlineEmitCopyEnd: + // End inline of the emitCopy call. + // ---------------------------------------- + + // nextEmit = s + MOVQ SI, R10 + + // if s >= sLimit { goto emitRemainder } + MOVQ SI, AX + SUBQ DX, AX + CMPQ AX, R9 + JAE emitRemainder + + // As per the encode_other.go code: + // + // We could immediately etc. + + // x := load64(src, s-1) + MOVQ -1(SI), R14 + + // prevHash := hash(uint32(x>>0), shift) + MOVL R14, R11 + IMULL $0x1e35a7bd, R11 + SHRL CX, R11 + + // table[prevHash] = uint16(s-1) + MOVQ SI, AX + SUBQ DX, AX + SUBQ $1, AX + + // XXX: MOVW AX, table-32768(SP)(R11*2) + // XXX: 66 42 89 44 5c 78 mov %ax,0x78(%rsp,%r11,2) + BYTE $0x66 + BYTE $0x42 + BYTE $0x89 + BYTE $0x44 + BYTE $0x5c + BYTE $0x78 + + // currHash := hash(uint32(x>>8), shift) + SHRQ $8, R14 + MOVL R14, R11 + IMULL $0x1e35a7bd, R11 + SHRL CX, R11 + + // candidate = int(table[currHash]) + // XXX: MOVWQZX table-32768(SP)(R11*2), R15 + // XXX: 4e 0f b7 7c 5c 78 movzwq 0x78(%rsp,%r11,2),%r15 + BYTE $0x4e + BYTE $0x0f + BYTE $0xb7 + BYTE $0x7c + BYTE $0x5c + BYTE $0x78 + + // table[currHash] = uint16(s) + ADDQ $1, AX + + // XXX: MOVW AX, table-32768(SP)(R11*2) + // XXX: 66 42 89 44 5c 78 mov %ax,0x78(%rsp,%r11,2) + BYTE $0x66 + BYTE $0x42 + BYTE $0x89 + BYTE $0x44 + BYTE $0x5c + BYTE $0x78 + + // if uint32(x>>8) == load32(src, candidate) { continue } + MOVL (DX)(R15*1), BX + CMPL R14, BX + JEQ inner1 + + // nextHash = hash(uint32(x>>16), shift) + SHRQ $8, R14 + MOVL R14, R11 + IMULL $0x1e35a7bd, R11 + SHRL CX, R11 + + // s++ + ADDQ $1, SI + + // break out of the inner1 for loop, i.e. continue the outer loop. + JMP outer + +emitRemainder: + // if nextEmit < len(src) { etc } + MOVQ src_len+32(FP), AX + ADDQ DX, AX + CMPQ R10, AX + JEQ encodeBlockEnd + + // d += emitLiteral(dst[d:], src[nextEmit:]) + // + // Push args. + MOVQ DI, 0(SP) + MOVQ $0, 8(SP) // Unnecessary, as the callee ignores it, but conservative. + MOVQ $0, 16(SP) // Unnecessary, as the callee ignores it, but conservative. + MOVQ R10, 24(SP) + SUBQ R10, AX + MOVQ AX, 32(SP) + MOVQ AX, 40(SP) // Unnecessary, as the callee ignores it, but conservative. + + // Spill local variables (registers) onto the stack; call; unspill. + MOVQ DI, 80(SP) + CALL ·emitLiteral(SB) + MOVQ 80(SP), DI + + // Finish the "d +=" part of "d += emitLiteral(etc)". + ADDQ 48(SP), DI + +encodeBlockEnd: + MOVQ dst_base+0(FP), AX + SUBQ AX, DI + MOVQ DI, d+48(FP) + RET diff --git a/vendor/github.com/golang/snappy/encode_other.go b/vendor/github.com/golang/snappy/encode_other.go new file mode 100644 index 00000000000..dbcae905e6e --- /dev/null +++ b/vendor/github.com/golang/snappy/encode_other.go @@ -0,0 +1,238 @@ +// Copyright 2016 The Snappy-Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !amd64 appengine !gc noasm + +package snappy + +func load32(b []byte, i int) uint32 { + b = b[i : i+4 : len(b)] // Help the compiler eliminate bounds checks on the next line. + return uint32(b[0]) | uint32(b[1])<<8 | uint32(b[2])<<16 | uint32(b[3])<<24 +} + +func load64(b []byte, i int) uint64 { + b = b[i : i+8 : len(b)] // Help the compiler eliminate bounds checks on the next line. + return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | uint64(b[3])<<24 | + uint64(b[4])<<32 | uint64(b[5])<<40 | uint64(b[6])<<48 | uint64(b[7])<<56 +} + +// emitLiteral writes a literal chunk and returns the number of bytes written. +// +// It assumes that: +// dst is long enough to hold the encoded bytes +// 1 <= len(lit) && len(lit) <= 65536 +func emitLiteral(dst, lit []byte) int { + i, n := 0, uint(len(lit)-1) + switch { + case n < 60: + dst[0] = uint8(n)<<2 | tagLiteral + i = 1 + case n < 1<<8: + dst[0] = 60<<2 | tagLiteral + dst[1] = uint8(n) + i = 2 + default: + dst[0] = 61<<2 | tagLiteral + dst[1] = uint8(n) + dst[2] = uint8(n >> 8) + i = 3 + } + return i + copy(dst[i:], lit) +} + +// emitCopy writes a copy chunk and returns the number of bytes written. +// +// It assumes that: +// dst is long enough to hold the encoded bytes +// 1 <= offset && offset <= 65535 +// 4 <= length && length <= 65535 +func emitCopy(dst []byte, offset, length int) int { + i := 0 + // The maximum length for a single tagCopy1 or tagCopy2 op is 64 bytes. The + // threshold for this loop is a little higher (at 68 = 64 + 4), and the + // length emitted down below is is a little lower (at 60 = 64 - 4), because + // it's shorter to encode a length 67 copy as a length 60 tagCopy2 followed + // by a length 7 tagCopy1 (which encodes as 3+2 bytes) than to encode it as + // a length 64 tagCopy2 followed by a length 3 tagCopy2 (which encodes as + // 3+3 bytes). The magic 4 in the 64±4 is because the minimum length for a + // tagCopy1 op is 4 bytes, which is why a length 3 copy has to be an + // encodes-as-3-bytes tagCopy2 instead of an encodes-as-2-bytes tagCopy1. + for length >= 68 { + // Emit a length 64 copy, encoded as 3 bytes. + dst[i+0] = 63<<2 | tagCopy2 + dst[i+1] = uint8(offset) + dst[i+2] = uint8(offset >> 8) + i += 3 + length -= 64 + } + if length > 64 { + // Emit a length 60 copy, encoded as 3 bytes. + dst[i+0] = 59<<2 | tagCopy2 + dst[i+1] = uint8(offset) + dst[i+2] = uint8(offset >> 8) + i += 3 + length -= 60 + } + if length >= 12 || offset >= 2048 { + // Emit the remaining copy, encoded as 3 bytes. + dst[i+0] = uint8(length-1)<<2 | tagCopy2 + dst[i+1] = uint8(offset) + dst[i+2] = uint8(offset >> 8) + return i + 3 + } + // Emit the remaining copy, encoded as 2 bytes. + dst[i+0] = uint8(offset>>8)<<5 | uint8(length-4)<<2 | tagCopy1 + dst[i+1] = uint8(offset) + return i + 2 +} + +// extendMatch returns the largest k such that k <= len(src) and that +// src[i:i+k-j] and src[j:k] have the same contents. +// +// It assumes that: +// 0 <= i && i < j && j <= len(src) +func extendMatch(src []byte, i, j int) int { + for ; j < len(src) && src[i] == src[j]; i, j = i+1, j+1 { + } + return j +} + +func hash(u, shift uint32) uint32 { + return (u * 0x1e35a7bd) >> shift +} + +// encodeBlock encodes a non-empty src to a guaranteed-large-enough dst. It +// assumes that the varint-encoded length of the decompressed bytes has already +// been written. +// +// It also assumes that: +// len(dst) >= MaxEncodedLen(len(src)) && +// minNonLiteralBlockSize <= len(src) && len(src) <= maxBlockSize +func encodeBlock(dst, src []byte) (d int) { + // Initialize the hash table. Its size ranges from 1<<8 to 1<<14 inclusive. + // The table element type is uint16, as s < sLimit and sLimit < len(src) + // and len(src) <= maxBlockSize and maxBlockSize == 65536. + const ( + maxTableSize = 1 << 14 + // tableMask is redundant, but helps the compiler eliminate bounds + // checks. + tableMask = maxTableSize - 1 + ) + shift := uint32(32 - 8) + for tableSize := 1 << 8; tableSize < maxTableSize && tableSize < len(src); tableSize *= 2 { + shift-- + } + // In Go, all array elements are zero-initialized, so there is no advantage + // to a smaller tableSize per se. However, it matches the C++ algorithm, + // and in the asm versions of this code, we can get away with zeroing only + // the first tableSize elements. + var table [maxTableSize]uint16 + + // sLimit is when to stop looking for offset/length copies. The inputMargin + // lets us use a fast path for emitLiteral in the main loop, while we are + // looking for copies. + sLimit := len(src) - inputMargin + + // nextEmit is where in src the next emitLiteral should start from. + nextEmit := 0 + + // The encoded form must start with a literal, as there are no previous + // bytes to copy, so we start looking for hash matches at s == 1. + s := 1 + nextHash := hash(load32(src, s), shift) + + for { + // Copied from the C++ snappy implementation: + // + // Heuristic match skipping: If 32 bytes are scanned with no matches + // found, start looking only at every other byte. If 32 more bytes are + // scanned (or skipped), look at every third byte, etc.. When a match + // is found, immediately go back to looking at every byte. This is a + // small loss (~5% performance, ~0.1% density) for compressible data + // due to more bookkeeping, but for non-compressible data (such as + // JPEG) it's a huge win since the compressor quickly "realizes" the + // data is incompressible and doesn't bother looking for matches + // everywhere. + // + // The "skip" variable keeps track of how many bytes there are since + // the last match; dividing it by 32 (ie. right-shifting by five) gives + // the number of bytes to move ahead for each iteration. + skip := 32 + + nextS := s + candidate := 0 + for { + s = nextS + bytesBetweenHashLookups := skip >> 5 + nextS = s + bytesBetweenHashLookups + skip += bytesBetweenHashLookups + if nextS > sLimit { + goto emitRemainder + } + candidate = int(table[nextHash&tableMask]) + table[nextHash&tableMask] = uint16(s) + nextHash = hash(load32(src, nextS), shift) + if load32(src, s) == load32(src, candidate) { + break + } + } + + // A 4-byte match has been found. We'll later see if more than 4 bytes + // match. But, prior to the match, src[nextEmit:s] are unmatched. Emit + // them as literal bytes. + d += emitLiteral(dst[d:], src[nextEmit:s]) + + // Call emitCopy, and then see if another emitCopy could be our next + // move. Repeat until we find no match for the input immediately after + // what was consumed by the last emitCopy call. + // + // If we exit this loop normally then we need to call emitLiteral next, + // though we don't yet know how big the literal will be. We handle that + // by proceeding to the next iteration of the main loop. We also can + // exit this loop via goto if we get close to exhausting the input. + for { + // Invariant: we have a 4-byte match at s, and no need to emit any + // literal bytes prior to s. + base := s + + // Extend the 4-byte match as long as possible. + // + // This is an inlined version of: + // s = extendMatch(src, candidate+4, s+4) + s += 4 + for i := candidate + 4; s < len(src) && src[i] == src[s]; i, s = i+1, s+1 { + } + + d += emitCopy(dst[d:], base-candidate, s-base) + nextEmit = s + if s >= sLimit { + goto emitRemainder + } + + // We could immediately start working at s now, but to improve + // compression we first update the hash table at s-1 and at s. If + // another emitCopy is not our next move, also calculate nextHash + // at s+1. At least on GOARCH=amd64, these three hash calculations + // are faster as one load64 call (with some shifts) instead of + // three load32 calls. + x := load64(src, s-1) + prevHash := hash(uint32(x>>0), shift) + table[prevHash&tableMask] = uint16(s - 1) + currHash := hash(uint32(x>>8), shift) + candidate = int(table[currHash&tableMask]) + table[currHash&tableMask] = uint16(s) + if uint32(x>>8) != load32(src, candidate) { + nextHash = hash(uint32(x>>16), shift) + s++ + break + } + } + } + +emitRemainder: + if nextEmit < len(src) { + d += emitLiteral(dst[d:], src[nextEmit:]) + } + return d +} diff --git a/vendor/github.com/golang/snappy/snappy.go b/vendor/github.com/golang/snappy/snappy.go new file mode 100644 index 00000000000..0cf5e379c47 --- /dev/null +++ b/vendor/github.com/golang/snappy/snappy.go @@ -0,0 +1,87 @@ +// Copyright 2011 The Snappy-Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package snappy implements the snappy block-based compression format. +// It aims for very high speeds and reasonable compression. +// +// The C++ snappy implementation is at https://github.com/google/snappy +package snappy // import "github.com/golang/snappy" + +import ( + "hash/crc32" +) + +/* +Each encoded block begins with the varint-encoded length of the decoded data, +followed by a sequence of chunks. Chunks begin and end on byte boundaries. The +first byte of each chunk is broken into its 2 least and 6 most significant bits +called l and m: l ranges in [0, 4) and m ranges in [0, 64). l is the chunk tag. +Zero means a literal tag. All other values mean a copy tag. + +For literal tags: + - If m < 60, the next 1 + m bytes are literal bytes. + - Otherwise, let n be the little-endian unsigned integer denoted by the next + m - 59 bytes. The next 1 + n bytes after that are literal bytes. + +For copy tags, length bytes are copied from offset bytes ago, in the style of +Lempel-Ziv compression algorithms. In particular: + - For l == 1, the offset ranges in [0, 1<<11) and the length in [4, 12). + The length is 4 + the low 3 bits of m. The high 3 bits of m form bits 8-10 + of the offset. The next byte is bits 0-7 of the offset. + - For l == 2, the offset ranges in [0, 1<<16) and the length in [1, 65). + The length is 1 + m. The offset is the little-endian unsigned integer + denoted by the next 2 bytes. + - For l == 3, this tag is a legacy format that is no longer issued by most + encoders. Nonetheless, the offset ranges in [0, 1<<32) and the length in + [1, 65). The length is 1 + m. The offset is the little-endian unsigned + integer denoted by the next 4 bytes. +*/ +const ( + tagLiteral = 0x00 + tagCopy1 = 0x01 + tagCopy2 = 0x02 + tagCopy4 = 0x03 +) + +const ( + checksumSize = 4 + chunkHeaderSize = 4 + magicChunk = "\xff\x06\x00\x00" + magicBody + magicBody = "sNaPpY" + + // maxBlockSize is the maximum size of the input to encodeBlock. It is not + // part of the wire format per se, but some parts of the encoder assume + // that an offset fits into a uint16. + // + // Also, for the framing format (Writer type instead of Encode function), + // https://github.com/google/snappy/blob/master/framing_format.txt says + // that "the uncompressed data in a chunk must be no longer than 65536 + // bytes". + maxBlockSize = 65536 + + // maxEncodedLenOfMaxBlockSize equals MaxEncodedLen(maxBlockSize), but is + // hard coded to be a const instead of a variable, so that obufLen can also + // be a const. Their equivalence is confirmed by + // TestMaxEncodedLenOfMaxBlockSize. + maxEncodedLenOfMaxBlockSize = 76490 + + obufHeaderLen = len(magicChunk) + checksumSize + chunkHeaderSize + obufLen = obufHeaderLen + maxEncodedLenOfMaxBlockSize +) + +const ( + chunkTypeCompressedData = 0x00 + chunkTypeUncompressedData = 0x01 + chunkTypePadding = 0xfe + chunkTypeStreamIdentifier = 0xff +) + +var crcTable = crc32.MakeTable(crc32.Castagnoli) + +// crc implements the checksum specified in section 3 of +// https://github.com/google/snappy/blob/master/framing_format.txt +func crc(b []byte) uint32 { + c := crc32.Update(0, crcTable, b) + return uint32(c>>15|c<<17) + 0xa282ead8 +} diff --git a/vendor/vendor.json b/vendor/vendor.json index e5a66f62c48..7667e51ec4e 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -255,12 +255,6 @@ "revision": "da39e9a4f920a15683dd0f23923c302d4db6eed5", "revisionTime": "2016-05-28T08:11:04Z" }, - { - "checksumSHA1": "iP5slJJPRZUm0rfdII8OiATAACA=", - "path": "github.com/docker/docker/pkg/idtools", - "revision": "52debcd58ac91bf68503ce60561536911b74ff05", - "revisionTime": "2016-05-20T15:17:10Z" - }, { "checksumSHA1": "iP5slJJPRZUm0rfdII8OiATAACA=", "path": "github.com/docker/docker/pkg/idtools", @@ -268,8 +262,8 @@ "revisionTime": "2016-05-28T10:48:36Z" }, { - "checksumSHA1": "tdhmIGUaoOMEDymMC23qTS7bt0g=", - "path": "github.com/docker/docker/pkg/ioutils", + "checksumSHA1": "iP5slJJPRZUm0rfdII8OiATAACA=", + "path": "github.com/docker/docker/pkg/idtools", "revision": "52debcd58ac91bf68503ce60561536911b74ff05", "revisionTime": "2016-05-20T15:17:10Z" }, @@ -279,6 +273,12 @@ "revision": "da39e9a4f920a15683dd0f23923c302d4db6eed5", "revisionTime": "2016-05-28T08:11:04Z" }, + { + "checksumSHA1": "tdhmIGUaoOMEDymMC23qTS7bt0g=", + "path": "github.com/docker/docker/pkg/ioutils", + "revision": "52debcd58ac91bf68503ce60561536911b74ff05", + "revisionTime": "2016-05-20T15:17:10Z" + }, { "checksumSHA1": "ndnAFCfsGC3upNQ6jAEwzxcurww=", "path": "github.com/docker/docker/pkg/longpath", @@ -484,6 +484,12 @@ "path": "github.com/golang/protobuf/proto/testdata", "revision": "0dfe8f37844c14cb32c7247925270e0f7ba90973" }, + { + "checksumSHA1": "W+E/2xXcE1GmJ0Qb784ald0Fn6I=", + "path": "github.com/golang/snappy", + "revision": "d9eb7a3d35ec988b8585d4a0068e462c27d28380", + "revisionTime": "2016-05-29T05:00:41Z" + }, { "comment": "1.0.0", "path": "github.com/gorhill/cronexpr", From d978c008d311ed6bf6cc445245216ef6586458b7 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 28 Nov 2016 16:05:56 -0800 Subject: [PATCH 05/27] Decompress --- command/agent/alloc_endpoint.go | 15 ++++++++++++++- command/agent/job_endpoint.go | 15 ++++++++++++++- nomad/job_endpoint.go | 14 ++++++++------ 3 files changed, 36 insertions(+), 8 deletions(-) diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index 19e2f67ed54..591ca9d7e4d 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -5,6 +5,7 @@ import ( "net/http" "strings" + "github.com/golang/snappy" "github.com/hashicorp/nomad/nomad/structs" ) @@ -57,7 +58,19 @@ func (s *HTTPServer) AllocSpecificRequest(resp http.ResponseWriter, req *http.Re if out.Alloc == nil { return nil, CodedError(404, "alloc not found") } - return out.Alloc, nil + + // Decode the input data if there is any + alloc := out.Alloc + if alloc.Job != nil && len(alloc.Job.InputData) != 0 { + decoded, err := snappy.Decode(nil, alloc.Job.InputData) + if err != nil { + return nil, err + } + alloc = alloc.Copy() + alloc.Job.InputData = decoded + } + + return alloc, nil } func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 460a91c1d4f..30504436db4 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -4,6 +4,7 @@ import ( "net/http" "strings" + "github.com/golang/snappy" "github.com/hashicorp/nomad/nomad/structs" ) @@ -208,7 +209,19 @@ func (s *HTTPServer) jobQuery(resp http.ResponseWriter, req *http.Request, if out.Job == nil { return nil, CodedError(404, "job not found") } - return out.Job, nil + + // Decode the input data if there is any + job := out.Job + if len(job.InputData) != 0 { + decoded, err := snappy.Decode(nil, out.Job.InputData) + if err != nil { + return nil, err + } + job = job.Copy() + job.InputData = decoded + } + + return job, nil } func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request, diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 5490526ac3f..53d3d6a871c 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -316,6 +316,8 @@ func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegis if job.IsPeriodic() { return fmt.Errorf("can't evaluate periodic job") + } else if job.IsDispatchTemplate() { + return fmt.Errorf("can't evaluate dispatch template job") } // Create a new evaluation @@ -807,19 +809,19 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa return err } - // XXX disable job/evaluate on periodic jobs - - // XXX merge in the meta data - // Derive the child job and commit it via Raft dispatchJob := tmpl.Copy() dispatchJob.Dispatch = nil dispatchJob.ID = structs.DispatchedID(tmpl.ID, time.Now()) dispatchJob.Name = dispatchJob.ID + // Merge in the meta data + for k, v := range args.Meta { + dispatchJob.Meta[k] = v + } + // Compress the input - // XXX Decompress on the HTTP endpoint - dispatchJob.InputData = snappy.Encode(dispatchJob.InputData, args.InputData) + dispatchJob.InputData = snappy.Encode(nil, args.InputData) regReq := &structs.JobRegisterRequest{ Job: dispatchJob, From b05b72956240b11b5c799474c7f551d4551000ac Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 1 Dec 2016 13:17:34 -0800 Subject: [PATCH 06/27] Remove paused --- api/jobs.go | 1 - jobspec/parse_test.go | 1 - jobspec/test-fixtures/dispatch.hcl | 1 - nomad/structs/structs.go | 4 ---- 4 files changed, 7 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index 1b701a8190e..0458bd02504 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -189,7 +189,6 @@ type PeriodicConfig struct { // DispatchConfig is used to configure the dispatch template type DispatchConfig struct { - Paused bool InputData string MetaRequired []string MetaOptional []string diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 53d43a07b9b..6e71b6e8d9e 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -548,7 +548,6 @@ func TestParse(t *testing.T) { Region: "global", Dispatch: &structs.DispatchConfig{ - Paused: true, InputData: "required", MetaRequired: []string{"foo", "bar"}, MetaOptional: []string{"baz", "bam"}, diff --git a/jobspec/test-fixtures/dispatch.hcl b/jobspec/test-fixtures/dispatch.hcl index c3ed001c12c..9040b7b264d 100644 --- a/jobspec/test-fixtures/dispatch.hcl +++ b/jobspec/test-fixtures/dispatch.hcl @@ -1,6 +1,5 @@ job "dispatch" { dispatch { - paused = true input_data = "required" meta_keys { required = ["foo", "bar"] diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index a90ce2b9531..b698ada1f9f 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1575,10 +1575,6 @@ const ( // DispatchConfig is used to configure the dispatch template type DispatchConfig struct { - // Paused specifies whether jobs can be dispatched based on the template or - // if the job is paused. - Paused bool - // InputData configure the input data requirements InputData string `mapstructure:"input_data"` From 9dc2f63240baa6432351899c1399479d03e1b631 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 1 Dec 2016 16:27:22 -0800 Subject: [PATCH 07/27] agent tests --- command/agent/agent_test.go | 2 +- command/agent/alloc_endpoint_test.go | 58 +++++++++++++++ command/agent/job_endpoint.go | 11 ++- command/agent/job_endpoint_test.go | 105 +++++++++++++++++++++++++++ 4 files changed, 172 insertions(+), 4 deletions(-) diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 735dfca97e9..26d61e17a89 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -14,7 +14,7 @@ import ( ) func getPort() int { - addr, err := net.ResolveTCPAddr("tcp", "localhost:0") + addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0") if err != nil { panic(err) } diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index f996799b7b7..b0e4e5195cd 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -3,9 +3,11 @@ package agent import ( "net/http" "net/http/httptest" + "reflect" "strings" "testing" + "github.com/golang/snappy" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" ) @@ -161,6 +163,62 @@ func TestHTTP_AllocQuery(t *testing.T) { }) } +func TestHTTP_AllocQuery_InputData(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + // Directly manipulate the state + state := s.Agent.server.State() + alloc := mock.Alloc() + if err := state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)); err != nil { + t.Fatal(err) + } + + // Insert InputData compressed + expected := []byte("hello world") + compressed := snappy.Encode(nil, expected) + alloc.Job.InputData = compressed + + err := state.UpsertAllocs(1000, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Make the HTTP request + req, err := http.NewRequest("GET", "/v1/allocation/"+alloc.ID, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.AllocSpecificRequest(respW, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Check for the index + if respW.HeaderMap.Get("X-Nomad-Index") == "" { + t.Fatalf("missing index") + } + if respW.HeaderMap.Get("X-Nomad-KnownLeader") != "true" { + t.Fatalf("missing known leader") + } + if respW.HeaderMap.Get("X-Nomad-LastContact") == "" { + t.Fatalf("missing last contact") + } + + // Check the job + a := obj.(*structs.Allocation) + if a.ID != alloc.ID { + t.Fatalf("bad: %#v", a) + } + + // Check the input data is decompressed + if !reflect.DeepEqual(a.Job.InputData, expected) { + t.Fatalf("InputData not decompressed properly; got %#v; want %#v", a.Job.InputData, expected) + } + }) +} + func TestHTTP_AllocStats(t *testing.T) { httpTest(t, nil, func(s *TestServer) { // Make the HTTP request diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 30504436db4..5db4b795712 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -286,12 +286,17 @@ func (s *HTTPServer) jobDispatchRequest(resp http.ResponseWriter, req *http.Requ if req.Method != "PUT" && req.Method != "POST" { return nil, CodedError(405, ErrInvalidMethod) } - args := structs.JobDispatchRequest{ - JobID: name, - } + args := structs.JobDispatchRequest{} if err := decodeBody(req, &args); err != nil { return nil, CodedError(400, err.Error()) } + if args.JobID != "" && args.JobID != name { + return nil, CodedError(400, "Job ID does not match") + } + if args.JobID == "" { + args.JobID = name + } + s.parseRegion(req, &args.Region) var out structs.JobDispatchResponse diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index 1f718cbc3d8..3d963b9ae63 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -3,8 +3,10 @@ package agent import ( "net/http" "net/http/httptest" + "reflect" "testing" + "github.com/golang/snappy" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" ) @@ -205,6 +207,62 @@ func TestHTTP_JobQuery(t *testing.T) { }) } +func TestHTTP_JobQuery_InputData(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + // Create the job + job := mock.Job() + + // Insert InputData compressed + expected := []byte("hello world") + compressed := snappy.Encode(nil, expected) + job.InputData = compressed + + args := structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.JobRegisterResponse + if err := s.Agent.RPC("Job.Register", &args, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Make the HTTP request + req, err := http.NewRequest("GET", "/v1/job/"+job.ID, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.JobSpecificRequest(respW, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Check for the index + if respW.HeaderMap.Get("X-Nomad-Index") == "" { + t.Fatalf("missing index") + } + if respW.HeaderMap.Get("X-Nomad-KnownLeader") != "true" { + t.Fatalf("missing known leader") + } + if respW.HeaderMap.Get("X-Nomad-LastContact") == "" { + t.Fatalf("missing last contact") + } + + // Check the job + j := obj.(*structs.Job) + if j.ID != job.ID { + t.Fatalf("bad: %#v", j) + } + + // Check the input data is decompressed + if !reflect.DeepEqual(j.InputData, expected) { + t.Fatalf("InputData not decompressed properly; got %#v; want %#v", j.InputData, expected) + } + }) +} + func TestHTTP_JobUpdate(t *testing.T) { httpTest(t, nil, func(s *TestServer) { // Create the job @@ -521,3 +579,50 @@ func TestHTTP_JobPlan(t *testing.T) { } }) } + +func TestHTTP_JobDispatch(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + // Create the dispatch template job + job := mock.Job() + job.Dispatch = &structs.DispatchConfig{} + + args := structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.JobRegisterResponse + if err := s.Agent.RPC("Job.Register", &args, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Make the request + respW := httptest.NewRecorder() + args2 := structs.JobDispatchRequest{ + WriteRequest: structs.WriteRequest{Region: "global"}, + } + buf := encodeReq(args2) + + // Make the HTTP request + req2, err := http.NewRequest("PUT", "/v1/job/"+job.ID+"/dispatch", buf) + if err != nil { + t.Fatalf("err: %v", err) + } + respW.Flush() + + // Make the request + obj, err := s.Server.JobSpecificRequest(respW, req2) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Check the response + dispatch := obj.(structs.JobDispatchResponse) + if dispatch.EvalID == "" { + t.Fatalf("bad: %v", dispatch) + } + + if dispatch.DispatchedJobID == "" { + t.Fatalf("bad: %v", dispatch) + } + }) +} From 990f3f868eb1e5f6816eb1a56661bd88885cf03f Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 2 Dec 2016 15:37:26 -0800 Subject: [PATCH 08/27] Dispatch tests --- nomad/job_endpoint.go | 6 +- nomad/job_endpoint_test.go | 347 +++++++++++++++++++++++++++++++++++++ 2 files changed, 350 insertions(+), 3 deletions(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 53d3d6a871c..e652e1fdff8 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -382,8 +382,8 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD // Populate the reply with job information reply.JobModifyIndex = index - // If the job is periodic, we don't create an eval. - if job != nil && job.IsPeriodic() { + // If the job is periodic or a dispatch template, we don't create an eval. + if job != nil && (job.IsPeriodic() || job.IsDispatchTemplate()) { return nil } @@ -926,7 +926,7 @@ func validateDispatchRequest(req *structs.JobDispatchRequest, tmpl *structs.Job) flat = append(flat, k) } - return fmt.Errorf("Dispatch did not provided required meta keys: %v", flat) + return fmt.Errorf("Dispatch did not provide required meta keys: %v", flat) } return nil diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 3405fa10100..869eb6d8f35 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -249,6 +249,48 @@ func TestJobEndpoint_Register_Periodic(t *testing.T) { } } +func TestJobEndpoint_Register_Dispatch_Template(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request for a dispatch template job. + job := mock.Job() + job.Dispatch = &structs.DispatchConfig{} + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.JobRegisterResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.JobModifyIndex == 0 { + t.Fatalf("bad index: %d", resp.Index) + } + + // Check for the node in the FSM + state := s1.fsm.State() + out, err := state.JobByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("expected job") + } + if out.CreateIndex != resp.JobModifyIndex { + t.Fatalf("index mis-match") + } + if resp.EvalID != "" { + t.Fatalf("Register created an eval for a dispatch template job") + } +} + func TestJobEndpoint_Register_EnforceIndex(t *testing.T) { s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue @@ -714,6 +756,43 @@ func TestJobEndpoint_Evaluate_Periodic(t *testing.T) { } } +func TestJobEndpoint_Evaluate_Dispatch_Template(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + job := mock.Job() + job.Dispatch = &structs.DispatchConfig{} + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.JobRegisterResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.JobModifyIndex == 0 { + t.Fatalf("bad index: %d", resp.Index) + } + + // Force a re-evaluation + reEval := &structs.JobEvaluateRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + if err := msgpackrpc.CallWithCodec(codec, "Job.Evaluate", reEval, &resp); err == nil { + t.Fatal("expect an err") + } +} + func TestJobEndpoint_Deregister(t *testing.T) { s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue @@ -894,6 +973,56 @@ func TestJobEndpoint_Deregister_Periodic(t *testing.T) { } } +func TestJobEndpoint_Deregister_Dispatch_Template(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + job := mock.Job() + job.Dispatch = &structs.DispatchConfig{} + reg := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.JobRegisterResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Deregister + dereg := &structs.JobDeregisterRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp2 structs.JobDeregisterResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + if resp2.JobModifyIndex == 0 { + t.Fatalf("bad index: %d", resp2.Index) + } + + // Check for the node in the FSM + state := s1.fsm.State() + out, err := state.JobByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out != nil { + t.Fatalf("unexpected job") + } + + if resp.EvalID != "" { + t.Fatalf("Deregister created an eval for a dispatch template job") + } +} + func TestJobEndpoint_GetJob(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() @@ -1725,3 +1854,221 @@ func TestJobEndpoint_ValidateJob_InvalidSignals(t *testing.T) { t.Fatalf("Expected signal feasibility error; got %v", err) } } + +func TestJobEndpoint_Dispatch(t *testing.T) { + + // No requirements + d1 := mock.Job() + d1.Dispatch = &structs.DispatchConfig{} + + // Require input data + d2 := mock.Job() + d2.Dispatch = &structs.DispatchConfig{ + InputData: structs.DispatchInputDataRequired, + } + + // Disallow input data + d3 := mock.Job() + d3.Dispatch = &structs.DispatchConfig{ + InputData: structs.DispatchInputDataForbidden, + } + + // Require meta + d4 := mock.Job() + d4.Dispatch = &structs.DispatchConfig{ + MetaRequired: []string{"foo", "bar"}, + } + + // Optional meta + d5 := mock.Job() + d5.Dispatch = &structs.DispatchConfig{ + MetaOptional: []string{"foo", "bar"}, + } + + reqNoInputNoMeta := &structs.JobDispatchRequest{} + reqInputDataNoMeta := &structs.JobDispatchRequest{ + InputData: []byte("hello world"), + } + reqNoInputDataMeta := &structs.JobDispatchRequest{ + Meta: map[string]string{ + "foo": "f1", + "bar": "f2", + }, + } + reqInputDataMeta := &structs.JobDispatchRequest{ + InputData: []byte("hello world"), + Meta: map[string]string{ + "foo": "f1", + "bar": "f2", + }, + } + reqBadMeta := &structs.JobDispatchRequest{ + InputData: []byte("hello world"), + Meta: map[string]string{ + "foo": "f1", + "bar": "f2", + "baz": "f3", + }, + } + reqInputDataTooLarge := &structs.JobDispatchRequest{ + InputData: make([]byte, DispatchInputDataSizeLimit+100), + } + + type testCase struct { + name string + dispatchTemplate *structs.Job + dispatchReq *structs.JobDispatchRequest + err bool + errStr string + } + cases := []testCase{ + { + name: "optional input data w/ data", + dispatchTemplate: d1, + dispatchReq: reqInputDataNoMeta, + err: false, + }, + { + name: "optional input data w/o data", + dispatchTemplate: d1, + dispatchReq: reqNoInputNoMeta, + err: false, + }, + { + name: "require input data w/ data", + dispatchTemplate: d2, + dispatchReq: reqInputDataNoMeta, + err: false, + }, + { + name: "require input data w/o data", + dispatchTemplate: d2, + dispatchReq: reqNoInputNoMeta, + err: true, + errStr: "not provided but required", + }, + { + name: "disallow input data w/o data", + dispatchTemplate: d3, + dispatchReq: reqNoInputNoMeta, + err: false, + }, + { + name: "disallow input data w/ data", + dispatchTemplate: d3, + dispatchReq: reqInputDataNoMeta, + err: true, + errStr: "provided but forbidden", + }, + { + name: "require meta w/ meta", + dispatchTemplate: d4, + dispatchReq: reqInputDataMeta, + err: false, + }, + { + name: "require meta w/o meta", + dispatchTemplate: d4, + dispatchReq: reqNoInputNoMeta, + err: true, + errStr: "did not provide required meta keys", + }, + { + name: "optional meta w/ meta", + dispatchTemplate: d5, + dispatchReq: reqNoInputDataMeta, + err: false, + }, + { + name: "optional meta w/o meta", + dispatchTemplate: d5, + dispatchReq: reqNoInputNoMeta, + err: false, + }, + { + name: "optional meta w/ bad meta", + dispatchTemplate: d5, + dispatchReq: reqBadMeta, + err: true, + errStr: "unpermitted metadata keys", + }, + { + name: "optional input w/ too big of input", + dispatchTemplate: d1, + dispatchReq: reqInputDataTooLarge, + err: true, + errStr: "data exceeds maximum size", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + regReq := &structs.JobRegisterRequest{ + Job: tc.dispatchTemplate, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var regResp structs.JobRegisterResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", regReq, ®Resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Now try to dispatch + tc.dispatchReq.JobID = tc.dispatchTemplate.ID + tc.dispatchReq.WriteRequest = structs.WriteRequest{Region: "global"} + + var dispatchResp structs.JobDispatchResponse + dispatchErr := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", tc.dispatchReq, &dispatchResp) + + if dispatchErr == nil { + if tc.err { + t.Fatalf("Expected error: %v", dispatchErr) + } + + // Check that we got an eval and job id back + if dispatchResp.EvalID == "" || dispatchResp.DispatchedJobID == "" { + t.Fatalf("Bad response") + } + + state := s1.fsm.State() + out, err := state.JobByID(dispatchResp.DispatchedJobID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("expected job") + } + if out.CreateIndex != dispatchResp.JobCreateIndex { + t.Fatalf("index mis-match") + } + + // Lookup the evaluation + eval, err := state.EvalByID(dispatchResp.EvalID) + if err != nil { + t.Fatalf("err: %v", err) + } + if eval == nil { + t.Fatalf("expected eval") + } + if eval.CreateIndex != dispatchResp.EvalCreateIndex { + t.Fatalf("index mis-match") + } + } else { + if !tc.err { + t.Fatalf("Got unexpected error: %v", dispatchErr) + } else if !strings.Contains(dispatchErr.Error(), tc.errStr) { + t.Fatalf("Expected err to include %q; got %v", tc.errStr, dispatchErr) + } + } + }) + } +} From 2868a77d745cd8aa831b4247039f99d307407b70 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sun, 4 Dec 2016 21:22:13 -0800 Subject: [PATCH 09/27] start of the cli command --- api/jobs.go | 29 +++++++++++ command/job.go | 19 +++++++ command/job_dispatch.go | 112 ++++++++++++++++++++++++++++++++++++++++ commands.go | 10 ++++ 4 files changed, 170 insertions(+) create mode 100644 command/job.go create mode 100644 command/job_dispatch.go diff --git a/api/jobs.go b/api/jobs.go index 0458bd02504..81b4983f002 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -168,6 +168,21 @@ func (j *Jobs) Summary(jobID string, q *QueryOptions) (*JobSummary, *QueryMeta, return &resp, qm, nil } +func (j *Jobs) Dispatch(jobID string, meta map[string]string, + inputData []byte, q *WriteOptions) (*JobDispatchResponse, *WriteMeta, error) { + var resp JobDispatchResponse + req := &JobDispatchRequest{ + JobID: jobID, + Meta: meta, + InputData: inputData, + } + wm, err := j.client.write("/v1/job/"+jobID+"/dispatch", req, &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, wm, nil +} + // periodicForceResponse is used to deserialize a force response type periodicForceResponse struct { EvalID string @@ -411,3 +426,17 @@ type DesiredUpdates struct { InPlaceUpdate uint64 DestructiveUpdate uint64 } + +type JobDispatchRequest struct { + JobID string + InputData []byte + Meta map[string]string +} + +type JobDispatchResponse struct { + DispatchedJobID string + EvalID string + EvalCreateIndex uint64 + JobCreateIndex uint64 + QueryMeta +} diff --git a/command/job.go b/command/job.go new file mode 100644 index 00000000000..35477146fa6 --- /dev/null +++ b/command/job.go @@ -0,0 +1,19 @@ +package command + +import "github.com/mitchellh/cli" + +type JobCommand struct { + Meta +} + +func (f *JobCommand) Help() string { + return "This command is accessed by using one of the subcommands below." +} + +func (f *JobCommand) Synopsis() string { + return "Interact with jobs" +} + +func (f *JobCommand) Run(args []string) int { + return cli.RunResultHelp +} diff --git a/command/job_dispatch.go b/command/job_dispatch.go new file mode 100644 index 00000000000..6a2f27ce7f1 --- /dev/null +++ b/command/job_dispatch.go @@ -0,0 +1,112 @@ +package command + +import ( + "fmt" + "io/ioutil" + "os" + "strings" + + flaghelper "github.com/hashicorp/nomad/helper/flag-helpers" +) + +type JobDispatchCommand struct { + Meta +} + +func (c *JobDispatchCommand) Help() string { + helpText := ` +Usage: nomad job dispatch [options] + + +General Options: + + ` + generalOptionsUsage() + ` + +Dispatch Options: + +` + return strings.TrimSpace(helpText) +} + +func (c *JobDispatchCommand) Synopsis() string { + return "Dispatch an instance of a dispatch template" +} + +func (c *JobDispatchCommand) Run(args []string) int { + var detach, verbose bool + var meta []string + var inputFile string + + flags := c.Meta.FlagSet("job dispatch", FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.BoolVar(&detach, "detach", false, "") + flags.BoolVar(&verbose, "verbose", false, "") + flags.StringVar(&inputFile, "input-file", "", "") + flags.Var((*flaghelper.StringFlag)(&meta), "meta", "") + + if err := flags.Parse(args); err != nil { + return 1 + } + + // Truncate the id unless full length is requested + //length := shortId + //if verbose { + //length = fullId + //} + + // Check that we got exactly one node + args = flags.Args() + if len(args) != 1 { + c.Ui.Error(c.Help()) + return 1 + } + + templateJob := args[0] + var inputData []byte + var readErr error + + // If the input data is specified try to read from the file + if inputFile != "" { + inputData, readErr = ioutil.ReadFile(inputFile) + } else { + // Read from stdin + inputData, readErr = ioutil.ReadAll(os.Stdin) + } + if readErr != nil { + c.Ui.Error(fmt.Sprintf("Error reading input data: %v", readErr)) + return 1 + } + + // Build the meta + metaMap := make(map[string]string, len(meta)) + for _, m := range meta { + split := strings.SplitN(m, "=", 2) + if len(split) != 2 { + c.Ui.Error(fmt.Sprintf("Error parsing meta value: %v", m)) + return 1 + } + + metaMap[split[0]] = split[1] + } + + // Get the HTTP client + client, err := c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + // Dispatch the job + resp, _, err := client.Jobs().Dispatch(templateJob, metaMap, inputData, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to dispatch job: %s", err)) + return 1 + } + + basic := []string{ + fmt.Sprintf("Dispatched Job ID|%s", resp.DispatchedJobID), + fmt.Sprintf("Evaluation ID|%s", resp.EvalID), + } + c.Ui.Output(formatKV(basic)) + return 0 +} diff --git a/commands.go b/commands.go index 36f8e92c5f7..08bad16eaee 100644 --- a/commands.go +++ b/commands.go @@ -89,6 +89,16 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "job": func() (cli.Command, error) { + return &command.JobCommand{ + Meta: meta, + }, nil + }, + "job dispatch": func() (cli.Command, error) { + return &command.JobDispatchCommand{ + Meta: meta, + }, nil + }, "logs": func() (cli.Command, error) { return &command.LogsCommand{ Meta: meta, From ef778d7c92644282e71548b96169313b046a475a Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 5 Dec 2016 15:10:23 -0800 Subject: [PATCH 10/27] Dispatch command --- command/job_dispatch.go | 62 ++++++++++++++++++++++++------------ command/job_dispatch_test.go | 43 +++++++++++++++++++++++++ command/run.go | 7 ++-- 3 files changed, 89 insertions(+), 23 deletions(-) create mode 100644 command/job_dispatch_test.go diff --git a/command/job_dispatch.go b/command/job_dispatch.go index 6a2f27ce7f1..c084f9c059a 100644 --- a/command/job_dispatch.go +++ b/command/job_dispatch.go @@ -15,8 +15,16 @@ type JobDispatchCommand struct { func (c *JobDispatchCommand) Help() string { helpText := ` -Usage: nomad job dispatch [options] +Usage: nomad job dispatch [options]