Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various fix ups for multitenancy #1533

Merged
merged 7 commits into from
May 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions app/multitenant/dynamo_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/prometheus/client_golang/prometheus"
"github.com/ugorji/go/codec"
"golang.org/x/net/context"

Expand All @@ -25,6 +26,18 @@ const (
reportField = "report"
)

var (
dynamoRequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "scope",
Name: "dynamo_request_duration_nanoseconds",
Help: "Time spent doing DynamoDB requests.",
}, []string{"method", "status_code"})
)

func init() {
prometheus.MustRegister(dynamoRequestDuration)
}

// DynamoDBCollector is a Collector which can also CreateTables
type DynamoDBCollector interface {
app.Collector
Expand Down Expand Up @@ -99,6 +112,7 @@ func (c *dynamoDBCollector) CreateTables() error {

func (c *dynamoDBCollector) getRows(userid string, row int64, start, end time.Time, input report.Report) (report.Report, error) {
rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10))
startTime := time.Now()
resp, err := c.db.Query(&dynamodb.QueryInput{
TableName: aws.String(tableName),
KeyConditions: map[string]*dynamodb.Condition{
Expand All @@ -117,9 +131,12 @@ func (c *dynamoDBCollector) getRows(userid string, row int64, start, end time.Ti
},
},
})
duration := time.Now().Sub(startTime)
if err != nil {
dynamoRequestDuration.WithLabelValues("Query", "500").Observe(float64(duration.Nanoseconds()))
return report.MakeReport(), err
}
dynamoRequestDuration.WithLabelValues("Query", "200").Observe(float64(duration.Nanoseconds()))
result := input
for _, item := range resp.Items {
b := item[reportField].B
Expand Down Expand Up @@ -182,6 +199,7 @@ func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error {

now := time.Now()
rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(now.UnixNano()/time.Hour.Nanoseconds(), 10))
startTime := time.Now()
_, err = c.db.PutItem(&dynamodb.PutItemInput{
TableName: aws.String(tableName),
Item: map[string]*dynamodb.AttributeValue{
Expand All @@ -196,9 +214,12 @@ func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error {
},
},
})
duration := time.Now().Sub(startTime)
if err != nil {
dynamoRequestDuration.WithLabelValues("PutItem", "500").Observe(float64(duration.Nanoseconds()))
return err
}
dynamoRequestDuration.WithLabelValues("PutItem", "200").Observe(float64(duration.Nanoseconds()))
return nil
}

Expand Down
47 changes: 45 additions & 2 deletions app/multitenant/sqs_control_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,27 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"

"github.com/weaveworks/scope/app"
"github.com/weaveworks/scope/common/xfer"
)

var (
longPollTime = aws.Int64(10)
rpcTimeout = time.Minute
longPollTime = aws.Int64(10)
rpcTimeout = time.Minute
sqsRequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "scope",
Name: "sqs_request_duration_nanoseconds",
Help: "Time spent doing SQS requests.",
}, []string{"method", "status_code"})
)

func init() {
prometheus.MustRegister(sqsRequestDuration)
}

// sqsControlRouter:
// Creates a queue for every probe that connects to it, and a queue for
// responses back to it. When it receives a request, posts it to the
Expand Down Expand Up @@ -80,12 +90,16 @@ func (cr *sqsControlRouter) getResponseQueueURL() *string {

func (cr *sqsControlRouter) getOrCreateQueue(name string) (*string, error) {
// CreateQueue creates a queue or if it already exists, returns url of said queue
start := time.Now()
createQueueRes, err := cr.service.CreateQueue(&sqs.CreateQueueInput{
QueueName: aws.String(name),
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("CreateQueue", "500").Observe(float64(duration.Nanoseconds()))
return nil, err
}
sqsRequestDuration.WithLabelValues("CreateQueue", "200").Observe(float64(duration.Nanoseconds()))
return createQueueRes.QueueUrl, nil
}

Expand All @@ -108,14 +122,19 @@ func (cr *sqsControlRouter) loop() {
}

for {
start := time.Now()
res, err := cr.service.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: responseQueueURL,
WaitTimeSeconds: longPollTime,
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("ReceiveMessage", "500").Observe(float64(duration.Nanoseconds()))
log.Errorf("Error receiving message from %s: %v", *responseQueueURL, err)
continue
}
sqsRequestDuration.WithLabelValues("ReceiveMessage", "200").Observe(float64(duration.Nanoseconds()))

if len(res.Messages) == 0 {
continue
}
Expand All @@ -134,10 +153,17 @@ func (cr *sqsControlRouter) deleteMessages(queueURL *string, messages []*sqs.Mes
Id: message.MessageId,
})
}
start := time.Now()
_, err := cr.service.DeleteMessageBatch(&sqs.DeleteMessageBatchInput{
QueueUrl: queueURL,
Entries: entries,
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("DeleteMessageBatch", "500").Observe(float64(duration.Nanoseconds()))
} else {
sqsRequestDuration.WithLabelValues("DeleteMessageBatch", "200").Observe(float64(duration.Nanoseconds()))
}
return err
}

