From a687ee6df316ffadac5305f4be9cf9a5642cd6af Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sat, 20 Feb 2016 13:21:38 -0800 Subject: [PATCH] Lzw compress raft entries --- nomad/fsm.go | 11 +++ nomad/fsm_test.go | 36 +++---- nomad/rpc.go | 2 +- nomad/structs/structs.go | 49 ++++++++++ nomad/structs/structs_test.go | 175 +++++++++++++++++++++------------- 5 files changed, 187 insertions(+), 86 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 7e9a57f3f2d..39618a8b9b9 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -112,6 +112,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { ignoreUnknown = true } +PARSE_TYPE: switch msgType { case structs.NodeRegisterRequestType: return n.applyUpsertNode(buf[1:], log.Index) @@ -133,6 +134,16 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyAllocUpdate(buf[1:], log.Index) case structs.AllocClientUpdateRequestType: return n.applyAllocClientUpdate(buf[1:], log.Index) + case structs.CompressedRequestType: + decomp, err := structs.Uncompress(buf[1:]) + if err != nil { + panic(fmt.Errorf("failed to decompress request: %#v", buf)) + } + + // Store the inner message type and buffer and re-enter switch + msgType = structs.MessageType(decomp[0]) + buf = decomp + goto PARSE_TYPE default: if ignoreUnknown { n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType) diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 0961a708fec..f00526dea94 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -73,7 +73,7 @@ func TestFSM_UpsertNode(t *testing.T) { req := structs.NodeRegisterRequest{ Node: mock.Node(), } - buf, err := structs.Encode(structs.NodeRegisterRequestType, req) + buf, err := structs.EncodeCompressed(structs.NodeRegisterRequestType, req) if err != nil { t.Fatalf("err: %v", err) } @@ -109,7 +109,7 @@ func TestFSM_DeregisterNode(t *testing.T) { req := structs.NodeRegisterRequest{ Node: node, } - buf, err := structs.Encode(structs.NodeRegisterRequestType, req) + buf, err := structs.EncodeCompressed(structs.NodeRegisterRequestType, req) if err != nil { t.Fatalf("err: %v", err) } @@ -122,7 +122,7 @@ func TestFSM_DeregisterNode(t *testing.T) { req2 := structs.NodeDeregisterRequest{ NodeID: node.ID, } - buf, err = structs.Encode(structs.NodeDeregisterRequestType, req2) + buf, err = structs.EncodeCompressed(structs.NodeDeregisterRequestType, req2) if err != nil { t.Fatalf("err: %v", err) } @@ -150,7 +150,7 @@ func TestFSM_UpdateNodeStatus(t *testing.T) { req := structs.NodeRegisterRequest{ Node: node, } - buf, err := structs.Encode(structs.NodeRegisterRequestType, req) + buf, err := structs.EncodeCompressed(structs.NodeRegisterRequestType, req) if err != nil { t.Fatalf("err: %v", err) } @@ -169,7 +169,7 @@ func TestFSM_UpdateNodeStatus(t *testing.T) { NodeID: node.ID, Status: structs.NodeStatusReady, } - buf, err = structs.Encode(structs.NodeUpdateStatusRequestType, req2) + buf, err = structs.EncodeCompressed(structs.NodeUpdateStatusRequestType, req2) if err != nil { t.Fatalf("err: %v", err) } @@ -207,7 +207,7 @@ func TestFSM_UpdateNodeDrain(t *testing.T) { req := structs.NodeRegisterRequest{ Node: node, } - buf, err := structs.Encode(structs.NodeRegisterRequestType, req) + buf, err := structs.EncodeCompressed(structs.NodeRegisterRequestType, req) if err != nil { t.Fatalf("err: %v", err) } @@ -221,7 +221,7 @@ func TestFSM_UpdateNodeDrain(t *testing.T) { NodeID: node.ID, Drain: true, } - buf, err = structs.Encode(structs.NodeUpdateDrainRequestType, req2) + buf, err = structs.EncodeCompressed(structs.NodeUpdateDrainRequestType, req2) if err != nil { t.Fatalf("err: %v", err) } @@ -248,7 +248,7 @@ func TestFSM_RegisterJob(t *testing.T) { req := structs.JobRegisterRequest{ Job: job, } - buf, err := structs.Encode(structs.JobRegisterRequestType, req) + buf, err := structs.EncodeCompressed(structs.JobRegisterRequestType, req) if err != nil { t.Fatalf("err: %v", err) } @@ -295,7 +295,7 @@ func TestFSM_DeregisterJob(t *testing.T) { req := structs.JobRegisterRequest{ Job: job, } - buf, err := structs.Encode(structs.JobRegisterRequestType, req) + buf, err := structs.EncodeCompressed(structs.JobRegisterRequestType, req) if err != nil { t.Fatalf("err: %v", err) } @@ -308,7 +308,7 @@ func TestFSM_DeregisterJob(t *testing.T) { req2 := structs.JobDeregisterRequest{ JobID: job.ID, } - buf, err = structs.Encode(structs.JobDeregisterRequestType, req2) + buf, err = structs.EncodeCompressed(structs.JobDeregisterRequestType, req2) if err != nil { t.Fatalf("err: %v", err) } @@ -349,7 +349,7 @@ func TestFSM_UpdateEval(t *testing.T) { req := structs.EvalUpdateRequest{ Evals: []*structs.Evaluation{mock.Eval()}, } - buf, err := structs.Encode(structs.EvalUpdateRequestType, req) + buf, err := structs.EncodeCompressed(structs.EvalUpdateRequestType, req) if err != nil { t.Fatalf("err: %v", err) } @@ -390,7 +390,7 @@ func TestFSM_UpdateEval_Blocked(t *testing.T) { req := structs.EvalUpdateRequest{ Evals: []*structs.Evaluation{eval}, } - buf, err := structs.Encode(structs.EvalUpdateRequestType, req) + buf, err := structs.EncodeCompressed(structs.EvalUpdateRequestType, req) if err != nil { t.Fatalf("err: %v", err) } @@ -432,7 +432,7 @@ func TestFSM_DeleteEval(t *testing.T) { req := structs.EvalUpdateRequest{ Evals: []*structs.Evaluation{eval}, } - buf, err := structs.Encode(structs.EvalUpdateRequestType, req) + buf, err := structs.EncodeCompressed(structs.EvalUpdateRequestType, req) if err != nil { t.Fatalf("err: %v", err) } @@ -445,7 +445,7 @@ func TestFSM_DeleteEval(t *testing.T) { req2 := structs.EvalDeleteRequest{ Evals: []string{eval.ID}, } - buf, err = structs.Encode(structs.EvalDeleteRequestType, req2) + buf, err = structs.EncodeCompressed(structs.EvalDeleteRequestType, req2) if err != nil { t.Fatalf("err: %v", err) } @@ -472,7 +472,7 @@ func TestFSM_UpsertAllocs(t *testing.T) { req := structs.AllocUpdateRequest{ Alloc: []*structs.Allocation{alloc}, } - buf, err := structs.Encode(structs.AllocUpdateRequestType, req) + buf, err := structs.EncodeCompressed(structs.AllocUpdateRequestType, req) if err != nil { t.Fatalf("err: %v", err) } @@ -500,7 +500,7 @@ func TestFSM_UpsertAllocs(t *testing.T) { req2 := structs.AllocUpdateRequest{ Alloc: []*structs.Allocation{evictAlloc}, } - buf, err = structs.Encode(structs.AllocUpdateRequestType, req2) + buf, err = structs.EncodeCompressed(structs.AllocUpdateRequestType, req2) if err != nil { t.Fatalf("err: %v", err) } @@ -550,7 +550,7 @@ func TestFSM_UpdateAllocFromClient(t *testing.T) { req := structs.AllocUpdateRequest{ Alloc: []*structs.Allocation{clientAlloc}, } - buf, err := structs.Encode(structs.AllocClientUpdateRequestType, req) + buf, err := structs.EncodeCompressed(structs.AllocClientUpdateRequestType, req) if err != nil { t.Fatalf("err: %v", err) } @@ -597,7 +597,7 @@ func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) { req := structs.AllocUpdateRequest{ Alloc: []*structs.Allocation{clientAlloc}, } - buf, err := structs.Encode(structs.AllocClientUpdateRequestType, req) + buf, err := structs.EncodeCompressed(structs.AllocClientUpdateRequestType, req) if err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/rpc.go b/nomad/rpc.go index e52e258f0c5..03ff4089c4b 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -242,7 +242,7 @@ func (s *Server) forwardRegion(region, method string, args interface{}, reply in // raftApplyFuture is used to encode a message, run it through raft, and return the Raft future. func (s *Server) raftApplyFuture(t structs.MessageType, msg interface{}) (raft.ApplyFuture, error) { - buf, err := structs.Encode(t, msg) + buf, err := structs.EncodeCompressed(t, msg) if err != nil { return nil, fmt.Errorf("Failed to encode request: %v", err) } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 93f43f57a4a..e2cd9d0e593 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2,6 +2,7 @@ package structs import ( "bytes" + "compress/lzw" "crypto/sha1" "errors" "fmt" @@ -38,6 +39,7 @@ const ( EvalDeleteRequestType AllocUpdateRequestType AllocClientUpdateRequestType + CompressedRequestType ) const ( @@ -2500,6 +2502,53 @@ var MsgpackHandle = func() *codec.MsgpackHandle { return h }() +// EncodeCompressed encodes and compresses the passed payload. The compressed +// payload is prefixed with the CompressedRequestType header byte. +func EncodeCompressed(t MessageType, msg interface{}) ([]byte, error) { + // Create a buffer that will store in its first byte the compressed + // header type and in its following bytes the compressed payload. + var buf bytes.Buffer + buf.WriteByte(uint8(CompressedRequestType)) + + // Create the compressed writer to compress the user payload. + compWriter := lzw.NewWriter(&buf, lzw.LSB, 8) + + // Encode the input. + encoded, err := Encode(t, msg) + if err != nil { + return nil, err + } + + // Compress the encoded data. + if _, err := compWriter.Write(encoded); err != nil { + return nil, err + } + + // Close the writer to ensure the data gets flushed. + if err := compWriter.Close(); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +// Uncompress uncompresses the compressed payload returned by EncodeCompressed +// stripped of the header byte. The decompressed payload can be decoded by +// calling Decode with the original message type. +func Uncompress(buf []byte) ([]byte, error) { + uncomp := lzw.NewReader(bytes.NewReader(buf), lzw.LSB, 8) + defer uncomp.Close() + + // Read all the data + var b bytes.Buffer + if _, err := io.Copy(&b, uncomp); err != nil { + return nil, err + } + + // Return the uncompressed bytes + return b.Bytes(), nil +} + // Decode is used to decode a MsgPack encoded object func Decode(buf []byte, out interface{}) error { return codec.NewDecoder(bytes.NewReader(buf), MsgpackHandle).Decode(out) diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 394a899116a..ab6d661a6f6 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -9,6 +9,76 @@ import ( "github.com/hashicorp/go-multierror" ) +func testJob() *Job { + return &Job{ + Region: "global", + ID: GenerateUUID(), + Name: "my-job", + Type: JobTypeService, + Priority: 50, + AllAtOnce: false, + Datacenters: []string{"dc1"}, + Constraints: []*Constraint{ + &Constraint{ + LTarget: "$attr.kernel.name", + RTarget: "linux", + Operand: "=", + }, + }, + Periodic: &PeriodicConfig{ + Enabled: false, + }, + TaskGroups: []*TaskGroup{ + &TaskGroup{ + Name: "web", + Count: 10, + RestartPolicy: &RestartPolicy{ + Attempts: 3, + Interval: 10 * time.Minute, + Delay: 1 * time.Minute, + }, + Tasks: []*Task{ + &Task{ + Name: "web", + Driver: "exec", + Config: map[string]interface{}{ + "command": "/bin/date", + }, + Env: map[string]string{ + "FOO": "bar", + }, + Services: []*Service{ + { + Name: "${TASK}-frontend", + PortLabel: "http", + }, + }, + Resources: &Resources{ + CPU: 500, + MemoryMB: 256, + Networks: []*NetworkResource{ + &NetworkResource{ + MBits: 50, + DynamicPorts: []Port{{Label: "http"}}, + }, + }, + }, + }, + }, + Meta: map[string]string{ + "elb_check_type": "http", + "elb_check_interval": "30s", + "elb_check_min": "3", + }, + }, + }, + Meta: map[string]string{ + "owner": "armon", + }, + } + +} + func TestJob_Validate(t *testing.T) { j := &Job{} err := j.Validate() @@ -94,73 +164,7 @@ func TestJob_Validate(t *testing.T) { } func TestJob_Copy(t *testing.T) { - j := &Job{ - Region: "global", - ID: GenerateUUID(), - Name: "my-job", - Type: JobTypeService, - Priority: 50, - AllAtOnce: false, - Datacenters: []string{"dc1"}, - Constraints: []*Constraint{ - &Constraint{ - LTarget: "$attr.kernel.name", - RTarget: "linux", - Operand: "=", - }, - }, - Periodic: &PeriodicConfig{ - Enabled: false, - }, - TaskGroups: []*TaskGroup{ - &TaskGroup{ - Name: "web", - Count: 10, - RestartPolicy: &RestartPolicy{ - Attempts: 3, - Interval: 10 * time.Minute, - Delay: 1 * time.Minute, - }, - Tasks: []*Task{ - &Task{ - Name: "web", - Driver: "exec", - Config: map[string]interface{}{ - "command": "/bin/date", - }, - Env: map[string]string{ - "FOO": "bar", - }, - Services: []*Service{ - { - Name: "${TASK}-frontend", - PortLabel: "http", - }, - }, - Resources: &Resources{ - CPU: 500, - MemoryMB: 256, - Networks: []*NetworkResource{ - &NetworkResource{ - MBits: 50, - DynamicPorts: []Port{{Label: "http"}}, - }, - }, - }, - }, - }, - Meta: map[string]string{ - "elb_check_type": "http", - "elb_check_interval": "30s", - "elb_check_min": "3", - }, - }, - }, - Meta: map[string]string{ - "owner": "armon", - }, - } - + j := testJob() c := j.Copy() if !reflect.DeepEqual(j, c) { t.Fatalf("Copy() returned an unequal Job; got %#v; want %#v", c, j) @@ -716,3 +720,40 @@ func TestRestartPolicy_Validate(t *testing.T) { t.Fatalf("expect restart interval error, got: %v", err) } } + +func TestEncodeCompressed(t *testing.T) { + // Create an input payload + msgType := JobRegisterRequestType + req := JobRegisterRequest{Job: testJob()} + + // Encode and compress it + buf, err := EncodeCompressed(msgType, req) + if err != nil { + t.Fatalf("EncodeCompressed(%v, %#v) failed: %v", msgType, req, err) + } + if len(buf) == 0 { + t.Fatalf("EncodeCompressed(%v, %#v) returned empty", msgType, req) + } + + // Uncompress and check data + decomp, err := Uncompress(buf[1:]) + if err != nil { + t.Fatalf("Uncompress(%#v) errored: %v", decomp, err) + } + if len(decomp) < 2 { + t.Fatalf("Uncompress(%#v) returned too little: %#v", buf, decomp) + } + if act := MessageType(decomp[0]); act != msgType { + t.Fatalf("bad: received incorrect MessageType: %v", act) + } + + // Decode payload + var decodedJob JobRegisterRequest + if err := Decode(decomp[1:], &decodedJob); err != nil { + t.Fatalf("Decode of uncompressed payload failed: %v", err) + } + + if !reflect.DeepEqual(decodedJob, req) { + t.Fatalf("Decode failed: got %#v; want %#v", decodedJob, req) + } +}