Skip to content

Commit

Permalink
Change codec to json enconding
Browse files Browse the repository at this point in the history
  • Loading branch information
gdiazlo committed Dec 5, 2018
1 parent ee813c6 commit 87fdb8d
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 76 deletions.
17 changes: 10 additions & 7 deletions gossip/auditor/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package auditor

import (
"bytes"
"encoding/base64"
"fmt"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -78,12 +77,8 @@ func (t *MembershipTask) getSnapshot(version uint64) (*protocol.SignedSnapshot,
}
defer resp.Body.Close()
buf, err := ioutil.ReadAll(resp.Body)
sDec, err := base64.StdEncoding.DecodeString(string(buf))
if err != nil {
return nil, fmt.Errorf("Error decoding signed snapshot %d base64", t.s.Snapshot.Version)
}
var s protocol.SignedSnapshot
err = s.Decode(sDec)
err = s.Decode(buf)
if err != nil {
return nil, fmt.Errorf("Error decoding signed snapshot %d codec", t.s.Snapshot.Version)
}
Expand Down Expand Up @@ -127,7 +122,15 @@ func (t *MembershipTask) Do() {
func (t *MembershipTask) sendAlert(msg string) {

go func() {
http.Post(fmt.Sprintf("%s/alert", t.pubUrl), "application/octet-stream", bytes.NewBufferString(msg))
resp, err := http.Post(fmt.Sprintf("%s/alert", t.pubUrl), "application/json", bytes.NewBufferString(msg))
if err != nil {
log.Infof("Error saving batch in alertStore: %v", err)
}
defer resp.Body.Close()
_, err = ioutil.ReadAll(resp.Body)
if err != nil {
log.Infof("Error getting response from alertStore saving a batch: %v", err)
}
}()

}
Expand Down
27 changes: 11 additions & 16 deletions gossip/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package publisher

import (
"encoding/base64"
"bytes"
"fmt"
"io/ioutil"
"net/http"

"github.com/bbva/qed/gossip"
"github.com/bbva/qed/log"
Expand Down Expand Up @@ -59,22 +62,14 @@ func (p *Publisher) Process(b *protocol.BatchSnapshots) {
log.Debug("\nPublisher: Error marshalling: %s", err.Error())
return
}
body := []byte(base64.StdEncoding.EncodeToString(buf))

req := fasthttp.AcquireRequest()
// TODO: Implement send to different endpoints
req.SetRequestURI(p.Config.SendTo[0] + "/batch")
req.Header.SetMethodBytes([]byte("POST"))
req.Header.Add("Content-Type", "application/json")
req.SetBody(body)
res := fasthttp.AcquireResponse()

err = p.Config.Client.Do(req, res)
resp, err := http.Post(fmt.Sprintf("%s/batch", p.Config.SendTo[0]), "application/json", bytes.NewBuffer(buf))
if err != nil {
log.Info("\nPublisher: Error sending request to publishers: %s", err.Error())
return
log.Infof("Error saving batch in snapStore: %v", err)
}
defer resp.Body.Close()
_, err = ioutil.ReadAll(resp.Body)
if err != nil {
log.Infof("Error getting response from snapStore saving a batch: %v", err)
}

fasthttp.ReleaseRequest(req)
fasthttp.ReleaseResponse(res)
}
55 changes: 10 additions & 45 deletions protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package protocol

import (
"bytes"
"encoding/json"
"net"

"github.com/bbva/qed/balloon"
Expand All @@ -26,9 +26,7 @@ import (
"github.com/bbva/qed/balloon/visitor"
"github.com/bbva/qed/gossip/member"
"github.com/bbva/qed/hashing"
"github.com/bbva/qed/log"
"github.com/bbva/qed/util"
"github.com/hashicorp/go-msgpack/codec"
)

// Event is the public struct that Add handler function uses to
Expand Down Expand Up @@ -65,23 +63,12 @@ type SignedSnapshot struct {
}

func (b *SignedSnapshot) Encode() ([]byte, error) {
var buf bytes.Buffer
encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
if err := encoder.Encode(b); err != nil {
log.Errorf("Failed to encode signed snapshot into message: %v", err)
return nil, err
}
return buf.Bytes(), nil
return json.Marshal(b)
}

func (b *SignedSnapshot) Decode(msg []byte) error {
reader := bytes.NewReader(msg)
decoder := codec.NewDecoder(reader, &codec.MsgpackHandle{})
if err := decoder.Decode(b); err != nil {
log.Errorf("Failed to decode signed snapshot: %v", err)
return err
}
return nil
err := json.Unmarshal(msg, b)
return err
}

type BatchSnapshots struct {
Expand All @@ -97,43 +84,21 @@ type Source struct {
}

func (b *Snapshot) Encode() ([]byte, error) {
var buf bytes.Buffer
encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
if err := encoder.Encode(b); err != nil {
log.Errorf("Failed to encode message: %v", err)
return nil, err
}
return buf.Bytes(), nil
return json.Marshal(b)
}

func (b *Snapshot) Decode(msg []byte) error {
reader := bytes.NewReader(msg)
decoder := codec.NewDecoder(reader, &codec.MsgpackHandle{})
if err := decoder.Decode(b); err != nil {
log.Errorf("Failed to decode snapshot batch: %v", err)
return err
}
return nil
err := json.Unmarshal(msg, b)
return err
}

func (b *BatchSnapshots) Encode() ([]byte, error) {
var buf bytes.Buffer
encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
if err := encoder.Encode(b); err != nil {
log.Errorf("Failed to encode message: %v", err)
return nil, err
}
return buf.Bytes(), nil
return json.Marshal(b)
}

func (b *BatchSnapshots) Decode(msg []byte) error {
reader := bytes.NewReader(msg)
decoder := codec.NewDecoder(reader, &codec.MsgpackHandle{})
if err := decoder.Decode(b); err != nil {
log.Errorf("Failed to decode snapshots batch: %v", err)
return err
}
return nil
err := json.Unmarshal(msg, b)
return err
}

type MembershipResult struct {
Expand Down
10 changes: 2 additions & 8 deletions tests/e2e/test_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package e2e

import (
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -129,12 +128,7 @@ func (s *Service) postBatchHandler() func(http.ResponseWriter, *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
sDec, err := base64.StdEncoding.DecodeString(string(buf))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
err = b.Decode(sDec)
err = b.Decode(buf)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down Expand Up @@ -164,7 +158,7 @@ func (s *Service) getSnapshotHandler() func(http.ResponseWriter, *http.Request)
return
}
buf, err := b.Encode()
_, err = w.Write([]byte(base64.StdEncoding.EncodeToString(buf)))
_, err = w.Write(buf)
if err != nil {
fmt.Printf("ERROR: %v", err)
}
Expand Down

0 comments on commit 87fdb8d

Please sign in to comment.