Skip to content

Commit

Permalink
Merge pull request #1533 from weaveworks/missing-return
Browse files Browse the repository at this point in the history
Various fix ups for multitenancy
  • Loading branch information
paulbellamy committed May 24, 2016
2 parents 42ad3aa + 861605a commit d137840
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 5 deletions.
21 changes: 21 additions & 0 deletions app/multitenant/dynamo_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/prometheus/client_golang/prometheus"
"github.com/ugorji/go/codec"
"golang.org/x/net/context"

Expand All @@ -25,6 +26,18 @@ const (
reportField = "report"
)

var (
dynamoRequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "scope",
Name: "dynamo_request_duration_nanoseconds",
Help: "Time spent doing DynamoDB requests.",
}, []string{"method", "status_code"})
)

func init() {
prometheus.MustRegister(dynamoRequestDuration)
}

// DynamoDBCollector is a Collector which can also CreateTables
type DynamoDBCollector interface {
app.Collector
Expand Down Expand Up @@ -99,6 +112,7 @@ func (c *dynamoDBCollector) CreateTables() error {

func (c *dynamoDBCollector) getRows(userid string, row int64, start, end time.Time, input report.Report) (report.Report, error) {
rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10))
startTime := time.Now()
resp, err := c.db.Query(&dynamodb.QueryInput{
TableName: aws.String(tableName),
KeyConditions: map[string]*dynamodb.Condition{
Expand All @@ -117,9 +131,12 @@ func (c *dynamoDBCollector) getRows(userid string, row int64, start, end time.Ti
},
},
})
duration := time.Now().Sub(startTime)
if err != nil {
dynamoRequestDuration.WithLabelValues("Query", "500").Observe(float64(duration.Nanoseconds()))
return report.MakeReport(), err
}
dynamoRequestDuration.WithLabelValues("Query", "200").Observe(float64(duration.Nanoseconds()))
result := input
for _, item := range resp.Items {
b := item[reportField].B
Expand Down Expand Up @@ -182,6 +199,7 @@ func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error {

now := time.Now()
rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(now.UnixNano()/time.Hour.Nanoseconds(), 10))
startTime := time.Now()
_, err = c.db.PutItem(&dynamodb.PutItemInput{
TableName: aws.String(tableName),
Item: map[string]*dynamodb.AttributeValue{
Expand All @@ -196,9 +214,12 @@ func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error {
},
},
})
duration := time.Now().Sub(startTime)
if err != nil {
dynamoRequestDuration.WithLabelValues("PutItem", "500").Observe(float64(duration.Nanoseconds()))
return err
}
dynamoRequestDuration.WithLabelValues("PutItem", "200").Observe(float64(duration.Nanoseconds()))
return nil
}

Expand Down
47 changes: 45 additions & 2 deletions app/multitenant/sqs_control_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,27 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"

"github.com/weaveworks/scope/app"
"github.com/weaveworks/scope/common/xfer"
)

var (
longPollTime = aws.Int64(10)
rpcTimeout = time.Minute
longPollTime = aws.Int64(10)
rpcTimeout = time.Minute
sqsRequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "scope",
Name: "sqs_request_duration_nanoseconds",
Help: "Time spent doing SQS requests.",
}, []string{"method", "status_code"})
)

func init() {
prometheus.MustRegister(sqsRequestDuration)
}

// sqsControlRouter:
// Creates a queue for every probe that connects to it, and a queue for
// responses back to it. When it receives a request, posts it to the
Expand Down Expand Up @@ -80,12 +90,16 @@ func (cr *sqsControlRouter) getResponseQueueURL() *string {

func (cr *sqsControlRouter) getOrCreateQueue(name string) (*string, error) {
// CreateQueue creates a queue or if it already exists, returns url of said queue
start := time.Now()
createQueueRes, err := cr.service.CreateQueue(&sqs.CreateQueueInput{
QueueName: aws.String(name),
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("CreateQueue", "500").Observe(float64(duration.Nanoseconds()))
return nil, err
}
sqsRequestDuration.WithLabelValues("CreateQueue", "200").Observe(float64(duration.Nanoseconds()))
return createQueueRes.QueueUrl, nil
}

Expand All @@ -108,14 +122,19 @@ func (cr *sqsControlRouter) loop() {
}

for {
start := time.Now()
res, err := cr.service.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: responseQueueURL,
WaitTimeSeconds: longPollTime,
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("ReceiveMessage", "500").Observe(float64(duration.Nanoseconds()))
log.Errorf("Error receiving message from %s: %v", *responseQueueURL, err)
continue
}
sqsRequestDuration.WithLabelValues("ReceiveMessage", "200").Observe(float64(duration.Nanoseconds()))

if len(res.Messages) == 0 {
continue
}
Expand All @@ -134,10 +153,17 @@ func (cr *sqsControlRouter) deleteMessages(queueURL *string, messages []*sqs.Mes
Id: message.MessageId,
})
}
start := time.Now()
_, err := cr.service.DeleteMessageBatch(&sqs.DeleteMessageBatchInput{
QueueUrl: queueURL,
Entries: entries,
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("DeleteMessageBatch", "500").Observe(float64(duration.Nanoseconds()))
} else {
sqsRequestDuration.WithLabelValues("DeleteMessageBatch", "200").Observe(float64(duration.Nanoseconds()))
}
return err
}

