Skip to content

Commit

Permalink
Add reload/leave http endpoints (#2516)
Browse files Browse the repository at this point in the history
  • Loading branch information
kyhavlov authored Nov 30, 2016
1 parent cbc180d commit bd69c6d
Show file tree
Hide file tree
Showing 11 changed files with 348 additions and 36 deletions.
22 changes: 22 additions & 0 deletions api/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,17 @@ func (a *Agent) Self() (map[string]map[string]interface{}, error) {
return out, nil
}

// Reload triggers a configuration reload for the agent we are connected to.
func (a *Agent) Reload() error {
r := a.c.newRequest("PUT", "/v1/agent/reload")
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}

// NodeName is used to get the node name of the agent
func (a *Agent) NodeName() (string, error) {
if a.nodeName != "" {
Expand Down Expand Up @@ -348,6 +359,17 @@ func (a *Agent) Join(addr string, wan bool) error {
return nil
}

// Leave is used to have the agent gracefully leave the cluster and shutdown
func (a *Agent) Leave() error {
r := a.c.newRequest("PUT", "/v1/agent/leave")
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return err
}
resp.Body.Close()
return nil
}

// ForceLeave is used to have the agent eject a failed node
func (a *Agent) ForceLeave(node string) error {
r := a.c.newRequest("PUT", "/v1/agent/force-leave/"+node)
Expand Down
83 changes: 83 additions & 0 deletions api/agent_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package api

import (
"io/ioutil"
"strings"
"testing"

"time"

"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/serf/serf"
)

func TestAgent_Self(t *testing.T) {
Expand All @@ -24,6 +29,51 @@ func TestAgent_Self(t *testing.T) {
}
}

func TestAgent_Reload(t *testing.T) {
t.Parallel()

// Create our initial empty config file, to be overwritten later
configFile, err := ioutil.TempFile("", "reload")
if err != nil {
t.Fatalf("err: %s", err)
}
if _, err := configFile.Write([]byte("{}")); err != nil {
t.Fatalf("err: %s", err)
}
configFile.Close()

c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
conf.Args = []string{"-config-file", configFile.Name()}
})
defer s.Stop()

agent := c.Agent()

// Update the config file with a service definition
config := `{"service":{"name":"redis", "port":1234}}`
err = ioutil.WriteFile(configFile.Name(), []byte(config), 0644)
if err != nil {
t.Fatalf("err: %v", err)
}

if err = agent.Reload(); err != nil {
t.Fatalf("err: %v", err)
}

services, err := agent.Services()
if err != nil {
t.Fatalf("err: %v", err)
}

service, ok := services["redis"]
if !ok {
t.Fatalf("bad: %v", ok)
}
if service.Port != 1234 {
t.Fatalf("bad: %v", service.Port)
}
}

func TestAgent_Members(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
Expand Down Expand Up @@ -545,6 +595,39 @@ func TestAgent_Join(t *testing.T) {
}
}

func TestAgent_Leave(t *testing.T) {
t.Parallel()
c1, s1 := makeClient(t)
defer s1.Stop()

c2, s2 := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
conf.Server = false
conf.Bootstrap = false
})
defer s2.Stop()

if err := c2.Agent().Join(s1.LANAddr, false); err != nil {
t.Fatalf("err: %v", err)
}

if err := c2.Agent().Leave(); err != nil {
t.Fatalf("err: %v", err)
}

// Make sure the second agent's status is 'Left'
members, err := c1.Agent().Members(false)
if err != nil {
t.Fatalf("err: %v", err)
}
member := members[0]
if member.Name == s1.Config.NodeName {
member = members[1]
}
if member.Status != int(serf.StatusLeft) {
t.Fatalf("bad: %v", *member)
}
}

func TestAgent_ForceLeave(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
Expand Down
6 changes: 5 additions & 1 deletion command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ type Agent struct {
eventLock sync.RWMutex
eventNotify state.NotifyGroup

reloadCh chan chan error

shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
Expand All @@ -121,7 +123,8 @@ type Agent struct {

// Create is used to create a new Agent. Returns
// the agent or potentially an error.
func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) (*Agent, error) {
func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter,
reloadCh chan chan error) (*Agent, error) {
// Ensure we have a log sink
if logOutput == nil {
logOutput = os.Stderr
Expand Down Expand Up @@ -184,6 +187,7 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) (*
checkDockers: make(map[types.CheckID]*CheckDocker),
eventCh: make(chan serf.UserEvent, 1024),
eventBuf: make([]*UserEvent, 256),
reloadCh: reloadCh,
shutdownCh: make(chan struct{}),
endpoints: make(map[string]string),
}
Expand Down
36 changes: 36 additions & 0 deletions command/agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,30 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int
}, nil
}

func (s *HTTPServer) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}

errCh := make(chan error, 0)

// Trigger the reload
select {
case <-s.agent.ShutdownCh():
return nil, fmt.Errorf("Agent was shutdown before reload could be completed")
case s.agent.reloadCh <- errCh:
}

