Skip to content

Commit

Permalink
Merge pull request #826 from hashicorp/f-compress-raft
Browse files Browse the repository at this point in the history
LZW compress raft entries
  • Loading branch information
dadgar committed Feb 21, 2016
2 parents 2ed5d34 + a687ee6 commit d9059c5
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 86 deletions.
11 changes: 11 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
ignoreUnknown = true
}

PARSE_TYPE:
switch msgType {
case structs.NodeRegisterRequestType:
return n.applyUpsertNode(buf[1:], log.Index)
Expand All @@ -133,6 +134,16 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyAllocUpdate(buf[1:], log.Index)
case structs.AllocClientUpdateRequestType:
return n.applyAllocClientUpdate(buf[1:], log.Index)
case structs.CompressedRequestType:
decomp, err := structs.Uncompress(buf[1:])
if err != nil {
panic(fmt.Errorf("failed to decompress request: %#v", buf))
}

// Store the inner message type and buffer and re-enter switch
msgType = structs.MessageType(decomp[0])
buf = decomp
goto PARSE_TYPE
default:
if ignoreUnknown {
n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
Expand Down
36 changes: 18 additions & 18 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestFSM_UpsertNode(t *testing.T) {
req := structs.NodeRegisterRequest{
Node: mock.Node(),
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
buf, err := structs.EncodeCompressed(structs.NodeRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestFSM_DeregisterNode(t *testing.T) {
req := structs.NodeRegisterRequest{
Node: node,
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
buf, err := structs.EncodeCompressed(structs.NodeRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -122,7 +122,7 @@ func TestFSM_DeregisterNode(t *testing.T) {
req2 := structs.NodeDeregisterRequest{
NodeID: node.ID,
}
buf, err = structs.Encode(structs.NodeDeregisterRequestType, req2)
buf, err = structs.EncodeCompressed(structs.NodeDeregisterRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestFSM_UpdateNodeStatus(t *testing.T) {
req := structs.NodeRegisterRequest{
Node: node,
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
buf, err := structs.EncodeCompressed(structs.NodeRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -169,7 +169,7 @@ func TestFSM_UpdateNodeStatus(t *testing.T) {
NodeID: node.ID,
Status: structs.NodeStatusReady,
}
buf, err = structs.Encode(structs.NodeUpdateStatusRequestType, req2)
buf, err = structs.EncodeCompressed(structs.NodeUpdateStatusRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestFSM_UpdateNodeDrain(t *testing.T) {
req := structs.NodeRegisterRequest{
Node: node,
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
buf, err := structs.EncodeCompressed(structs.NodeRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -221,7 +221,7 @@ func TestFSM_UpdateNodeDrain(t *testing.T) {
NodeID: node.ID,
Drain: true,
}
buf, err = structs.Encode(structs.NodeUpdateDrainRequestType, req2)
buf, err = structs.EncodeCompressed(structs.NodeUpdateDrainRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -248,7 +248,7 @@ func TestFSM_RegisterJob(t *testing.T) {
req := structs.JobRegisterRequest{
Job: job,
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
buf, err := structs.EncodeCompressed(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -295,7 +295,7 @@ func TestFSM_DeregisterJob(t *testing.T) {
req := structs.JobRegisterRequest{
Job: job,
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
buf, err := structs.EncodeCompressed(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -308,7 +308,7 @@ func TestFSM_DeregisterJob(t *testing.T) {
req2 := structs.JobDeregisterRequest{
JobID: job.ID,
}
buf, err = structs.Encode(structs.JobDeregisterRequestType, req2)
buf, err = structs.EncodeCompressed(structs.JobDeregisterRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestFSM_UpdateEval(t *testing.T) {
req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{mock.Eval()},
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
buf, err := structs.EncodeCompressed(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -390,7 +390,7 @@ func TestFSM_UpdateEval_Blocked(t *testing.T) {
req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
buf, err := structs.EncodeCompressed(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -432,7 +432,7 @@ func TestFSM_DeleteEval(t *testing.T) {
req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
buf, err := structs.EncodeCompressed(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -445,7 +445,7 @@ func TestFSM_DeleteEval(t *testing.T) {
req2 := structs.EvalDeleteRequest{
Evals: []string{eval.ID},
}
buf, err = structs.Encode(structs.EvalDeleteRequestType, req2)
buf, err = structs.EncodeCompressed(structs.EvalDeleteRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -472,7 +472,7 @@ func TestFSM_UpsertAllocs(t *testing.T) {
req := structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
}
buf, err := structs.Encode(structs.AllocUpdateRequestType, req)
buf, err := structs.EncodeCompressed(structs.AllocUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -500,7 +500,7 @@ func TestFSM_UpsertAllocs(t *testing.T) {
req2 := structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{evictAlloc},
}
buf, err = structs.Encode(structs.AllocUpdateRequestType, req2)
buf, err = structs.EncodeCompressed(structs.AllocUpdateRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -550,7 +550,7 @@ func TestFSM_UpdateAllocFromClient(t *testing.T) {
req := structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{clientAlloc},
}
buf, err := structs.Encode(structs.AllocClientUpdateRequestType, req)
buf, err := structs.EncodeCompressed(structs.AllocClientUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -597,7 +597,7 @@ func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) {
req := structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{clientAlloc},
}
buf, err := structs.Encode(structs.AllocClientUpdateRequestType, req)
buf, err := structs.EncodeCompressed(structs.AllocClientUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (s *Server) forwardRegion(region, method string, args interface{}, reply in

// raftApplyFuture is used to encode a message, run it through raft, and return the Raft future.
func (s *Server) raftApplyFuture(t structs.MessageType, msg interface{}) (raft.ApplyFuture, error) {
buf, err := structs.Encode(t, msg)
buf, err := structs.EncodeCompressed(t, msg)
if err != nil {
return nil, fmt.Errorf("Failed to encode request: %v", err)
}
Expand Down
49 changes: 49 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package structs

import (
"bytes"
"compress/lzw"
"crypto/sha1"
"errors"
"fmt"
Expand Down Expand Up @@ -38,6 +39,7 @@ const (
EvalDeleteRequestType
AllocUpdateRequestType
AllocClientUpdateRequestType
CompressedRequestType
)

const (
Expand Down Expand Up @@ -2500,6 +2502,53 @@ var MsgpackHandle = func() *codec.MsgpackHandle {
return h
}()

// EncodeCompressed encodes and compresses the passed payload. The compressed
// payload is prefixed with the CompressedRequestType header byte.
func EncodeCompressed(t MessageType, msg interface{}) ([]byte, error) {
// Create a buffer that will store in its first byte the compressed
// header type and in its following bytes the compressed payload.
var buf bytes.Buffer
buf.WriteByte(uint8(CompressedRequestType))

// Create the compressed writer to compress the user payload.
compWriter := lzw.NewWriter(&buf, lzw.LSB, 8)

// Encode the input.
encoded, err := Encode(t, msg)
if err != nil {
return nil, err
}

// Compress the encoded data.
if _, err := compWriter.Write(encoded); err != nil {
return nil, err
}

// Close the writer to ensure the data gets flushed.
if err := compWriter.Close(); err != nil {
return nil, err
}

return buf.Bytes(), nil
}

// Uncompress uncompresses the compressed payload returned by EncodeCompressed
// stripped of the header byte. The decompressed payload can be decoded by
// calling Decode with the original message type.
func Uncompress(buf []byte) ([]byte, error) {
uncomp := lzw.NewReader(bytes.NewReader(buf), lzw.LSB, 8)
defer uncomp.Close()

// Read all the data
var b bytes.Buffer
if _, err := io.Copy(&b, uncomp); err != nil {
return nil, err
}

// Return the uncompressed bytes
return b.Bytes(), nil
}

// Decode is used to decode a MsgPack encoded object
func Decode(buf []byte, out interface{}) error {
return codec.NewDecoder(bytes.NewReader(buf), MsgpackHandle).Decode(out)
Expand Down
Loading

0 comments on commit d9059c5

Please sign in to comment.