Skip to content

Commit

Permalink
Merge branch 'master' into f-ui/alloc-fs
Browse files Browse the repository at this point in the history
  • Loading branch information
backspace committed Jun 1, 2020
2 parents d2ba531 + 3b04afe commit e634f9c
Show file tree
Hide file tree
Showing 71 changed files with 2,716 additions and 583 deletions.
81 changes: 81 additions & 0 deletions api/ioutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package api

import (
"crypto/md5"
"crypto/sha256"
"crypto/sha512"
"encoding/base64"
"fmt"
"hash"
"io"
"strings"
)

var errMismatchChecksum = fmt.Errorf("mismatch checksum")

// checksumValidatingReader is a wrapper reader that validates
// the checksum of the underlying reader.
type checksumValidatingReader struct {
r io.ReadCloser

// algo is the hash algorithm (e.g. `sha-256`)
algo string

// checksum is the base64 component of checksum
checksum string

// hash is the hashing function used to compute the checksum
hash hash.Hash
}

// newChecksumValidatingReader returns a checksum-validating wrapper reader, according
// to a digest received in HTTP header
//
// The digest must be in the format "<algo>=<base64 of hash>" (e.g. "sha-256=gPelGB7...").
//
// When the reader is fully consumed (i.e. EOT is encountered), if the checksum don't match,
// `Read` returns a checksum mismatch error.
func newChecksumValidatingReader(r io.ReadCloser, digest string) (io.ReadCloser, error) {
parts := strings.SplitN(digest, "=", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("invalid digest format")
}

algo := parts[0]
var hash hash.Hash
switch algo {
case "sha-256":
hash = sha256.New()
case "sha-512":
hash = sha512.New()
case "md5":
hash = md5.New()
}

return &checksumValidatingReader{
r: r,
algo: algo,
checksum: parts[1],
hash: hash,
}, nil
}

func (r *checksumValidatingReader) Read(b []byte) (int, error) {
n, err := r.r.Read(b)
if n != 0 {
r.hash.Write(b[:n])
}

if err == io.EOF || err == io.ErrClosedPipe {
found := base64.StdEncoding.EncodeToString(r.hash.Sum(nil))
if found != r.checksum {
return n, errMismatchChecksum
}
}

return n, err
}

func (r *checksumValidatingReader) Close() error {
return r.r.Close()
}
87 changes: 87 additions & 0 deletions api/ioutil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package api

import (
"bytes"
"crypto/sha256"
"crypto/sha512"
"encoding/base64"
"fmt"
"hash"
"io"
"io/ioutil"
"math/rand"
"testing"
"testing/iotest"

"github.com/stretchr/testify/require"
)

func TestChecksumValidatingReader(t *testing.T) {
data := make([]byte, 4096)
_, err := rand.Read(data)
require.NoError(t, err)

cases := []struct {
algo string
hash hash.Hash
}{
{"sha-256", sha256.New()},
{"sha-512", sha512.New()},
}

for _, c := range cases {
t.Run("valid: "+c.algo, func(t *testing.T) {
_, err := c.hash.Write(data)
require.NoError(t, err)

checksum := c.hash.Sum(nil)
digest := c.algo + "=" + base64.StdEncoding.EncodeToString(checksum)

r := iotest.HalfReader(bytes.NewReader(data))
cr, err := newChecksumValidatingReader(ioutil.NopCloser(r), digest)
require.NoError(t, err)

_, err = io.Copy(ioutil.Discard, cr)
require.NoError(t, err)
})

t.Run("invalid: "+c.algo, func(t *testing.T) {
_, err := c.hash.Write(data)
require.NoError(t, err)

checksum := c.hash.Sum(nil)
// mess up checksum
checksum[0]++
digest := c.algo + "=" + base64.StdEncoding.EncodeToString(checksum)

r := iotest.HalfReader(bytes.NewReader(data))
cr, err := newChecksumValidatingReader(ioutil.NopCloser(r), digest)
require.NoError(t, err)

_, err = io.Copy(ioutil.Discard, cr)
require.Error(t, err)
require.Equal(t, errMismatchChecksum, err)
})
}
}

func TestChecksumValidatingReader_PropagatesError(t *testing.T) {

pr, pw := io.Pipe()
defer pr.Close()
defer pw.Close()

expectedErr := fmt.Errorf("some error")

go func() {
pw.Write([]byte("some input"))
pw.CloseWithError(expectedErr)
}()

cr, err := newChecksumValidatingReader(pr, "sha-256=aaaa")
require.NoError(t, err)

_, err = io.Copy(ioutil.Discard, cr)
require.Error(t, err)
require.Equal(t, expectedErr, err)
}
8 changes: 8 additions & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ func (j *Jobs) PrefixList(prefix string) ([]*JobListStub, *QueryMeta, error) {
return j.List(&QueryOptions{Prefix: prefix})
}

