Skip to content

Commit

Permalink
Metrics for Log-Streaming (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aayyush authored and msarvar committed Sep 27, 2021
1 parent 69c4dd4 commit 1d792df
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 7 deletions.
7 changes: 7 additions & 0 deletions server/controllers/logstreaming_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (

"github.com/gorilla/mux"
"github.com/gorilla/websocket"
stats "github.com/lyft/gostats"
"github.com/runatlantis/atlantis/server/controllers/templates"
"github.com/runatlantis/atlantis/server/core/db"
"github.com/runatlantis/atlantis/server/events/metrics"
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/handlers"
"github.com/runatlantis/atlantis/server/logging"
Expand All @@ -26,6 +28,7 @@ type JobsController struct {

WebsocketHandler handlers.WebsocketHandler
ProjectCommandOutputHandler handlers.ProjectCommandOutputHandler
StatsScope stats.Scope
}

type pullInfo struct {
Expand Down Expand Up @@ -111,14 +114,17 @@ func (j *JobsController) GetProjectJobs(w http.ResponseWriter, r *http.Request)
}

func (j *JobsController) GetProjectJobsWS(w http.ResponseWriter, r *http.Request) {
jobsMetric := j.StatsScope.Scope("get_project_jobs_ws")
projectInfo, err := newProjectInfo(r)
if err != nil {
jobsMetric.Scope("project_info").NewCounter(metrics.ExecutionErrorMetric).Inc()
j.respond(w, logging.Error, http.StatusInternalServerError, err.Error())
return
}

c, err := j.WebsocketHandler.Upgrade(w, r, nil)
if err != nil {
jobsMetric.Scope("ws_upgrade").NewCounter(metrics.ExecutionErrorMetric).Inc()
j.Logger.Warn("Failed to upgrade websocket: %s", err)
return
}
Expand All @@ -141,6 +147,7 @@ func (j *JobsController) GetProjectJobsWS(w http.ResponseWriter, r *http.Request
})

if err != nil {
jobsMetric.Scope("ws_write").NewCounter(metrics.ExecutionErrorMetric).Inc()
j.Logger.Warn("Failed to receive message: %s", err)
j.respond(w, logging.Error, http.StatusInternalServerError, err.Error())
return
Expand Down
2 changes: 2 additions & 0 deletions server/controllers/logstreaming_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/gorilla/mux"
stats "github.com/lyft/gostats"
"github.com/runatlantis/atlantis/server/controllers"
"github.com/runatlantis/atlantis/server/logging"

Expand Down Expand Up @@ -34,6 +35,7 @@ func TestGetProjectJobs_WebSockets(t *testing.T) {
Logger: logger,
WebsocketHandler: websocketMock,
ProjectCommandOutputHandler: projectOutputHandler,
StatsScope: stats.NewDefaultStore(),
}

When(websocketMock.Upgrade(matchers.AnyHttpResponseWriter(), matchers.AnyPtrToHttpRequest(), matchers.AnyHttpHeader())).ThenReturn(webSocketWrapper, nil)
Expand Down
44 changes: 44 additions & 0 deletions server/handlers/instrumented_project_command_output_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package handlers

import (
stats "github.com/lyft/gostats"
"github.com/runatlantis/atlantis/server/events/models"
)

type InstrumentedProjectCommandOutputHandler struct {
ProjectCommandOutputHandler
numChans stats.Gauge
}

func NewInstrumentedProjectCommandOutputHandler(prjCmdOutputHandler ProjectCommandOutputHandler, statsScope stats.Scope) ProjectCommandOutputHandler {
return &InstrumentedProjectCommandOutputHandler{
ProjectCommandOutputHandler: prjCmdOutputHandler,
numChans: statsScope.Scope("log_streaming").NewGauge("num_ws_chans"),
}
}

func (p *InstrumentedProjectCommandOutputHandler) Clear(ctx models.ProjectCommandContext) {
p.ProjectCommandOutputHandler.Clear(ctx)
}

func (p *InstrumentedProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string) {
p.ProjectCommandOutputHandler.Send(ctx, msg)
}

func (p *InstrumentedProjectCommandOutputHandler) Receive(projectInfo string, receiver chan string, callback func(msg string) error) error {
p.numChans.Inc()
defer p.numChans.Dec()
return p.ProjectCommandOutputHandler.Receive(projectInfo, receiver, callback)
}

func (p *InstrumentedProjectCommandOutputHandler) Handle() {
p.ProjectCommandOutputHandler.Handle()
}

func (p *InstrumentedProjectCommandOutputHandler) SetJobURLWithStatus(ctx models.ProjectCommandContext, cmdName models.CommandName, status models.CommitStatus) error {
return p.ProjectCommandOutputHandler.SetJobURLWithStatus(ctx, cmdName, status)
}

func (p *InstrumentedProjectCommandOutputHandler) CleanUp(pull string) {
p.ProjectCommandOutputHandler.CleanUp(pull)
}
17 changes: 10 additions & 7 deletions server/handlers/project_command_output_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handlers
import (
"sync"

stats "github.com/lyft/gostats"
"github.com/runatlantis/atlantis/server/events/models"
"github.com/runatlantis/atlantis/server/feature"
"github.com/runatlantis/atlantis/server/logging"
Expand Down Expand Up @@ -221,15 +222,17 @@ func NewFeatureAwareOutputHandler(
projectJobURLGenerator ProjectJobURLGenerator,
logger logging.SimpleLogging,
featureAllocator feature.Allocator,
scope stats.Scope,
) ProjectCommandOutputHandler {
prjCmdOutputHandler := NewAsyncProjectCommandOutputHandler(
projectCmdOutput,
projectStatusUpdater,
projectJobURLGenerator,
logger,
)
return &FeatureAwareOutputHandler{
FeatureAllocator: featureAllocator,
ProjectCommandOutputHandler: NewAsyncProjectCommandOutputHandler(
projectCmdOutput,
projectStatusUpdater,
projectJobURLGenerator,
logger,
),
FeatureAllocator: featureAllocator,
ProjectCommandOutputHandler: NewInstrumentedProjectCommandOutputHandler(prjCmdOutputHandler, scope),
}
}

Expand Down
2 changes: 2 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) {
router,
logger,
featureAllocator,
statsScope,
)

terraformClient, err := terraform.NewClient(
Expand Down Expand Up @@ -708,6 +709,7 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) {
Db: boltdb,
WebsocketHandler: handlers.NewWebsocketHandler(logger),
ProjectCommandOutputHandler: projectCmdOutputHandler,
StatsScope: statsScope.Scope("log_streaming"),
}

eventsController := &events_controllers.VCSEventsController{
Expand Down

0 comments on commit 1d792df

Please sign in to comment.