Skip to content
This repository has been archived by the owner on Oct 17, 2024. It is now read-only.

New watch api #122

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
c584f46
Changed Watch signature
asdine May 21, 2019
767805a
Change etcd watch implementation
asdine May 21, 2019
8018d3a
Return the latest revision when a timeout occurs
asdine May 21, 2019
feb7276
Test watch api
asdine May 21, 2019
be77fd4
Fix mock
asdine May 22, 2019
5fd73c6
Simplify server handler
asdine May 22, 2019
8569a17
Fix list tests
asdine May 22, 2019
9b1adf5
Decode paths from the request body
asdine May 22, 2019
42405cd
Support empty body
asdine May 22, 2019
067f696
Improve docstring
asdine May 22, 2019
2ef744a
Improve etcd watch docstring
asdine May 23, 2019
acc0cca
Change revision type from string to int64
asdine May 23, 2019
64597bd
Change mock watch signature
asdine May 23, 2019
1f7f01e
Return an error when receiving a malformed revision
asdine May 23, 2019
3705b6b
Change RulesetEvents revision from string to int64
asdine May 23, 2019
41d1abd
Support revision as int64
asdine May 23, 2019
1607250
Fix etcd watch tests
asdine May 23, 2019
5180456
Fix http watch tests
asdine May 23, 2019
8f2b579
Simplify etcd watch key selection
asdine May 23, 2019
7b1bc5d
Fix slice out of range bug
asdine May 23, 2019
f717e73
Ignoring malformed payload
asdine May 23, 2019
2acc01d
Mark test helpers as test helpers
asdine May 23, 2019
952546b
Explain the time.Sleep in the watch test
asdine May 23, 2019
8633e8c
Apply suggestions from code review
asdine May 23, 2019
db1cb26
Fix regression
asdine May 23, 2019
07d9e3d
Change Rulesets revision type from string to int64
asdine May 23, 2019
04f26fc
Use r.Form instead of r.URL.Query
asdine May 23, 2019
8b0e35c
Fix missing ParseForm in ServeHTTP
asdine May 23, 2019
83a4367
Add WatchOptions and changed Watch signature
asdine Jun 5, 2019
d0c1b57
Implement WatchOptions support for etcd
asdine Jun 5, 2019
e271428
Use WatchOptions for revision and paths in HTTP api
asdine Jun 5, 2019
38b88a1
Improve happy path
asdine Jun 5, 2019
f0e5eb9
Simplify timeout handling
asdine Jun 5, 2019
0f2381f
Apply suggestions from code review
asdine Jun 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions api/etcd/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package etcd
import (
"context"
"encoding/base64"
"strconv"

"github.com/coreos/etcd/clientv3"
"github.com/heetch/regula/api"
Expand Down Expand Up @@ -45,7 +44,7 @@ func (s *RulesetService) List(ctx context.Context, opt api.ListOptions) (*api.Ru
}

rulesets := api.Rulesets{
Revision: strconv.FormatInt(resp.Header.Revision, 10),
Revision: resp.Header.Revision,
}

rulesets.Paths = make([]string, 0, len(resp.Kvs))
Expand Down
15 changes: 9 additions & 6 deletions api/etcd/rulesets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/heetch/regula"
"github.com/heetch/regula/api"
"github.com/heetch/regula/rule"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand All @@ -24,13 +25,11 @@ var (
endpoints = []string{"localhost:2379", "etcd:2379"}
)

func Init() {
rand.Seed(time.Now().UnixNano())
}

func newEtcdRulesetService(t *testing.T) (*RulesetService, func()) {
t.Helper()

rand.Seed(time.Now().UnixNano())

cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
Expand All @@ -49,18 +48,22 @@ func newEtcdRulesetService(t *testing.T) (*RulesetService, func()) {
}

func createRuleset(t *testing.T, s *RulesetService, path string, rules ...*rule.Rule) *regula.Ruleset {
t.Helper()

_, err := s.Put(context.Background(), path, rules)
if err != nil && err != api.ErrRulesetNotModified {
require.NoError(t, err)
}

rs, err := s.Get(context.Background(), path, "")
require.NoError(t, err)
assert.NoError(t, err)
return rs
}

func createBoolRuleset(t *testing.T, s *RulesetService, path string, rules ...*rule.Rule) *regula.Ruleset {
t.Helper()

err := s.Create(context.Background(), path, &regula.Signature{ReturnType: "bool"})
require.False(t, err != nil && err != api.ErrAlreadyExists)
assert.False(t, err != nil && err != api.ErrAlreadyExists)
asdine marked this conversation as resolved.
Show resolved Hide resolved
return createRuleset(t, s, path, rules...)
}
86 changes: 64 additions & 22 deletions api/etcd/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package etcd

import (
"context"
"strconv"
"strings"
asdine marked this conversation as resolved.
Show resolved Hide resolved

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
Expand All @@ -12,61 +12,103 @@ import (
"github.com/pkg/errors"
)

// Watch the given prefix for anything new.
func (s *RulesetService) Watch(ctx context.Context, prefix string, revision string) (*api.RulesetEvents, error) {
// Watch a list of paths for changes and return a list of events. If paths is empty or nil,
// watch all paths. If the revision is negative, watch from the latest revision.
// This method blocks until there is a change in one of the paths or until the context is canceled.
// The given context can be used to limit the watch period or to cancel any running one.
func (s *RulesetService) Watch(ctx context.Context, opt api.WatchOptions) (*api.RulesetEvents, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

revision := opt.Revision

opts := []clientv3.OpOption{clientv3.WithPrefix()}
if i, _ := strconv.ParseInt(revision, 10, 64); i > 0 {
if revision > 0 {
// watch from the next revision
opts = append(opts, clientv3.WithRev(i+1))
opts = append(opts, clientv3.WithRev(revision+1))
}

events := api.RulesetEvents{
Revision: revision,
}
var events api.RulesetEvents

wc := s.Client.Watch(ctx, s.rulesPath(prefix, ""), opts...)
wc := s.Client.Watch(ctx, s.rulesPath("", ""), opts...)
asdine marked this conversation as resolved.
Show resolved Hide resolved
for {
select {
case wresp := <-wc:
if err := wresp.Err(); err != nil {
return nil, errors.Wrapf(err, "failed to watch prefix: '%s'", prefix)
return nil, errors.Wrapf(err, "failed to watch paths: '%#v'", opt.Paths)
}

revision = wresp.Header.Revision

if len(wresp.Events) == 0 {
continue
}

list := make([]api.RulesetEvent, len(wresp.Events))
for i, ev := range wresp.Events {
switch ev.Type {
case mvccpb.PUT:
list[i].Type = api.RulesetPutEvent
default:
var list []api.RulesetEvent
for _, ev := range wresp.Events {
// filter keys that haven't been selected
if !s.shouldIncludeEvent(ev, opt.Paths) {
s.Logger.Debug().Str("type", string(ev.Type)).Str("key", string(ev.Kv.Key)).Msg("watch: ignoring event key")
continue
}

// filter event types, keep only PUT events
if ev.Type != mvccpb.PUT {
s.Logger.Debug().Str("type", string(ev.Type)).Msg("watch: ignoring event type")
continue
}

var pbrs pb.Rules
err := proto.Unmarshal(ev.Kv.Value, &pbrs)
if err != nil {
s.Logger.Debug().Bytes("entry", ev.Kv.Value).Msg("watch: unmarshalling failed")
return nil, errors.Wrap(err, "failed to unmarshal entry")
s.Logger.Error().Bytes("entry", ev.Kv.Value).Msg("watch: unmarshalling failed, ignoring the event")
continue
}

path, version := s.pathVersionFromKey(string(ev.Kv.Key))
list[i].Path = path
list[i].Rules = rulesFromProtobuf(&pbrs)
list[i].Version = version

list = append(list, api.RulesetEvent{
Type: api.RulesetPutEvent,
Path: path,
Rules: rulesFromProtobuf(&pbrs),
Version: version,
})
}

// None of the events matched the user selection, so continue
// waiting for more.
if len(list) == 0 {
continue
}

events.Events = list
events.Revision = strconv.FormatInt(wresp.Header.Revision, 10)
events.Revision = revision
return &events, nil
case <-ctx.Done():
events.Timeout = true
// if we received events but ignored them
// this function will go on until the context is canceled.
// we need to return the latest received revision so the
// caller can start after the filtered events.
events.Revision = revision
return &events, ctx.Err()
}
}
}

// shouldIncludeEvent reports whether the given event should be included
// in the Watch data for the given paths.
func (s *RulesetService) shouldIncludeEvent(ev *clientv3.Event, paths []string) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a little bit simpler, I think:

if len(paths) == 0 {
	return true
}
key := string(ev.Kv.Key)
key = key[:strings.Index(key, versionSeparator)]
for _, path := range paths {
	if s.rulesPath(path) == key {
		return true
	}
}
return false

Also, I'm a bit concerned that this code will panic if the key doesn't contain versionSeparator. How sure are we of that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put is the only function responsible for writing in that path, I'd say that as long as Put is tested it should be fine, but I'm open to suggestions

// detect if the event key is found in the paths list
// or that the paths list is empty
key := string(ev.Kv.Key)
key = key[:strings.Index(key, versionSeparator)]
ok := len(paths) == 0
for i := 0; i < len(paths) && !ok; i++ {
if key == s.rulesPath(paths[i], "") {
ok = true
}
}

return ok
}
98 changes: 68 additions & 30 deletions api/etcd/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,55 +8,93 @@ import (

"github.com/heetch/regula/api"
"github.com/heetch/regula/rule"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestWatch(t *testing.T) {
t.Parallel()

s, cleanup := newEtcdRulesetService(t)
defer cleanup()
tests := []struct {
name string
paths []string
expected []string
}{
{"no paths", nil, []string{"a", "b", "c"}},
{"existing paths", []string{"a", "c"}, []string{"a", "c"}},
}

var wg sync.WaitGroup
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
s, cleanup := newEtcdRulesetService(t)
defer cleanup()

wg.Add(1)
go func() {
defer wg.Done()
var wg sync.WaitGroup

time.Sleep(time.Second)
wg.Add(1)
go func() {
defer wg.Done()

r := rule.New(rule.True(), rule.BoolValue(true))
// wait enought time so that the other goroutine had the time to run the watch method
// before writing data to the database.
time.Sleep(time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sleep statements in tests are usually a bit of a smell :)

Why is this one necessary? If it really is necessary, then I'd add a comment.

FWIW my usual approach these days is to poll until some expected condition becomes true, with a timeout in case it doesn't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like it either, the issue here is that I want to make sure I had the time to run the watch method in the other goroutine before writing data to the database.
I'll add a comment but I'm open to any suggestion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you actually need the watch statement to execute before this code? How about adding some rulesets, wait for an event to arrive, then adding some more, so you check both the initial case and the later case? (I'm not entirely sure of the intended semantics when there are rulesets already there though - what's meant to happen?)


createBoolRuleset(t, s, "aa", r)
createBoolRuleset(t, s, "ab", r)
createBoolRuleset(t, s, "a/1", r)
}()
r := rule.New(rule.True(), rule.BoolValue(true))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
createBoolRuleset(t, s, "a", r)
createBoolRuleset(t, s, "b", r)
createBoolRuleset(t, s, "c", r)
}()

events, err := s.Watch(ctx, "a", "")
require.NoError(t, err)
require.Len(t, events.Events, 1)
require.NotEmpty(t, events.Revision)
require.Equal(t, "aa", events.Events[0].Path)
require.Equal(t, api.RulesetPutEvent, events.Events[0].Type)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

wg.Wait()
var events api.RulesetEvents
var rev int64
var watchCount int
for len(events.Events) != len(test.expected) && watchCount < 4 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ISTM that this won't fail if the watcher produces more events than you want. Perhaps the loop should wait for a while to check that no more arrive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can't produce more events because each call to createBoolRuleset in the goroutine above will produce exactly one event

evs, err := s.Watch(ctx, api.WatchOptions{Paths: test.paths, Revision: rev})
if err != nil {
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition looks redundant to me.

if err == context.DeadlineExceeded {
t.Errorf("timed out waiting for expected events")
} else {
t.Errorf("unexpected error from watcher: %v", err)
}
break
}
break
}
assert.True(t, len(evs.Events) > 0)
assert.NotEmpty(t, evs.Revision)
rev = evs.Revision
events.Events = append(events.Events, evs.Events...)
watchCount++
}

events, err = s.Watch(ctx, "a", events.Revision)
require.NoError(t, err)
require.Len(t, events.Events, 2)
require.NotEmpty(t, events.Revision)
require.Equal(t, api.RulesetPutEvent, events.Events[0].Type)
require.Equal(t, "ab", events.Events[0].Path)
require.Equal(t, api.RulesetPutEvent, events.Events[1].Type)
require.Equal(t, "a/1", events.Events[1].Path)
wg.Wait()

var foundCount int
for _, ev := range events.Events {
for _, p := range test.expected {
if ev.Path == p {
foundCount++
break
}
}
}
require.Equal(t, len(test.expected), foundCount)
})

}

t.Run("timeout", func(t *testing.T) {
s, cleanup := newEtcdRulesetService(t)
defer cleanup()

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel()
events, err := s.Watch(ctx, "", "")
events, err := s.Watch(ctx, api.WatchOptions{})
require.Equal(t, context.DeadlineExceeded, err)
require.True(t, events.Timeout)
})
Expand Down
24 changes: 18 additions & 6 deletions api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ type RulesetService interface {
// List returns the list of all rulesets paths.
// The listing is paginated and can be customised using the ListOptions type.
List(ctx context.Context, opt ListOptions) (*Rulesets, error)
// Watch a prefix for changes and return a list of events.
Watch(ctx context.Context, prefix string, revision string) (*RulesetEvents, error)
// Watch a list of paths for changes and return a list of events.
// The watcher can be customized using the WatchOption type.
Watch(ctx context.Context, opt WatchOptions) (*RulesetEvents, error)
// Eval evaluates a ruleset given a path and a set of parameters. It implements the regula.Evaluator interface.
Eval(ctx context.Context, path, version string, params rule.Params) (*regula.EvalResult, error)
}
Expand All @@ -55,10 +56,21 @@ func (l *ListOptions) GetLimit() int {
return l.Limit
}

// WatchOptions gives indications on what rulesets to watch.
type WatchOptions struct {
// List of paths to watch for changes.
// If the slice is empty, watch all paths.
Paths []string
// Indicates from which revision start watching.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Indicates from which revision start watching.
// Indicates from which revision to start watching.

// Any event happened after that revision is returned.
// If the revision is zero or negative, watch from the latest revision.
Revision int64
}

// Rulesets holds a list of rulesets.
type Rulesets struct {
Paths []string `json:"paths"`
Revision string `json:"revision"` // revision when the request was applied
Revision int64 `json:"revision"` // revision when the request was applied
Cursor string `json:"cursor,omitempty"` // cursor of the next page, if any
}

Expand All @@ -77,7 +89,7 @@ type RulesetEvent struct {

// RulesetEvents holds a list of events occured on a group of rulesets.
type RulesetEvents struct {
Events []RulesetEvent
Revision string
Timeout bool // indicates if the watch did timeout
Events []RulesetEvent `json:"events"`
Revision int64 `json:"revision"`
Timeout bool `json:"timeout"` // indicates if the watch did timeout
}
Loading