From 41b053f915f5921e04da28d2b256daf8b844a318 Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Fri, 25 Oct 2019 10:51:18 -0400 Subject: [PATCH] rpc acl tests for both monitor endpoints --- client/monitor_endpoint_test.go | 108 +++++++++++++++++++++++++ nomad/client_monitor_endpoint_test.go | 110 +++++++++++++++++++++++++- 2 files changed, 217 insertions(+), 1 deletion(-) diff --git a/client/monitor_endpoint_test.go b/client/monitor_endpoint_test.go index ca95cf73126..4b71d674e7c 100644 --- a/client/monitor_endpoint_test.go +++ b/client/monitor_endpoint_test.go @@ -8,9 +8,11 @@ import ( "testing" "time" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/client/config" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad" + "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" @@ -93,3 +95,109 @@ OUTER: } } } + +func TestMonitor_Monitor_ACL(t *testing.T) { + t.Parallel() + require := require.New(t) + + // start server + s, root := nomad.TestACLServer(t, nil) + defer s.Shutdown() + testutil.WaitForLeader(t, s.RPC) + + policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS}) + tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad) + + policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "", + []string{acl.NamespaceCapabilityReadLogs, acl.NamespaceCapabilityReadFS}) + tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid", policyGood) + + cases := []struct { + Name string + Token string + ExpectedErr string + }{ + { + Name: "bad token", + Token: tokenBad.SecretID, + ExpectedErr: structs.ErrPermissionDenied.Error(), + }, + { + Name: "good token", + Token: tokenGood.SecretID, + ExpectedErr: "Unknown log level", + }, + { + Name: "root token", + Token: root.SecretID, + ExpectedErr: "Unknown log level", + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + req := &cstructs.MonitorRequest{ + LogLevel: "unknown", + QueryOptions: structs.QueryOptions{ + Namespace: structs.DefaultNamespace, + Region: "global", + AuthToken: tc.Token, + }, + } + + handler, err := s.StreamingRpcHandler("Agent.Monitor") + require.Nil(err) + + // create pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + go handler(p2) + + // Start decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // send request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + timeout := time.After(5 * time.Second) + OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error == nil { + continue + } + + if strings.Contains(msg.Error.Error(), tc.ExpectedErr) { + break OUTER + } else { + t.Fatalf("Bad error: %v", msg.Error) + } + } + } + }) + } +} diff --git a/nomad/client_monitor_endpoint_test.go b/nomad/client_monitor_endpoint_test.go index 6045c9caa4b..27496f7390c 100644 --- a/nomad/client_monitor_endpoint_test.go +++ b/nomad/client_monitor_endpoint_test.go @@ -8,9 +8,11 @@ import ( "testing" "time" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/client" "github.com/hashicorp/nomad/client/config" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" @@ -112,7 +114,7 @@ func TestMonitor_MonitorServer(t *testing.T) { t.Parallel() require := require.New(t) - // start server and client + // start server s := TestServer(t, nil) defer s.Shutdown() testutil.WaitForLeader(t, s.RPC) @@ -187,3 +189,109 @@ OUTER: } } } + +func TestMonitor_Monitor_ACL(t *testing.T) { + t.Parallel() + require := require.New(t) + + // start server + s, root := TestACLServer(t, nil) + defer s.Shutdown() + testutil.WaitForLeader(t, s.RPC) + + policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS}) + tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad) + + policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "", + []string{acl.NamespaceCapabilityReadLogs, acl.NamespaceCapabilityReadFS}) + tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid", policyGood) + + cases := []struct { + Name string + Token string + ExpectedErr string + }{ + { + Name: "bad token", + Token: tokenBad.SecretID, + ExpectedErr: structs.ErrPermissionDenied.Error(), + }, + { + Name: "good token", + Token: tokenGood.SecretID, + ExpectedErr: "Unknown log level", + }, + { + Name: "root token", + Token: root.SecretID, + ExpectedErr: "Unknown log level", + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + req := &cstructs.MonitorRequest{ + LogLevel: "unknown", + QueryOptions: structs.QueryOptions{ + Namespace: structs.DefaultNamespace, + Region: "global", + AuthToken: tc.Token, + }, + } + + handler, err := s.StreamingRpcHandler("Agent.Monitor") + require.Nil(err) + + // create pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + go handler(p2) + + // Start decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // send request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + timeout := time.After(5 * time.Second) + OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error == nil { + continue + } + + if strings.Contains(msg.Error.Error(), tc.ExpectedErr) { + break OUTER + } else { + t.Fatalf("Bad error: %v", msg.Error) + } + } + } + }) + } +}