Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Msgpack perf: write psMap out directly #2466

Merged
merged 4 commits into from
Apr 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/api_topologies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func TestAPITopologyAddsKubernetes(t *testing.T) {
buf := &bytes.Buffer{}
encoder := codec.NewEncoder(buf, &codec.MsgpackHandle{})
if err := encoder.Encode(rpt); err != nil {
t.Fatalf("GOB encoding error: %s", err)
t.Fatalf("Msgpack encoding error: %s", err)
}
checkRequest(t, ts, "POST", "/api/report", buf.Bytes())

Expand Down
18 changes: 0 additions & 18 deletions common/xfer/plugin_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package xfer

import (
"bytes"
"encoding/gob"
"fmt"
"sort"

Expand Down Expand Up @@ -209,23 +208,6 @@ func (*PluginSpecs) UnmarshalJSON(b []byte) error {
panic("UnmarshalJSON shouldn't be used, use CodecDecodeSelf instead")
}

// GobEncode implements gob.Marshaller
func (n PluginSpecs) GobEncode() ([]byte, error) {
buf := bytes.Buffer{}
err := gob.NewEncoder(&buf).Encode(n.toIntermediate())
return buf.Bytes(), err
}

// GobDecode implements gob.Unmarshaller
func (n *PluginSpecs) GobDecode(input []byte) error {
in := []PluginSpec{}
if err := gob.NewDecoder(bytes.NewBuffer(input)).Decode(&in); err != nil {
return err
}
*n = PluginSpecs{}.fromIntermediate(in)
return nil
}

// PluginSpecsByID implements sort.Interface, so we can sort the specs by the
// ID field.
type PluginSpecsByID []PluginSpec
Expand Down
18 changes: 3 additions & 15 deletions extras/generate_latest_map
Original file line number Diff line number Diff line change
Expand Up @@ -155,23 +155,11 @@ function generate_latest_map() {
})
}

func (m ${latest_map_type}) toIntermediate() map[string]${entry_type} {
intermediate := make(map[string]${entry_type}, m.Size())
if m.Map != nil {
m.Map.ForEach(func(key string, val interface{}) {
intermediate[key] = *val.(*${entry_type})
})
}
return intermediate
}

// CodecEncodeSelf implements codec.Selfer.
func (m *${latest_map_type}) CodecEncodeSelf(encoder *codec.Encoder) {
if m.Map != nil {
encoder.Encode(m.toIntermediate())
} else {
encoder.Encode(nil)
}
mapWrite(m.Map, encoder, func(encoder *codec.Encoder, val interface{}) {
val.(*${entry_type}).CodecEncodeSelf(encoder)
})
}

// CodecDecodeSelf implements codec.Selfer.
Expand Down
31 changes: 4 additions & 27 deletions report/counters.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package report

