Skip to content

Commit

Permalink
Clean test service
Browse files Browse the repository at this point in the history
  • Loading branch information
aalda committed Mar 5, 2019
1 parent 7386665 commit 32c2e62
Showing 1 changed file with 85 additions and 145 deletions.
230 changes: 85 additions & 145 deletions tests/e2e/test_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"net/http"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"

Expand Down Expand Up @@ -89,6 +87,10 @@ type alertStore struct {
d []string
}

func newAlertStore() *alertStore {
return &alertStore{d: make([]string, 0)}
}

func (a *alertStore) Append(msg string) {
a.Lock()
defer a.Unlock()
Expand All @@ -107,71 +109,111 @@ type snapStore struct {
d *sync.Map
}

func newSnapStore() *snapStore {
return &snapStore{d: &sync.Map{}}
}

func (s *snapStore) Put(b *protocol.BatchSnapshots) {
for _, snap := range b.Snapshots {
s.d.Store(snap.Snapshot.Version, snap)
}
}

func (s *snapStore) Get(version uint64) (v *protocol.SignedSnapshot, ok bool) {
var tmpV interface{}
tmpV, ok = s.d.Load(version)
v = tmpV.(*protocol.SignedSnapshot)
return v, ok
tmpV, ok := s.d.Load(version)
if !ok {
return nil, ok
}
return tmpV.(*protocol.SignedSnapshot), ok
}

const (
STAT int = iota
SNAP
ALERT
RPS
)
type Service struct {
snaps *snapStore
alerts *alertStore

type statStore struct {
sync.Mutex
count [4]uint64
batch map[string][]int
metricsServer *http.Server
prometheusRegistry *prometheus.Registry
httpServer *http.Server

quitCh chan bool
}

func (s *statStore) Add(key string, index, v int) {
s.Lock()
defer s.Unlock()
if s.batch[key] == nil {
s.batch[key] = make([]int, 10)
func NewService() *Service {
return &Service{
snaps: newSnapStore(),
alerts: newAlertStore(),
quitCh: make(chan bool),
}
s.batch[key][index] += v
}

func (s *statStore) Get(key string, index int) int {
s.Lock()
defer s.Unlock()
return s.batch[key][index]
}
func (s *Service) Start(foreground bool) {

// Metrics server.
r := prometheus.NewRegistry()
Register(r)
s.prometheusRegistry = r
metricsMux := metricshttp.NewMetricsHTTP(r)
s.metricsServer = &http.Server{Addr: ":18888", Handler: metricsMux}

QedStoreInstancesCount.Inc()

go func() {
log.Debugf(" * Starting metrics HTTP server ")
if err := s.metricsServer.ListenAndServe(); err != http.ErrServerClosed {
log.Errorf("Can't start metrics HTTP server: %s", err)
}
}()

// Snapshot/alert store server.
router := http.NewServeMux()
router.HandleFunc("/batch", s.postBatchHandler())
router.HandleFunc("/snapshot", s.getSnapshotHandler())
router.HandleFunc("/alert", s.alertHandler())

s.httpServer = &http.Server{Addr: ":8888", Handler: router}
fmt.Println("Starting test service...")

go func() {
for {
select {
case <-s.quitCh:
log.Debugf("\nShutting down the server...")
_ = s.httpServer.Shutdown(context.Background())
return
}
}
}()

func (s *statStore) Print() {
s.Lock()
defer s.Unlock()
b, err := json.Marshal(s.batch)
//b, err := json.MarshalIndent(s.batch, "", " ")
if err == nil {
fmt.Println(string(b))
if foreground {
if err := s.httpServer.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal(err)
}
} else {
go (func() {
if err := s.httpServer.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal(err)
}
})()
}
}

func (s *Service) statHandler() func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
key := q.Get("key")
index, _ := strconv.Atoi(q.Get("index"))
s.stats.Add(key, index, 1)
atomic.AddUint64(&s.stats.count[STAT], 1)
func (s *Service) Shutdown() {

// Metrics
QedStoreInstancesCount.Dec()

log.Debugf("Metrics enabled: stopping server...")
if err := s.metricsServer.Shutdown(context.Background()); err != nil { // TODO include timeout instead nil
log.Error(err)
}
log.Debugf("Done.\n")

s.quitCh <- true
close(s.quitCh)
}

func (s *Service) postBatchHandler() func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
atomic.AddUint64(&s.stats.count[RPS], 1)
atomic.AddUint64(&s.stats.count[SNAP], 1)
if r.Method == "POST" {
QedStoreBatchesStoredTotal.Inc()
// Decode batch to get signed snapshots and batch version.
Expand All @@ -196,8 +238,6 @@ func (s *Service) postBatchHandler() func(http.ResponseWriter, *http.Request) {

func (s *Service) getSnapshotHandler() func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
atomic.AddUint64(&s.stats.count[RPS], 1)
atomic.AddUint64(&s.stats.count[SNAP], 1)
if r.Method == "GET" {
QedStoreSnapshotsRetrievedTotal.Inc()
q := r.URL.Query()
Expand Down Expand Up @@ -228,8 +268,6 @@ func (s *Service) getSnapshotHandler() func(http.ResponseWriter, *http.Request)

func (s *Service) alertHandler() func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
atomic.AddUint64(&s.stats.count[RPS], 1)
atomic.AddUint64(&s.stats.count[ALERT], 1)
if r.Method == "GET" {
b, err := json.Marshal(s.alerts.GetAll())
if err != nil {
Expand All @@ -255,101 +293,3 @@ func (s *Service) alertHandler() func(http.ResponseWriter, *http.Request) {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
}
}

type Service struct {
snaps *snapStore
alerts *alertStore
stats *statStore

metricsServer *http.Server
prometheusRegistry *prometheus.Registry
httpServer *http.Server

quitCh chan bool
}

func NewService() *Service {
var snaps snapStore
var alerts alertStore
var stats statStore
snaps.d = &sync.Map{}
stats.batch = make(map[string][]int)
alerts.d = make([]string, 0)

return &Service{
snaps: &snaps,
alerts: &alerts,
stats: &stats,
quitCh: make(chan bool),
}
}

func (s *Service) Start(foreground bool) {
// Metrics server.
r := prometheus.NewRegistry()
Register(r)
s.prometheusRegistry = r
metricsMux := metricshttp.NewMetricsHTTP(r)
s.metricsServer = &http.Server{Addr: ":18888", Handler: metricsMux}

QedStoreInstancesCount.Inc()

go func() {
log.Debugf(" * Starting metrics HTTP server ")
if err := s.metricsServer.ListenAndServe(); err != http.ErrServerClosed {
log.Errorf("Can't start metrics HTTP server: %s", err)
}
}()

// Snapshot/alert store server.
router := http.NewServeMux()
router.HandleFunc("/stat", s.statHandler())
router.HandleFunc("/batch", s.postBatchHandler())
router.HandleFunc("/snapshot", s.getSnapshotHandler())
router.HandleFunc("/alert", s.alertHandler())

s.httpServer = &http.Server{Addr: ":8888", Handler: router}
fmt.Println("Starting test service...")
go func() {
ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-ticker.C:
c := atomic.LoadUint64(&s.stats.count[RPS])
log.Debugf("Request per second: ", c)
log.Debugf("Counters ", s.stats.count)
atomic.StoreUint64(&s.stats.count[RPS], 0)
case <-s.quitCh:
log.Debugf("\nShutting down the server...")
_ = s.httpServer.Shutdown(context.Background())
return
}
}
}()

if foreground {
if err := s.httpServer.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal(err)
}
} else {
go (func() {
if err := s.httpServer.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal(err)
}
})()
}
}

func (s *Service) Shutdown() {
// Metrics
QedStoreInstancesCount.Dec()

log.Debugf("Metrics enabled: stopping server...")
if err := s.metricsServer.Shutdown(context.Background()); err != nil { // TODO include timeout instead nil
log.Error(err)
}
log.Debugf("Done.\n")

s.quitCh <- true
close(s.quitCh)
}

0 comments on commit 32c2e62

Please sign in to comment.