diff --git a/app/multitenant/dynamo_collector.go b/app/multitenant/dynamo_collector.go index c8cec91688..0d5181d14f 100644 --- a/app/multitenant/dynamo_collector.go +++ b/app/multitenant/dynamo_collector.go @@ -2,7 +2,6 @@ package multitenant import ( "bytes" - "compress/gzip" "crypto/md5" "fmt" "io" @@ -18,7 +17,6 @@ import ( "github.com/bluele/gcache" "github.com/nats-io/nats" "github.com/prometheus/client_golang/prometheus" - "github.com/ugorji/go/codec" "golang.org/x/net/context" "github.com/weaveworks/scope/app" @@ -312,15 +310,7 @@ func (c *dynamoDBCollector) getNonCachedReport(reportKey string) (*report.Report if err != nil { return nil, err } - reader, err := gzip.NewReader(resp.Body) - if err != nil { - return nil, err - } - rep := report.MakeReport() - if err := codec.NewDecoder(reader, &codec.MsgpackHandle{}).Decode(&rep); err != nil { - return nil, err - } - return &rep, nil + return report.MakeFromBinary(resp.Body) } func (c *dynamoDBCollector) getReports(userid string, row int64, start, end time.Time) ([]report.Report, error) { @@ -387,14 +377,7 @@ func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error { // first, encode the report into a buffer and record its size var buf bytes.Buffer - writer, err := gzip.NewWriterLevel(&buf, gzip.BestCompression) - if err != nil { - return err - } - if err := codec.NewEncoder(writer, &codec.MsgpackHandle{}).Encode(&rep); err != nil { - return err - } - writer.Close() + rep.WriteBinary(&buf) reportSize.Add(float64(buf.Len())) // second, put the report on s3 diff --git a/app/router.go b/app/router.go index fce7c7311d..27e71ce482 100644 --- a/app/router.go +++ b/app/router.go @@ -1,9 +1,7 @@ package app import ( - "compress/gzip" - "encoding/gob" - "io" + "fmt" "net/http" "net/url" "strings" @@ -102,63 +100,32 @@ func RegisterTopologyRoutes(router *mux.Router, r Reporter) { gzipHandler(requestContextDecorator(makeProbeHandler(r)))) } -type byteCounter struct { - next io.ReadCloser - count *uint64 -} - -func (c byteCounter) Read(p []byte) (n int, err error) { - n, err = c.next.Read(p) - *c.count += uint64(n) - return n, err -} - -func (c byteCounter) Close() error { - return c.next.Close() -} - // RegisterReportPostHandler registers the handler for report submission func RegisterReportPostHandler(a Adder, router *mux.Router) { post := router.Methods("POST").Subrouter() post.HandleFunc("/api/report", requestContextDecorator(func(ctx context.Context, w http.ResponseWriter, r *http.Request) { var ( - rpt report.Report - reader = r.Body - err error - compressedSize, uncompressedSize uint64 + rpt report.Report + reader = r.Body ) - if log.GetLevel() == log.DebugLevel { - reader = byteCounter{next: reader, count: &compressedSize} - } - if strings.Contains(r.Header.Get("Content-Encoding"), "gzip") { - reader, err = gzip.NewReader(reader) - if err != nil { - respondWith(w, http.StatusBadRequest, err) - return - } - } - - if log.GetLevel() == log.DebugLevel { - reader = byteCounter{next: reader, count: &uncompressedSize} - } - decoder := gob.NewDecoder(reader).Decode - if strings.HasPrefix(r.Header.Get("Content-Type"), "application/json") { - decoder = codec.NewDecoder(reader, &codec.JsonHandle{}).Decode - } else if strings.HasPrefix(r.Header.Get("Content-Type"), "application/msgpack") { - decoder = codec.NewDecoder(reader, &codec.MsgpackHandle{}).Decode + gzipped := strings.Contains(r.Header.Get("Content-Encoding"), "gzip") + contentType := r.Header.Get("Content-Type") + var handle codec.Handle + switch { + case strings.HasPrefix(contentType, "application/json"): + handle = &codec.JsonHandle{} + case strings.HasPrefix(contentType, "application/msgpack"): + handle = &codec.MsgpackHandle{} + default: + respondWith(w, http.StatusBadRequest, fmt.Errorf("Unsupported Content-Type: %v", contentType)) + return } - if err := decoder(&rpt); err != nil { + if err := rpt.ReadBinary(reader, gzipped, handle); err != nil { respondWith(w, http.StatusBadRequest, err) return } - log.Debugf( - "Received report sizes: compressed %d bytes, uncompressed %d bytes (%.2f%%)", - compressedSize, - uncompressedSize, - float32(compressedSize)/float32(uncompressedSize)*100, - ) if err := a.Add(ctx, rpt); err != nil { log.Errorf("Error Adding report: %v", err) diff --git a/app/router_test.go b/app/router_test.go index 563320ca47..37980f37be 100644 --- a/app/router_test.go +++ b/app/router_test.go @@ -2,7 +2,6 @@ package app_test import ( "bytes" - "encoding/gob" "io/ioutil" "net/http" "net/http/httptest" @@ -84,11 +83,6 @@ func TestReportPostHandler(t *testing.T) { } } - test("", func(v interface{}) ([]byte, error) { - buf := &bytes.Buffer{} - err := gob.NewEncoder(buf).Encode(v) - return buf.Bytes(), err - }) test("application/json", func(v interface{}) ([]byte, error) { buf := &bytes.Buffer{} err := codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(v) diff --git a/probe/appclient/report_publisher.go b/probe/appclient/report_publisher.go index 5e713ebb8d..2e21341473 100644 --- a/probe/appclient/report_publisher.go +++ b/probe/appclient/report_publisher.go @@ -2,9 +2,6 @@ package appclient import ( "bytes" - "compress/gzip" - "github.com/ugorji/go/codec" - "github.com/weaveworks/scope/report" ) @@ -24,11 +21,6 @@ func NewReportPublisher(publisher Publisher) *ReportPublisher { // Publish serialises and compresses a report, then passes it to a publisher func (p *ReportPublisher) Publish(r report.Report) error { buf := &bytes.Buffer{} - gzwriter := gzip.NewWriter(buf) - if err := codec.NewEncoder(gzwriter, &codec.MsgpackHandle{}).Encode(r); err != nil { - return err - } - gzwriter.Close() // otherwise the content won't get flushed to the output stream - + r.WriteBinary(buf) return p.publisher.Publish(buf) } diff --git a/report/marshal.go b/report/marshal.go new file mode 100644 index 0000000000..2cafd0568a --- /dev/null +++ b/report/marshal.go @@ -0,0 +1,77 @@ +package report + +import ( + "compress/gzip" + "io" + + log "github.com/Sirupsen/logrus" + "github.com/ugorji/go/codec" +) + +// WriteBinary writes a Report as a gzipped msgpack. +func (rep Report) WriteBinary(w io.Writer) error { + gzwriter, err := gzip.NewWriterLevel(w, gzip.BestCompression) + if err != nil { + return err + } + if err = codec.NewEncoder(gzwriter, &codec.MsgpackHandle{}).Encode(&rep); err != nil { + return err + } + gzwriter.Close() // otherwise the content won't get flushed to the output stream + return nil +} + +type byteCounter struct { + next io.Reader + count *uint64 +} + +func (c byteCounter) Read(p []byte) (n int, err error) { + n, err = c.next.Read(p) + *c.count += uint64(n) + return n, err +} + +// ReadBinary reads bytes into a Report. +// +// Will decompress the binary if gzipped is true, and will use the given +// codecHandle to decode it. +func (rep *Report) ReadBinary(r io.Reader, gzipped bool, codecHandle codec.Handle) error { + var err error + var compressedSize, uncompressedSize uint64 + + // We have historically had trouble with reports being too large. We are + // keeping this instrumentation around to help us implement + // weaveworks/scope#985. + if log.GetLevel() == log.DebugLevel { + r = byteCounter{next: r, count: &compressedSize} + } + if gzipped { + r, err = gzip.NewReader(r) + if err != nil { + return err + } + } + if log.GetLevel() == log.DebugLevel { + r = byteCounter{next: r, count: &uncompressedSize} + } + if err := codec.NewDecoder(r, codecHandle).Decode(&rep); err != nil { + return err + } + log.Debugf( + "Received report sizes: compressed %d bytes, uncompressed %d bytes (%.2f%%)", + compressedSize, + uncompressedSize, + float32(compressedSize)/float32(uncompressedSize)*100, + ) + return nil +} + +// MakeFromBinary constructs a Report from a gzipped msgpack. +func MakeFromBinary(r io.Reader) (*Report, error) { + rep := MakeReport() + if err := rep.ReadBinary(r, true, &codec.MsgpackHandle{}); err != nil { + return nil, err + } + return &rep, nil +}