Expand Down Expand Up @@ -167,10 +193,17 @@ func (cr *sqsControlRouter) sendMessage(queueURL *string, message interface{}) e
return err
}
log.Infof("sendMessage to %s: %s", *queueURL, buf.String())
start := time.Now()
_, err := cr.service.SendMessage(&sqs.SendMessageInput{
QueueUrl: queueURL,
MessageBody: aws.String(buf.String()),
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("SendMessage", "500").Observe(float64(duration.Nanoseconds()))
} else {
sqsRequestDuration.WithLabelValues("SendMessage", "200").Observe(float64(duration.Nanoseconds()))
}
return err
}

Expand All @@ -188,12 +221,16 @@ func (cr *sqsControlRouter) Handle(ctx context.Context, probeID string, req xfer
}

probeQueueName := fmt.Sprintf("probe-%s-%s", userID, probeID)
start := time.Now()
probeQueueURL, err := cr.service.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String(probeQueueName),
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("GetQueueUrl", "500").Observe(float64(duration.Nanoseconds()))
return xfer.Response{}, err
}
sqsRequestDuration.WithLabelValues("GetQueueUrl", "200").Observe(float64(duration.Nanoseconds()))

// Add a response channel before we send the request, to prevent races
id := fmt.Sprintf("request-%s-%d", userID, rand.Int63())
Expand All @@ -213,6 +250,7 @@ func (cr *sqsControlRouter) Handle(ctx context.Context, probeID string, req xfer
Request: req,
ResponseQueueURL: *responseQueueURL,
}); err != nil {
sqsRequestDuration.WithLabelValues("GetQueueUrl", "500").Observe(float64(duration.Nanoseconds()))
return xfer.Response{}, err
}

Expand Down Expand Up @@ -287,14 +325,19 @@ func (pw *probeWorker) loop() {
default:
}

start := time.Now()
res, err := pw.router.service.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: pw.requestQueueURL,
WaitTimeSeconds: longPollTime,
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("ReceiveMessage", "500").Observe(float64(duration.Nanoseconds()))
log.Errorf("Error recieving message: %v", err)
continue
}
sqsRequestDuration.WithLabelValues("ReceiveMessage", "200").Observe(float64(duration.Nanoseconds()))

if len(res.Messages) == 0 {
continue
}
Expand Down
1 change: 1 addition & 0 deletions app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func apiHandler(rep Reporter) CtxHandlerFunc {
report, err := rep.Report(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
newVersion.Lock()
defer newVersion.Unlock()
Expand Down
9 changes: 8 additions & 1 deletion common/middleware/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package middleware
import (
"net/http"
"strconv"
"strings"
"time"

"github.com/gorilla/mux"
Expand All @@ -17,18 +18,24 @@ type Instrument struct {
Duration *prometheus.SummaryVec
}

func isWSHandshakeRequest(req *http.Request) bool {
return strings.ToLower(req.Header.Get("Upgrade")) == "websocket" &&
strings.ToLower(req.Header.Get("Connection")) == "upgrade"
}

// Wrap implements middleware.Interface
func (i Instrument) Wrap(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
begin := time.Now()
isWS := strconv.FormatBool(isWSHandshakeRequest(r))
interceptor := &interceptor{ResponseWriter: w, statusCode: http.StatusOK}
next.ServeHTTP(interceptor, r)
var (
route = i.getRouteName(r)
status = strconv.Itoa(interceptor.statusCode)
took = time.Since(begin)
)
i.Duration.WithLabelValues(r.Method, route, status).Observe(float64(took.Nanoseconds()))
i.Duration.WithLabelValues(r.Method, route, status, isWS).Observe(float64(took.Nanoseconds()))
})
}

Expand Down
6 changes: 5 additions & 1 deletion common/middleware/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@ var Logging = Func(func(next http.Handler) http.Handler {
type interceptor struct {
http.ResponseWriter
statusCode int
recorded bool
}

func (i *interceptor) WriteHeader(code int) {
i.statusCode = code
if !i.recorded {
i.statusCode = code
i.recorded = true
}
i.ResponseWriter.WriteHeader(code)
}

Expand Down
1 change: 1 addition & 0 deletions common/middleware/path_rewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type pathRewrite struct {
func (p pathRewrite) Wrap(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.RequestURI = p.regexp.ReplaceAllString(r.RequestURI, p.replacement)
r.URL.Path = p.regexp.ReplaceAllString(r.RequestURI, p.replacement)
next.ServeHTTP(w, r)
})
}
Expand Down
2 changes: 1 addition & 1 deletion prog/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
Namespace: "scope",
Name: "request_duration_nanoseconds",
Help: "Time spent serving HTTP requests.",
}, []string{"method", "route", "status_code"})
}, []string{"method", "route", "status_code", "ws"})
)

func init() {
Expand Down

0 comments on commit d137840

Please sign in to comment.