// ListAll is used to list all of the existing jobs in all namespaces.
func (j *Jobs) ListAll() ([]*JobListStub, *QueryMeta, error) {
return j.List(&QueryOptions{
Params: map[string]string{"all_namespaces": "true"},
})
}

// Info is used to retrieve information about a particular
// job given its unique ID.
func (j *Jobs) Info(jobID string, q *QueryOptions) (*Job, *QueryMeta, error) {
Expand Down Expand Up @@ -867,6 +874,7 @@ type JobListStub struct {
ID string
ParentID string
Name string
Namespace string `json:",omitempty"`
Datacenters []string
Type string
Priority int
Expand Down
28 changes: 28 additions & 0 deletions api/operator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package api

import (
"io"
"io/ioutil"
"strconv"
"time"
)
Expand Down Expand Up @@ -194,6 +196,32 @@ func (op *Operator) SchedulerCASConfiguration(conf *SchedulerConfiguration, q *W
return &out, wm, nil
}

// Snapshot is used to capture a snapshot state of a running cluster.
// The returned reader that must be consumed fully
func (op *Operator) Snapshot(q *QueryOptions) (io.ReadCloser, error) {
r, err := op.c.newRequest("GET", "/v1/operator/snapshot")
if err != nil {
return nil, err
}
r.setQueryOptions(q)
_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return nil, err
}

digest := resp.Header.Get("Digest")

cr, err := newChecksumValidatingReader(resp.Body, digest)
if err != nil {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()

return nil, err
}

return cr, nil
}

type License struct {
// The unique identifier of the license
LicenseID string
Expand Down
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
invalidAllocs: make(map[string]struct{}),
serversContactedCh: make(chan struct{}),
serversContactedOnce: sync.Once{},
EnterpriseClient: newEnterpriseClient(),
EnterpriseClient: newEnterpriseClient(logger),
}

c.batchNodeUpdates = newBatchNodeUpdates(
Expand Down
4 changes: 3 additions & 1 deletion client/enterprise_client_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

package client

import hclog "github.com/hashicorp/go-hclog"

// EnterpriseClient holds information and methods for enterprise functionality
type EnterpriseClient struct{}

func newEnterpriseClient() *EnterpriseClient {
func newEnterpriseClient(logger hclog.Logger) *EnterpriseClient {
return &EnterpriseClient{}
}

Expand Down
2 changes: 1 addition & 1 deletion command/agent/config_oss.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// +build !pro,!ent
// +build !ent

package agent

Expand Down
1 change: 1 addition & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/operator/raft/", s.wrap(s.OperatorRequest))
s.mux.HandleFunc("/v1/operator/autopilot/configuration", s.wrap(s.OperatorAutopilotConfiguration))
s.mux.HandleFunc("/v1/operator/autopilot/health", s.wrap(s.OperatorServerHealth))
s.mux.HandleFunc("/v1/operator/snapshot", s.wrap(s.SnapshotRequest))

s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest))
s.mux.HandleFunc("/v1/system/reconcile/summaries", s.wrap(s.ReconcileJobSummaries))
Expand Down
1 change: 1 addition & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (s *HTTPServer) jobListRequest(resp http.ResponseWriter, req *http.Request)
return nil, nil
}

args.AllNamespaces, _ = strconv.ParseBool(req.URL.Query().Get("all_namespaces"))
var out structs.JobListResponse
if err := s.agent.RPC("Job.List", &args, &out); err != nil {
return nil, err
Expand Down
40 changes: 40 additions & 0 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,46 @@ func TestHTTP_PrefixJobsList(t *testing.T) {
})
}

func TestHTTP_JobsList_AllNamespaces_OSS(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
for i := 0; i < 3; i++ {
// Create the job
job := mock.Job()
args := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var resp structs.JobRegisterResponse
err := s.Agent.RPC("Job.Register", &args, &resp)
require.NoError(t, err)
}

// Make the HTTP request
req, err := http.NewRequest("GET", "/v1/jobs?all_namespaces=true", nil)
require.NoError(t, err)
respW := httptest.NewRecorder()

// Make the request
obj, err := s.Server.JobsRequest(respW, req)
require.NoError(t, err)

// Check for the index
require.NotEmpty(t, respW.HeaderMap.Get("X-Nomad-Index"), "missing index")
require.Equal(t, "true", respW.HeaderMap.Get("X-Nomad-KnownLeader"), "missing known leader")
require.NotEmpty(t, respW.HeaderMap.Get("X-Nomad-LastContact"), "missing last contact")

// Check the job
j := obj.([]*structs.JobListStub)
require.Len(t, j, 3)

require.Equal(t, "default", j[0].Namespace)
})
}

func TestHTTP_JobsRegister(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
Expand Down
Loading

0 comments on commit e634f9c

Please sign in to comment.