diff --git a/.drone.star b/.drone.star index bae39f81046..c10f0ddcc0e 100644 --- a/.drone.star +++ b/.drone.star @@ -58,6 +58,7 @@ config = { "services/auth-basic", "services/auth-bearer", "services/auth-machine", + "services/eventhistory", "services/frontend", "services/gateway", "services/graph", diff --git a/Makefile b/Makefile index 767d353aac4..b32ec49a085 100644 --- a/Makefile +++ b/Makefile @@ -22,6 +22,7 @@ OCIS_MODULES = \ services/auth-basic \ services/auth-bearer \ services/auth-machine \ + services/eventhistory \ services/frontend \ services/gateway \ services/graph \ diff --git a/changelog/unreleased/event-history.md b/changelog/unreleased/event-history.md index 7aed8f1202d..e2a44501214 100644 --- a/changelog/unreleased/event-history.md +++ b/changelog/unreleased/event-history.md @@ -1,5 +1,5 @@ Enhancement: Eventhistory service -Introduces the `eventhistory` service. It is a service that is storing events and providing a grpc API to retrieve them +Introduces the `eventhistory` service. It is a service that stores events and provides a grpc API to retrieve them. https://github.com/owncloud/ocis/pull/5600 diff --git a/ocis-pkg/config/config.go b/ocis-pkg/config/config.go index aa3af173757..76ba265f073 100644 --- a/ocis-pkg/config/config.go +++ b/ocis-pkg/config/config.go @@ -9,6 +9,7 @@ import ( authbasic "github.com/owncloud/ocis/v2/services/auth-basic/pkg/config" authbearer "github.com/owncloud/ocis/v2/services/auth-bearer/pkg/config" authmachine "github.com/owncloud/ocis/v2/services/auth-machine/pkg/config" + eventhistory "github.com/owncloud/ocis/v2/services/eventhistory/pkg/config" frontend "github.com/owncloud/ocis/v2/services/frontend/pkg/config" gateway "github.com/owncloud/ocis/v2/services/gateway/pkg/config" graph "github.com/owncloud/ocis/v2/services/graph/pkg/config" @@ -84,6 +85,7 @@ type Config struct { AuthBasic *authbasic.Config `yaml:"auth_basic"` AuthBearer *authbearer.Config `yaml:"auth_bearer"` AuthMachine *authmachine.Config `yaml:"auth_machine"` + EventHistory *eventhistory.Config `yaml:"eventhistory"` Frontend *frontend.Config `yaml:"frontend"` Gateway *gateway.Config `yaml:"gateway"` Graph *graph.Config `yaml:"graph"` diff --git a/ocis-pkg/config/defaultconfig.go b/ocis-pkg/config/defaultconfig.go index fd527e95109..67d056eaf59 100644 --- a/ocis-pkg/config/defaultconfig.go +++ b/ocis-pkg/config/defaultconfig.go @@ -7,6 +7,7 @@ import ( authbasic "github.com/owncloud/ocis/v2/services/auth-basic/pkg/config/defaults" authbearer "github.com/owncloud/ocis/v2/services/auth-bearer/pkg/config/defaults" authmachine "github.com/owncloud/ocis/v2/services/auth-machine/pkg/config/defaults" + eventhistory "github.com/owncloud/ocis/v2/services/eventhistory/pkg/config/defaults" frontend "github.com/owncloud/ocis/v2/services/frontend/pkg/config/defaults" gateway "github.com/owncloud/ocis/v2/services/gateway/pkg/config/defaults" graph "github.com/owncloud/ocis/v2/services/graph/pkg/config/defaults" @@ -48,6 +49,7 @@ func DefaultConfig() *Config { AuthBasic: authbasic.DefaultConfig(), AuthBearer: authbearer.DefaultConfig(), AuthMachine: authmachine.DefaultConfig(), + EventHistory: eventhistory.DefaultConfig(), Frontend: frontend.DefaultConfig(), Gateway: gateway.DefaultConfig(), Graph: graph.DefaultConfig(), diff --git a/ocis-pkg/roles/manager.go b/ocis-pkg/roles/manager.go index 651970a9252..b92132e9214 100644 --- a/ocis-pkg/roles/manager.go +++ b/ocis-pkg/roles/manager.go @@ -29,7 +29,7 @@ type Manager struct { func NewManager(o ...Option) Manager { opts := newOptions(o...) - nStore := ocisstore.GetStore(opts.storeOptions) + nStore := ocisstore.Create(opts.storeOptions...) return Manager{ cache: nStore, roleService: opts.roleService, diff --git a/ocis-pkg/roles/option.go b/ocis-pkg/roles/option.go index 772f0ef6477..ede1d403271 100644 --- a/ocis-pkg/roles/option.go +++ b/ocis-pkg/roles/option.go @@ -8,7 +8,7 @@ import ( // Options are all the possible options. type Options struct { - storeOptions ocisstore.OcisStoreOptions + storeOptions []ocisstore.Option logger log.Logger roleService settingssvc.RoleService } @@ -30,7 +30,8 @@ func RoleService(rs settingssvc.RoleService) Option { } } -func StoreOptions(storeOpts ocisstore.OcisStoreOptions) Option { +// StoreOptions are the options for the store +func StoreOptions(storeOpts []ocisstore.Option) Option { return func(o *Options) { o.storeOptions = storeOpts } diff --git a/ocis-pkg/store/options.go b/ocis-pkg/store/options.go new file mode 100644 index 00000000000..f2671211613 --- /dev/null +++ b/ocis-pkg/store/options.go @@ -0,0 +1,50 @@ +package store + +import "time" + +// Option provides an option to configure the store +type Option func(*Options) + +// Type defines the type of the store +func Type(typ string) Option { + return func(o *Options) { + o.Type = typ + } +} + +// Addresses defines the addresses where the store can be reached +func Addresses(addrs ...string) Option { + return func(o *Options) { + o.Addresses = addrs + } +} + +// Database defines the Database the store should use +func Database(db string) Option { + return func(o *Options) { + o.Database = db + } +} + +// Table defines the table the store should use +func Table(t string) Option { + return func(o *Options) { + o.Table = t + } +} + +// Size defines the maximum capacity of the store. +// Only applicable when using "ocmem" store +func Size(s int) Option { + return func(o *Options) { + o.Size = s + } +} + +// TTL defines the time to life for elements in the store. +// Only applicable when using "natsjs" store +func TTL(t time.Duration) Option { + return func(o *Options) { + o.TTL = t + } +} diff --git a/ocis-pkg/store/store.go b/ocis-pkg/store/store.go index 14d91d28793..380e0952832 100644 --- a/ocis-pkg/store/store.go +++ b/ocis-pkg/store/store.go @@ -2,8 +2,11 @@ package store import ( "context" - "strings" + "time" + natsjs "github.com/go-micro/plugins/v4/store/nats-js" + "github.com/go-micro/plugins/v4/store/redis" + "github.com/nats-io/nats.go" "github.com/owncloud/ocis/v2/ocis-pkg/store/etcd" "github.com/owncloud/ocis/v2/ocis-pkg/store/memory" "go-micro.dev/v4/store" @@ -11,7 +14,8 @@ import ( var ocMemStore *store.Store -type OcisStoreOptions struct { +// Options are the options to configure the store +type Options struct { // Type determines the implementation: // * "noop", for a noop store (it does nothing) // * "etcd", for etcd @@ -20,10 +24,8 @@ type OcisStoreOptions struct { // * "memory", for a in-memory implementation, which is the default if noone matches Type string - // Address is a comma-separated list of nodes that the store - // will use. This is currently usable only with the etcd implementation. If it - // isn't provided, "127.0.0.1:2379" will be the only node used. - Address string + // Address is a list of nodes that the store will use. + Addresses []string // Size configures the maximum capacity of the cache for // the "ocmem" implementation, in number of items that the cache can hold per table. @@ -31,33 +33,50 @@ type OcisStoreOptions struct { // The parameter only affects to the "ocmem" implementation, the rest will ignore it. // If an invalid value is used, the default of 512 will be used instead. Size int + + // Database the store should use (optional) + Database string + + // Table the store should use (optional) + Table string + + // TTL is the time to life for documents stored in the store + TTL time.Duration } -// GetStore returns a configured key-value store +// Create returns a configured key-value store // // Each microservice (or whatever piece is using the store) should use the // options available in the interface's operations to choose the right database // and table to prevent collisions with other microservices. // Recommended approach is to use "services" or "ocis-pkg" for the database, // and "services//" or "ocis-pkg//" for the package name. -func GetStore(ocisOpts OcisStoreOptions) store.Store { - var s store.Store - - addresses := strings.Split(ocisOpts.Address, ",") - opts := []store.Option{ - store.Nodes(addresses...), +func Create(opts ...Option) store.Store { + options := &Options{} + for _, o := range opts { + o(options) } - switch ocisOpts.Type { + storeopts := storeOptions(options) + + switch options.Type { + default: + // TODO: better to error in default case? + fallthrough + case "mem": + return store.NewMemoryStore(storeopts...) case "noop": - s = store.NewNoopStore(opts...) + return store.NewNoopStore(storeopts...) case "etcd": - s = etcd.NewEtcdStore(opts...) + return etcd.NewEtcdStore(storeopts...) + case "redis": + // FIXME redis plugin does not support redis cluster, sentinel or ring -> needs upstream patch or our implementation + return redis.NewStore(storeopts...) case "ocmem": if ocMemStore == nil { var memStore store.Store - sizeNum := ocisOpts.Size + sizeNum := options.Size if sizeNum <= 0 { memStore = memory.NewMultiMemStore() } else { @@ -73,9 +92,37 @@ func GetStore(ocisOpts OcisStoreOptions) store.Store { } ocMemStore = &memStore } - s = *ocMemStore - default: - s = store.NewMemoryStore(opts...) + return *ocMemStore + case "nats-js": + // TODO nats needs a DefaultTTL option as it does not support per Write TTL ... + // FIXME nats has restrictions on the key, we cannot use slashes AFAICT + // host, port, clusterid + return natsjs.NewStore( + append(storeopts, + natsjs.NatsOptions(nats.Options{Name: "TODO"}), + natsjs.DefaultTTL(options.TTL), + )..., + ) // TODO test with ocis nats } - return s +} + +func storeOptions(o *Options) []store.Option { + var opts []store.Option + + if o.Addresses != nil { + opts = append(opts, store.Nodes(o.Addresses...)) + } + + if o.Database != "" { + opts = append(opts, store.Database(o.Database)) + + } + + if o.Table != "" { + opts = append(opts, store.Table(o.Table)) + + } + + return opts + } diff --git a/ocis/pkg/runtime/service/service.go b/ocis/pkg/runtime/service/service.go index 84a25679f34..517d6418d0b 100644 --- a/ocis/pkg/runtime/service/service.go +++ b/ocis/pkg/runtime/service/service.go @@ -24,6 +24,7 @@ import ( appRegistry "github.com/owncloud/ocis/v2/services/app-registry/pkg/command" authbasic "github.com/owncloud/ocis/v2/services/auth-basic/pkg/command" authmachine "github.com/owncloud/ocis/v2/services/auth-machine/pkg/command" + eventhistory "github.com/owncloud/ocis/v2/services/eventhistory/pkg/command" frontend "github.com/owncloud/ocis/v2/services/frontend/pkg/command" gateway "github.com/owncloud/ocis/v2/services/gateway/pkg/command" graph "github.com/owncloud/ocis/v2/services/graph/pkg/command" @@ -129,6 +130,7 @@ func NewService(options ...Option) (*Service, error) { s.ServicesRegistry[opts.Config.Notifications.Service.Name] = notifications.NewSutureService s.ServicesRegistry[opts.Config.Search.Service.Name] = search.NewSutureService s.ServicesRegistry[opts.Config.Postprocessing.Service.Name] = postprocessing.NewSutureService + s.ServicesRegistry[opts.Config.EventHistory.Service.Name] = eventhistory.NewSutureService // populate delayed services s.Delayed[opts.Config.Sharing.Service.Name] = sharing.NewSutureService diff --git a/services/eventhistory/README.md b/services/eventhistory/README.md index 44d3d1dbffa..c369eb58583 100644 --- a/services/eventhistory/README.md +++ b/services/eventhistory/README.md @@ -1,15 +1,28 @@ -# Eventhistory service +# Eventhistory Service -The `eventhistory` consumes all events from the configured event systems, stores them and allows to retrieve them via an eventid +The `eventhistory` consumes all events from the configured event system like NATS, stores them and allows to retrieve them via an eventid. + +## Prerequisites + +Running the eventhistory service without an event sytem like NATS is not possible. ## Consuming -The `eventhistory` services consumes all events from the configured event sytem. Running it without an event sytem is not possible. +The `eventhistory` services consumes all events from the configured event sytem. ## Storing -The `eventhistory` stores each consumed event in the configured store. Possible stores are ? and ? but not ?. +The `eventhistory` service stores each consumed event via the configured store in `EVENTHISTORY_STORE_TYPE`. Possible stores are: + - `mem`: Basic in-memory store and the default. + - `ocmem`: Advanced in-memory store allowing max size. + - `redis`: Stores data in a configured redis cluster. + - `etcd`: Stores data in a configured etcd cluster. + - `nats-js`: Stores data using key-value-store feature of nats jetstream. https://docs.nats.io/nats-concepts/jetstream/key-value-store + - `noop`: Stores nothing. Useful for testing. Not recommended in productive enviroments. + +1. Note that in-memory stores are by nature not reboot persistant. +2. Events stay in the store for 2 weeks by default. Use `EVENTHISTORY_RECORD_EXPIRY` to adjust this value. ## Retrieving -Other services can call the `eventhistory` service via a grpc call to retrieve events. The request must contain the eventid that should be retrieved +Other services can call the `eventhistory` service via a grpc call to retrieve events. The request must contain the eventid that should be retrieved. diff --git a/services/eventhistory/pkg/command/root.go b/services/eventhistory/pkg/command/root.go index 04ff01f6bcb..73107f10856 100644 --- a/services/eventhistory/pkg/command/root.go +++ b/services/eventhistory/pkg/command/root.go @@ -45,7 +45,7 @@ type SutureService struct { func NewSutureService(cfg *ociscfg.Config) suture.Service { cfg.Notifications.Commons = cfg.Commons return SutureService{ - //cfg: cfg.Notifications, + cfg: cfg.EventHistory, } } diff --git a/services/eventhistory/pkg/command/server.go b/services/eventhistory/pkg/command/server.go index 228649c6381..cbf1678c903 100644 --- a/services/eventhistory/pkg/command/server.go +++ b/services/eventhistory/pkg/command/server.go @@ -3,11 +3,13 @@ package command import ( "context" "fmt" + "strings" "github.com/cs3org/reva/v2/pkg/events/stream" "github.com/oklog/run" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" ogrpc "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc" + "github.com/owncloud/ocis/v2/ocis-pkg/store" "github.com/owncloud/ocis/v2/ocis-pkg/version" "github.com/owncloud/ocis/v2/services/eventhistory/pkg/config" "github.com/owncloud/ocis/v2/services/eventhistory/pkg/config/parser" @@ -15,7 +17,6 @@ import ( "github.com/owncloud/ocis/v2/services/eventhistory/pkg/metrics" "github.com/owncloud/ocis/v2/services/eventhistory/pkg/server/grpc" "github.com/urfave/cli/v2" - "go-micro.dev/v4/store" ) // Server is the entrypoint for the server command. @@ -54,8 +55,13 @@ func Server(cfg *config.Config) *cli.Command { return err } - // TODO: configure store - st := store.DefaultStore + st := store.Create( + store.Type(cfg.Store.Type), + store.Addresses(strings.Split(cfg.Store.Addresses, ",")...), + store.Database(cfg.Store.Database), + store.Table(cfg.Store.Table), + store.TTL(cfg.Store.RecordExpiry), + ) service := grpc.NewService( grpc.Logger(logger), @@ -69,11 +75,11 @@ func Server(cfg *config.Config) *cli.Command { grpc.Store(st), ) - gr.Add(service.Run, func(_ error) { + gr.Add(service.Run, func(err error) { logger.Error(). Err(err). Str("server", "grpc"). - Msg("Shutting down server") + Msg("Shutting Down server") cancel() }) diff --git a/services/eventhistory/pkg/config/config.go b/services/eventhistory/pkg/config/config.go index 8e9dad27ad3..0ab9bae1686 100644 --- a/services/eventhistory/pkg/config/config.go +++ b/services/eventhistory/pkg/config/config.go @@ -34,7 +34,12 @@ type GRPCConfig struct { // Store configures the store to use type Store struct { - RecordExpiry time.Duration `yaml:"record_expiry" env:"RECORD_EXPIRY" desc:"time to life for events in the store"` + Type string `yaml:"type" env:"EVENTHISTORY_STORE_TYPE" desc:"The type of the eventhistory store. Supported values are: 'mem', 'ocmem', 'etcd', 'redis', 'nats-js', 'noop'. See the text description for details."` + Addresses string `yaml:"addresses" env:"EVENTHISTORY_STORE_ADDRESSES" desc:"A comma separated list of addresses the store should use. No effect on 'inmemory' implementations."` + Database string `yaml:"database" env:"EVENTHISTORY_STORE_DATABASE" desc:"(optional) The database the store should use."` + Table string `yaml:"table" env:"EVENTHISTORY_STORE_TABLE" desc:"(optional) The table the store should use."` + RecordExpiry time.Duration `yaml:"record_expiry" env:"EVENTHISTORY_RECORD_EXPIRY" desc:"Time to life for events in the store. The duration can be set as number followed by a unit identifier like s, m or h. Defaults to '336h' (2 weeks)."` + Size int `yaml:"size" env:"EVENTHISTORY_STORE_SIZE" desc:"The maximum amount of items in the store. Only applies when using store type 'ocmem'. Defaults to 512."` } // Events combines the configuration options for the event bus. diff --git a/services/eventhistory/pkg/config/defaults/defaultconfig.go b/services/eventhistory/pkg/config/defaults/defaultconfig.go index 0c7db160df1..a61d411c4b9 100644 --- a/services/eventhistory/pkg/config/defaults/defaultconfig.go +++ b/services/eventhistory/pkg/config/defaults/defaultconfig.go @@ -1,6 +1,8 @@ package defaults import ( + "time" + "github.com/owncloud/ocis/v2/ocis-pkg/shared" "github.com/owncloud/ocis/v2/services/eventhistory/pkg/config" ) @@ -19,6 +21,19 @@ func DefaultConfig() *config.Config { Service: config.Service{ Name: "eventhistory", }, + Events: config.Events{ + Endpoint: "127.0.0.1:9233", + Cluster: "ocis-cluster", + EnableTLS: false, + }, + Store: config.Store{ + Type: "mem", + RecordExpiry: 336 * time.Hour, + }, + GRPC: config.GRPCConfig{ + Addr: "127.0.0.1:0", + Namespace: "com.owncloud.api", + }, } } @@ -42,6 +57,15 @@ func EnsureDefaults(cfg *config.Config) { cfg.GRPCClientTLS = cfg.Commons.GRPCClientTLS } } + + if cfg.GRPC.TLS == nil { + cfg.GRPC.TLS = &shared.GRPCServiceTLS{} + if cfg.Commons != nil && cfg.Commons.GRPCServiceTLS != nil { + cfg.GRPC.TLS.Enabled = cfg.Commons.GRPCServiceTLS.Enabled + cfg.GRPC.TLS.Cert = cfg.Commons.GRPCServiceTLS.Cert + cfg.GRPC.TLS.Key = cfg.Commons.GRPCServiceTLS.Key + } + } } // Sanitize sanitizes the config diff --git a/services/eventhistory/pkg/server/grpc/server.go b/services/eventhistory/pkg/server/grpc/server.go index 3be2f6d8f1a..38b06713038 100644 --- a/services/eventhistory/pkg/server/grpc/server.go +++ b/services/eventhistory/pkg/server/grpc/server.go @@ -31,7 +31,7 @@ func NewService(opts ...Option) grpc.Service { return grpc.Service{} } - eh, err := svc.NewEventHistoryService(options.Config, options.Consumer, options.Store) + eh, err := svc.NewEventHistoryService(options.Config, options.Consumer, options.Store, options.Logger) if err != nil { options.Logger.Fatal().Err(err).Msg("Error creating event history service") return grpc.Service{} diff --git a/services/eventhistory/pkg/service/service.go b/services/eventhistory/pkg/service/service.go index ad1e2ab3966..9a34f1d7491 100644 --- a/services/eventhistory/pkg/service/service.go +++ b/services/eventhistory/pkg/service/service.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/cs3org/reva/v2/pkg/events" + "github.com/owncloud/ocis/v2/ocis-pkg/log" ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0" ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" "github.com/owncloud/ocis/v2/services/eventhistory/pkg/config" @@ -16,10 +17,11 @@ type EventHistoryService struct { ch <-chan events.Event store store.Store cfg *config.Config + log log.Logger } // NewEventHistoryService returns an EventHistory service -func NewEventHistoryService(cfg *config.Config, consumer events.Consumer, store store.Store) (*EventHistoryService, error) { +func NewEventHistoryService(cfg *config.Config, consumer events.Consumer, store store.Store, log log.Logger) (*EventHistoryService, error) { if consumer == nil || store == nil { return nil, fmt.Errorf("Need non nil consumer (%v) and store (%v) to work properly", consumer, store) } @@ -29,7 +31,7 @@ func NewEventHistoryService(cfg *config.Config, consumer events.Consumer, store return nil, err } - eh := &EventHistoryService{ch: ch, store: store, cfg: cfg} + eh := &EventHistoryService{ch: ch, store: store, cfg: cfg, log: log} go eh.StoreEvents() return eh, nil @@ -42,8 +44,12 @@ func (eh *EventHistoryService) StoreEvents() { Key: event.ID, Value: event.Event.([]byte), Expiry: eh.cfg.Store.RecordExpiry, + Metadata: map[string]interface{}{ + "type": event.Type, + }, }); err != nil { // we can't store. That's it for us. + eh.log.Error().Err(err).Str("eventid", event.ID).Msg("could not store event") return } } @@ -54,15 +60,16 @@ func (eh *EventHistoryService) GetEvents(ctx context.Context, req *ehsvc.GetEven for _, id := range req.Ids { evs, err := eh.store.Read(id) if err != nil { - // TODO: Handle! - // return? - // gather errors and add to response? + if err != store.ErrNotFound { + eh.log.Error().Err(err).Str("eventid", id).Msg("could not read event") + } continue } resp.Events = append(resp.Events, &ehmsg.Event{ Id: id, Event: evs[0].Value, + Type: evs[0].Metadata["type"].(string), }) } diff --git a/services/eventhistory/pkg/service/service_suit_test.go b/services/eventhistory/pkg/service/service_suit_test.go new file mode 100644 index 00000000000..5ad83d80f4e --- /dev/null +++ b/services/eventhistory/pkg/service/service_suit_test.go @@ -0,0 +1,13 @@ +package service_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestSearch(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Service Suite") +} diff --git a/services/eventhistory/pkg/service/service_test.go b/services/eventhistory/pkg/service/service_test.go index 944a1f69f35..da13eae9dc7 100644 --- a/services/eventhistory/pkg/service/service_test.go +++ b/services/eventhistory/pkg/service/service_test.go @@ -1,3 +1,89 @@ package service_test -// tests here +import ( + "context" + "encoding/json" + "reflect" + "time" + + "github.com/cs3org/reva/v2/pkg/events" + "github.com/google/uuid" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/owncloud/ocis/v2/ocis-pkg/log" + "github.com/owncloud/ocis/v2/ocis-pkg/store" + ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" + "github.com/owncloud/ocis/v2/services/eventhistory/pkg/config" + "github.com/owncloud/ocis/v2/services/eventhistory/pkg/service" + microevents "go-micro.dev/v4/events" + microstore "go-micro.dev/v4/store" +) + +var _ = Describe("EventHistoryService", func() { + var ( + cfg = &config.Config{} + + eh *service.EventHistoryService + bus testBus + sto microstore.Store + ) + + BeforeEach(func() { + var err error + sto = store.Create() + bus = testBus(make(chan events.Event)) + eh, err = service.NewEventHistoryService(cfg, bus, sto, log.Logger{}) + Expect(err).ToNot(HaveOccurred()) + + }) + + AfterEach(func() { + close(bus) + }) + + It("Records events, stores them and allows to retrieve them", func() { + id := bus.Publish(events.UploadReady{}) + + // service will store eventually + time.Sleep(500 * time.Millisecond) + + resp := &ehsvc.GetEventsResponse{} + err := eh.GetEvents(context.Background(), &ehsvc.GetEventsRequest{Ids: []string{id}}, resp) + Expect(err).ToNot(HaveOccurred()) + Expect(resp).ToNot(BeNil()) + + Expect(len(resp.Events)).To(Equal(1)) + Expect(resp.Events[0].Id).To(Equal(id)) + + }) +}) + +type testBus chan events.Event + +func (tb testBus) Consume(_ string, _ ...microevents.ConsumeOption) (<-chan microevents.Event, error) { + ch := make(chan microevents.Event) + go func() { + for ev := range tb { + b, _ := json.Marshal(ev.Event) + ch <- microevents.Event{ + Payload: b, + Metadata: map[string]string{ + events.MetadatakeyEventID: ev.ID, + events.MetadatakeyEventType: ev.Type, + }, + } + } + }() + return ch, nil +} + +func (tb testBus) Publish(e interface{}) string { + ev := events.Event{ + ID: uuid.New().String(), + Type: reflect.TypeOf(e).String(), + Event: e, + } + + tb <- ev + return ev.ID +} diff --git a/services/graph/pkg/service/v0/service.go b/services/graph/pkg/service/v0/service.go index ad6705e78a9..9cc30c0ba90 100644 --- a/services/graph/pkg/service/v0/service.go +++ b/services/graph/pkg/service/v0/service.go @@ -7,6 +7,7 @@ import ( "net/http" "os" "strconv" + "strings" "time" "github.com/go-chi/chi/v5" @@ -156,10 +157,10 @@ func NewService(opts ...Option) (Graph, error) { roleManager := options.RoleManager if roleManager == nil { - storeOptions := store.OcisStoreOptions{ - Type: options.Config.CacheStore.Type, - Address: options.Config.CacheStore.Address, - Size: options.Config.CacheStore.Size, + storeOptions := []store.Option{ + store.Type(options.Config.CacheStore.Type), + store.Addresses(strings.Split(options.Config.CacheStore.Address, ",")...), + store.Size(options.Config.CacheStore.Size), } m := roles.NewManager( roles.StoreOptions(storeOptions),