// Wait for the result of the reload, or for the agent to shutdown
select {
case <-s.agent.ShutdownCh():
return nil, fmt.Errorf("Agent was shutdown before reload could be completed")
case err := <-errCh:
return nil, err
}
}

func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
services := s.agent.state.Services()
return services, nil
Expand Down Expand Up @@ -80,6 +104,18 @@ func (s *HTTPServer) AgentJoin(resp http.ResponseWriter, req *http.Request) (int
}
}

func (s *HTTPServer) AgentLeave(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}

if err := s.agent.Leave(); err != nil {
return nil, err
}
return nil, s.agent.Shutdown()
}

func (s *HTTPServer) AgentForceLeave(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
addr := strings.TrimPrefix(req.URL.Path, "/v1/agent/force-leave/")
return nil, s.agent.ForceLeave(addr)
Expand Down
120 changes: 119 additions & 1 deletion command/agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/serf/serf"
"github.com/mitchellh/cli"
)

func TestHTTPAgentServices(t *testing.T) {
Expand Down Expand Up @@ -119,6 +120,81 @@ func TestHTTPAgentSelf(t *testing.T) {
}
}

func TestHTTPAgentReload(t *testing.T) {
conf := nextConfig()
tmpDir, err := ioutil.TempDir("", "consul")
if err != nil {
t.Fatalf("err: %s", err)
}
defer os.RemoveAll(tmpDir)

// Write initial config, to be reloaded later
tmpFile, err := ioutil.TempFile(tmpDir, "config")
if err != nil {
t.Fatalf("err: %s", err)
}
_, err = tmpFile.WriteString(`{"service":{"name":"redis"}}`)
if err != nil {
t.Fatalf("err: %s", err)
}
tmpFile.Close()

doneCh := make(chan struct{})
shutdownCh := make(chan struct{})

defer func() {
close(shutdownCh)
<-doneCh
}()

cmd := &Command{
ShutdownCh: shutdownCh,
Ui: new(cli.MockUi),
}

args := []string{
"-server",
"-data-dir", tmpDir,
"-http-port", fmt.Sprintf("%d", conf.Ports.HTTP),
"-config-file", tmpFile.Name(),
}

go func() {
cmd.Run(args)
close(doneCh)
}()

testutil.WaitForResult(func() (bool, error) {
return len(cmd.httpServers) == 1, nil
}, func(err error) {
t.Fatalf("should have an http server")
})

if _, ok := cmd.agent.state.services["redis"]; !ok {
t.Fatalf("missing redis service")
}

err = ioutil.WriteFile(tmpFile.Name(), []byte(`{"service":{"name":"redis-reloaded"}}`), 0644)
if err != nil {
t.Fatalf("err: %v", err)
}

srv := cmd.httpServers[0]
req, err := http.NewRequest("PUT", "/v1/agent/reload", nil)
if err != nil {
t.Fatalf("err: %v", err)
}

_, err = srv.AgentReload(nil, req)
if err != nil {
t.Fatalf("Err: %v", err)
}

if _, ok := cmd.agent.state.services["redis-reloaded"]; !ok {
t.Fatalf("missing redis-reloaded service")
}
}

func TestHTTPAgentMembers(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
Expand Down Expand Up @@ -239,6 +315,49 @@ func TestHTTPAgentJoin_WAN(t *testing.T) {
})
}

func TestHTTPAgentLeave(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()

dir2, srv2 := makeHTTPServerWithConfig(t, func(c *Config) {
c.Server = false
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer srv2.Shutdown()

// Join first
addr := fmt.Sprintf("127.0.0.1:%d", srv2.agent.config.Ports.SerfLan)
_, err := srv.agent.JoinLAN([]string{addr})
if err != nil {
t.Fatalf("err: %v", err)
}

// Graceful leave now
req, err := http.NewRequest("PUT", "/v1/agent/leave", nil)
if err != nil {
t.Fatalf("err: %v", err)
}

obj, err := srv2.AgentLeave(nil, req)
if err != nil {
t.Fatalf("Err: %v", err)
}
if obj != nil {
t.Fatalf("Err: %v", obj)
}

testutil.WaitForResult(func() (bool, error) {
m := srv.agent.LANMembers()
success := m[1].Status == serf.StatusLeft
return success, errors.New(m[1].Status.String())
}, func(err error) {
t.Fatalf("member status is %v, should be left", err)
})
}

func TestHTTPAgentForceLeave(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
Expand Down Expand Up @@ -1030,7 +1149,6 @@ func TestHTTPAgent_Monitor(t *testing.T) {
logger := io.MultiWriter(os.Stdout, &expectedLogs, logWriter)

dir, srv := makeHTTPServerWithConfigLog(t, nil, logger, logWriter)
srv.agent.logWriter = logWriter
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
Expand Down
Loading

0 comments on commit bd69c6d

Please sign in to comment.