Skip to content

Commit

Permalink
Merge pull request #8131 from hashicorp/f-snapshot-restore
Browse files Browse the repository at this point in the history
Implement snapshot restore
  • Loading branch information
Mahmood Ali authored Jun 15, 2020
2 parents 723437f + 14cd3da commit c52a290
Show file tree
Hide file tree
Showing 19 changed files with 920 additions and 37 deletions.
9 changes: 8 additions & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,8 +943,15 @@ func decodeBody(resp *http.Response, out interface{}) error {
}
}

// encodeBody is used to encode a request body
// encodeBody prepares the reader to serve as the request body.
//
// Returns the `obj` input if it is a raw io.Reader object; otherwise
// returns a reader of the json format of the passed argument.
func encodeBody(obj interface{}) (io.Reader, error) {
if reader, ok := obj.(io.Reader); ok {
return reader, nil
}

buf := bytes.NewBuffer(nil)
enc := json.NewEncoder(buf)
if err := enc.Encode(obj); err != nil {
Expand Down
11 changes: 11 additions & 0 deletions api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,17 @@ func (op *Operator) Snapshot(q *QueryOptions) (io.ReadCloser, error) {
return cr, nil
}

// SnapshotRestore is used to restore a running nomad cluster to an original
// state.
func (op *Operator) SnapshotRestore(in io.Reader, q *WriteOptions) (*WriteMeta, error) {
wm, err := op.c.write("/v1/operator/snapshot", in, nil, q)
if err != nil {
return nil, err
}

return wm, nil
}

type License struct {
// The unique identifier of the license
LicenseID string
Expand Down
93 changes: 92 additions & 1 deletion command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/api"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
)
Expand Down Expand Up @@ -292,6 +293,8 @@ func (s *HTTPServer) SnapshotRequest(resp http.ResponseWriter, req *http.Request
switch req.Method {
case "GET":
return s.snapshotSaveRequest(resp, req)
case "PUT", "POST":
return s.snapshotRestoreRequest(resp, req)
default:
return nil, CodedError(405, ErrInvalidMethod)
}
Expand Down Expand Up @@ -331,7 +334,7 @@ func (s *HTTPServer) snapshotSaveRequest(resp http.ResponseWriter, req *http.Req
httpPipe.Close()
}()

errCh := make(chan HTTPCodedError, 1)
errCh := make(chan HTTPCodedError, 2)
go func() {
defer cancel()

Expand Down Expand Up @@ -372,3 +375,91 @@ func (s *HTTPServer) snapshotSaveRequest(resp http.ResponseWriter, req *http.Req

return nil, codedErr
}

func (s *HTTPServer) snapshotRestoreRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := &structs.SnapshotRestoreRequest{}
s.parseWriteRequest(req, &args.WriteRequest)

var handler structs.StreamingRpcHandler
var handlerErr error

if server := s.agent.Server(); server != nil {
handler, handlerErr = server.StreamingRpcHandler("Operator.SnapshotRestore")
} else if client := s.agent.Client(); client != nil {
handler, handlerErr = client.RemoteStreamingRpcHandler("Operator.SnapshotRestore")
} else {
handlerErr = fmt.Errorf("misconfigured connection")
}

if handlerErr != nil {
return nil, CodedError(500, handlerErr.Error())
}

httpPipe, handlerPipe := net.Pipe()
decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle)
encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle)

// Create a goroutine that closes the pipe if the connection closes.
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
go func() {
<-ctx.Done()
httpPipe.Close()
}()

errCh := make(chan HTTPCodedError, 2)
go func() {
defer cancel()

// Send the request
if err := encoder.Encode(args); err != nil {
errCh <- CodedError(500, err.Error())
return
}

go func() {
var wrapper cstructs.StreamErrWrapper
bytes := make([]byte, 1024)

for {
n, err := req.Body.Read(bytes)
if n > 0 {
wrapper.Payload = bytes[:n]
err := encoder.Encode(wrapper)
if err != nil {
errCh <- CodedError(500, err.Error())
return
}
}
if err != nil {
wrapper.Payload = nil
wrapper.Error = &cstructs.RpcError{Message: err.Error()}
err := encoder.Encode(wrapper)
if err != nil {
errCh <- CodedError(500, err.Error())
}
return
}
}
}()

var res structs.SnapshotRestoreResponse
if err := decoder.Decode(&res); err != nil {
errCh <- CodedError(500, err.Error())
return
}

if res.ErrorMsg != "" {
errCh <- CodedError(res.ErrorCode, res.ErrorMsg)
return
}

errCh <- nil
}()

handler(handlerPipe)
cancel()
codedErr := <-errCh

return nil, codedErr
}
70 changes: 66 additions & 4 deletions command/agent/operator_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ import (
"net/http/httptest"
"os"
"path"
"path/filepath"
"strings"
"testing"
"time"

"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -389,25 +391,44 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
})
}

