From 97fb054c9da15998572b9d7ace5208d6ac934215 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 4 Jun 2020 17:14:43 -0400 Subject: [PATCH 01/11] basic snapshot restore --- command/agent/operator_endpoint.go | 93 ++++++++- command/agent/operator_endpoint_test.go | 70 ++++++- helper/pool/pool.go | 21 +- nomad/operator_endpoint.go | 123 +++++++++++- nomad/operator_endpoint_test.go | 252 ++++++++++++++++++++++++ nomad/structs/operator.go | 11 ++ 6 files changed, 560 insertions(+), 10 deletions(-) diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index 07be2b1673e..ed4a3c4cb73 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/nomad/api" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/raft" ) @@ -292,6 +293,8 @@ func (s *HTTPServer) SnapshotRequest(resp http.ResponseWriter, req *http.Request switch req.Method { case "GET": return s.snapshotSaveRequest(resp, req) + case "PUT", "POST": + return s.snapshotRestoreRequest(resp, req) default: return nil, CodedError(405, ErrInvalidMethod) } @@ -331,7 +334,7 @@ func (s *HTTPServer) snapshotSaveRequest(resp http.ResponseWriter, req *http.Req httpPipe.Close() }() - errCh := make(chan HTTPCodedError, 1) + errCh := make(chan HTTPCodedError, 2) go func() { defer cancel() @@ -372,3 +375,91 @@ func (s *HTTPServer) snapshotSaveRequest(resp http.ResponseWriter, req *http.Req return nil, codedErr } + +func (s *HTTPServer) snapshotRestoreRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + args := &structs.SnapshotRestoreRequest{} + s.parseWriteRequest(req, &args.WriteRequest) + + var handler structs.StreamingRpcHandler + var handlerErr error + + if server := s.agent.Server(); server != nil { + handler, handlerErr = server.StreamingRpcHandler("Operator.SnapshotRestore") + } else if client := s.agent.Client(); client != nil { + handler, handlerErr = client.RemoteStreamingRpcHandler("Operator.SnapshotRestore") + } else { + handlerErr = fmt.Errorf("misconfigured connection") + } + + if handlerErr != nil { + return nil, CodedError(500, handlerErr.Error()) + } + + httpPipe, handlerPipe := net.Pipe() + decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle) + encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle) + + // Create a goroutine that closes the pipe if the connection closes. + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + go func() { + <-ctx.Done() + httpPipe.Close() + }() + + errCh := make(chan HTTPCodedError, 2) + go func() { + defer cancel() + + // Send the request + if err := encoder.Encode(args); err != nil { + errCh <- CodedError(500, err.Error()) + return + } + + go func() { + var wrapper cstructs.StreamErrWrapper + bytes := make([]byte, 1024) + + for { + n, err := req.Body.Read(bytes) + if n > 0 { + wrapper.Payload = bytes[:n] + err := encoder.Encode(wrapper) + if err != nil { + errCh <- CodedError(500, err.Error()) + return + } + } + if err != nil { + wrapper.Payload = nil + wrapper.Error = &cstructs.RpcError{Message: err.Error()} + err := encoder.Encode(wrapper) + if err != nil { + errCh <- CodedError(500, err.Error()) + } + return + } + } + }() + + var res structs.SnapshotRestoreResponse + if err := decoder.Decode(&res); err != nil { + errCh <- CodedError(500, err.Error()) + return + } + + if res.ErrorMsg != "" { + errCh <- CodedError(res.ErrorCode, res.ErrorMsg) + return + } + + errCh <- nil + }() + + handler(handlerPipe) + cancel() + codedErr := <-errCh + + return nil, codedErr +} diff --git a/command/agent/operator_endpoint_test.go b/command/agent/operator_endpoint_test.go index f096d049fb0..8814fad4fa2 100644 --- a/command/agent/operator_endpoint_test.go +++ b/command/agent/operator_endpoint_test.go @@ -11,12 +11,14 @@ import ( "net/http/httptest" "os" "path" + "path/filepath" "strings" "testing" "time" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -389,14 +391,17 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) { }) } -func TestOperator_SnapshotSaveRequest(t *testing.T) { +func TestOperator_SnapshotRequests(t *testing.T) { t.Parallel() - ////// Nomad clusters topology - not specific to test dir, err := ioutil.TempDir("", "nomadtest-operator-") require.NoError(t, err) defer os.RemoveAll(dir) + snapshotPath := filepath.Join(dir, "snapshot.bin") + job := mock.Job() + + // test snapshot generation httpTest(t, func(c *Config) { c.Server.BootstrapExpect = 1 c.DevMode = false @@ -404,10 +409,26 @@ func TestOperator_SnapshotSaveRequest(t *testing.T) { c.AdvertiseAddrs.HTTP = "127.0.0.1" c.AdvertiseAddrs.RPC = "127.0.0.1" c.AdvertiseAddrs.Serf = "127.0.0.1" + + // don't actually run the job + c.Client.Enabled = false }, func(s *TestAgent) { + // make a simple update + jargs := structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: structs.DefaultNamespace, + }, + } + var jresp structs.JobRegisterResponse + err := s.Agent.RPC("Job.Register", &jargs, &jresp) + require.NoError(t, err) + + // now actually snapshot req, _ := http.NewRequest("GET", "/v1/operator/snapshot", nil) resp := httptest.NewRecorder() - _, err := s.Server.SnapshotRequest(resp, req) + _, err = s.Server.SnapshotRequest(resp, req) require.NoError(t, err) require.Equal(t, 200, resp.Code) @@ -416,11 +437,52 @@ func TestOperator_SnapshotSaveRequest(t *testing.T) { require.Contains(t, digest, "sha-256=") hash := sha256.New() - _, err = io.Copy(hash, resp.Body) + f, err := os.Create(snapshotPath) + require.NoError(t, err) + defer f.Close() + + _, err = io.Copy(io.MultiWriter(f, hash), resp.Body) require.NoError(t, err) expectedChecksum := "sha-256=" + base64.StdEncoding.EncodeToString(hash.Sum(nil)) require.Equal(t, digest, expectedChecksum) }) + // test snapshot restoration + httpTest(t, func(c *Config) { + c.Server.BootstrapExpect = 1 + c.DevMode = false + c.DataDir = path.Join(dir, "server2") + c.AdvertiseAddrs.HTTP = "127.0.0.1" + c.AdvertiseAddrs.RPC = "127.0.0.1" + c.AdvertiseAddrs.Serf = "127.0.0.1" + + // don't actually run the job + c.Client.Enabled = false + }, func(s *TestAgent) { + jobExists := func() bool { + // check job isn't present + req, _ := http.NewRequest("GET", "/v1/job/"+job.ID, nil) + resp := httptest.NewRecorder() + j, _ := s.Server.jobCRUD(resp, req, job.ID) + return j != nil + } + + // job doesn't get initially + require.False(t, jobExists()) + + // restrore and check if job exists after + f, err := os.Open(snapshotPath) + require.NoError(t, err) + defer f.Close() + + req, _ := http.NewRequest("PUT", "/v1/operator/snapshot", f) + resp := httptest.NewRecorder() + _, err = s.Server.SnapshotRequest(resp, req) + require.NoError(t, err) + require.Equal(t, 200, resp.Code) + + require.True(t, jobExists()) + }) + } diff --git a/helper/pool/pool.go b/helper/pool/pool.go index bc2d40a214e..a774a2cb26b 100644 --- a/helper/pool/pool.go +++ b/helper/pool/pool.go @@ -305,8 +305,12 @@ func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, er return nil, fmt.Errorf("rpc error: lead thread didn't get connection") } -// getNewConn is used to return a new connection -func (p *ConnPool) getNewConn(region string, addr net.Addr, version int) (*Conn, error) { +type HalfCloserConn interface { + net.Conn + CloseWrite() error +} + +func (p *ConnPool) DialTimeout(region string, addr net.Addr, version int, mode RPCType) (net.Conn, error) { // Try to dial the conn conn, err := net.DialTimeout("tcp", addr.String(), 10*time.Second) if err != nil { @@ -337,11 +341,22 @@ func (p *ConnPool) getNewConn(region string, addr net.Addr, version int) (*Conn, } // Write the multiplex byte to set the mode - if _, err := conn.Write([]byte{byte(RpcMultiplexV2)}); err != nil { + if _, err := conn.Write([]byte{byte(mode)}); err != nil { conn.Close() return nil, err } + return conn, nil +} + +// getNewConn is used to return a new connection +func (p *ConnPool) getNewConn(region string, addr net.Addr, version int) (*Conn, error) { + + conn, err := p.DialTimeout(region, addr, version, RpcMultiplexV2) + if err != nil { + return nil, err + } + // Setup the logger conf := yamux.DefaultConfig() conf.LogOutput = nil diff --git a/nomad/operator_endpoint.go b/nomad/operator_endpoint.go index fb3e4965563..5fb4f70c65a 100644 --- a/nomad/operator_endpoint.go +++ b/nomad/operator_endpoint.go @@ -1,6 +1,7 @@ package nomad import ( + "errors" "fmt" "io" "net" @@ -9,6 +10,7 @@ import ( "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/consul/agent/consul/autopilot" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/snapshot" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/raft" @@ -23,6 +25,7 @@ type Operator struct { func (op *Operator) register() { op.srv.streamingRpcs.Register("Operator.SnapshotSave", op.snapshotSave) + op.srv.streamingRpcs.Register("Operator.SnapshotRestore", op.snapshotRestore) } // RaftGetConfiguration is used to retrieve the current Raft configuration. @@ -459,8 +462,7 @@ func (op *Operator) snapshotSave(conn io.ReadWriteCloser) { } defer snap.Close() - enc := codec.NewEncoder(conn, structs.MsgpackHandle) - if err := enc.Encode(&reply); err != nil { + if err := encoder.Encode(&reply); err != nil { handleFailure(500, fmt.Errorf("failed to encode response: %v", err)) return } @@ -470,3 +472,120 @@ func (op *Operator) snapshotSave(conn io.ReadWriteCloser) { } } } + +func (op *Operator) snapshotRestore(conn io.ReadWriteCloser) { + defer conn.Close() + + var args structs.SnapshotRestoreRequest + var reply structs.SnapshotRestoreResponse + decoder := codec.NewDecoder(conn, structs.MsgpackHandle) + encoder := codec.NewEncoder(conn, structs.MsgpackHandle) + + handleFailure := func(code int, err error) { + encoder.Encode(&structs.SnapshotRestoreResponse{ + ErrorCode: code, + ErrorMsg: err.Error(), + }) + } + + if err := decoder.Decode(&args); err != nil { + handleFailure(500, err) + return + } + + // Forward to appropriate region + if args.Region != op.srv.Region() { + err := op.forwardStreamingRPC(args.Region, "Operator.SnapshotRestore", args, conn) + if err != nil { + handleFailure(500, err) + } + return + } + + // forward to leader + remoteServer, err := op.srv.getLeaderForRPC() + if err != nil { + handleFailure(500, err) + return + } + if remoteServer != nil { + err := op.forwardStreamingRPCToServer(remoteServer, "Operator.SnapshotRestore", args, conn) + if err != nil { + handleFailure(500, err) + } + return + + } + + // Check agent permissions + if aclObj, err := op.srv.ResolveToken(args.AuthToken); err != nil { + code := 500 + if err == structs.ErrTokenNotFound { + code = 400 + } + handleFailure(code, err) + return + } else if aclObj != nil && !aclObj.IsManagement() { + handleFailure(403, structs.ErrPermissionDenied) + return + } + + op.srv.setQueryMeta(&reply.QueryMeta) + + reader, errCh := decodeStreamOutput(decoder) + + err = snapshot.Restore(op.logger.Named("snapshot"), reader, op.srv.raft) + if err != nil { + handleFailure(500, fmt.Errorf("failed to restore from snapshot: %v", err)) + return + } + + err = <-errCh + if err != nil { + handleFailure(400, fmt.Errorf("failed to read stream: %v", err)) + return + } + + reply.Index, _ = op.srv.State().LatestIndex() + op.srv.setQueryMeta(&reply.QueryMeta) + encoder.Encode(reply) +} + +func decodeStreamOutput(decoder *codec.Decoder) (io.Reader, <-chan error) { + pr, pw := io.Pipe() + errCh := make(chan error, 1) + + go func() { + defer close(errCh) + + for { + var wrapper cstructs.StreamErrWrapper + + err := decoder.Decode(&wrapper) + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to decode input: %v", err)) + errCh <- err + return + } + + if len(wrapper.Payload) != 0 { + _, err = pw.Write(wrapper.Payload) + if err != nil { + pw.CloseWithError(err) + errCh <- err + } + } + + if errW := wrapper.Error; errW != nil { + if errW.Message == io.EOF.Error() { + pw.CloseWithError(io.EOF) + } else { + pw.CloseWithError(errors.New(errW.Message)) + } + return + } + } + }() + + return pr, errCh +} diff --git a/nomad/operator_endpoint_test.go b/nomad/operator_endpoint_test.go index 831f0076a3c..1ae6fb33c3e 100644 --- a/nomad/operator_endpoint_test.go +++ b/nomad/operator_endpoint_test.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/go-msgpack/codec" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/acl" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/freeport" "github.com/hashicorp/nomad/helper/snapshot" "github.com/hashicorp/nomad/helper/uuid" @@ -714,3 +715,254 @@ func TestOperator_SnapshotSave_ACL(t *testing.T) { }) } } + +func TestOperator_SnapshotRestore(t *testing.T) { + targets := []string{"leader", "non_leader", "remote_region"} + + for _, c := range targets { + t.Run(c, func(t *testing.T) { + snap, job := generateSnapshot(t) + + checkFn := func(t *testing.T, s *Server) { + found, err := s.State().JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.Equal(t, job.ID, found.ID) + } + + var req structs.SnapshotRestoreRequest + req.Region = "global" + testRestoreSnapshot(t, &req, snap, c, checkFn) + }) + } +} + +func generateSnapshot(t *testing.T) (*snapshot.Snapshot, *structs.Job) { + dir, err := ioutil.TempDir("", "nomadtest-operator-") + require.NoError(t, err) + t.Cleanup(func() { os.RemoveAll(dir) }) + + s, cleanup := TestServer(t, func(c *Config) { + c.BootstrapExpect = 1 + c.DevMode = false + c.DataDir = path.Join(dir, "server1") + }) + defer cleanup() + + job := mock.Job() + jobReq := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var jobResp structs.JobRegisterResponse + codec := rpcClient(t, s) + err = msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp) + require.NoError(t, err) + + err = s.State().UpsertJob(1000, job) + require.NoError(t, err) + + snapshot, err := snapshot.New(s.logger, s.raft) + require.NoError(t, err) + + t.Cleanup(func() { snapshot.Close() }) + + return snapshot, job +} + +func testRestoreSnapshot(t *testing.T, req *structs.SnapshotRestoreRequest, snapshot io.Reader, target string, + assertionFn func(t *testing.T, server *Server)) { + + ////// Nomad clusters topology - not specific to test + dir, err := ioutil.TempDir("", "nomadtest-operator-") + require.NoError(t, err) + defer os.RemoveAll(dir) + + server1, cleanupLS := TestServer(t, func(c *Config) { + c.BootstrapExpect = 2 + c.DevMode = false + c.DataDir = path.Join(dir, "server1") + }) + defer cleanupLS() + + server2, cleanupRS := TestServer(t, func(c *Config) { + c.BootstrapExpect = 2 + c.DevMode = false + c.DataDir = path.Join(dir, "server2") + }) + defer cleanupRS() + + remoteRegionServer, cleanupRRS := TestServer(t, func(c *Config) { + c.Region = "two" + c.DevMode = false + c.DataDir = path.Join(dir, "remote_region_server") + }) + defer cleanupRRS() + + TestJoin(t, server1, server2) + TestJoin(t, server1, remoteRegionServer) + testutil.WaitForLeader(t, server1.RPC) + testutil.WaitForLeader(t, server2.RPC) + testutil.WaitForLeader(t, remoteRegionServer.RPC) + + leader, nonLeader := server1, server2 + if server2.IsLeader() { + leader, nonLeader = server2, server1 + } + + ///////// Actually run query now + mapping := map[string]*Server{ + "leader": leader, + "non_leader": nonLeader, + "remote_region": remoteRegionServer, + } + + server := mapping[target] + require.NotNil(t, server, "target not found") + + handler, err := server.StreamingRpcHandler("Operator.SnapshotRestore") + require.NoError(t, err) + + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + // start handler + go handler(p2) + + var resp structs.SnapshotRestoreResponse + + // send request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + err = encoder.Encode(req) + require.NoError(t, err) + + buf := make([]byte, 1024) + for { + n, err := snapshot.Read(buf) + if n > 0 { + require.NoError(t, encoder.Encode(&cstructs.StreamErrWrapper{Payload: buf[:n]})) + } + if err != nil { + require.NoError(t, encoder.Encode(&cstructs.StreamErrWrapper{Error: &cstructs.RpcError{Message: err.Error()}})) + break + } + } + + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + err = decoder.Decode(&resp) + require.NoError(t, err) + require.Empty(t, resp.ErrorMsg) + + t.Run("checking leader state", func(t *testing.T) { + assertionFn(t, leader) + }) + + t.Run("checking nonleader state", func(t *testing.T) { + assertionFn(t, leader) + }) +} + +func TestOperator_SnapshotRestore_ACL(t *testing.T) { + t.Parallel() + + dir, err := ioutil.TempDir("", "nomadtest-operator-") + require.NoError(t, err) + defer os.RemoveAll(dir) + + ///////// Actually run query now + cases := []struct { + name string + errCode int + err error + }{ + {"root", 0, nil}, + {"no_permission_token", 403, structs.ErrPermissionDenied}, + {"invalid token", 400, structs.ErrTokenNotFound}, + {"unauthenticated", 403, structs.ErrPermissionDenied}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + snapshot, _ := generateSnapshot(t) + + s, root, cleanupLS := TestACLServer(t, func(cfg *Config) { + cfg.BootstrapExpect = 1 + cfg.DevMode = false + cfg.DataDir = path.Join(dir, "server_"+c.name) + }) + defer cleanupLS() + + testutil.WaitForLeader(t, s.RPC) + + deniedToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite)) + + token := "" + switch c.name { + case "root": + token = root.SecretID + case "no_permission_token": + token = deniedToken.SecretID + case "invalid token": + token = uuid.Generate() + case "unauthenticated": + token = "" + default: + t.Fatalf("unexpected case: %v", c.name) + } + + handler, err := s.StreamingRpcHandler("Operator.SnapshotRestore") + require.NoError(t, err) + + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + // start handler + go handler(p2) + + var req structs.SnapshotRestoreRequest + var resp structs.SnapshotRestoreResponse + + req.Region = "global" + req.AuthToken = token + + // send request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + err = encoder.Encode(&req) + require.NoError(t, err) + + if c.err == nil { + buf := make([]byte, 1024) + for { + n, err := snapshot.Read(buf) + if n > 0 { + require.NoError(t, encoder.Encode(&cstructs.StreamErrWrapper{Payload: buf[:n]})) + } + if err != nil { + require.NoError(t, encoder.Encode(&cstructs.StreamErrWrapper{Error: &cstructs.RpcError{Message: err.Error()}})) + break + } + } + } + + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + err = decoder.Decode(&resp) + require.NoError(t, err) + + // streaming errors appear as a response rather than a returned error + if c.err != nil { + require.Equal(t, c.err.Error(), resp.ErrorMsg) + require.Equal(t, c.errCode, resp.ErrorCode) + return + + } + + require.NotZero(t, resp.Index) + + io.Copy(ioutil.Discard, p1) + }) + } +} diff --git a/nomad/structs/operator.go b/nomad/structs/operator.go index c654457a14f..8a3afef9f15 100644 --- a/nomad/structs/operator.go +++ b/nomad/structs/operator.go @@ -246,3 +246,14 @@ type SnapshotSaveResponse struct { QueryMeta } + +type SnapshotRestoreRequest struct { + WriteRequest +} + +type SnapshotRestoreResponse struct { + ErrorCode int `codec:",omitempty"` + ErrorMsg string `codec:",omitempty"` + + QueryMeta +} From b73e7b026546201f9656ccba3beda1fc7383ffb6 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 4 Jun 2020 18:11:17 -0400 Subject: [PATCH 02/11] reassert leadership --- nomad/leader.go | 17 +++++++++++++++++ nomad/operator_endpoint.go | 36 ++++++++++++++++++++++++++++++++++++ nomad/server.go | 36 ++++++++++++++++++++---------------- 3 files changed, 73 insertions(+), 16 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index 5d5c417b0ee..11ca5f42fcc 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -189,6 +189,23 @@ WAIT: goto RECONCILE case member := <-reconcileCh: s.reconcileMember(member) + case errCh := <-s.reassertLeaderCh: + // we can get into this state when the initial + // establishLeadership has failed as well as the follow + // up leadershipTransfer. Afterwards we will be waiting + // for the interval to trigger a reconciliation and can + // potentially end up here. There is no point to + // reassert because this agent was never leader in the + // first place. + if !establishedLeader { + errCh <- fmt.Errorf("leadership has not been established") + continue + } + + // refresh leadership state + s.revokeLeadership() + err := s.establishLeadership(stopCh) + errCh <- err } } } diff --git a/nomad/operator_endpoint.go b/nomad/operator_endpoint.go index 5fb4f70c65a..809b669c629 100644 --- a/nomad/operator_endpoint.go +++ b/nomad/operator_endpoint.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net" + "time" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-msgpack/codec" @@ -546,6 +547,41 @@ func (op *Operator) snapshotRestore(conn io.ReadWriteCloser) { return } + // This'll be used for feedback from the leader loop. + timeoutCh := time.After(time.Minute) + + lerrCh := make(chan error, 1) + + select { + // Tell the leader loop to reassert leader actions since we just + // replaced the state store contents. + case op.srv.reassertLeaderCh <- lerrCh: + + // We might have lost leadership while waiting to kick the loop. + case <-timeoutCh: + handleFailure(500, fmt.Errorf("timed out waiting to re-run leader actions")) + + // Make sure we don't get stuck during shutdown + case <-op.srv.shutdownCh: + } + + select { + // Wait for the leader loop to finish up. + case err := <-lerrCh: + if err != nil { + handleFailure(500, err) + return + } + + // We might have lost leadership while the loop was doing its + // thing. + case <-timeoutCh: + handleFailure(500, fmt.Errorf("timed out waiting for re-run of leader actions")) + + // Make sure we don't get stuck during shutdown + case <-op.srv.shutdownCh: + } + reply.Index, _ = op.srv.State().LatestIndex() op.srv.setQueryMeta(&reply.QueryMeta) encoder.Encode(reply) diff --git a/nomad/server.go b/nomad/server.go index 3acb9a3f579..556dd19fbd1 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -108,6 +108,9 @@ type Server struct { raftInmem *raft.InmemStore raftTransport *raft.NetworkTransport + // reassertLeaderCh is used to signal the leader loop should re-run + reassertLeaderCh chan chan error + // autopilot is the Autopilot instance for this server. autopilot *autopilot.Autopilot @@ -312,22 +315,23 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulACLs consu // Create the server s := &Server{ - config: config, - consulCatalog: consulCatalog, - connPool: pool.NewPool(logger, serverRPCCache, serverMaxStreams, tlsWrap), - logger: logger, - tlsWrap: tlsWrap, - rpcServer: rpc.NewServer(), - streamingRpcs: structs.NewStreamingRpcRegistry(), - nodeConns: make(map[string][]*nodeConnState), - peers: make(map[string][]*serverParts), - localPeers: make(map[raft.ServerAddress]*serverParts), - reconcileCh: make(chan serf.Member, 32), - eventCh: make(chan serf.Event, 256), - evalBroker: evalBroker, - blockedEvals: NewBlockedEvals(evalBroker, logger), - rpcTLS: incomingTLS, - aclCache: aclCache, + config: config, + consulCatalog: consulCatalog, + connPool: pool.NewPool(logger, serverRPCCache, serverMaxStreams, tlsWrap), + logger: logger, + tlsWrap: tlsWrap, + rpcServer: rpc.NewServer(), + streamingRpcs: structs.NewStreamingRpcRegistry(), + nodeConns: make(map[string][]*nodeConnState), + peers: make(map[string][]*serverParts), + localPeers: make(map[raft.ServerAddress]*serverParts), + reassertLeaderCh: make(chan chan error), + reconcileCh: make(chan serf.Member, 32), + eventCh: make(chan serf.Event, 256), + evalBroker: evalBroker, + blockedEvals: NewBlockedEvals(evalBroker, logger), + rpcTLS: incomingTLS, + aclCache: aclCache, } s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background()) From 867147c90efb036827706ea368cd7581b60cdba1 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Sat, 6 Jun 2020 08:19:10 -0400 Subject: [PATCH 03/11] api: add snapshot restore --- api/api.go | 4 ++++ api/operator.go | 11 +++++++++++ vendor/github.com/hashicorp/nomad/api/api.go | 4 ++++ vendor/github.com/hashicorp/nomad/api/operator.go | 11 +++++++++++ 4 files changed, 30 insertions(+) diff --git a/api/api.go b/api/api.go index 95fcd734425..83ccf6bd278 100644 --- a/api/api.go +++ b/api/api.go @@ -945,6 +945,10 @@ func decodeBody(resp *http.Response, out interface{}) error { // encodeBody is used to encode a request body func encodeBody(obj interface{}) (io.Reader, error) { + if reader, ok := obj.(io.Reader); ok { + return reader, nil + } + buf := bytes.NewBuffer(nil) enc := json.NewEncoder(buf) if err := enc.Encode(obj); err != nil { diff --git a/api/operator.go b/api/operator.go index ddcc531e55a..3fad32aa39f 100644 --- a/api/operator.go +++ b/api/operator.go @@ -222,6 +222,17 @@ func (op *Operator) Snapshot(q *QueryOptions) (io.ReadCloser, error) { return cr, nil } +// SnapshotRestore is used to restore a running nomad cluster to an original +// state. +func (op *Operator) SnapshotRestore(in io.Reader, q *WriteOptions) (*WriteMeta, error) { + wm, err := op.c.write("/v1/operator/snapshot", in, nil, q) + if err != nil { + return nil, err + } + + return wm, nil +} + type License struct { // The unique identifier of the license LicenseID string diff --git a/vendor/github.com/hashicorp/nomad/api/api.go b/vendor/github.com/hashicorp/nomad/api/api.go index 95fcd734425..83ccf6bd278 100644 --- a/vendor/github.com/hashicorp/nomad/api/api.go +++ b/vendor/github.com/hashicorp/nomad/api/api.go @@ -945,6 +945,10 @@ func decodeBody(resp *http.Response, out interface{}) error { // encodeBody is used to encode a request body func encodeBody(obj interface{}) (io.Reader, error) { + if reader, ok := obj.(io.Reader); ok { + return reader, nil + } + buf := bytes.NewBuffer(nil) enc := json.NewEncoder(buf) if err := enc.Encode(obj); err != nil { diff --git a/vendor/github.com/hashicorp/nomad/api/operator.go b/vendor/github.com/hashicorp/nomad/api/operator.go index ddcc531e55a..3fad32aa39f 100644 --- a/vendor/github.com/hashicorp/nomad/api/operator.go +++ b/vendor/github.com/hashicorp/nomad/api/operator.go @@ -222,6 +222,17 @@ func (op *Operator) Snapshot(q *QueryOptions) (io.ReadCloser, error) { return cr, nil } +// SnapshotRestore is used to restore a running nomad cluster to an original +// state. +func (op *Operator) SnapshotRestore(in io.Reader, q *WriteOptions) (*WriteMeta, error) { + wm, err := op.c.write("/v1/operator/snapshot", in, nil, q) + if err != nil { + return nil, err + } + + return wm, nil +} + type License struct { // The unique identifier of the license LicenseID string From 669b75d621f58c6388aa0bd318455fefede2adfb Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Sat, 6 Jun 2020 09:16:58 -0400 Subject: [PATCH 04/11] implement snapshot restore CLI --- command/commands.go | 5 ++ command/operator_snapshot_inspect_test.go | 12 ++- command/operator_snapshot_restore.go | 95 ++++++++++++++++++++++ command/operator_snapshot_restore_test.go | 96 +++++++++++++++++++++++ command/operator_snapshot_save_test.go | 18 +++++ 5 files changed, 223 insertions(+), 3 deletions(-) create mode 100644 command/operator_snapshot_restore.go create mode 100644 command/operator_snapshot_restore_test.go diff --git a/command/commands.go b/command/commands.go index 1df3ce78839..7aabefe7fea 100644 --- a/command/commands.go +++ b/command/commands.go @@ -517,6 +517,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "operator snapshot restore": func() (cli.Command, error) { + return &OperatorSnapshotRestoreCommand{ + Meta: meta, + }, nil + }, "plan": func() (cli.Command, error) { return &JobPlanCommand{ diff --git a/command/operator_snapshot_inspect_test.go b/command/operator_snapshot_inspect_test.go index 4997342a072..ac4e462a74c 100644 --- a/command/operator_snapshot_inspect_test.go +++ b/command/operator_snapshot_inspect_test.go @@ -6,6 +6,7 @@ import ( "path/filepath" "testing" + "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/command/agent" "github.com/mitchellh/cli" "github.com/stretchr/testify/require" @@ -14,7 +15,7 @@ import ( func TestOperatorSnapshotInspect_Works(t *testing.T) { t.Parallel() - snapPath := generateSnapshotFile(t) + snapPath := generateSnapshotFile(t, nil) ui := new(cli.MockUi) cmd := &OperatorSnapshotInspectCommand{Meta: Meta{Ui: ui}} @@ -67,17 +68,18 @@ func TestOperatorSnapshotInspect_HandlesFailure(t *testing.T) { } -func generateSnapshotFile(t *testing.T) string { +func generateSnapshotFile(t *testing.T, prepare func(srv *agent.TestAgent, client *api.Client, url string)) string { tmpDir, err := ioutil.TempDir("", "nomad-tempdir") require.NoError(t, err) t.Cleanup(func() { os.RemoveAll(tmpDir) }) - srv, _, url := testServer(t, false, func(c *agent.Config) { + srv, api, url := testServer(t, false, func(c *agent.Config) { c.DevMode = false c.DataDir = filepath.Join(tmpDir, "server") + c.Client.Enabled = false c.AdvertiseAddrs.HTTP = "127.0.0.1" c.AdvertiseAddrs.RPC = "127.0.0.1" c.AdvertiseAddrs.Serf = "127.0.0.1" @@ -85,6 +87,10 @@ func generateSnapshotFile(t *testing.T) string { defer srv.Shutdown() + if prepare != nil { + prepare(srv, api, url) + } + ui := new(cli.MockUi) cmd := &OperatorSnapshotSaveCommand{Meta: Meta{Ui: ui}} diff --git a/command/operator_snapshot_restore.go b/command/operator_snapshot_restore.go new file mode 100644 index 00000000000..efaaf3b70ce --- /dev/null +++ b/command/operator_snapshot_restore.go @@ -0,0 +1,95 @@ +package command + +import ( + "fmt" + "os" + "strings" + + "github.com/hashicorp/nomad/api" + "github.com/posener/complete" +) + +type OperatorSnapshotRestoreCommand struct { + Meta +} + +func (c *OperatorSnapshotRestoreCommand) Help() string { + helpText := ` +Usage: nomad snapshot restore [options] FILE + + Restores an atomic, point-in-time snapshot of the state of the Nomad servers + which includes jobs, nodes, allocations, periodic jobs, and ACLs. + + Restores involve a potentially dangerous low-level Raft operation that is not + designed to handle server failures during a restore. This command is primarily + intended to be used when recovering from a disaster, restoring into a fresh + cluster of Nomad servers. + + If ACLs are enabled, a management token must be supplied in order to perform + snapshot operations. + + To restore a snapshot from the file "backup.snap": + + $ nomad snapshot restore backup.snap + +General Options: + + ` + generalOptionsUsage() + return strings.TrimSpace(helpText) +} + +func (c *OperatorSnapshotRestoreCommand) AutocompleteFlags() complete.Flags { + return c.Meta.AutocompleteFlags(FlagSetClient) +} + +func (c *OperatorSnapshotRestoreCommand) AutocompleteArgs() complete.Predictor { + return complete.PredictNothing +} + +func (c *OperatorSnapshotRestoreCommand) Synopsis() string { + return "Restore snapshot of Nomad server state" +} + +func (c *OperatorSnapshotRestoreCommand) Name() string { return "operator snapshot restore" } + +func (c *OperatorSnapshotRestoreCommand) Run(args []string) int { + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + + if err := flags.Parse(args); err != nil { + c.Ui.Error(fmt.Sprintf("Failed to parse args: %v", err)) + return 1 + } + + // Check for misuse + args = flags.Args() + if len(args) != 1 { + c.Ui.Error("This command takes one: ") + c.Ui.Error(commandErrorText(c)) + return 1 + } + + snap, err := os.Open(args[0]) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error opening snapshot file: %q", err)) + return 1 + } + defer snap.Close() + + // Set up a client. + client, err := c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + // Fetch the current configuration. + _, err = client.Operator().SnapshotRestore(snap, &api.WriteOptions{}) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to get restore snapshot: %v", err)) + return 1 + } + + c.Ui.Output("Snapshot Restored") + return 0 +} diff --git a/command/operator_snapshot_restore_test.go b/command/operator_snapshot_restore_test.go new file mode 100644 index 00000000000..66038ef4979 --- /dev/null +++ b/command/operator_snapshot_restore_test.go @@ -0,0 +1,96 @@ +package command + +import ( + "io/ioutil" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/command/agent" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/mitchellh/cli" + "github.com/stretchr/testify/require" +) + +func TestOperatorSnapshotRestore_Works(t *testing.T) { + t.Parallel() + + tmpDir, err := ioutil.TempDir("", "nomad-tempdir") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + snapshotPath := generateSnapshotFile(t, func(srv *agent.TestAgent, client *api.Client, url string) { + sampleJob := ` +job "snapshot-test-job" { + type = "service" + datacenters = [ "dc1" ] + group "group1" { + count = 1 + task "task1" { + driver = "exec" + resources = { + cpu = 1000 + memory = 512 + } + } + } + +}` + + ui := new(cli.MockUi) + cmd := &JobRunCommand{Meta: Meta{Ui: ui}} + cmd.JobGetter.testStdin = strings.NewReader(sampleJob) + + code := cmd.Run([]string{"--address=" + url, "-detach", "-"}) + require.Zero(t, code) + }) + + srv, _, url := testServer(t, false, func(c *agent.Config) { + c.DevMode = false + c.DataDir = filepath.Join(tmpDir, "server1") + + c.Client.Enabled = false + c.AdvertiseAddrs.HTTP = "127.0.0.1" + c.AdvertiseAddrs.RPC = "127.0.0.1" + c.AdvertiseAddrs.Serf = "127.0.0.1" + }) + + defer srv.Shutdown() + + // job is not found before restore + j, err := srv.Agent.Server().State().JobByID(nil, structs.DefaultNamespace, "snapshot-test-job") + require.NoError(t, err) + require.Nil(t, j) + + ui := new(cli.MockUi) + cmd := &OperatorSnapshotRestoreCommand{Meta: Meta{Ui: ui}} + + code := cmd.Run([]string{"--address=" + url, snapshotPath}) + require.Empty(t, ui.ErrorWriter.String()) + require.Zero(t, code) + require.Contains(t, ui.OutputWriter.String(), "Snapshot Restored") + + foundJob, err := srv.Agent.Server().State().JobByID(nil, structs.DefaultNamespace, "snapshot-test-job") + require.NoError(t, err) + require.Equal(t, "snapshot-test-job", foundJob.ID) +} + +func TestOperatorSnapshotRestore_Fails(t *testing.T) { + t.Parallel() + + ui := new(cli.MockUi) + cmd := &OperatorSnapshotRestoreCommand{Meta: Meta{Ui: ui}} + + // Fails on misuse + code := cmd.Run([]string{"some", "bad", "args"}) + require.Equal(t, 1, code) + require.Contains(t, ui.ErrorWriter.String(), commandErrorText(cmd)) + ui.ErrorWriter.Reset() + + // Fails when specified file does not exist + code = cmd.Run([]string{"/unicorns/leprechauns"}) + require.Equal(t, 1, code) + require.Contains(t, ui.ErrorWriter.String(), "no such file") +} diff --git a/command/operator_snapshot_save_test.go b/command/operator_snapshot_save_test.go index 4723e6d02e8..5a7ae490cd8 100644 --- a/command/operator_snapshot_save_test.go +++ b/command/operator_snapshot_save_test.go @@ -49,3 +49,21 @@ func TestOperatorSnapshotSave_Works(t *testing.T) { require.NoError(t, err) require.NotZero(t, meta.Index) } + +func TestOperatorSnapshotSave_Fails(t *testing.T) { + t.Parallel() + + ui := new(cli.MockUi) + cmd := &OperatorSnapshotSaveCommand{Meta: Meta{Ui: ui}} + + // Fails on misuse + code := cmd.Run([]string{"some", "bad", "args"}) + require.Equal(t, 1, code) + require.Contains(t, ui.ErrorWriter.String(), commandErrorText(cmd)) + ui.ErrorWriter.Reset() + + // Fails when specified file does not exist + code = cmd.Run([]string{"/unicorns/leprechauns"}) + require.Equal(t, 1, code) + require.Contains(t, ui.ErrorWriter.String(), "no such file") +} From 519447d1c00bf2c13a711d5b2892ff887583cc49 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Sun, 7 Jun 2020 09:16:11 -0400 Subject: [PATCH 05/11] tests: prefix agent logs to identify agent sources --- command/agent/testagent.go | 3 ++- helper/testlog/testlog.go | 15 +++++++++------ nomad/testing.go | 18 +++++++++++++++--- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/command/agent/testagent.go b/command/agent/testagent.go index 32b35ffabe9..6504c8cbec0 100644 --- a/command/agent/testagent.go +++ b/command/agent/testagent.go @@ -223,7 +223,8 @@ RETRY: func (a *TestAgent) start() (*Agent, error) { if a.LogOutput == nil { - a.LogOutput = testlog.NewWriter(a.T) + prefix := fmt.Sprintf("%v:%v ", a.Config.BindAddr, a.Config.Ports.RPC) + a.LogOutput = testlog.NewPrefixWriter(a.T, prefix) } inm := metrics.NewInmemSink(10*time.Second, time.Minute) diff --git a/helper/testlog/testlog.go b/helper/testlog/testlog.go index e3834189ba4..28977001ff5 100644 --- a/helper/testlog/testlog.go +++ b/helper/testlog/testlog.go @@ -71,12 +71,15 @@ func (w *prefixStderr) Write(p []byte) (int, error) { } // Skip prefix if only writing whitespace - if len(bytes.TrimSpace(p)) > 0 { - _, err := os.Stderr.Write(w.prefix) - if err != nil { - return 0, err - } + if len(bytes.TrimSpace(p)) == 0 { + return os.Stderr.Write(p) } - return os.Stderr.Write(p) + // decrease likely hood of partial line writes that may mess up test + // indicator success detection + buf := make([]byte, 0, len(w.prefix)+len(p)) + buf = append(buf, w.prefix...) + buf = append(buf, p...) + + return os.Stderr.Write(buf) } diff --git a/nomad/testing.go b/nomad/testing.go index 3beeeb3702c..a93035e6019 100644 --- a/nomad/testing.go +++ b/nomad/testing.go @@ -4,12 +4,14 @@ import ( "fmt" "math/rand" "net" + "os" "sync/atomic" "time" testing "github.com/mitchellh/go-testing-interface" "github.com/pkg/errors" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/freeport" "github.com/hashicorp/nomad/helper/pluginutils/catalog" @@ -49,6 +51,19 @@ func TestServer(t testing.T, cb func(*Config)) (*Server, func()) { nodeNum := atomic.AddUint32(&nodeNumber, 1) config.NodeName = fmt.Sprintf("nomad-%03d", nodeNum) + // configer logger + level := hclog.Trace + if envLogLevel := os.Getenv("NOMAD_TEST_LOG_LEVEL"); envLogLevel != "" { + level = hclog.LevelFromString(envLogLevel) + } + opts := &hclog.LoggerOptions{ + Level: level, + Output: testlog.NewPrefixWriter(t, config.NodeName+" "), + IncludeLocation: true, + } + config.Logger = hclog.NewInterceptLogger(opts) + config.LogOutput = opts.Output + // Tighten the Serf timing config.SerfConfig.MemberlistConfig.BindAddr = "127.0.0.1" config.SerfConfig.MemberlistConfig.SuspicionMult = 2 @@ -67,9 +82,6 @@ func TestServer(t testing.T, cb func(*Config)) (*Server, func()) { f := false config.VaultConfig.Enabled = &f - // Squelch output when -v isn't specified - config.LogOutput = testlog.NewWriter(t) - // Tighten the autopilot timing config.AutopilotConfig.ServerStabilizationTime = 100 * time.Millisecond config.ServerHealthInterval = 50 * time.Millisecond From fad0fafa9fa38a3278bedc244588fdf06c47a503 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Sun, 7 Jun 2020 09:17:37 -0400 Subject: [PATCH 06/11] tests: client already disabled --- command/operator_snapshot_inspect_test.go | 1 - command/operator_snapshot_restore_test.go | 1 - 2 files changed, 2 deletions(-) diff --git a/command/operator_snapshot_inspect_test.go b/command/operator_snapshot_inspect_test.go index ac4e462a74c..a394e657c81 100644 --- a/command/operator_snapshot_inspect_test.go +++ b/command/operator_snapshot_inspect_test.go @@ -79,7 +79,6 @@ func generateSnapshotFile(t *testing.T, prepare func(srv *agent.TestAgent, clien c.DevMode = false c.DataDir = filepath.Join(tmpDir, "server") - c.Client.Enabled = false c.AdvertiseAddrs.HTTP = "127.0.0.1" c.AdvertiseAddrs.RPC = "127.0.0.1" c.AdvertiseAddrs.Serf = "127.0.0.1" diff --git a/command/operator_snapshot_restore_test.go b/command/operator_snapshot_restore_test.go index 66038ef4979..1b448565a2b 100644 --- a/command/operator_snapshot_restore_test.go +++ b/command/operator_snapshot_restore_test.go @@ -51,7 +51,6 @@ job "snapshot-test-job" { c.DevMode = false c.DataDir = filepath.Join(tmpDir, "server1") - c.Client.Enabled = false c.AdvertiseAddrs.HTTP = "127.0.0.1" c.AdvertiseAddrs.RPC = "127.0.0.1" c.AdvertiseAddrs.Serf = "127.0.0.1" From d13a63c8bbf186de9d18e1c5118cbe3d34af05a6 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Sun, 7 Jun 2020 09:18:09 -0400 Subject: [PATCH 07/11] loosen raft timeout --- nomad/operator_endpoint_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/nomad/operator_endpoint_test.go b/nomad/operator_endpoint_test.go index 1ae6fb33c3e..b78e075b6fd 100644 --- a/nomad/operator_endpoint_test.go +++ b/nomad/operator_endpoint_test.go @@ -12,6 +12,7 @@ import ( "reflect" "strings" "testing" + "time" "github.com/hashicorp/go-msgpack/codec" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" @@ -784,6 +785,13 @@ func testRestoreSnapshot(t *testing.T, req *structs.SnapshotRestoreRequest, snap c.BootstrapExpect = 2 c.DevMode = false c.DataDir = path.Join(dir, "server1") + + // increase times outs to account for I/O operations that + // snapshot restore performs - some of which require sync calls + c.RaftConfig.LeaderLeaseTimeout = 1 * time.Second + c.RaftConfig.HeartbeatTimeout = 1 * time.Second + c.RaftConfig.ElectionTimeout = 1 * time.Second + c.RaftTimeout = 5 * time.Second }) defer cleanupLS() @@ -791,6 +799,13 @@ func testRestoreSnapshot(t *testing.T, req *structs.SnapshotRestoreRequest, snap c.BootstrapExpect = 2 c.DevMode = false c.DataDir = path.Join(dir, "server2") + + // increase times outs to account for I/O operations that + // snapshot restore performs - some of which require sync calls + c.RaftConfig.LeaderLeaseTimeout = 1 * time.Second + c.RaftConfig.HeartbeatTimeout = 1 * time.Second + c.RaftConfig.ElectionTimeout = 1 * time.Second + c.RaftTimeout = 5 * time.Second }) defer cleanupRS() From bd48db20e2d62869ebdf6567cf6ec0feb43d9d4f Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 9 Jun 2020 11:26:52 -0400 Subject: [PATCH 08/11] clarify error message Co-authored-by: Tim Gross --- command/operator_snapshot_restore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/command/operator_snapshot_restore.go b/command/operator_snapshot_restore.go index efaaf3b70ce..8fe8f59a5c0 100644 --- a/command/operator_snapshot_restore.go +++ b/command/operator_snapshot_restore.go @@ -64,7 +64,7 @@ func (c *OperatorSnapshotRestoreCommand) Run(args []string) int { // Check for misuse args = flags.Args() if len(args) != 1 { - c.Ui.Error("This command takes one: ") + c.Ui.Error("This command takes one argument: ") c.Ui.Error(commandErrorText(c)) return 1 } From 1f6cb154b9fe4acb96b0375b7dd127b039a61f4f Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 9 Jun 2020 12:00:09 -0400 Subject: [PATCH 09/11] clarify ccomments, esp related to leadership code --- api/api.go | 5 ++++- nomad/leader.go | 17 ++++++++++------- nomad/operator_endpoint.go | 4 ++-- nomad/server.go | 7 ++++++- 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/api/api.go b/api/api.go index 83ccf6bd278..9633efb0ea3 100644 --- a/api/api.go +++ b/api/api.go @@ -943,7 +943,10 @@ func decodeBody(resp *http.Response, out interface{}) error { } } -// encodeBody is used to encode a request body +// encodeBody prepares the reader to serve as the request body. +// +// Returns the `obj` input if it is a raw io.Reader object; otherwise +// returns a reader of the json format of the passed argument. func encodeBody(obj interface{}) (io.Reader, error) { if reader, ok := obj.(io.Reader); ok { return reader, nil diff --git a/nomad/leader.go b/nomad/leader.go index 11ca5f42fcc..ca071d2c832 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -190,13 +190,16 @@ WAIT: case member := <-reconcileCh: s.reconcileMember(member) case errCh := <-s.reassertLeaderCh: - // we can get into this state when the initial - // establishLeadership has failed as well as the follow - // up leadershipTransfer. Afterwards we will be waiting - // for the interval to trigger a reconciliation and can - // potentially end up here. There is no point to - // reassert because this agent was never leader in the - // first place. + // Recompute leader state, by asserting leadership and + // repopulating leader states. + + // Check first if we are indeed the leaders first. We + // can get into this state when the initial + // establishLeadership has failed. + // Afterwards we will be waiting for the interval to + // trigger a reconciliation and can potentially end up + // here. There is no point to reassert because this + // agent was never leader in the first place. if !establishedLeader { errCh <- fmt.Errorf("leadership has not been established") continue diff --git a/nomad/operator_endpoint.go b/nomad/operator_endpoint.go index 809b669c629..965c4bc65b9 100644 --- a/nomad/operator_endpoint.go +++ b/nomad/operator_endpoint.go @@ -553,8 +553,8 @@ func (op *Operator) snapshotRestore(conn io.ReadWriteCloser) { lerrCh := make(chan error, 1) select { - // Tell the leader loop to reassert leader actions since we just - // replaced the state store contents. + // Reassert leader actions and update all leader related state + // with new state store content. case op.srv.reassertLeaderCh <- lerrCh: // We might have lost leadership while waiting to kick the loop. diff --git a/nomad/server.go b/nomad/server.go index 556dd19fbd1..5bb25e6b33e 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -108,7 +108,12 @@ type Server struct { raftInmem *raft.InmemStore raftTransport *raft.NetworkTransport - // reassertLeaderCh is used to signal the leader loop should re-run + // reassertLeaderCh is used to signal that the leader loop must + // re-establish leadership. + // + // This might be relevant in snapshot restores, where leader in-memory + // state changed significantly such that leader state (e.g. periodic + // jobs, eval brokers) need to be recomputed. reassertLeaderCh chan chan error // autopilot is the Autopilot instance for this server. From fbb109201933428f67c4b93bd29fa856522c3cda Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Fri, 12 Jun 2020 14:02:33 -0400 Subject: [PATCH 10/11] revert changes from earlier change --- helper/pool/pool.go | 21 +++------------------ 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/helper/pool/pool.go b/helper/pool/pool.go index a774a2cb26b..bc2d40a214e 100644 --- a/helper/pool/pool.go +++ b/helper/pool/pool.go @@ -305,12 +305,8 @@ func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, er return nil, fmt.Errorf("rpc error: lead thread didn't get connection") } -type HalfCloserConn interface { - net.Conn - CloseWrite() error -} - -func (p *ConnPool) DialTimeout(region string, addr net.Addr, version int, mode RPCType) (net.Conn, error) { +// getNewConn is used to return a new connection +func (p *ConnPool) getNewConn(region string, addr net.Addr, version int) (*Conn, error) { // Try to dial the conn conn, err := net.DialTimeout("tcp", addr.String(), 10*time.Second) if err != nil { @@ -341,22 +337,11 @@ func (p *ConnPool) DialTimeout(region string, addr net.Addr, version int, mode R } // Write the multiplex byte to set the mode - if _, err := conn.Write([]byte{byte(mode)}); err != nil { + if _, err := conn.Write([]byte{byte(RpcMultiplexV2)}); err != nil { conn.Close() return nil, err } - return conn, nil -} - -// getNewConn is used to return a new connection -func (p *ConnPool) getNewConn(region string, addr net.Addr, version int) (*Conn, error) { - - conn, err := p.DialTimeout(region, addr, version, RpcMultiplexV2) - if err != nil { - return nil, err - } - // Setup the logger conf := yamux.DefaultConfig() conf.LogOutput = nil From 14cd3da2529b788a8815b5450bf6544fb021b922 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Mon, 15 Jun 2020 08:32:16 -0400 Subject: [PATCH 11/11] Apply suggestions from code review Co-authored-by: Michael Schurter --- command/operator_snapshot_restore.go | 6 +++--- nomad/operator_endpoint.go | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/command/operator_snapshot_restore.go b/command/operator_snapshot_restore.go index 8fe8f59a5c0..08761b72376 100644 --- a/command/operator_snapshot_restore.go +++ b/command/operator_snapshot_restore.go @@ -15,7 +15,7 @@ type OperatorSnapshotRestoreCommand struct { func (c *OperatorSnapshotRestoreCommand) Help() string { helpText := ` -Usage: nomad snapshot restore [options] FILE +Usage: nomad operator snapshot restore [options] FILE Restores an atomic, point-in-time snapshot of the state of the Nomad servers which includes jobs, nodes, allocations, periodic jobs, and ACLs. @@ -30,7 +30,7 @@ Usage: nomad snapshot restore [options] FILE To restore a snapshot from the file "backup.snap": - $ nomad snapshot restore backup.snap + $ nomad operator snapshot restore backup.snap General Options: @@ -83,7 +83,7 @@ func (c *OperatorSnapshotRestoreCommand) Run(args []string) int { return 1 } - // Fetch the current configuration. + // Call snapshot restore API with backup file. _, err = client.Operator().SnapshotRestore(snap, &api.WriteOptions{}) if err != nil { c.Ui.Error(fmt.Sprintf("Failed to get restore snapshot: %v", err)) diff --git a/nomad/operator_endpoint.go b/nomad/operator_endpoint.go index 965c4bc65b9..89faacf8e54 100644 --- a/nomad/operator_endpoint.go +++ b/nomad/operator_endpoint.go @@ -609,6 +609,7 @@ func decodeStreamOutput(decoder *codec.Decoder) (io.Reader, <-chan error) { if err != nil { pw.CloseWithError(err) errCh <- err + return } }