Skip to content

Commit

Permalink
QED-Publisher and publisher-service in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Jose Luis Lucas committed Nov 23, 2018
1 parent 7ec96fc commit 214fa8c
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 1 deletion.
63 changes: 63 additions & 0 deletions cmd/agent_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2018 Banco Bilbao Vizcaya Argentaria, n.A.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cmd

import (
"github.com/bbva/qed/gossip"
"github.com/bbva/qed/gossip/publisher"
"github.com/bbva/qed/log"
"github.com/bbva/qed/util"
"github.com/spf13/cobra"
"github.com/valyala/fasthttp"
)

func newAgentPublisherCommand(ctx *agentContext) *cobra.Command {

var endpoints []string

cmd := &cobra.Command{
Use: "monitor",
Short: "Start a QED publisher",
Long: `Start a QED publisher that reacts to snapshot batches
propagated by QED servers and periodically publishes them to
a certain log storage.`,
Run: func(cmd *cobra.Command, args []string) {

log.SetLogger("QedPublisher", logLevel)

config := ctx.config
publisherConfig := publisher.NewConfig(&fasthttp.Client{}, endpoints)

agent, err := gossip.NewAgent(config, []gossip.Processor{publisher.NewPublisher(publisherConfig)})
if err != nil {
log.Fatalf("Failed to start the QED publisher: %v", err)
}

contacted, err := agent.Join(config.StartJoin)
if err != nil {
log.Fatalf("Failed to join the cluster: %v", err)
}
log.Debugf("Number of nodes contacted: %d", contacted)

agent.Start()
defer agent.Shutdown()
util.AwaitTermSignal(agent.Leave)
},
}

cmd.Flags().StringSliceVarP(&endpoints, "endpoints", "", []string{}, "Comma-delimited list of end-publishers ([host]:port), through which an publisher can send requests")
cmd.MarkPersistentFlagRequired("endpoints")

return cmd
}
77 changes: 77 additions & 0 deletions gossip/publisher/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package publisher

import (
"encoding/json"

"github.com/bbva/qed/gossip"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/valyala/fasthttp"
)

type Config struct {
Client *fasthttp.Client
SendTo []string
}

func DefaultConfig() *Config {
return &Config{}
}

func NewConfig(c *fasthttp.Client, to []string) *Config {
return &Config{
Client: c,
SendTo: to,
}
}

type Publisher struct {
Agent *gossip.Agent
Config *Config
quit chan bool
}

func NewPublisher(conf *Config) *Publisher {
return &Publisher{
Config: conf,
}
}

func (p Publisher) Process(b *protocol.BatchSnapshots) {
body, err := json.Marshal(&b)
if err != nil {
log.Debug("\nPublisher: Error marshalling: %s", err.Error())
return
}

req := fasthttp.AcquireRequest()
// TODO: Implement send to different endpoints
req.SetRequestURI(p.Config.SendTo[0])
req.Header.SetMethodBytes([]byte("POST"))
req.Header.Add("Content-Type", "application/json")
req.SetBody(body)

res := fasthttp.AcquireResponse()

err = p.Config.Client.Do(req, res)
if err != nil {
log.Debug("\nPublisher: Error sending request to publishers: %s", err.Error())
return
}

fasthttp.ReleaseRequest(req)
fasthttp.ReleaseResponse(res)
}
50 changes: 50 additions & 0 deletions tests/gossip/store_clients.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"fmt"

"github.com/go-redis/redis"
)

type StoreClient interface {
Put(key, value string)
}

type RedisCli struct {
rcli *redis.Client
}

func NewRedisClient() *RedisCli {
c := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "", // no password set
DB: 0, // use default DB
})

pong, err := c.Ping().Result()
fmt.Println(pong, err)
// Output: PONG <nil>
return &RedisCli{rcli: c}
}

func (c *RedisCli) Put(key, value string) {
err := c.rcli.Set(key, value, 0).Err()
if err != nil {
panic(err)
}
}

// TODO: SeMaaS
48 changes: 47 additions & 1 deletion tests/gossip/test_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package main

import (
"crypto/sha256"
"encoding/json"
"fmt"
"log"
Expand All @@ -22,13 +23,35 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/bbva/qed/gossip/member"
)

type stats struct {
sync.Mutex
batch map[string][]int
}

type Digest []byte

type Snapshot struct {
HistoryDigest Digest
HyperDigest Digest
Version uint64
EventDigest Digest
}

type SignedSnapshot struct {
Snapshot *Snapshot
Signature []byte
}

type BatchSnapshots struct {
Snapshots []*SignedSnapshot
TTL int
From *member.Peer
}

func (s *stats) Add(nodeType string, id, v int) {
s.Lock()
defer s.Unlock()
Expand Down Expand Up @@ -67,9 +90,30 @@ func handler(w http.ResponseWriter, r *http.Request) {
atomic.AddUint64(&count, 1)
}

func main() {
func PublishHandler(w http.ResponseWriter, r *http.Request, client StoreClient) {
if r.Method == "POST" {
var b BatchSnapshots
err := json.NewDecoder(r.Body).Decode(&b)
if err != nil {
fmt.Println("Error unmarshalling: ", err)
}

// TODO: Insert the whole batch. Not snapshot by snapshot.
for _, s := range b.Snapshots {
key := strconv.FormatUint(s.Snapshot.Version, 10)
v := sha256.Sum256(s.Snapshot.HistoryDigest)
val := string(v[:])
go client.Put(key, val)
}

} else {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
}
}

func main() {
s.batch = make(map[string][]int, 0)
client := NewRedisClient()

go func() {
ticker := time.NewTicker(2 * time.Second)
Expand All @@ -83,6 +127,8 @@ func main() {
}
}
}()

http.HandleFunc("/", handler)
http.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) { PublishHandler(w, r, client) })
log.Fatal(http.ListenAndServe("127.0.0.1:8888", nil))
}

0 comments on commit 214fa8c

Please sign in to comment.