func TestOperator_SnapshotSaveRequest(t *testing.T) {
func TestOperator_SnapshotRequests(t *testing.T) {
t.Parallel()

////// Nomad clusters topology - not specific to test
dir, err := ioutil.TempDir("", "nomadtest-operator-")
require.NoError(t, err)
defer os.RemoveAll(dir)

snapshotPath := filepath.Join(dir, "snapshot.bin")
job := mock.Job()

// test snapshot generation
httpTest(t, func(c *Config) {
c.Server.BootstrapExpect = 1
c.DevMode = false
c.DataDir = path.Join(dir, "server")
c.AdvertiseAddrs.HTTP = "127.0.0.1"
c.AdvertiseAddrs.RPC = "127.0.0.1"
c.AdvertiseAddrs.Serf = "127.0.0.1"

// don't actually run the job
c.Client.Enabled = false
}, func(s *TestAgent) {
// make a simple update
jargs := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var jresp structs.JobRegisterResponse
err := s.Agent.RPC("Job.Register", &jargs, &jresp)
require.NoError(t, err)

// now actually snapshot
req, _ := http.NewRequest("GET", "/v1/operator/snapshot", nil)
resp := httptest.NewRecorder()
_, err := s.Server.SnapshotRequest(resp, req)
_, err = s.Server.SnapshotRequest(resp, req)
require.NoError(t, err)
require.Equal(t, 200, resp.Code)

Expand All @@ -416,11 +437,52 @@ func TestOperator_SnapshotSaveRequest(t *testing.T) {
require.Contains(t, digest, "sha-256=")

hash := sha256.New()
_, err = io.Copy(hash, resp.Body)
f, err := os.Create(snapshotPath)
require.NoError(t, err)
defer f.Close()

_, err = io.Copy(io.MultiWriter(f, hash), resp.Body)
require.NoError(t, err)

expectedChecksum := "sha-256=" + base64.StdEncoding.EncodeToString(hash.Sum(nil))
require.Equal(t, digest, expectedChecksum)
})

// test snapshot restoration
httpTest(t, func(c *Config) {
c.Server.BootstrapExpect = 1
c.DevMode = false
c.DataDir = path.Join(dir, "server2")
c.AdvertiseAddrs.HTTP = "127.0.0.1"
c.AdvertiseAddrs.RPC = "127.0.0.1"
c.AdvertiseAddrs.Serf = "127.0.0.1"

// don't actually run the job
c.Client.Enabled = false
}, func(s *TestAgent) {
jobExists := func() bool {
// check job isn't present
req, _ := http.NewRequest("GET", "/v1/job/"+job.ID, nil)
resp := httptest.NewRecorder()
j, _ := s.Server.jobCRUD(resp, req, job.ID)
return j != nil
}

// job doesn't get initially
require.False(t, jobExists())

// restrore and check if job exists after
f, err := os.Open(snapshotPath)
require.NoError(t, err)
defer f.Close()

req, _ := http.NewRequest("PUT", "/v1/operator/snapshot", f)
resp := httptest.NewRecorder()
_, err = s.Server.SnapshotRequest(resp, req)
require.NoError(t, err)
require.Equal(t, 200, resp.Code)

require.True(t, jobExists())
})

}
3 changes: 2 additions & 1 deletion command/agent/testagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ RETRY:

func (a *TestAgent) start() (*Agent, error) {
if a.LogOutput == nil {
a.LogOutput = testlog.NewWriter(a.T)
prefix := fmt.Sprintf("%v:%v ", a.Config.BindAddr, a.Config.Ports.RPC)
a.LogOutput = testlog.NewPrefixWriter(a.T, prefix)
}

inm := metrics.NewInmemSink(10*time.Second, time.Minute)
Expand Down
5 changes: 5 additions & 0 deletions command/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"operator snapshot restore": func() (cli.Command, error) {
return &OperatorSnapshotRestoreCommand{
Meta: meta,
}, nil
},

"plan": func() (cli.Command, error) {
return &JobPlanCommand{
Expand Down
11 changes: 8 additions & 3 deletions command/operator_snapshot_inspect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"path/filepath"
"testing"

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/command/agent"
"github.com/mitchellh/cli"
"github.com/stretchr/testify/require"
Expand All @@ -14,7 +15,7 @@ import (
func TestOperatorSnapshotInspect_Works(t *testing.T) {
t.Parallel()

snapPath := generateSnapshotFile(t)
snapPath := generateSnapshotFile(t, nil)

ui := new(cli.MockUi)
cmd := &OperatorSnapshotInspectCommand{Meta: Meta{Ui: ui}}
Expand Down Expand Up @@ -67,14 +68,14 @@ func TestOperatorSnapshotInspect_HandlesFailure(t *testing.T) {

}

func generateSnapshotFile(t *testing.T) string {
func generateSnapshotFile(t *testing.T, prepare func(srv *agent.TestAgent, client *api.Client, url string)) string {

tmpDir, err := ioutil.TempDir("", "nomad-tempdir")
require.NoError(t, err)

t.Cleanup(func() { os.RemoveAll(tmpDir) })

srv, _, url := testServer(t, false, func(c *agent.Config) {
srv, api, url := testServer(t, false, func(c *agent.Config) {
c.DevMode = false
c.DataDir = filepath.Join(tmpDir, "server")

Expand All @@ -85,6 +86,10 @@ func generateSnapshotFile(t *testing.T) string {

defer srv.Shutdown()

if prepare != nil {
prepare(srv, api, url)
}

ui := new(cli.MockUi)
cmd := &OperatorSnapshotSaveCommand{Meta: Meta{Ui: ui}}

Expand Down
Loading

0 comments on commit c52a290

Please sign in to comment.