import (
"bytes"
"encoding/gob"
"fmt"
"reflect"
"sort"
Expand Down Expand Up @@ -137,14 +136,6 @@ func (c Counters) DeepEqual(d Counters) bool {
return equal
}

func (c Counters) toIntermediate() map[string]int {
intermediate := map[string]int{}
c.ForEach(func(key string, val int) {
intermediate[key] = val
})
return intermediate
}

func (c Counters) fromIntermediate(in map[string]int) Counters {
out := ps.NewMap()
for k, v := range in {
Expand All @@ -155,7 +146,10 @@ func (c Counters) fromIntermediate(in map[string]int) Counters {

// CodecEncodeSelf implements codec.Selfer
func (c *Counters) CodecEncodeSelf(encoder *codec.Encoder) {
encoder.Encode(c.toIntermediate())
mapWrite(c.psMap, encoder, func(encoder *codec.Encoder, val interface{}) {
i := val.(int)
encoder.Encode(i)
})
}

// CodecDecodeSelf implements codec.Selfer
Expand All @@ -179,20 +173,3 @@ func (Counters) MarshalJSON() ([]byte, error) {
func (*Counters) UnmarshalJSON(b []byte) error {
panic("UnmarshalJSON shouldn't be used, use CodecDecodeSelf instead")
}

// GobEncode implements gob.Marshaller
func (c Counters) GobEncode() ([]byte, error) {
buf := bytes.Buffer{}
err := gob.NewEncoder(&buf).Encode(c.toIntermediate())
return buf.Bytes(), err
}

// GobDecode implements gob.Unmarshaller
func (c *Counters) GobDecode(input []byte) error {
in := map[string]int{}
if err := gob.NewDecoder(bytes.NewBuffer(input)).Decode(&in); err != nil {
return err
}
*c = Counters{}.fromIntermediate(in)
return nil
}
12 changes: 0 additions & 12 deletions report/counters_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,6 @@ func TestCountersEncoding(t *testing.T) {
Add("foo", 1).
Add("bar", 2)

{
gobs, err := want.GobEncode()
if err != nil {
t.Fatal(err)
}
have := EmptyCounters
have.GobDecode(gobs)
if !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
}

{

for _, h := range []codec.Handle{
Expand Down
45 changes: 4 additions & 41 deletions report/edge_metadatas.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package report

import (
"bytes"
"encoding/gob"
"fmt"
"reflect"
"sort"
Expand Down Expand Up @@ -148,31 +147,12 @@ func (c EdgeMetadatas) DeepEqual(d EdgeMetadatas) bool {
return equal
}

func (c EdgeMetadatas) toIntermediate() map[string]EdgeMetadata {
intermediate := map[string]EdgeMetadata{}
if c.psMap != nil {
c.psMap.ForEach(func(key string, val interface{}) {
intermediate[key] = val.(EdgeMetadata)
})
}
return intermediate
}

func (c EdgeMetadatas) fromIntermediate(in map[string]EdgeMetadata) EdgeMetadatas {
out := ps.NewMap()
for k, v := range in {
out = out.Set(k, v)
}
return EdgeMetadatas{out}
}

// CodecEncodeSelf implements codec.Selfer
func (c *EdgeMetadatas) CodecEncodeSelf(encoder *codec.Encoder) {
if c.psMap != nil {
encoder.Encode(c.toIntermediate())
} else {
encoder.Encode(nil)
}
mapWrite(c.psMap, encoder, func(encoder *codec.Encoder, val interface{}) {
e := val.(EdgeMetadata)
(&e).CodecEncodeSelf(encoder)
})
}

// CodecDecodeSelf implements codec.Selfer
Expand All @@ -197,23 +177,6 @@ func (*EdgeMetadatas) UnmarshalJSON(b []byte) error {
panic("UnmarshalJSON shouldn't be used, use CodecDecodeSelf instead")
}

// GobEncode implements gob.Marshaller
func (c EdgeMetadatas) GobEncode() ([]byte, error) {
buf := bytes.Buffer{}
err := gob.NewEncoder(&buf).Encode(c.toIntermediate())
return buf.Bytes(), err
}

// GobDecode implements gob.Unmarshaller
func (c *EdgeMetadatas) GobDecode(input []byte) error {
in := map[string]EdgeMetadata{}
if err := gob.NewDecoder(bytes.NewBuffer(input)).Decode(&in); err != nil {
return err
}
*c = EdgeMetadatas{}.fromIntermediate(in)
return nil
}

// EdgeMetadata describes a superset of the metadata that probes can possibly
// collect about a directed edge between two nodes in any topology.
type EdgeMetadata struct {
Expand Down
24 changes: 0 additions & 24 deletions report/edge_metadatas_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,18 +214,6 @@ func TestEdgeMetadatasEncoding(t *testing.T) {
EgressPacketCount: newu64(3),
})

{
gobs, err := want.GobEncode()
if err != nil {
t.Fatal(err)
}
have := EmptyEdgeMetadatas
have.GobDecode(gobs)
if !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
}

{
for _, h := range []codec.Handle{
codec.Handle(&codec.MsgpackHandle{}),
Expand All @@ -247,18 +235,6 @@ func TestEdgeMetadatasEncoding(t *testing.T) {
func TestEdgeMetadatasEncodingNil(t *testing.T) {
want := EdgeMetadatas{}

{
gobs, err := want.GobEncode()
if err != nil {
t.Fatal(err)
}
have := EmptyEdgeMetadatas
have.GobDecode(gobs)
if have.psMap == nil {
t.Error("needed to get back a non-nil psMap for EdgeMetadata")
}
}

{

for _, h := range []codec.Handle{
Expand Down
36 changes: 6 additions & 30 deletions report/latest_map_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,23 +122,11 @@ func (m StringLatestMap) DeepEqual(n StringLatestMap) bool {
})
}

func (m StringLatestMap) toIntermediate() map[string]stringLatestEntry {
intermediate := make(map[string]stringLatestEntry, m.Size())
if m.Map != nil {
m.Map.ForEach(func(key string, val interface{}) {
intermediate[key] = *val.(*stringLatestEntry)
})
}
return intermediate
}

// CodecEncodeSelf implements codec.Selfer.
func (m *StringLatestMap) CodecEncodeSelf(encoder *codec.Encoder) {
if m.Map != nil {
encoder.Encode(m.toIntermediate())
} else {
encoder.Encode(nil)
}
mapWrite(m.Map, encoder, func(encoder *codec.Encoder, val interface{}) {
val.(*stringLatestEntry).CodecEncodeSelf(encoder)
})
}

// CodecDecodeSelf implements codec.Selfer.
Expand Down Expand Up @@ -274,23 +262,11 @@ func (m NodeControlDataLatestMap) DeepEqual(n NodeControlDataLatestMap) bool {
})
}

func (m NodeControlDataLatestMap) toIntermediate() map[string]nodeControlDataLatestEntry {
intermediate := make(map[string]nodeControlDataLatestEntry, m.Size())
if m.Map != nil {
m.Map.ForEach(func(key string, val interface{}) {
intermediate[key] = *val.(*nodeControlDataLatestEntry)
})
}
return intermediate
}

// CodecEncodeSelf implements codec.Selfer.
func (m *NodeControlDataLatestMap) CodecEncodeSelf(encoder *codec.Encoder) {
if m.Map != nil {
encoder.Encode(m.toIntermediate())
} else {
encoder.Encode(nil)
}
mapWrite(m.Map, encoder, func(encoder *codec.Encoder, val interface{}) {
val.(*nodeControlDataLatestEntry).CodecEncodeSelf(encoder)
})
}

// CodecDecodeSelf implements codec.Selfer.
Expand Down
21 changes: 20 additions & 1 deletion report/map_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,10 @@ const (
containerMapKey = 2
containerMapValue = 3
containerMapEnd = 4
// from https://github.com/ugorji/go/blob/master/codec/helper.go#L152
cUTF8 = 2
)

// CodecDecodeSelf implements codec.Selfer.
// This implementation does not use the intermediate form as that was a
// performance issue; skipping it saved almost 10% CPU. Note this means
// we are using undocumented, internal APIs, which could break in the future.
Expand Down Expand Up @@ -118,3 +119,21 @@ func mapRead(decoder *codec.Decoder, decodeValue func(isNil bool) interface{}) p
z.DecSendContainerState(containerMapEnd)
return out
}

// Inverse of mapRead, done for performance. Same comments about
// undocumented internal APIs apply.
func mapWrite(m ps.Map, encoder *codec.Encoder, encodeValue func(*codec.Encoder, interface{})) {
z, r := codec.GenHelperEncoder(encoder)
if m == nil {
r.EncodeNil()
return
}
r.EncodeMapStart(m.Size())
m.ForEach(func(key string, val interface{}) {
z.EncSendContainerState(containerMapKey)
r.EncodeString(cUTF8, key)
z.EncSendContainerState(containerMapValue)
encodeValue(encoder, val)
})
z.EncSendContainerState(containerMapEnd)
}
4 changes: 4 additions & 0 deletions report/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ func (s *dummySelfer) CodecDecodeSelf(decoder *codec.Decoder) {
panic("This shouldn't happen: perhaps something has gone wrong in code generation?")
}

func (s *dummySelfer) CodecEncodeSelf(encoder *codec.Encoder) {
panic("This shouldn't happen: perhaps something has gone wrong in code generation?")
}

// WriteBinary writes a Report as a gzipped msgpack.
func (rep Report) WriteBinary(w io.Writer, compressionLevel int) error {
gzwriter, err := gzip.NewWriterLevel(w, compressionLevel)
Expand Down
18 changes: 0 additions & 18 deletions report/node_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package report

import (
"bytes"
"encoding/gob"
"fmt"
"sort"

Expand Down Expand Up @@ -206,20 +205,3 @@ func (NodeSet) MarshalJSON() ([]byte, error) {
func (*NodeSet) UnmarshalJSON(b []byte) error {
panic("UnmarshalJSON shouldn't be used, use CodecDecodeSelf instead")
}

// GobEncode implements gob.Marshaller
func (n NodeSet) GobEncode() ([]byte, error) {
buf := bytes.Buffer{}
err := gob.NewEncoder(&buf).Encode(n.toIntermediate())
return buf.Bytes(), err
}

// GobDecode implements gob.Unmarshaller
func (n *NodeSet) GobDecode(input []byte) error {
in := []Node{}
if err := gob.NewDecoder(bytes.NewBuffer(input)).Decode(&in); err != nil {
return err
}
*n = NodeSet{}.fromIntermediate(in)
return nil
}
Loading