Skip to content

Commit

Permalink
Reuse the buffer in encoder and decoder (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
cw9 authored and Chao Wang committed Jun 7, 2018
1 parent bac368f commit 768ac30
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 33 deletions.
2 changes: 1 addition & 1 deletion src/msg/protocol/proto/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

const (
sizeBufferSize = 4
sizeEncodingLength = 4
)

var sizeEncodeDecoder = binary.BigEndian
Expand Down
15 changes: 7 additions & 8 deletions src/msg/protocol/proto/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import (

type decoder struct {
r io.Reader
sizeBuffer []byte
dataBuffer []byte
buffer []byte
bytesPool pool.BytesPool
maxMessageSize int
}
Expand All @@ -47,7 +46,7 @@ func newDecoder(r io.Reader, opts BaseOptions) *decoder {
pool := opts.BytesPool()
return &decoder{
r: r,
sizeBuffer: getByteSliceWithLength(sizeBufferSize, pool),
buffer: getByteSliceWithLength(sizeEncodingLength, pool),
bytesPool: pool,
maxMessageSize: opts.MaxMessageSize(),
}
Expand All @@ -65,19 +64,19 @@ func (d *decoder) Decode(m Unmarshaler) error {
}

func (d *decoder) decodeSize() (int, error) {
if _, err := io.ReadFull(d.r, d.sizeBuffer); err != nil {
if _, err := io.ReadFull(d.r, d.buffer[:sizeEncodingLength]); err != nil {
return 0, err
}
size := sizeEncodeDecoder.Uint32(d.sizeBuffer)
size := sizeEncodeDecoder.Uint32(d.buffer[:sizeEncodingLength])
return int(size), nil
}

func (d *decoder) decodeData(m Unmarshaler, size int) error {
d.dataBuffer = growDataBufferIfNeeded(d.dataBuffer, size, d.bytesPool)
if _, err := io.ReadFull(d.r, d.dataBuffer[:size]); err != nil {
d.buffer = growDataBufferIfNeeded(d.buffer, size, d.bytesPool)
if _, err := io.ReadFull(d.r, d.buffer[:size]); err != nil {
return err
}
return m.Unmarshal(d.dataBuffer[:size])
return m.Unmarshal(d.buffer[:size])
}

func (d *decoder) resetReader(r io.Reader) {
Expand Down
15 changes: 7 additions & 8 deletions src/msg/protocol/proto/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import (

type encoder struct {
w io.Writer
sizeBuffer []byte
dataBuffer []byte
buffer []byte
bytesPool pool.BytesPool
maxMessageSize int
}
Expand All @@ -47,7 +46,7 @@ func newEncoder(w io.Writer, opts BaseOptions) *encoder {
pool := opts.BytesPool()
return &encoder{
w: w,
sizeBuffer: getByteSliceWithLength(sizeBufferSize, pool),
buffer: getByteSliceWithLength(sizeEncodingLength, pool),
bytesPool: pool,
maxMessageSize: opts.MaxMessageSize(),
}
Expand All @@ -65,18 +64,18 @@ func (e *encoder) Encode(m Marshaler) error {
}

func (e *encoder) encodeSize(size int) error {
sizeEncodeDecoder.PutUint32(e.sizeBuffer, uint32(size))
_, err := e.w.Write(e.sizeBuffer)
sizeEncodeDecoder.PutUint32(e.buffer, uint32(size))
_, err := e.w.Write(e.buffer[:sizeEncodingLength])
return err
}

func (e *encoder) encodeData(m Marshaler, size int) error {
e.dataBuffer = growDataBufferIfNeeded(e.dataBuffer, size, e.bytesPool)
size, err := m.MarshalTo(e.dataBuffer)
e.buffer = growDataBufferIfNeeded(e.buffer, size, e.bytesPool)
size, err := m.MarshalTo(e.buffer)
if err != nil {
return err
}
_, err = e.w.Write(e.dataBuffer[:size])
_, err = e.w.Write(e.buffer[:size])
return err
}

Expand Down
32 changes: 16 additions & 16 deletions src/msg/protocol/proto/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ import (
func TestBaseEncodeDecodeRoundTripWithoutPool(t *testing.T) {
mimicTCP := bytes.NewBuffer(nil)
enc := newEncoder(mimicTCP, NewBaseOptions().SetBytesPool(nil))
require.Equal(t, 4, len(enc.sizeBuffer))
require.Equal(t, 4, cap(enc.sizeBuffer))
require.Equal(t, 4, len(enc.buffer))
require.Equal(t, 4, cap(enc.buffer))
dec := newDecoder(mimicTCP, NewBaseOptions().SetBytesPool(nil))
require.Equal(t, 4, len(dec.sizeBuffer))
require.Equal(t, 4, cap(dec.sizeBuffer))
require.Equal(t, 4, len(dec.buffer))
require.Equal(t, 4, cap(dec.buffer))
encodeMsg := msgpb.Message{
Metadata: msgpb.Metadata{
Shard: 1,
Expand All @@ -48,23 +48,23 @@ func TestBaseEncodeDecodeRoundTripWithoutPool(t *testing.T) {
decodeMsg := msgpb.Message{}

require.NoError(t, enc.Encode(&encodeMsg))
require.Equal(t, encodeMsg.Size(), len(enc.dataBuffer))
require.Equal(t, encodeMsg.Size(), cap(enc.dataBuffer))
require.Equal(t, encodeMsg.Size(), len(enc.buffer))
require.Equal(t, encodeMsg.Size(), cap(enc.buffer))
require.NoError(t, dec.Decode(&decodeMsg))
require.Equal(t, decodeMsg.Size(), len(dec.dataBuffer))
require.Equal(t, encodeMsg.Size(), cap(dec.dataBuffer))
require.Equal(t, decodeMsg.Size(), len(dec.buffer))
require.Equal(t, encodeMsg.Size(), cap(dec.buffer))
}

func TestBaseEncodeDecodeRoundTripWithPool(t *testing.T) {
p := getBytesPool(2, []int{2, 8, 100})
p.Init()
mimicTCP := bytes.NewBuffer(nil)
enc := newEncoder(mimicTCP, NewBaseOptions().SetBytesPool(p))
require.Equal(t, 8, len(enc.sizeBuffer))
require.Equal(t, 8, cap(enc.sizeBuffer))
require.Equal(t, 8, len(enc.buffer))
require.Equal(t, 8, cap(enc.buffer))
dec := newDecoder(mimicTCP, NewBaseOptions().SetBytesPool(p))
require.Equal(t, 8, len(dec.sizeBuffer))
require.Equal(t, 8, cap(dec.sizeBuffer))
require.Equal(t, 8, len(dec.buffer))
require.Equal(t, 8, cap(dec.buffer))
encodeMsg := msgpb.Message{
Metadata: msgpb.Metadata{
Shard: 1,
Expand All @@ -75,11 +75,11 @@ func TestBaseEncodeDecodeRoundTripWithPool(t *testing.T) {
decodeMsg := msgpb.Message{}

require.NoError(t, enc.Encode(&encodeMsg))
require.Equal(t, 100, len(enc.dataBuffer))
require.Equal(t, 100, cap(enc.dataBuffer))
require.Equal(t, 100, len(enc.buffer))
require.Equal(t, 100, cap(enc.buffer))
require.NoError(t, dec.Decode(&decodeMsg))
require.Equal(t, 100, len(dec.dataBuffer))
require.Equal(t, 100, cap(dec.dataBuffer))
require.Equal(t, 100, len(dec.buffer))
require.Equal(t, 100, cap(dec.buffer))
}

func TestResetReader(t *testing.T) {
Expand Down

0 comments on commit 768ac30

Please sign in to comment.