From 05fe5ebf58a9be69b43aeecda68cfaca3a299949 Mon Sep 17 00:00:00 2001 From: Fernando Crespo Gravalos Date: Tue, 12 May 2020 09:17:16 +0200 Subject: [PATCH] add unit tests, custom errors and makefile --- Makefile | 19 ++++ api/handler.go | 6 +- fswatcher/fswatcher.go | 25 +++-- fswatcher/fswatcher_test.go | 187 ++++++++++++++++++++++++++++++++++++ go.mod | 1 + go.sum | 12 +++ gonfig/gonfig.go | 14 +-- gonfig/gonfig_test.go | 110 +++++++++++++++++++++ kv/error.go | 55 +++++++++-- kv/in_memory.go | 3 +- kv/in_memory_test.go | 39 ++++++++ kv/kv.go | 19 ++-- kv/kv_test.go | 18 ++++ kv/value.go | 26 +++-- kv/value_test.go | 20 ++++ main.go | 24 ++++- pubsub/error.go | 56 +++++++++++ pubsub/event.go | 4 +- pubsub/event_test.go | 16 +++ pubsub/in_memory.go | 20 ++-- pubsub/in_memory_test.go | 47 +++++++++ pubsub/pubsub_test.go | 18 ++++ pubsub/pubsug.go | 48 +++++++-- 23 files changed, 716 insertions(+), 71 deletions(-) create mode 100644 Makefile create mode 100644 fswatcher/fswatcher_test.go create mode 100644 gonfig/gonfig_test.go create mode 100644 kv/in_memory_test.go create mode 100644 kv/kv_test.go create mode 100644 kv/value_test.go create mode 100644 pubsub/error.go create mode 100644 pubsub/event_test.go create mode 100644 pubsub/in_memory_test.go create mode 100644 pubsub/pubsub_test.go diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..4b56342 --- /dev/null +++ b/Makefile @@ -0,0 +1,19 @@ +GO111MODULE=on +GONFIGD_BINARY_NAME=gonfigd +GONFIGD_VERSION=1.0.0 +BUILD_FLAGS=-ldflags "-X main.version=v${GONFIGD_VERSION}" + +fmt: + go fmt ./... + +vet: + go vet ./... + +test: fmt vet + go test -v ./gonfig... ./fswatcher... ./kv/... ./pubsub/... -coverprofile cover.out + +tidy: + go mod tidy + +build: fmt vet test tidy + go build ${BUILD_FLAGS} -o bin/${GONFIGD_BINARY_NAME} main.go diff --git a/api/handler.go b/api/handler.go index 5c6ae89..f13f478 100644 --- a/api/handler.go +++ b/api/handler.go @@ -30,7 +30,9 @@ func (s *server) WatchConfig(req *WatchConfigRequest, stream Gonfig_WatchConfigS return err } } - sID, sCh := s.Subscribe(req.ConfigPath) + sub, _ := s.Subscribe(req.ConfigPath) + sID := sub.ID() + sCh := sub.Channel() defer s.UnSubscribe(req.ConfigPath, sID) ctx := stream.Context() @@ -42,7 +44,7 @@ func (s *server) WatchConfig(req *WatchConfigRequest, stream Gonfig_WatchConfigS Event: ev.String(), } if err := stream.Send(resp); err != nil { - s.Error().Msgf("failed to send response %v trhough stream: %v", resp, err) + s.Error().Msgf("failed to send response %v through stream: %v", resp, err) return err } s.Info().Msgf("event %s sent to subscription ID %s", resp.Event, resp.SubscriptionID) diff --git a/fswatcher/fswatcher.go b/fswatcher/fswatcher.go index cf9b7cf..954487c 100644 --- a/fswatcher/fswatcher.go +++ b/fswatcher/fswatcher.go @@ -3,7 +3,6 @@ package fswatcher import ( "context" "crypto/md5" - "errors" "fmt" "io/ioutil" "os" @@ -56,7 +55,7 @@ func (r *registry) unregister(path string) { r.Unlock() } -func (fsw *fsWatcher) isValidFileName(name string) bool { +func isValidFileName(name string) bool { // Discard hidden files if match, _ := filepath.Match("\\.*", name); match { return false @@ -67,12 +66,12 @@ func (fsw *fsWatcher) isValidFileName(name string) bool { return true } -func (fsw *fsWatcher) isValidFile(name string) bool { +func isValidFile(name string) bool { fi, err := os.Stat(name) if err != nil { return false } - return fi.Mode().IsRegular() && fsw.isValidFileName(filepath.Base(name)) + return fi.Mode().IsRegular() && isValidFileName(filepath.Base(name)) } func (fsw *fsWatcher) upsertFileOnDb(path string) (bool, error) { @@ -101,8 +100,8 @@ func (fsw *fsWatcher) walk(path string, fi os.FileInfo, err error) error { fsw.log.Info().Msgf("adding watcher for %s", abs) return fsw.watcher.Add(path) - // If it's a regular file, we don't have a have set a fsnotify watch but we check if it's stored in db - } else if fsw.isValidFile(path) { + // If it's a regular file, we don't have to set a fsnotify watch but we check if it's stored in db + } else if isValidFile(path) { if _, err := fsw.kv.Get(path); err != nil { fsw.log.Warn().Msgf("missing file %s in kv, inserting and creating event", path) return fsw.createEventHandler(path) @@ -112,7 +111,7 @@ func (fsw *fsWatcher) walk(path string, fi os.FileInfo, err error) error { } func (fsw *fsWatcher) publishEvent(config string, evType pubsub.EventType) error { - ev := pubsub.CreateEvent(evType, config) + ev := pubsub.NewEvent(evType, config) if !fsw.ps.TopicExists(config) { err := fsw.ps.CreateTopic(config) if err != nil { @@ -157,15 +156,15 @@ func (fsw *fsWatcher) removeEventHandler(name string) error { func (fsw *fsWatcher) routeEvent(ev fsnotify.Event) { evOp := ev.Op.String() - err := errors.New("") + var err error switch evOp { case "CREATE": - if fsw.isValidFile(ev.Name) { + if isValidFile(ev.Name) { err = fsw.createEventHandler(ev.Name) } break case "WRITE": - if fsw.isValidFile(ev.Name) { + if isValidFile(ev.Name) { err = fsw.writeEventHandler(ev.Name) } break @@ -178,13 +177,13 @@ func (fsw *fsWatcher) routeEvent(ev fsnotify.Event) { } if err != nil { - fsw.log.Error().Msgf("error while handling %s event for %s: %v\n", evOp, ev.Name, err) + fsw.log.Error().Msgf("error while handling %s event for %s: %v", evOp, ev.Name, err) } } // Start creates a new fsWatcher // It will return an error if it's not able to create a *fsnotify.Watcer -func Start(ctx context.Context, root string, kv kv.KV, ps pubsub.PubSub, logger zerolog.Logger) error { +func Start(ctx context.Context, root string, fwalkInterval time.Duration, kv kv.KV, ps pubsub.PubSub, logger zerolog.Logger) error { watcher, err := fsnotify.NewWatcher() if err != nil { logger.Error().Msgf("failed to create new fsnotify watcher: %v", err) @@ -200,7 +199,7 @@ func Start(ctx context.Context, root string, kv kv.KV, ps pubsub.PubSub, logger go func(ctx context.Context, root string, fsw *fsWatcher, stopCh chan struct{}) { for { select { - case <-time.After(5 * time.Second): + case <-time.After(fwalkInterval): fsw.log.Debug().Msgf("walking %s directory", root) filepath.Walk(root, fsw.walk) break diff --git a/fswatcher/fswatcher_test.go b/fswatcher/fswatcher_test.go new file mode 100644 index 0000000..53c40fe --- /dev/null +++ b/fswatcher/fswatcher_test.go @@ -0,0 +1,187 @@ +package fswatcher + +import ( + "context" + "fmt" + "io/ioutil" + "log" + "os" + "testing" + "time" + + "github.com/fcgravalos/gonfigd/kv" + "github.com/fcgravalos/gonfigd/pubsub" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" +) + +type testConfig struct { + kv kv.KV + ps pubsub.PubSub + log zerolog.Logger + root string +} + +var testCfg *testConfig + +func TestUpsertFileOnDb(t *testing.T) { + fullPath := fmt.Sprintf("%s/test.yaml", testCfg.root) + err := ioutil.WriteFile(fullPath, []byte("foo: bar"), 0644) + if err != nil { + panic(err) + } + fsw := &fsWatcher{kv: testCfg.kv, ps: testCfg.ps} + changed, e1 := fsw.upsertFileOnDb(fullPath) + assert.Nil(t, e1) + assert.True(t, changed) + + v1, e2 := testCfg.kv.Get(fmt.Sprintf("%s/test.yaml", testCfg.root)) + assert.Nil(t, e2) + assert.Equal(t, "foo: bar", v1.Text()) + + changed2, e3 := fsw.upsertFileOnDb(fullPath) + assert.Nil(t, e3) + assert.False(t, changed2) + + f, err := os.OpenFile(fullPath, os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + panic(err) + } + defer f.Close() + if _, err := f.WriteString("\nbar: baz"); err != nil { + panic(err) + } + changed3, e4 := fsw.upsertFileOnDb(fullPath) + assert.Nil(t, e4) + assert.True(t, changed3) + + v2, e5 := testCfg.kv.Get(fmt.Sprintf("%s/test.yaml", testCfg.root)) + assert.Nil(t, e5) + assert.Equal(t, "foo: bar\nbar: baz", v2.Text()) +} + +func TestIsValidFileName(t *testing.T) { + assert.True(t, isValidFileName("foo.yaml")) + assert.False(t, isValidFileName("foo.swp")) + assert.False(t, isValidFileName("foo.swx")) + assert.False(t, isValidFileName("foo.~")) + assert.False(t, isValidFileName("foo.tmp")) +} + +func TestIsValidFile(t *testing.T) { + assert.False(t, isValidFile("foo.yaml")) + + fp1 := fmt.Sprintf("%s/foo.yaml", testCfg.root) + e1 := ioutil.WriteFile(fp1, []byte("foo: bar"), 0644) + if e1 != nil { + panic(e1) + } + assert.True(t, isValidFile(fmt.Sprintf("%s/foo.yaml", testCfg.root))) + + fp2 := fmt.Sprintf("%s/foo.swp", testCfg.root) + e2 := ioutil.WriteFile(fp2, []byte("foo: bar"), 0644) + if e2 != nil { + panic(e2) + } + assert.False(t, isValidFile(fmt.Sprintf("%s/foo.swp", testCfg.root))) +} + +func TestPublishEvent(t *testing.T) { + fsw := &fsWatcher{ps: testCfg.ps} + e1 := fsw.ps.CreateTopic("foo/config.yaml") + assert.Nil(t, e1) + + sub, e2 := fsw.ps.Subscribe("foo/config.yaml") + assert.Nil(t, e2) + + var ev *pubsub.Event + + done := make(chan struct{}) + + go func(done chan struct{}) { + scH := sub.Channel() + ev = <-scH + done <- struct{}{} + }(done) + + e3 := fsw.publishEvent("foo/config.yaml", pubsub.ConfigCreated) + assert.Nil(t, e3) + <-done + assert.Equal(t, "foo/config.yaml", ev.ConfigPath()) + assert.Equal(t, pubsub.ConfigCreated, ev.Kind()) +} + +func TestStart(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fp := fmt.Sprintf("%s/test-start.yaml", testCfg.root) + testCfg.ps.CreateTopic(fp) + sub, _ := testCfg.ps.Subscribe(fp) + sCh := sub.Channel() + + go Start(ctx, testCfg.root, 5*time.Second, testCfg.kv, testCfg.ps, testCfg.log) + + err := ioutil.WriteFile(fp, []byte("foo: bar"), 0644) + if err != nil { + panic(err) + } + + ev1 := <-sCh + assert.Equal(t, pubsub.ConfigCreated, ev1.Kind()) + + v1, e1 := testCfg.kv.Get(fp) + assert.Nil(t, e1) + assert.Equal(t, "foo: bar", v1.Text()) + + f, err := os.OpenFile(fp, os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + panic(err) + } + defer f.Close() + if _, err := f.WriteString("\nbar: baz"); err != nil { + panic(err) + } + + ev2 := <-sCh + assert.Equal(t, pubsub.ConfigUpdated, ev2.Kind()) + + v2, e2 := testCfg.kv.Get(fp) + assert.Nil(t, e2) + assert.Equal(t, "foo: bar\nbar: baz", v2.Text()) + + os.Remove(fp) + + ev3 := <-sCh + assert.Equal(t, pubsub.ConfigDeleted, ev3.Kind()) + + v3, e3 := testCfg.kv.Get(fp) + assert.Nil(t, v3) + assert.True(t, kv.IsKeyNotFoundError(e3)) + assert.EqualError(t, e3, fmt.Sprintf("[%s] Key %s not found in KV", kv.KeyNotFound, fp)) +} + +func TestMain(m *testing.M) { + dir, err := ioutil.TempDir("", fmt.Sprintf("fswatcher-tests-%s", time.Now())) + if err != nil { + log.Fatal(err) + } + defer os.Remove(dir) + + kv, _ := kv.NewKV(kv.INMEMORY) + ps, _ := pubsub.NewPubSub(pubsub.INMEMORY) + + logger := zerolog.New(os.Stderr). + With(). + Timestamp(). + Caller(). + Logger() + + testCfg = &testConfig{ + kv: kv, + ps: ps, + log: logger, + root: dir, + } + os.Exit(m.Run()) +} diff --git a/go.mod b/go.mod index e15af69..00db43c 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/golang/protobuf v1.4.1 github.com/google/uuid v1.1.1 github.com/rs/zerolog v1.18.0 + github.com/stretchr/testify v1.5.1 google.golang.org/grpc v1.29.1 google.golang.org/protobuf v1.22.0 ) diff --git a/go.sum b/go.sum index eafda95..52da0c8 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= @@ -29,10 +31,16 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.18.0 h1:CbAm3kP2Tptby1i9sYy2MGRg0uxIN9cyDb59Ys7W8z8= github.com/rs/zerolog v1.18.0/go.mod h1:9nvC1axdVrAHcu/s9taAVfBuIdTZLVQmKQyvrUjF5+I= +github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -80,5 +88,9 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.22.0 h1:cJv5/xdbk1NnMPR1VP9+HU6gupuG9MLBoH1r6RHZ2MY= google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/gonfig/gonfig.go b/gonfig/gonfig.go index 290a186..4e1f6e8 100644 --- a/gonfig/gonfig.go +++ b/gonfig/gonfig.go @@ -6,6 +6,7 @@ import ( "log" "net" "sync" + "time" "github.com/fcgravalos/gonfigd/api" "github.com/fcgravalos/gonfigd/fswatcher" @@ -16,11 +17,12 @@ import ( ) type Config struct { - GrpcAddr string - KvKind kv.Kind - PsKind pubsub.Kind - RootFolder string - Logger zerolog.Logger + GrpcAddr string + KvKind kv.Kind + PsKind pubsub.Kind + RootFolder string + FsWalkInterval time.Duration + Logger zerolog.Logger } func Start(ctx context.Context, waitChan chan struct{}, cfg Config) error { @@ -45,7 +47,7 @@ func Start(ctx context.Context, waitChan chan struct{}, cfg Config) error { defer wg.Done() cfg.Logger.Info(). Msg("starting fswatcher") - if err := fswatcher.Start(ctx, cfg.RootFolder, kv, ps, cfg.Logger); err != nil { + if err := fswatcher.Start(ctx, cfg.RootFolder, cfg.FsWalkInterval, kv, ps, cfg.Logger); err != nil { cfg.Logger.Fatal().Msgf("fswatcher returned with error: %v", err) } }(ctx) diff --git a/gonfig/gonfig_test.go b/gonfig/gonfig_test.go new file mode 100644 index 0000000..ef2935a --- /dev/null +++ b/gonfig/gonfig_test.go @@ -0,0 +1,110 @@ +package gonfig + +import ( + "context" + "fmt" + "io/ioutil" + "log" + "net" + "os" + "strings" + "testing" + "time" + + "github.com/fcgravalos/gonfigd/api" + "github.com/fcgravalos/gonfigd/kv" + "github.com/fcgravalos/gonfigd/pubsub" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" +) + +var cfg *Config + +// PickRandomTCPPort picks free TCP Port from localhost +func pickRandomTCPPort() int { + addr, err := net.ResolveTCPAddr("tcp", "localhost:0") + if err != nil { + log.Fatalf("Could not resolve address: %v", err) + } + l, err := net.ListenTCP("tcp", addr) + if err != nil { + log.Fatalf("Could not setup port %v", err) + } + defer l.Close() + return l.Addr().(*net.TCPAddr).Port +} + +func TestGetConfig(t *testing.T) { + var conn *grpc.ClientConn + conn, e1 := grpc.Dial(cfg.GrpcAddr, grpc.WithInsecure()) + assert.Nil(t, e1) + assert.NotNil(t, conn) + defer conn.Close() + + c := api.NewGonfigClient(conn) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fp := fmt.Sprintf("%s/test.yaml", cfg.RootFolder) + + client, e2 := c.WatchConfig(ctx, &api.WatchConfigRequest{ConfigPath: fp}) + assert.Nil(t, e2) + assert.NotNil(t, client) + outCh := make(chan string) + defer close(outCh) + go func(outCh chan string) { + for { + select { + case <-time.After(1 * time.Second): + resp, _ := client.Recv() + if resp != nil { + outCh <- resp.GetEvent() + } + } + } + }(outCh) + + err := ioutil.WriteFile(fp, []byte("foo: bar"), 0644) + if err != nil { + panic(err) + } + ev := <-outCh + assert.True(t, strings.Contains(ev, pubsub.ConfigCreated.String())) + response, err := c.GetConfig(ctx, &api.GetConfigRequest{ConfigPath: fp}) + assert.Nil(t, err) + assert.Equal(t, response.GetConfig(), "foo: bar") +} + +func TestMain(m *testing.M) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + waitChan := make(chan struct{}) + + logger := zerolog.New(os.Stderr). + With(). + Timestamp(). + Caller(). + Logger() + + dir, err := ioutil.TempDir("", fmt.Sprintf("gonfig-tests-%s", time.Now())) + if err != nil { + log.Fatal(err) + } + defer os.Remove(dir) + cfg = &Config{ + GrpcAddr: fmt.Sprintf(":%d", pickRandomTCPPort()), + KvKind: kv.INMEMORY, + PsKind: pubsub.INMEMORY, + RootFolder: dir, + FsWalkInterval: 5 * time.Second, + Logger: logger, + } + + go Start(ctx, waitChan, *cfg) + + res := m.Run() + cancel() + <-waitChan + os.Exit(res) +} diff --git a/kv/error.go b/kv/error.go index f2f23dc..07da2a6 100644 --- a/kv/error.go +++ b/kv/error.go @@ -3,8 +3,10 @@ package kv import "fmt" const ( - KeyNotFound ErrType = "KEY_NOT_FOUND_ERROR" - Unknown ErrType = "UNKNOWN_ERROR" + KeyNotFound ErrType = "KEY_NOT_FOUND_ERROR" + Compression ErrType = "COMPRESSION_ERROR" + NotImplemented ErrType = "NOT_IMPLEMENTED_ERROR" + Unknown ErrType = "UNKNOWN_ERROR" ) type ErrType string @@ -14,23 +16,62 @@ type KeyNotFoundError struct { key string } -func (e KeyNotFoundError) Error() string { - return fmt.Sprintf("[%s] Key %s not found in KV", e.errType, e.key) +type CompressionError struct { + errType ErrType + data string + err error } -func NewKeyNotFoundError(key string) KeyNotFoundError { - return KeyNotFoundError{errType: KeyNotFound, key: key} +type NotImplementedError struct { + errType ErrType + kvImpl string } func getErrorType(e error) ErrType { switch e.(type) { case KeyNotFoundError: return KeyNotFound + case CompressionError: + return Compression + case NotImplementedError: + return NotImplemented default: return Unknown } } -func isKeyNotFoundError(e error) bool { +func IsKeyNotFoundError(e error) bool { return getErrorType(e) == KeyNotFound } + +func IsCompressionError(e error) bool { + return getErrorType(e) == Compression +} + +func IsNotImplementedError(e error) bool { + return getErrorType(e) == NotImplemented +} + +func (e KeyNotFoundError) Error() string { + return fmt.Sprintf("[%s] Key %s not found in KV", e.errType, e.key) +} + +func (e CompressionError) Error() string { + return fmt.Sprintf("[%s] gzip operation failed for data %s", e.errType, e.data) +} + +func (e NotImplementedError) Error() string { + return fmt.Sprintf("[%s] %s is not a supported implementation of KV interface", e.errType, e.kvImpl) +} + +func NewKeyNotFoundError(key string) KeyNotFoundError { + return KeyNotFoundError{errType: KeyNotFound, key: key} +} + +func NewCompressionError(data []byte, err error) CompressionError { + return CompressionError{errType: Compression, data: string(data), err: err} +} + +func NewNotImplementedError(impl string) NotImplementedError { + return NotImplementedError{errType: NotImplemented, kvImpl: impl} +} diff --git a/kv/in_memory.go b/kv/in_memory.go index 10f0d7f..659fa76 100644 --- a/kv/in_memory.go +++ b/kv/in_memory.go @@ -1,7 +1,6 @@ package kv import ( - "fmt" "sync" ) @@ -24,7 +23,7 @@ func (im *InMemory) Put(key string, value *Value) error { func (im *InMemory) Get(key string) (*Value, error) { v, ok := im.Db[key] if !ok { - return nil, fmt.Errorf("key %s not found in gonfig db", key) + return nil, NewKeyNotFoundError(key) } return v, nil } diff --git a/kv/in_memory_test.go b/kv/in_memory_test.go new file mode 100644 index 0000000..82c9f1f --- /dev/null +++ b/kv/in_memory_test.go @@ -0,0 +1,39 @@ +package kv + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestInMemoryOperations(t *testing.T) { + db, err1 := NewKV(INMEMORY) + assert.NotNil(t, db) + + _, ok := db.(*InMemory) + assert.True(t, ok) + + assert.Nil(t, err1) + + v1, err2 := db.Get("foo") + assert.Nil(t, v1) + assert.EqualError(t, err2, fmt.Sprintf("[%s] Key foo not found in KV", KeyNotFound)) + + v2, err3 := NewValue([]byte("bar")) + assert.Nil(t, err3) + + err4 := db.Put("foo", v2) + assert.Nil(t, err4) + + v3, err5 := db.Get("foo") + assert.Nil(t, err5) + assert.Equal(t, v3, v2) + + err6 := db.Delete("foo") + assert.Nil(t, err6) + + v4, err7 := db.Get("foo") + assert.Nil(t, v4) + assert.EqualError(t, err7, fmt.Sprintf("[%s] Key foo not found in KV", KeyNotFound)) +} diff --git a/kv/kv.go b/kv/kv.go index 1b0b3ba..28c2e14 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -1,14 +1,12 @@ package kv -import ( - "fmt" -) - const ( INMEMORY Kind = "in-memory" ) -var supportedKVs map[Kind]struct{} +var supportedKVs map[string]Kind = map[string]Kind{ + "in-memory": INMEMORY, +} type KV interface { Put(k string, v *Value) error @@ -18,15 +16,16 @@ type KV interface { type Kind string -func init() { - supportedKVs = map[Kind]struct{}{INMEMORY: struct{}{}} +func KVFromName(name string) (Kind, error) { + kind, ok := supportedKVs[name] + if !ok { + return kind, NewNotImplementedError(name) + } + return kind, nil } func NewKV(kind Kind) (KV, error) { var kv KV - if _, ok := supportedKVs[kind]; !ok { - return nil, fmt.Errorf("KV %v not supported", kind) - } switch kind { case INMEMORY: kv = &InMemory{Db: make(map[string]*Value)} diff --git a/kv/kv_test.go b/kv/kv_test.go new file mode 100644 index 0000000..8558146 --- /dev/null +++ b/kv/kv_test.go @@ -0,0 +1,18 @@ +package kv + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestKVFromName(t *testing.T) { + kv, err := KVFromName("in-memory") + assert.Nil(t, err) + assert.Equal(t, INMEMORY, kv) + + kv2, err2 := KVFromName("foo") + assert.Equal(t, Kind(""), kv2) + assert.EqualError(t, err2, fmt.Sprintf("[%s] foo is not a supported implementation of KV interface", NotImplemented)) +} diff --git a/kv/value.go b/kv/value.go index d7c94ff..91b8512 100644 --- a/kv/value.go +++ b/kv/value.go @@ -19,17 +19,21 @@ type Value struct { func compressAndEncode(data []byte) (string, error) { var b bytes.Buffer - gz, _ := gzip.NewWriterLevel(&b, flate.BestCompression) - if _, err := gz.Write(data); err != nil { - return "", err + gz, err := gzip.NewWriterLevel(&b, flate.BestCompression) + if err != nil { + return "", NewCompressionError(data, err) + } + + if _, err = gz.Write(data); err != nil { + return "", NewCompressionError(data, err) } - if err := gz.Flush(); err != nil { - return "", err + if err = gz.Flush(); err != nil { + return "", NewCompressionError(data, err) } - if err := gz.Close(); err != nil { - return "", err + if err = gz.Close(); err != nil { + return "", NewCompressionError(data, err) } return base64.StdEncoding.EncodeToString(b.Bytes()), nil } @@ -47,10 +51,18 @@ func NewValue(data []byte) (*Value, error) { }, nil } +func (v *Value) LastModified() time.Time { + return v.lastModified +} + func (v *Value) MD5() string { return v.md5 } +func (v *Value) Data() string { + return v.data +} + func (v *Value) Text() string { gzipped, _ := base64.StdEncoding.DecodeString(v.data) r, _ := gzip.NewReader(bytes.NewReader(gzipped)) diff --git a/kv/value_test.go b/kv/value_test.go new file mode 100644 index 0000000..9435d18 --- /dev/null +++ b/kv/value_test.go @@ -0,0 +1,20 @@ +package kv + +import ( + "crypto/md5" + "encoding/base64" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetValue(t *testing.T) { + v, err := NewValue([]byte("foo")) + assert.Nil(t, err) + assert.Equal(t, "foo", v.Text()) + assert.Equal(t, fmt.Sprintf("%x", md5.Sum([]byte("foo"))), v.MD5()) + assert.NotNil(t, v.LastModified()) + _, err2 := base64.StdEncoding.DecodeString(v.Data()) + assert.Nil(t, err2) +} diff --git a/main.go b/main.go index eec69c9..db5915b 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "context" "flag" + "fmt" "os" "os/signal" "syscall" @@ -15,18 +16,29 @@ import ( "github.com/rs/zerolog" ) +// Will be initialized at build time +var version string + func main() { cfg := &gonfig.Config{} - var fsWatchInterval time.Duration var enableDebugLog bool + var kvImpl string + var versionFlag bool + flag.BoolVar(&versionFlag, "version", false, "Show gonfigd version") flag.StringVar(&cfg.GrpcAddr, "server-addr", ":8080", "gRPC server address.") flag.StringVar(&cfg.RootFolder, "root-folder", "./", "Root folder of the configuration tree") - flag.DurationVar(&fsWatchInterval, "fswatch-interval", 5*time.Second, "How often the fswatcher will inspect the configuration tree to setup") + flag.StringVar(&kvImpl, "kv", "in-memory", "Key-Value implementation. Only 'in-memory' supported") + flag.DurationVar(&cfg.FsWalkInterval, "fswalk-interval", 5*time.Second, "How often the fswatcher will inspect the configuration tree for new folders. Example: 10s") flag.BoolVar(&enableDebugLog, "debug", false, "Enable debug logging") flag.Parse() + if versionFlag { + fmt.Printf("gonfigd version %s\n", version) + os.Exit(0) + } + // Setting logs logger := zerolog.New(os.Stderr). With(). @@ -43,7 +55,11 @@ func main() { cfg.Logger = logger // Add a better way of selecting these, when we actually support more kvs and pubsubs. - cfg.KvKind = kv.INMEMORY + kvkind, err := kv.KVFromName(kvImpl) + if err != nil { + logger.Fatal().Msgf("%v", err) + } + cfg.KvKind = kvkind cfg.PsKind = pubsub.INMEMORY ctx, cancel := context.WithCancel(context.Background()) @@ -66,8 +82,6 @@ func main() { logger.Info(). Msg("gonfigd stopped, goodbye!") return - case <-time.After(1 * time.Second): - break } } } diff --git a/pubsub/error.go b/pubsub/error.go new file mode 100644 index 0000000..8d36d55 --- /dev/null +++ b/pubsub/error.go @@ -0,0 +1,56 @@ +package pubsub + +import "fmt" + +const ( + NoSuchTopic ErrType = "NO_SUCH_TOPIC_ERROR" + NotImplemented ErrType = "NOT_IMPLEMENTED_ERROR" + Unknown ErrType = "UNKNOWN_ERROR" +) + +type ErrType string + +type NoSuchTopicError struct { + errType ErrType + topic string +} + +type NotImplementedError struct { + errType ErrType + psImpl string +} + +func getErrorType(e error) ErrType { + switch e.(type) { + case NoSuchTopicError: + return NoSuchTopic + case NotImplementedError: + return NotImplemented + default: + return Unknown + } +} + +func IsNoSuchTopicError(e error) bool { + return getErrorType(e) == NoSuchTopic +} + +func IsNotImplementedError(e error) bool { + return getErrorType(e) == NotImplemented +} + +func (e NoSuchTopicError) Error() string { + return fmt.Sprintf("[%s] Topic %s does not exist", e.errType, e.topic) +} + +func (e NotImplementedError) Error() string { + return fmt.Sprintf("[%s] %s is not a supported implementation of PubSub interface", e.errType, e.psImpl) +} + +func NewNoSuchTopicError(topic string) NoSuchTopicError { + return NoSuchTopicError{errType: NoSuchTopic, topic: topic} +} + +func NewNotImplementedError(impl string) NotImplementedError { + return NotImplementedError{errType: NotImplemented, psImpl: impl} +} diff --git a/pubsub/event.go b/pubsub/event.go index f351b12..b6b8456 100644 --- a/pubsub/event.go +++ b/pubsub/event.go @@ -40,10 +40,10 @@ type Event struct { createdAt time.Time } -// CreateEvent returns a a pointer to Event type +// NewEvent returns a pointer to Event type // kind EventType, the kind of Event // configPath string, the config path -func CreateEvent(kind EventType, configPath string) *Event { +func NewEvent(kind EventType, configPath string) *Event { return &Event{kind: kind, configPath: configPath, createdAt: time.Now()} } diff --git a/pubsub/event_test.go b/pubsub/event_test.go new file mode 100644 index 0000000..de46f61 --- /dev/null +++ b/pubsub/event_test.go @@ -0,0 +1,16 @@ +package pubsub + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestEvent(t *testing.T) { + for _, e := range []EventType{ConfigCreated, ConfigUpdated, ConfigDeleted} { + ev := NewEvent(e, "foo/config.yaml") + assert.NotNil(t, ev) + assert.Equal(t, fmt.Sprintf("[%s] - %s: %s", ev.CreatedAt(), ev.Kind(), ev.ConfigPath()), ev.String()) + } +} diff --git a/pubsub/in_memory.go b/pubsub/in_memory.go index 43ffcb3..caae71e 100644 --- a/pubsub/in_memory.go +++ b/pubsub/in_memory.go @@ -3,8 +3,6 @@ package pubsub import ( "sync" - - "github.com/google/uuid" ) type subscriptions map[string]chan *Event @@ -49,18 +47,24 @@ func (im *InMemory) Publish(topic string, ev *Event) error { } // Subscribe adds a new subscription to a topic, -// returns the subscription ID and a subscription channel -func (im *InMemory) Subscribe(topic string) (string, chan *Event) { - sID := uuid.New().String() - sCh := make(chan *Event) +// returns the newly created Subscription object, +// or NoSuchTopicError if the topic is not created yet +func (im *InMemory) Subscribe(topic string) (*Subscription, error) { + if !im.TopicExists(topic) { + return nil, NewNoSuchTopicError(topic) + } + s := NewSubscription() im.Lock() - im.pubsub[topic][sID] = sCh + im.pubsub[topic][s.ID()] = s.Channel() im.Unlock() - return sID, sCh + return s, nil } // UnSubscribe removes a subscription from a topic func (im *InMemory) UnSubscribe(topic string, sID string) error { + if !im.TopicExists(topic) { + return NewNoSuchTopicError(topic) + } im.Lock() close(im.pubsub[topic][sID]) delete(im.pubsub[topic], sID) diff --git a/pubsub/in_memory_test.go b/pubsub/in_memory_test.go new file mode 100644 index 0000000..e29ea7a --- /dev/null +++ b/pubsub/in_memory_test.go @@ -0,0 +1,47 @@ +package pubsub + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestInMemoryOperations(t *testing.T) { + ps, err1 := NewPubSub(INMEMORY) + assert.NotNil(t, ps) + + _, ok := ps.(*InMemory) + assert.True(t, ok) + + assert.Nil(t, err1) + + err2 := ps.CreateTopic("foo") + assert.Nil(t, err2) + assert.True(t, ps.TopicExists("foo")) + + s, err3 := ps.Subscribe("bar") + assert.Nil(t, s) + assert.EqualError(t, err3, fmt.Sprintf("[%s] Topic bar does not exist", NoSuchTopic)) + + s2, err4 := ps.Subscribe("foo") + assert.NotNil(t, s2) + assert.Nil(t, err4) + + go func(s *Subscription) { + ch := s.Channel() + <-ch + }(s2) + + err5 := ps.Publish("foo", NewEvent(ConfigCreated, "foo")) + assert.Nil(t, err5) + + err6 := ps.UnSubscribe("foo", s2.ID()) + assert.Nil(t, err6) + + err7 := ps.DeleteTopic("foo") + assert.Nil(t, err7) + + err8 := ps.UnSubscribe("foo", s2.ID()) + assert.EqualError(t, err8, fmt.Sprintf("[%s] Topic foo does not exist", NoSuchTopic)) +} diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go new file mode 100644 index 0000000..df56ec8 --- /dev/null +++ b/pubsub/pubsub_test.go @@ -0,0 +1,18 @@ +package pubsub + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestKVFromName(t *testing.T) { + ps, err := PubSubFromName("in-memory") + assert.Nil(t, err) + assert.Equal(t, INMEMORY, ps) + + ps2, err2 := PubSubFromName("foo") + assert.Equal(t, Kind(""), ps2) + assert.EqualError(t, err2, fmt.Sprintf("[%s] foo is not a supported implementation of PubSub interface", NotImplemented)) +} diff --git a/pubsub/pubsug.go b/pubsub/pubsug.go index 547d66f..a976557 100644 --- a/pubsub/pubsug.go +++ b/pubsub/pubsug.go @@ -2,37 +2,67 @@ package pubsub import ( - "fmt" + "github.com/google/uuid" ) //INMEMORY is an in-memory PubSub implementation const INMEMORY Kind = "in-memory" -var supportedPubSubs map[Kind]struct{} - -func init() { - supportedPubSubs = map[Kind]struct{}{INMEMORY: struct{}{}} +var supportedPubSubs map[string]Kind = map[string]Kind{ + "in-memory": INMEMORY, } // Kind is the KV kind type Kind string +// Subscription type +type Subscription struct { + id string + ch chan *Event +} + +// ID returns the subscription id +func (s *Subscription) ID() string { + return s.id +} + +// Channel returns the subscription channel +func (s *Subscription) Channel() chan *Event { + return s.ch +} + +// NewSubscription creates a new Subscription +func NewSubscription() *Subscription { + return &Subscription{ + id: uuid.New().String(), + ch: make(chan *Event), + } +} + // PubSub provides an API for a Publish-Subscribe system type PubSub interface { CreateTopic(topic string) error DeleteTopic(topic string) error TopicExists(topic string) bool Publish(topic string, ev *Event) error - Subscribe(topic string) (string, chan *Event) + Subscribe(topic string) (*Subscription, error) UnSubscribe(topic string, sID string) error } +// PubSubFromName returns the PubSub Kind from the provided name +// It will return a NotImplementedError otherwise +func PubSubFromName(name string) (Kind, error) { + kind, ok := supportedPubSubs[name] + if !ok { + return kind, NewNotImplementedError(name) + } + return kind, nil +} + // NewPubSub returns an implementation of the PubSub interface func NewPubSub(kind Kind) (PubSub, error) { var ps PubSub - if _, ok := supportedPubSubs[kind]; !ok { - return nil, fmt.Errorf("PubSub %v not supported", kind) - } + switch kind { case INMEMORY: ps = &InMemory{pubsub: make(map[string]subscriptions)}