Skip to content

Commit

Permalink
Merge pull request #28 from bgalehouse/fastpoll
Browse files Browse the repository at this point in the history
Fastpoll Optimization
  • Loading branch information
bgalehouse authored Jan 11, 2018
2 parents 5c0bf6a + 051a172 commit 23c8fa3
Show file tree
Hide file tree
Showing 13 changed files with 327 additions and 17 deletions.
4 changes: 4 additions & 0 deletions fleetspeak/src/inttesting/integrationtest/clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/google/fleetspeak/fleetspeak/src/server/comms"
"github.com/google/fleetspeak/fleetspeak/src/server/db"
"github.com/google/fleetspeak/fleetspeak/src/server/https"
"github.com/google/fleetspeak/fleetspeak/src/server/sertesting"
"github.com/google/fleetspeak/fleetspeak/src/server/service"

dpb "github.com/golang/protobuf/ptypes/duration"
Expand All @@ -50,6 +51,9 @@ import (
// CloneHandlingTest runs an integration test using ds in which cloned clients
// are dealt with.
func CloneHandlingTest(t *testing.T, ds db.Store, tmpDir string) {
fin := sertesting.SetClientCacheMaxAge(time.Second)
defer fin()

// Create FS server certs and server communicator.
cert, key, err := comtesting.ServerCert()
if err != nil {
Expand Down
37 changes: 27 additions & 10 deletions fleetspeak/src/server/comms.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,23 @@ type commsContext struct {
// GetClientInfo loads basic information about a client. Returns nil if the client does
// not exist in the datastore.
func (c commsContext) GetClientInfo(ctx context.Context, id common.ClientID) (*comms.ClientInfo, error) {
cld, err := c.s.dataStore.GetClientData(ctx, id)
if err != nil {
if c.s.dataStore.IsNotFound(err) {
return nil, nil
var cld *db.ClientData
var err error
var cacheHit bool

if cld = c.s.clientCache.Get(id); cld != nil {
cacheHit = true
} else {
cld, err = c.s.dataStore.GetClientData(ctx, id)
if err != nil {
if c.s.dataStore.IsNotFound(err) {
return nil, nil
}
return nil, err
}
return nil, err
c.s.clientCache.Update(id, cld)
}

k, err := x509.ParsePKIXPublicKey(cld.Key)
if err != nil {
return nil, err
Expand All @@ -64,7 +74,8 @@ func (c commsContext) GetClientInfo(ctx context.Context, id common.ClientID) (*c
ID: id,
Key: k,
Labels: cld.Labels,
Blacklisted: cld.Blacklisted}, nil
Blacklisted: cld.Blacklisted,
Cached: cacheHit}, nil
}

// AddClient adds a new client to the system.
Expand Down Expand Up @@ -139,11 +150,17 @@ func (c commsContext) FindMessagesForClient(ctx context.Context, info *comms.Cli
log.Warning("Got %v messages along with error, continuing: %v", len(msgs), err)
}

bms, err := c.s.broadcastManager.MakeBroadcastMessagesForClient(ctx, info.ID, info.Labels)
if err != nil {
return nil, err
// If the client recently contacted us, the broadcast situation is unlikely to
// have changed, so we skip checking for broadcasts. To keep this from delaying
// broadcast distribution, the broadcast manager clears the client cache when it
// finds more broadcasts.
if !info.Cached {
bms, err := c.s.broadcastManager.MakeBroadcastMessagesForClient(ctx, info.ID, info.Labels)
if err != nil {
return nil, err
}
msgs = append(msgs, bms...)
}
msgs = append(msgs, bms...)

if len(msgs) == 0 {
return msgs, nil
Expand Down
1 change: 1 addition & 0 deletions fleetspeak/src/server/comms/comms.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type ClientInfo struct {
Key crypto.PublicKey
Labels []*fspb.Label
Blacklisted bool
Cached bool // Whether the data was retrieved from a cache.
}

// A Context defines the view of the Fleetspeak server provided to a Communicator.
Expand Down
1 change: 1 addition & 0 deletions fleetspeak/src/server/https/https.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ func (s messageServer) ServeHTTP(res http.ResponseWriter, req *http.Request) {
clientInfo.New = true
} else {
clientInfo.Labels = info.Labels
pi.CacheHit = info.Cached
}
if !s.fs.Authorizer().Allow3(addr, contactInfo, clientInfo) {
pi.Status = http.StatusServiceUnavailable
Expand Down
11 changes: 10 additions & 1 deletion fleetspeak/src/server/internal/broadcasts/broadcasts.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/google/fleetspeak/fleetspeak/src/common"
"github.com/google/fleetspeak/fleetspeak/src/server/db"
"github.com/google/fleetspeak/fleetspeak/src/server/ids"
"github.com/google/fleetspeak/fleetspeak/src/server/internal/cache"
"github.com/google/fleetspeak/fleetspeak/src/server/internal/ftime"

fspb "github.com/google/fleetspeak/fleetspeak/src/common/proto/fleetspeak"
Expand All @@ -51,17 +52,19 @@ type Manager struct {
l sync.RWMutex // Protects the structure of i.
done chan bool // Closes to indicate it is time to shut down.
basePollWait time.Duration
clientCache *cache.Clients
}

// MakeManager creates a Manager, populates it with the
// current set of broadcasts, and begins updating the broadcasts in the
// background, the time between updates is always between pw and 2*pw.
func MakeManager(ctx context.Context, bs db.BroadcastStore, pw time.Duration) (*Manager, error) {
func MakeManager(ctx context.Context, bs db.BroadcastStore, pw time.Duration, clientCache *cache.Clients) (*Manager, error) {
r := &Manager{
bs: bs,
infos: make(map[ids.BroadcastID]*bInfo),
done: make(chan bool),
basePollWait: pw,
clientCache: clientCache,
}
if err := r.refreshInfo(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -276,6 +279,12 @@ func (m *Manager) refreshInfo(ctx context.Context) error {
// Swap/insert the new allocations.
c := m.updateAllocs(curr, newAllocs)

// If we added any new allocations, then we should recompute broadcasts for
// any cached clients.
if len(newAllocs) > 0 {
m.clientCache.Clear()
}

var errMsgs []string
// Cleanup the dead allocations. They've been removed from m.infos, so
// once useCount reaches 0, no new actions with them will start. We
Expand Down
3 changes: 2 additions & 1 deletion fleetspeak/src/server/internal/broadcasts/broadcasts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/google/fleetspeak/fleetspeak/src/comtesting"
"github.com/google/fleetspeak/fleetspeak/src/server/db"
"github.com/google/fleetspeak/fleetspeak/src/server/ids"
"github.com/google/fleetspeak/fleetspeak/src/server/internal/cache"
"github.com/google/fleetspeak/fleetspeak/src/server/sqlite"

apb "github.com/golang/protobuf/ptypes/any"
Expand Down Expand Up @@ -80,7 +81,7 @@ func TestManager(t *testing.T) {
}
}

bm, err := MakeManager(ctx, ds, 100*time.Millisecond)
bm, err := MakeManager(ctx, ds, 100*time.Millisecond, cache.NewClients())
if err != nil {
t.Fatal(err)
}
Expand Down
119 changes: 119 additions & 0 deletions fleetspeak/src/server/internal/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package cache

import (
"sync"
"time"

"github.com/google/fleetspeak/fleetspeak/src/common"
"github.com/google/fleetspeak/fleetspeak/src/server/db"
)

var (
// How long client data should be considered valid for. Variable to support
// unit testing.
MaxAge = 30 * time.Second

// We occasionally expunge old client data records, to be tidy
// with RAM and prevent what would effectively be a slow memory leak as
// clients come and go. Variable to support unit testing.
expireInterval = 5 * time.Minute
)

// Clients is a cache of recently connected clients.
type Clients struct {
m map[common.ClientID]*clientEntry
l sync.RWMutex
stop chan struct{}
}

type clientEntry struct {
u time.Time
d *db.ClientData
}

// NewClients returns a new cache of client data.
func NewClients() *Clients {
ret := &Clients{
m: make(map[common.ClientID]*clientEntry),
stop: make(chan struct{}),
}
go ret.expireLoop()
return ret
}

// Get returns the cached client data, if there is sufficiently fresh data in
// the cache, otherwise nil.
func (c *Clients) Get(id common.ClientID) *db.ClientData {
c.l.RLock()
defer c.l.RUnlock()

e := c.m[id]
if e == nil || db.Now().Sub(e.u) > MaxAge {
return nil
}
return e.d
}

// Update updates or sets the cached data for a particular client. If data is
// nil, it clears the data for the client.
func (c *Clients) Update(id common.ClientID, data *db.ClientData) {
c.l.Lock()
defer c.l.Unlock()

if data == nil {
delete(c.m, id)
} else {
c.m[id] = &clientEntry{
u: db.Now(),
d: data,
}
}
}

// Clear empties the cache, removing all entries.
func (c *Clients) Clear() {
c.l.Lock()
defer c.l.Unlock()

c.m = make(map[common.ClientID]*clientEntry)
}

// Stop releases the resources required for background cache maintenence. The
// cache should not be used once Stop has been called.
func (c *Clients) Stop() {
close(c.stop)
}

// Size returns the current size taken up by the cache, this is a count of
// client records, some of which may no longer be up to date.
func (c *Clients) Size() int {
c.l.RLock()
defer c.l.RUnlock()
return len(c.m)
}

func (c *Clients) expireLoop() {
t := time.NewTicker(expireInterval)
defer t.Stop()

for {
select {
case <-t.C:
c.expire()
case <-c.stop:
return
}
}
}

// expire prunes the cache to clean out clients that are no longer up to date.
func (c *Clients) expire() {
c.l.Lock()
defer c.l.Unlock()

for k, e := range c.m {
if db.Now().Sub(e.u) > MaxAge {
delete(c.m, k)
}
}
}
107 changes: 107 additions & 0 deletions fleetspeak/src/server/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cache

import (
"bytes"
"sync/atomic"
"testing"
"time"

"github.com/google/fleetspeak/fleetspeak/src/common"
"github.com/google/fleetspeak/fleetspeak/src/server/db"
"github.com/google/fleetspeak/fleetspeak/src/server/internal/ftime"
)

func TestClients(t *testing.T) {
oi := expireInterval
expireInterval = time.Second
defer func() {
expireInterval = oi
}()

// We'd use sertesting.FakeNow, but that creates a dependency loop. So we
// do it directly.
otime := ftime.Now
fakeTime := int64(20000)
ftime.Now = func() time.Time {
return time.Unix(atomic.LoadInt64(&fakeTime), 0).UTC()
}
defer func() {
ftime.Now = otime
}()

c := NewClients()
defer c.Stop()

id1, _ := common.StringToClientID("0000000000000001")
id2, _ := common.StringToClientID("0000000000000002")

for _, id := range []common.ClientID{id1, id2} {
got := c.Get(id)
if got != nil {
t.Errorf("Get(%v) = %v, expected nil", id, got)
}
}

c.Update(id1, &db.ClientData{
Key: []byte("key 1"),
})

atomic.StoreInt64(&fakeTime, 20029)

got := c.Get(id1)
if got == nil || !bytes.Equal(got.Key, []byte("key 1")) {
t.Errorf("Get(%v) = %v, expected {Key: \"key 1\"}", id1, got)
}
got = c.Get(id2)
if got != nil {
t.Errorf("Get(%v) = %v, expected nil", id2, got)
}

// Set key 2, make sure that clearing it works.
c.Update(id2, &db.ClientData{
Key: []byte("key 2"),
})
s := c.Size()
if s != 2 {
t.Errorf("Expected cache size of 2, got Size: %d", s)
}
c.Update(id2, nil)
got = c.Get(id2)
if got != nil {
t.Errorf("Get(%v) = %v, expected nil", id2, got)
}
// A second clear should not panic.
c.Update(id2, nil)

// Advance the clock enough to expire id1, wait for expire to run.
atomic.StoreInt64(&fakeTime, 20031)
time.Sleep(2 * time.Second)

// Everything should be nil.
for _, id := range []common.ClientID{id1, id2} {
got := c.Get(id)
if got != nil {
t.Errorf("Get(%v) = %v, expected nil", id, got)
}
}

// We shouldn't be using any extra ram.
s = c.Size()
if s != 0 {
t.Errorf("Expected empty cache, got Size: %d", s)
}
}
Loading

0 comments on commit 23c8fa3

Please sign in to comment.