Skip to content

Commit

Permalink
Merge pull request #4 from spencerkimball/spencerkimball/diff-filter
Browse files Browse the repository at this point in the history
Added diff of an InfoStore with a supplied Filter
  • Loading branch information
spencerkimball committed Feb 16, 2014
2 parents 0e93cb0 + dafd4bd commit 495da5f
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 157 deletions.
3 changes: 3 additions & 0 deletions gossip/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func computeOptimalValues(N uint32, maxFP float64) (uint32, uint32) {
// insertions N, Number of bits per slot B, and expected value of a false
// positive < maxFP.
func NewFilter(N uint32, B uint32, maxFP float64) (*Filter, error) {
if N == 0 {
return nil, fmt.Errorf("number of insertions (N) must be > 0")
}
// TODO(spencer): we probably would be well-served using a 3-bit
// filter, so we should relax the following constraint and get a
// little bit fancier with the bit arithmetic to handle cross-byte
Expand Down
3 changes: 3 additions & 0 deletions gossip/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ func TestOptimalValues(t *testing.T) {

// TestNewFilter verifies bad inputs, optimal values, size of slots data.
func TestNewFilter(t *testing.T) {
if _, err := NewFilter(0, 3, 0.10); err == nil {
t.Error("NewFilter should not accept N == 0")
}
if _, err := NewFilter(1, 3, 0.10); err == nil {
t.Error("NewFilter should not accept bits B which are non-divisor of 8")
}
Expand Down
30 changes: 7 additions & 23 deletions gossip/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,18 @@ import (
type GroupType int

const (
minGroup GroupType = iota
maxGroup
// MinGroup maintains minimum values for keys matching group prefix.
MinGroup GroupType = iota
// MaxGroup maintains maximum values for keys matching group prefix.
MaxGroup
)

// Group organizes a collection of Info objects sharing a common key
// prefix (prefix is defined up to the last period character '.').
// Groups maintain a limited-size set of Info objects with set
// inclusion determined by group type. Two types are implemented here:
//
// minGroup, maxGroup: maintain only minimum/maximum values added
// MinGroup, MaxGroup: maintain only minimum/maximum values added
// to group respectively.
type Group struct {
Prefix string // Key prefix for Info items in group
Expand All @@ -55,9 +57,9 @@ func (g *Group) shouldInclude(info *Info) bool {
return true
}
switch g.TypeOf {
case minGroup:
case MinGroup:
return info.Val.Less(g.Gatekeeper.Val)
case maxGroup:
case MaxGroup:
return !info.Val.Less(g.Gatekeeper.Val)
default:
panic(fmt.Errorf("unknown group type %d", g.TypeOf))
Expand Down Expand Up @@ -205,21 +207,3 @@ func (g *Group) AddInfo(info *Info) bool {

return false
}

// Delta returns a delta of the group since the specified sequence number.
// Returns an error if the delta is empty.
func (g *Group) Delta(seq int64) (*Group, error) {
delta, err := NewGroup(g.Prefix, g.Limit, g.TypeOf)
if err != nil {
return nil, err
}
for _, info := range g.Infos {
if info.Seq > seq {
delta.addInternal(info)
}
}
if len(delta.Infos) == 0 {
return nil, fmt.Errorf("no deltas to group since sequence number %d", seq)
}
return delta, nil
}
75 changes: 20 additions & 55 deletions gossip/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ func newGroup(prefix string, limit int, typeOf GroupType, t *testing.T) *Group {

// TestNewGroup verifies NewGroup behavior.
func TestNewGroup(t *testing.T) {
if _, err := NewGroup("a", 0, minGroup); err == nil {
if _, err := NewGroup("a", 0, MinGroup); err == nil {
t.Error("new group with limit=0 should be illegal")
}
if _, err := NewGroup("a", -1, minGroup); err == nil {
if _, err := NewGroup("a", -1, MinGroup); err == nil {
t.Error("new group with limit=-1 should be illegal")
}
}

// TestMinGroupShouldInclude tests minGroup type groups
// TestMinGroupShouldInclude tests MinGroup type groups
// and group.shouldInclude() behavior.
func TestMinGroupShouldInclude(t *testing.T) {
group := newGroup("a", 2, minGroup, t)
group := newGroup("a", 2, MinGroup, t)

// First two inserts work fine.
info1 := newInfo("a.a", 1)
Expand All @@ -80,10 +80,10 @@ func TestMinGroupShouldInclude(t *testing.T) {
}
}

// TestMaxGroupShouldInclude tests maxGroup type groups and
// TestMaxGroupShouldInclude tests MaxGroup type groups and
// group.shouldInclude() behavior.
func TestMaxGroupShouldInclude(t *testing.T) {
group := newGroup("a", 2, maxGroup, t)
group := newGroup("a", 2, MaxGroup, t)

// First two inserts work fine.
info1 := newInfo("a.a", 1)
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestMaxGroupShouldInclude(t *testing.T) {
// TestSameKeyInserts inserts the same key into group and verifies
// earlier timestamps are ignored and later timestamps always replace it.
func TestSameKeyInserts(t *testing.T) {
group := newGroup("a", 1, minGroup, t)
group := newGroup("a", 1, MinGroup, t)
info1 := newInfo("a.a", 1)
if !group.AddInfo(info1) {
t.Error("could not insert")
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestSameKeyInserts(t *testing.T) {
// TestGroupCompactAfterTTL verifies group compaction after TTL by
// waiting and verifying a full group can be inserted into again.
func TestGroupCompactAfterTTL(t *testing.T) {
group := newGroup("a", 2, minGroup, t)
group := newGroup("a", 2, MinGroup, t)

// First two inserts work fine.
info1 := newInfo("a.a", 1)
Expand Down Expand Up @@ -186,24 +186,24 @@ func insertRandomInfos(group *Group, count int) InfoArray {
return infos
}

// TestGroups100Keys verifies behavior of minGroup and maxGroup with a
// TestGroups100Keys verifies behavior of MinGroup and MaxGroup with a
// limit of 100 keys after inserting 1000.
func TestGroups100Keys(t *testing.T) {
// Start by adding random infos to min group.
minGroup := newGroup("a", 100, minGroup, t)
infos := insertRandomInfos(minGroup, 1000)
MinGroup := newGroup("a", 100, MinGroup, t)
infos := insertRandomInfos(MinGroup, 1000)

// Insert same infos into the max group.
maxGroup := newGroup("a", 100, maxGroup, t)
MaxGroup := newGroup("a", 100, MaxGroup, t)
for _, info := range infos {
maxGroup.AddInfo(info)
MaxGroup.AddInfo(info)
}
sort.Sort(infos)

minInfos := minGroup.InfosAsArray()
minInfos := MinGroup.InfosAsArray()
sort.Sort(minInfos)

maxInfos := maxGroup.InfosAsArray()
maxInfos := MaxGroup.InfosAsArray()
sort.Sort(maxInfos)

for i := 0; i < 100; i++ {
Expand All @@ -222,7 +222,7 @@ func TestGroups100Keys(t *testing.T) {
// information. We don't want each new update with overlap to generate
// unnecessary delta info.
func TestSameKeySameTimestamp(t *testing.T) {
group := newGroup("a", 2, minGroup, t)
group := newGroup("a", 2, MinGroup, t)
info1 := newInfo("a.a", 1.0)
info2 := newInfo("a.a", 1.0)
info2.Timestamp = info1.Timestamp
Expand All @@ -243,7 +243,7 @@ func TestSameKeyDifferentHops(t *testing.T) {
info2.Hops = 2

// Add info1 first, then info2.
group1 := newGroup("a", 1, minGroup, t)
group1 := newGroup("a", 1, MinGroup, t)
if !group1.AddInfo(info1) || !group1.AddInfo(info2) {
t.Error("failed insertions", info1, info2)
}
Expand All @@ -252,7 +252,7 @@ func TestSameKeyDifferentHops(t *testing.T) {
}

// Add info1 first, then info2.
group2 := newGroup("a", 1, minGroup, t)
group2 := newGroup("a", 1, MinGroup, t)
if !group2.AddInfo(info1) || !group2.AddInfo(info2) {
t.Error("failed insertions")
}
Expand All @@ -263,7 +263,7 @@ func TestSameKeyDifferentHops(t *testing.T) {

// TestGroupGetInfo verifies info selection by key.
func TestGroupGetInfo(t *testing.T) {
group := newGroup("a", 10, minGroup, t)
group := newGroup("a", 10, MinGroup, t)
infos := insertRandomInfos(group, 10)
for _, info := range infos {
if info != group.GetInfo(info.Key) {
Expand All @@ -279,7 +279,7 @@ func TestGroupGetInfo(t *testing.T) {

// TestGroupGetInfoTTL verifies GetInfo with a short TTL.
func TestGroupGetInfoTTL(t *testing.T) {
group := newGroup("a", 10, minGroup, t)
group := newGroup("a", 10, MinGroup, t)
info := newInfo("a.a", 1)
info.TTLStamp = info.Timestamp + int64(time.Nanosecond)
group.AddInfo(info)
Expand All @@ -302,38 +302,3 @@ func TestGroupGetInfoTTL(t *testing.T) {
t.Error("only one info should be returned", infos)
}
}

// TestGroupDelta checks delta groups based on info sequence numbers.
func TestGroupDelta(t *testing.T) {
group := newGroup("a", 10, minGroup, t)

// Insert keys with sequence numbers 1..10.
for i := 0; i < 10; i++ {
info := newInfo(fmt.Sprintf("a.%d", i), float64(i))
info.Seq = int64(i + 1)
group.AddInfo(info)
}

// Verify deltas with successive sequence numbers.
for i := 0; i < 10; i++ {
delta, err := group.Delta(int64(i))
if err != nil {
t.Error("delta failed at sequence number", i)
}
infos := delta.InfosAsArray()
if len(infos) != 10-i {
t.Errorf("expected %d infos, not %d", 10-i, len(infos))
}
sort.Sort(infos)
for j := 0; j < 10-i; j++ {
expKey := fmt.Sprintf("a.%d", j+i)
if infos[j].Key != expKey {
t.Errorf("run %d: key mismatch at index %d: %s != %s", i, j, infos[j].Key, expKey)
}
}
}

if _, err := group.Delta(int64(10)); err == nil {
t.Error("fetching delta of group at maximum sequence number should return error")
}
}
Loading

0 comments on commit 495da5f

Please sign in to comment.