Skip to content

Commit

Permalink
Setup Storage interface for log persistence (#186)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aayyush committed Mar 22, 2022
1 parent 4a6bf94 commit e28284f
Show file tree
Hide file tree
Showing 24 changed files with 1,262 additions and 327 deletions.
12 changes: 3 additions & 9 deletions server/controllers/websocket/mux.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package websocket

import (
"fmt"
"net/http"

"github.com/gorilla/websocket"
Expand All @@ -18,8 +17,6 @@ type PartitionKeyGenerator interface {
// and is responsible for registering/deregistering new buffers
type PartitionRegistry interface {
Register(key string, buffer chan string)
Deregister(key string, buffer chan string)
IsKeyExists(key string) bool
}

// Multiplexor is responsible for handling the data transfer between the storage layer
Expand Down Expand Up @@ -53,18 +50,15 @@ func (m *Multiplexor) Handle(w http.ResponseWriter, r *http.Request) error {
return errors.Wrapf(err, "generating partition key")
}

// check if the job ID exists before registering receiver
if !m.registry.IsKeyExists(key) {
return fmt.Errorf("invalid key: %s", key)
}

// Buffer size set to 1000 to ensure messages get queued.
// TODO: make buffer size configurable
buffer := make(chan string, 1000)

// Note: Here we register the key without checking if the job exists because
// if the job DNE, the job is marked complete and we close the ws conn immediately

// spinning up a goroutine for this since we are attempting to block on the read side.
go m.registry.Register(key, buffer)
defer m.registry.Deregister(key, buffer)

return errors.Wrapf(m.writer.Write(w, r, buffer), "writing to ws %s", key)
}
4 changes: 2 additions & 2 deletions server/core/terraform/terraform_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func (c *DefaultClient) RunCommandAsync(ctx command.ProjectContext, path string,
for s.Scan() {
message := s.Text()
outCh <- Line{Line: message}
c.projectCmdOutputHandler.Send(ctx, message, false)
c.projectCmdOutputHandler.Send(ctx, message)
}
wg.Done()
}()
Expand All @@ -447,7 +447,7 @@ func (c *DefaultClient) RunCommandAsync(ctx command.ProjectContext, path string,
for s.Scan() {
message := s.Text()
outCh <- Line{Line: message}
c.projectCmdOutputHandler.Send(ctx, message, false)
c.projectCmdOutputHandler.Send(ctx, message)
}
wg.Done()
}()
Expand Down
97 changes: 97 additions & 0 deletions server/events/mocks/mock_job_closer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions server/events/project_command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,29 +125,30 @@ type JobURLSetter interface {
SetJobURLWithStatus(ctx command.ProjectContext, cmdName command.Name, status models.CommitStatus) error
}

//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_job_message_sender.go JobMessageSender
//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_job_closer.go JobCloser

type JobMessageSender interface {
Send(ctx command.ProjectContext, msg string, operationComplete bool)
// Job Closer closes a job by marking op complete and clearing up buffers if logs are successfully persisted
type JobCloser interface {
CloseJob(jobID string)
}

// ProjectOutputWrapper is a decorator that creates a new PR status check per project.
// The status contains a url that outputs current progress of the terraform plan/apply command.
type ProjectOutputWrapper struct {
ProjectCommandRunner
JobMessageSender JobMessageSender
JobURLSetter JobURLSetter
JobURLSetter JobURLSetter
JobCloser JobCloser
}

func (p *ProjectOutputWrapper) Plan(ctx command.ProjectContext) command.ProjectResult {
result := p.updateProjectPRStatus(command.Plan, ctx, p.ProjectCommandRunner.Plan)
p.JobMessageSender.Send(ctx, "", OperationComplete)
p.JobCloser.CloseJob(ctx.JobID)
return result
}

func (p *ProjectOutputWrapper) Apply(ctx command.ProjectContext) command.ProjectResult {
result := p.updateProjectPRStatus(command.Apply, ctx, p.ProjectCommandRunner.Apply)
p.JobMessageSender.Send(ctx, "", OperationComplete)
p.JobCloser.CloseJob(ctx.JobID)
return result
}

Expand Down
4 changes: 2 additions & 2 deletions server/events/project_command_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,12 @@ func TestProjectOutputWrapper(t *testing.T) {
var expCommitStatus models.CommitStatus

mockJobURLSetter := eventmocks.NewMockJobURLSetter()
mockJobMessageSender := eventmocks.NewMockJobMessageSender()
mockJobCloser := eventmocks.NewMockJobCloser()
mockProjectCommandRunner := mocks.NewMockProjectCommandRunner()

runner := &events.ProjectOutputWrapper{
JobURLSetter: mockJobURLSetter,
JobMessageSender: mockJobMessageSender,
JobCloser: mockJobCloser,
ProjectCommandRunner: mockProjectCommandRunner,
}

Expand Down
8 changes: 5 additions & 3 deletions server/events/pull_closed_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/events/models/fixtures"
vcsmocks "github.com/runatlantis/atlantis/server/events/vcs/mocks"
jobmocks "github.com/runatlantis/atlantis/server/jobs/mocks"
loggermocks "github.com/runatlantis/atlantis/server/logging/mocks"
. "github.com/runatlantis/atlantis/testing"
)
Expand Down Expand Up @@ -197,7 +198,8 @@ func TestCleanUpLogStreaming(t *testing.T) {

// Create Log streaming resources
prjCmdOutput := make(chan *jobs.ProjectCmdOutputLine)
prjCmdOutHandler := jobs.NewAsyncProjectCommandOutputHandler(prjCmdOutput, logger)
storageBackend := jobmocks.NewMockStorageBackend()
prjCmdOutHandler := jobs.NewAsyncProjectCommandOutputHandler(prjCmdOutput, logger, jobs.NewJobStore(storageBackend))
ctx := command.ProjectContext{
BaseRepo: fixtures.GithubRepo,
Pull: fixtures.Pull,
Expand All @@ -206,7 +208,7 @@ func TestCleanUpLogStreaming(t *testing.T) {
}

go prjCmdOutHandler.Handle()
prjCmdOutHandler.Send(ctx, "Test Message", false)
prjCmdOutHandler.Send(ctx, "Test Message")

// Create boltdb and add pull request.
var lockBucket = "bucket"
Expand Down Expand Up @@ -280,7 +282,7 @@ func TestCleanUpLogStreaming(t *testing.T) {

// Assert log streaming resources are cleaned up.
dfPrjCmdOutputHandler := prjCmdOutHandler.(*jobs.AsyncProjectCommandOutputHandler)
assert.Empty(t, dfPrjCmdOutputHandler.GetProjectOutputBuffer(ctx.PullInfo()))
assert.Empty(t, dfPrjCmdOutputHandler.GetJob(ctx.PullInfo()).Output)
assert.Empty(t, dfPrjCmdOutputHandler.GetReceiverBufferForPull(ctx.PullInfo()))
})
}
2 changes: 1 addition & 1 deletion server/events/working_dir_locker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestTryLock(t *testing.T) {

// Now another lock for the same repo, workspace, and pull should fail
_, err = locker.TryLock(repo, 1, workspace, "")
ErrEquals(t, "The default workspace is currently locked by another"+
ErrEquals(t, "The default workspace at path is currently locked by another"+
" command that is running for this pull request.\n"+
"Wait until the previous command is complete and try again.", err)

Expand Down
Loading

0 comments on commit e28284f

Please sign in to comment.