Skip to content

Commit

Permalink
[Elastic Agent] Improve version, restart, enroll CLI commands (elasti…
Browse files Browse the repository at this point in the history
…c#20359)

* Add improve version CLI cmd.

* Add new restart cmd. Perform restart at end of enroll.

* Fix yaml annotations on version struct.

* Fix control.Address on Windows.

* Fix control.Address on Windows.

* Fix windows dialer.

* Fix control.Address on Windows.

* Add to CHANGELOG.

* Review cleanups.

* Fix go vet.

* Update talking to communicating.

(cherry picked from commit 77b3b07)
  • Loading branch information
blakerouse committed Aug 4, 2020
1 parent 5a81194 commit e83e9f4
Show file tree
Hide file tree
Showing 18 changed files with 389 additions and 135 deletions.
3 changes: 3 additions & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,6 @@
- Will retry to enroll if the server return a 429. {pull}19918[19811]
- Add --staging option to enroll command {pull}20026[20026]
- Add `event.dataset` to all events {pull}20076[20076]
- Improved version CLI {pull}20359[20359]
- Enroll CLI now restarts running daemon {pull}20359[20359]
- Add restart CLI cmd {pull}20359[20359]
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/control.proto
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ message StatusResponse {
repeated ApplicationStatus applications = 3;
}

service ElasticAgent {
service ElasticAgentControl {
// Fetches the currently running version of the Elastic Agent.
rpc Version(Empty) returns (VersionResponse);

Expand Down
18 changes: 2 additions & 16 deletions x-pack/elastic-agent/pkg/agent/application/reexec/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,9 @@
package reexec

import (
"sync"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

var (
execSingleton ExecManager
execSingletonOnce sync.Once
)

// ExecManager is the interface that the global reexec manager implements.
type ExecManager interface {
// ReExec asynchronously re-executes command in the same PID and memory address
Expand All @@ -30,14 +23,6 @@ type ExecManager interface {
ShutdownComplete()
}

// Manager returns the global reexec manager.
func Manager(log *logger.Logger, exec string) ExecManager {
execSingletonOnce.Do(func() {
execSingleton = newManager(log, exec)
})
return execSingleton
}

type manager struct {
logger *logger.Logger
exec string
Expand All @@ -46,7 +31,8 @@ type manager struct {
complete chan bool
}

func newManager(log *logger.Logger, exec string) *manager {
// NewManager returns the reexec manager.
func NewManager(log *logger.Logger, exec string) ExecManager {
return &manager{
logger: log,
exec: exec,
Expand Down
24 changes: 23 additions & 1 deletion x-pack/elastic-agent/pkg/agent/cmd/enroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
package cmd

import (
"context"
"fmt"
"math/rand"
"os"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client"

"github.com/spf13/cobra"

"github.com/elastic/beats/v7/libbeat/common/backoff"
Expand Down Expand Up @@ -45,6 +48,7 @@ func newEnrollCommandWithArgs(flags *globalFlags, _ []string, streams *cli.IOStr
cmd.Flags().BoolP("force", "f", false, "Force overwrite the current and do not prompt for confirmation")
cmd.Flags().BoolP("insecure", "i", false, "Allow insecure connection to Kibana")
cmd.Flags().StringP("staging", "", "", "Configures agent to download artifacts from a staging build")
cmd.Flags().Bool("no-restart", false, "Skip restarting the currently running daemon")

return cmd
}
Expand Down Expand Up @@ -144,7 +148,25 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, args
return errors.New(err, "fail to enroll")
}

fmt.Fprintln(streams.Out, "Successfully enrolled the Agent.")
fmt.Fprintln(streams.Out, "Successfully enrolled the Elastic Agent.")

// skip restarting
noRestart, _ := cmd.Flags().GetBool("no-restart")
if noRestart {
return nil
}

daemon := client.New()
err = daemon.Connect(context.Background())
if err == nil {
defer daemon.Disconnect()
err = daemon.Restart(context.Background())
if err == nil {
fmt.Fprintln(streams.Out, "Successfully triggered restart on running Elastic Agent.")
return nil
}
}
fmt.Fprintln(streams.Out, "Elastic Agent might not be running; unable to trigger restart")
return nil
}

Expand Down
10 changes: 9 additions & 1 deletion x-pack/elastic-agent/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/reexec"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/server"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
Expand Down Expand Up @@ -80,7 +81,14 @@ func run(flags *globalFlags, streams *cli.IOStreams) error {
return err
}
rexLogger := logger.Named("reexec")
rex := reexec.Manager(rexLogger, execPath)
rex := reexec.NewManager(rexLogger, execPath)

// start the control listener
control := server.New(logger.Named("control"), rex)
if err := control.Start(); err != nil {
return err
}
defer control.Stop()

app, err := application.New(logger, pathConfigFile)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/elastic-agent/pkg/agent/control/addr_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (

// Address returns the address to connect to Elastic Agent daemon.
func Address() string {
data = paths.Data()
data := paths.Data()
// entire string cannot be longer than 256 characters, this forces the
// length to always be 87 characters (but unique per data path)
return fmt.Sprintf(`\\.\pipe\elastic-agent-%s`, sha256.Sum256(data))
return fmt.Sprintf(`\\.\pipe\elastic-agent-%x`, sha256.Sum256([]byte(data)))
}
22 changes: 11 additions & 11 deletions x-pack/elastic-agent/pkg/agent/control/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"context"
"encoding/json"
"fmt"

"sync"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto"
)

Expand Down Expand Up @@ -62,10 +62,10 @@ type AgentStatus struct {

// Client communicates to Elastic Agent through the control protocol.
type Client interface {
// Start starts the client.
Start(ctx context.Context) error
// Stop stops the client.
Stop()
// Connect connects to the running Elastic Agent.
Connect(ctx context.Context) error
// Disconnect disconnects from the running Elastic Agent.
Disconnect()
// Version returns the current version of the running agent.
Version(ctx context.Context) (Version, error)
// Status returns the current status of the running agent.
Expand All @@ -81,7 +81,7 @@ type client struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
client proto.ElasticAgentClient
client proto.ElasticAgentControlClient
cfgLock sync.RWMutex
obsLock sync.RWMutex
}
Expand All @@ -91,19 +91,19 @@ func New() Client {
return &client{}
}

// Start starts the connection to Elastic Agent.
func (c *client) Start(ctx context.Context) error {
// Connect connects to the running Elastic Agent.
func (c *client) Connect(ctx context.Context) error {
c.ctx, c.cancel = context.WithCancel(ctx)
conn, err := dialContext(ctx)
if err != nil {
return err
}
c.client = proto.NewElasticAgentClient(conn)
c.client = proto.NewElasticAgentControlClient(conn)
return nil
}

// Stop stops the connection to Elastic Agent.
func (c *client) Stop() {
// Disconnect disconnects from the running Elastic Agent.
func (c *client) Disconnect() {
if c.cancel != nil {
c.cancel()
c.wg.Wait()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ func dialContext(ctx context.Context) (*grpc.ClientConn, error) {
}

func dialer(ctx context.Context, addr string) (net.Conn, error) {
return npipe.DialContext(arr)(ctx, "", "")
return npipe.DialContext(addr)(ctx, "", "")
}
6 changes: 3 additions & 3 deletions x-pack/elastic-agent/pkg/agent/control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ import (
)

func TestServerClient_Version(t *testing.T) {
srv := server.New(newErrorLogger(t))
srv := server.New(newErrorLogger(t), nil)
err := srv.Start()
require.NoError(t, err)
defer srv.Stop()

c := client.New()
err = c.Start(context.Background())
err = c.Connect(context.Background())
require.NoError(t, err)
defer c.Stop()
defer c.Disconnect()

ver, err := c.Version(context.Background())
require.NoError(t, err)
Expand Down
Loading

0 comments on commit e83e9f4

Please sign in to comment.