Expand Down Expand Up @@ -167,10 +193,17 @@ func (cr *sqsControlRouter) sendMessage(queueURL *string, message interface{}) e
return err
}
log.Infof("sendMessage to %s: %s", *queueURL, buf.String())
start := time.Now()
_, err := cr.service.SendMessage(&sqs.SendMessageInput{
QueueUrl: queueURL,
MessageBody: aws.String(buf.String()),
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("SendMessage", "500").Observe(float64(duration.Nanoseconds()))
} else {
sqsRequestDuration.WithLabelValues("SendMessage", "200").Observe(float64(duration.Nanoseconds()))
}
return err
}

Expand All @@ -188,12 +221,16 @@ func (cr *sqsControlRouter) Handle(ctx context.Context, probeID string, req xfer
}

probeQueueName := fmt.Sprintf("probe-%s-%s", userID, probeID)
start := time.Now()
probeQueueURL, err := cr.service.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String(probeQueueName),
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("GetQueueUrl", "500").Observe(float64(duration.Nanoseconds()))
return xfer.Response{}, err
}
sqsRequestDuration.WithLabelValues("GetQueueUrl", "200").Observe(float64(duration.Nanoseconds()))

// Add a response channel before we send the request, to prevent races
id := fmt.Sprintf("request-%s-%d", userID, rand.Int63())
Expand All @@ -213,6 +250,7 @@ func (cr *sqsControlRouter) Handle(ctx context.Context, probeID string, req xfer
Request: req,
ResponseQueueURL: *responseQueueURL,
}); err != nil {
sqsRequestDuration.WithLabelValues("GetQueueUrl", "500").Observe(float64(duration.Nanoseconds()))
return xfer.Response{}, err
}

Expand Down Expand Up @@ -287,14 +325,19 @@ func (pw *probeWorker) loop() {
default:
}

start := time.Now()
res, err := pw.router.service.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: pw.requestQueueURL,
WaitTimeSeconds: longPollTime,
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("ReceiveMessage", "500").Observe(float64(duration.Nanoseconds()))
log.Errorf("Error recieving message: %v", err)
continue
}
sqsRequestDuration.WithLabelValues("ReceiveMessage", "200").Observe(float64(duration.Nanoseconds()))

if len(res.Messages) == 0 {
continue
}
Expand Down
1 change: 1 addition & 0 deletions app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func apiHandler(rep Reporter) CtxHandlerFunc {
report, err := rep.Report(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
newVersion.Lock()
defer newVersion.Unlock()
Expand Down
9 changes: 8 additions & 1 deletion common/middleware/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package middleware
import (
"net/http"
"strconv"
"strings"
"time"

"github.com/gorilla/mux"
Expand All @@ -17,18 +18,24 @@ type Instrument struct {
Duration *prometheus.SummaryVec
}

func isWSHandshakeRequest(req *http.Request) bool {
return strings.ToLower(req.Header.Get("Upgrade")) == "websocket" &&
strings.ToLower(req.Header.Get("Connection")) == "upgrade"
}

// Wrap implements middleware.Interface
func (i Instrument) Wrap(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
begin := time.Now()
isWS := strconv.FormatBool(isWSHandshakeRequest(r))
interceptor := &interceptor{ResponseWriter: w, statusCode: http.StatusOK}
next.ServeHTTP(interceptor, r)
var (
route = i.getRouteName(r)
status = strconv.Itoa(interceptor.statusCode)
took = time.Since(begin)
)
i.Duration.WithLabelValues(r.Method, route, status).Observe(float64(took.Nanoseconds()))
i.Duration.WithLabelValues(r.Method, route, status, isWS).Observe(float64(took.Nanoseconds()))
})
}

Expand Down
6 changes: 5 additions & 1 deletion common/middleware/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@ var Logging = Func(func(next http.Handler) http.Handler {
type interceptor struct {
http.ResponseWriter
statusCode int
recorded bool
}

func (i *interceptor) WriteHeader(code int) {
i.statusCode = code
if !i.recorded {
i.statusCode = code
i.recorded = true
}
i.ResponseWriter.WriteHeader(code)
}

Expand Down
1 change: 1 addition & 0 deletions common/middleware/path_rewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type pathRewrite struct {
func (p pathRewrite) Wrap(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.RequestURI = p.regexp.ReplaceAllString(r.RequestURI, p.replacement)
r.URL.Path = p.regexp.ReplaceAllString(r.RequestURI, p.replacement)
next.ServeHTTP(w, r)
})
}
Expand Down
2 changes: 1 addition & 1 deletion prog/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
Namespace: "scope",
Name: "request_duration_nanoseconds",
Help: "Time spent serving HTTP requests.",
}, []string{"method", "route", "status_code"})
}, []string{"method", "route", "status_code", "ws"})
)

func init() {
Expand Down