Skip to content

Commit

Permalink
WIP client
Browse files Browse the repository at this point in the history
  • Loading branch information
iknite committed Jan 10, 2019
1 parent 99761c7 commit 4655319
Show file tree
Hide file tree
Showing 22 changed files with 231 additions and 187 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ coverage.txt
*terraform.tfstate*
*.tfstate*
*terraform.tfvars*
.golangci.yml
62 changes: 40 additions & 22 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package client

import (
"bytes"
"crypto/tls"
"encoding/binary"
"encoding/json"
"fmt"
Expand All @@ -32,26 +33,30 @@ import (
"github.com/bbva/qed/protocol"
)

// HttpClient ist the stuct that has the required information for the cli.
type HttpClient struct {
endpoint string
apiKey string
// HTTPClient ist the stuct that has the required information for the cli.
type HTTPClient struct {
conf *Config

http.Client
}

// NewHttpClient will return a new instance of HttpClient.
func NewHttpClient(endpoint, apiKey string) *HttpClient {

return &HttpClient{
endpoint,
apiKey,
*http.DefaultClient,
// NewHTTPClient will return a new instance of HTTPClient.
func NewHTTPClient(conf *Config) *HTTPClient {
var c http.Client
if conf.EnableTLS {
c = http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}
} else {
c = http.Client{}
}
return &HTTPClient{conf, c}

}

func (c HttpClient) exponentialBackoff(req *http.Request) (*http.Response, error) {
func (c HTTPClient) exponentialBackoff(req *http.Request) (*http.Response, error) {

var retries uint

Expand All @@ -71,15 +76,15 @@ func (c HttpClient) exponentialBackoff(req *http.Request) (*http.Response, error

}

func (c HttpClient) doReq(method, path string, data []byte) ([]byte, error) {
func (c HTTPClient) doReq(method, path string, data []byte) ([]byte, error) {

req, err := http.NewRequest(method, c.endpoint+path, bytes.NewBuffer(data))
req, err := http.NewRequest(method, c.conf.Endpoint+path, bytes.NewBuffer(data))
if err != nil {
panic(err)
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Api-Key", c.apiKey)
req.Header.Set("Api-Key", c.conf.APIKey)

resp, err := c.exponentialBackoff(req)
if err != nil {
Expand All @@ -102,7 +107,7 @@ func (c HttpClient) doReq(method, path string, data []byte) ([]byte, error) {
}

// Add will do a request to the server with a post data to store a new event.
func (c HttpClient) Add(event string) (*protocol.Snapshot, error) {
func (c HTTPClient) Add(event string) (*protocol.Snapshot, error) {

data, _ := json.Marshal(&protocol.Event{[]byte(event)})

Expand All @@ -119,7 +124,7 @@ func (c HttpClient) Add(event string) (*protocol.Snapshot, error) {
}

// Membership will ask for a Proof to the server.
func (c HttpClient) Membership(key []byte, version uint64) (*protocol.MembershipResult, error) {
func (c HTTPClient) Membership(key []byte, version uint64) (*protocol.MembershipResult, error) {

query, _ := json.Marshal(&protocol.MembershipQuery{
key,
Expand All @@ -139,7 +144,7 @@ func (c HttpClient) Membership(key []byte, version uint64) (*protocol.Membership
}

// Membership will ask for a Proof to the server.
func (c HttpClient) MembershipDigest(keyDigest hashing.Digest, version uint64) (*protocol.MembershipResult, error) {
func (c HTTPClient) MembershipDigest(keyDigest hashing.Digest, version uint64) (*protocol.MembershipResult, error) {

query, _ := json.Marshal(&protocol.MembershipDigest{
keyDigest,
Expand All @@ -159,7 +164,7 @@ func (c HttpClient) MembershipDigest(keyDigest hashing.Digest, version uint64) (
}

// Incremental will ask for an IncrementalProof to the server.
func (c HttpClient) Incremental(start, end uint64) (*protocol.IncrementalResponse, error) {
func (c HTTPClient) Incremental(start, end uint64) (*protocol.IncrementalResponse, error) {

query, _ := json.Marshal(&protocol.IncrementalRequest{
start,
Expand All @@ -185,7 +190,11 @@ func uint2bytes(i uint64) []byte {

// Verify will compute the Proof given in Membership and the snapshot from the
// add and returns a proof of existence.
func (c HttpClient) Verify(result *protocol.MembershipResult, snap *protocol.Snapshot, hasherF func() hashing.Hasher) bool {
func (c HTTPClient) Verify(
result *protocol.MembershipResult,
snap *protocol.Snapshot,
hasherF func() hashing.Hasher,
) bool {

proof := protocol.ToBalloonProof(result, hasherF)

Expand All @@ -200,7 +209,11 @@ func (c HttpClient) Verify(result *protocol.MembershipResult, snap *protocol.Sna

// Verify will compute the Proof given in Membership and the snapshot from the
// add and returns a proof of existence.
func (c HttpClient) DigestVerify(result *protocol.MembershipResult, snap *protocol.Snapshot, hasherF func() hashing.Hasher) bool {
func (c HTTPClient) DigestVerify(
result *protocol.MembershipResult,
snap *protocol.Snapshot,
hasherF func() hashing.Hasher,
) bool {

proof := protocol.ToBalloonProof(result, hasherF)

Expand All @@ -212,7 +225,12 @@ func (c HttpClient) DigestVerify(result *protocol.MembershipResult, snap *protoc
})

}
func (c HttpClient) VerifyIncremental(result *protocol.IncrementalResponse, startSnapshot, endSnapshot *protocol.Snapshot, hasher hashing.Hasher) bool {

func (c HTTPClient) VerifyIncremental(
result *protocol.IncrementalResponse,
startSnapshot, endSnapshot *protocol.Snapshot,
hasher hashing.Hasher,
) bool {

proof := protocol.ToIncrementalProof(result, hasher)

Expand Down
8 changes: 6 additions & 2 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

var (
client *HttpClient
client *HTTPClient
mux *http.ServeMux
server *httptest.Server
)
Expand All @@ -42,7 +42,11 @@ func init() {
func setup() func() {
mux = http.NewServeMux()
server = httptest.NewServer(mux)
client = NewHttpClient(server.URL, "my-awesome-api-key")
client = NewHTTPClient(&Config{
Endpoint: server.URL,
APIKey: "my-awesome-api-key",
EnableTLS: false,
})
return func() {
server.Close()
}
Expand Down
36 changes: 36 additions & 0 deletions client/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client

type Config struct {
// Server host:port to consult
Endpoint string

// ApiKey to query the server endpoint
APIKey string

// Enable TLS service
EnableTLS bool
}

func DefaultConfig() *Config {
return &Config{
Endpoint: "localhost:8080",
APIKey: "my-key",
EnableTLS: true,
}
}
35 changes: 10 additions & 25 deletions cmd/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,49 +21,34 @@ import (
"github.com/spf13/cobra"
)

type agentContext struct {
config *gossip.Config
}

func newAgentCommand() *cobra.Command {
var (
nodeName, bindAddr, advertiseAddr string
startJoin, alertsUrls []string
)
func newAgentCommand(cmdCtx *cmdContext) *cobra.Command {

ctx := &agentContext{}
config := gossip.DefaultConfig()

cmd := &cobra.Command{
Use: "agent",
Short: "Start a gossip agent for the verifiable log QED",
Long: ``,
PersistentPreRun: func(cmd *cobra.Command, args []string) {
config := gossip.DefaultConfig()
config.NodeName = nodeName
config.BindAddr = bindAddr
config.AdvertiseAddr = advertiseAddr
config.StartJoin = startJoin
config.EnableCompression = true
config.AlertsUrls = alertsUrls
ctx.config = config
},
TraverseChildren: true,
}

cmd.PersistentFlags().StringVarP(&nodeName, "node", "", "", "Unique name for node. If not set, fallback to hostname")
cmd.PersistentFlags().StringVarP(&bindAddr, "bind", "", "", "Bind address for TCP/UDP gossip on (host:port)")
cmd.PersistentFlags().StringVarP(&advertiseAddr, "advertise", "", "", "Address to advertise to cluster")
cmd.PersistentFlags().StringSliceVarP(&startJoin, "join", "", []string{}, "Comma-delimited list of nodes ([host]:port), through which a cluster can be joined")
cmd.Flags().StringSliceVarP(&alertsUrls, "alertsUrls", "", []string{}, "Comma-delimited list of Alert servers ([host]:port), through which an agent can post alerts")
cmd.PersistentFlags().StringVar(&config.NodeName, "node", "", "Unique name for node. If not set, fallback to hostname")
cmd.PersistentFlags().StringVar(&config.BindAddr, "bind", "", "Bind address for TCP/UDP gossip on (host:port)")
cmd.PersistentFlags().StringVar(&config.AdvertiseAddr, "advertise", "", "Address to advertise to cluster")
cmd.PersistentFlags().StringSliceVar(&config.StartJoin, "join", []string{}, "Comma-delimited list of nodes ([host]:port), through which a cluster can be joined")
cmd.Flags().StringSliceVar(&config.AlertsUrls, "alertsUrls", []string{}, "Comma-delimited list of Alert servers ([host]:port), through which an agent can post alerts")

cmd.MarkPersistentFlagRequired("node")
cmd.MarkPersistentFlagRequired("bind")
cmd.MarkPersistentFlagRequired("join")
cmd.MarkFlagRequired("alertUrls")

cmd.AddCommand(newAgentMonitorCommand(ctx))
cmd.AddCommand(newAgentAuditorCommand(ctx))
cmd.AddCommand(newAgentPublisherCommand(ctx))
cmd.AddCommand(newAgentMonitorCommand(cmdCtx, config))
cmd.AddCommand(newAgentAuditorCommand(cmdCtx, config))
cmd.AddCommand(newAgentPublisherCommand(cmdCtx, config))

return cmd

Expand Down
28 changes: 11 additions & 17 deletions cmd/agent_auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,50 +22,44 @@ import (
"github.com/spf13/cobra"
)

func newAgentAuditorCommand(ctx *agentContext) *cobra.Command {
func newAgentAuditorCommand(ctx *cmdContext, config *gossip.Config) *cobra.Command {

var qedUrls, pubUrls []string
auditorConfig := auditor.DefaultConfig()

cmd := &cobra.Command{
Use: "auditor",
Short: "Start a QED auditor",
Long: `Start a QED auditor that reacts to snapshot batches
propagated by QED servers and periodically executes membership
queries to verify the inclusion of events`,
Long: `Start a QED auditor that reacts to snapshot batches propagated by QED servers and periodically executes membership queries to verify the inclusion of events`,
Run: func(cmd *cobra.Command, args []string) {

log.SetLogger("QedAuditor", logLevel)
log.SetLogger("QedAuditor", ctx.logLevel)

agentConfig := ctx.config
agentConfig.Role = member.Auditor
auditorConfig := auditor.DefaultConfig()
auditorConfig.APIKey = apiKey
auditorConfig.QEDUrls = qedUrls
auditorConfig.PubUrls = pubUrls
config.Role = member.Auditor
auditorConfig.APIKey = ctx.apiKey

auditor, err := auditor.NewAuditor(*auditorConfig)
if err != nil {
log.Fatalf("Failed to start the QED monitor: %v", err)
}

agent, err := gossip.NewAgent(agentConfig, []gossip.Processor{auditor})
agent, err := gossip.NewAgent(config, []gossip.Processor{auditor})
if err != nil {
log.Fatalf("Failed to start the QED auditor: %v", err)
}

contacted, err := agent.Join(agentConfig.StartJoin)
contacted, err := agent.Join(config.StartJoin)
if err != nil {
log.Fatalf("Failed to join the cluster: %v", err)
}
log.Debugf("Number of nodes contacted: %d (%v)", contacted, agentConfig.StartJoin)
log.Debugf("Number of nodes contacted: %d (%v)", contacted, config.StartJoin)

defer agent.Shutdown()
util.AwaitTermSignal(agent.Leave)
},
}

cmd.Flags().StringSliceVarP(&qedUrls, "qedUrls", "", []string{}, "Comma-delimited list of QED servers ([host]:port), through which an auditor can make queries")
cmd.Flags().StringSliceVarP(&pubUrls, "pubUrls", "", []string{}, "Comma-delimited list of QED servers ([host]:port), through which an auditor can make queries")
cmd.Flags().StringSliceVarP(&auditorConfig.QEDUrls, "qedUrls", "", []string{}, "Comma-delimited list of QED servers ([host]:port), through which an auditor can make queries")
cmd.Flags().StringSliceVarP(&auditorConfig.PubUrls, "pubUrls", "", []string{}, "Comma-delimited list of QED servers ([host]:port), through which an auditor can make queries")
cmd.MarkFlagRequired("qedUrls")
cmd.MarkFlagRequired("pubUrls")

Expand Down
26 changes: 11 additions & 15 deletions cmd/agent_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
"github.com/spf13/cobra"
)

func newAgentMonitorCommand(ctx *agentContext) *cobra.Command {
func newAgentMonitorCommand(ctx *cmdContext, config *gossip.Config) *cobra.Command {

var qedUrls, pubUrls []string
monitorConfig := monitor.DefaultConfig()

cmd := &cobra.Command{
Use: "monitor",
Expand All @@ -33,39 +33,35 @@ func newAgentMonitorCommand(ctx *agentContext) *cobra.Command {
propagated by QED servers and periodically executes incremental
queries to verify the consistency between snaphots`,
Run: func(cmd *cobra.Command, args []string) {
log.SetLogger("QedMonitor", ctx.logLevel)

log.SetLogger("QedMonitor", logLevel)

agentConfig := ctx.config
agentConfig.Role = member.Monitor
monitorConfig := monitor.DefaultConfig()
monitorConfig.APIKey = apiKey
monitorConfig.QedUrls = qedUrls
monitorConfig.PubUrls = pubUrls
config.Role = member.Monitor
monitorConfig.APIKey = ctx.apiKey

monitor, err := monitor.NewMonitor(*monitorConfig)
if err != nil {
log.Fatalf("Failed to start the QED monitor: %v", err)
}

agent, err := gossip.NewAgent(agentConfig, []gossip.Processor{monitor})
agent, err := gossip.NewAgent(config, []gossip.Processor{monitor})
if err != nil {
log.Fatalf("Failed to start the QED monitor: %v", err)
}
defer agent.Shutdown()

contacted, err := agent.Join(agentConfig.StartJoin)
contacted, err := agent.Join(config.StartJoin)
if err != nil {
log.Fatalf("Failed to join the cluster: %v", err)
}

log.Debugf("Number of nodes contacted: %d", contacted)

defer agent.Shutdown()
util.AwaitTermSignal(agent.Leave)
},
}

cmd.Flags().StringSliceVarP(&qedUrls, "qedUrls", "", []string{}, "Comma-delimited list of QED servers ([host]:port), through which a monitor can make queries")
cmd.Flags().StringSliceVarP(&pubUrls, "pubUrls", "", []string{}, "Comma-delimited list of QED servers ([host]:port), through which an auditor can make queries")
cmd.Flags().StringSliceVarP(&monitorConfig.QedUrls, "qedUrls", "", []string{}, "Comma-delimited list of QED servers ([host]:port), through which a monitor can make queries")
cmd.Flags().StringSliceVarP(&monitorConfig.PubUrls, "pubUrls", "", []string{}, "Comma-delimited list of QED servers ([host]:port), through which an monitor can publish alerts")
cmd.MarkFlagRequired("qedUrls")
cmd.MarkFlagRequired("pubUrls")

Expand Down
Loading

0 comments on commit 4655319

Please sign in to comment.