Skip to content

Commit

Permalink
sharpen eventhistory service
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <[email protected]>
  • Loading branch information
kobergj committed Feb 20, 2023
1 parent 5aa1f6e commit 109ffc0
Show file tree
Hide file tree
Showing 20 changed files with 309 additions and 48 deletions.
1 change: 1 addition & 0 deletions .drone.star
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ config = {
"services/auth-basic",
"services/auth-bearer",
"services/auth-machine",
"services/eventhistory",
"services/frontend",
"services/gateway",
"services/graph",
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ OCIS_MODULES = \
services/auth-basic \
services/auth-bearer \
services/auth-machine \
services/eventhistory \
services/frontend \
services/gateway \
services/graph \
Expand Down
2 changes: 1 addition & 1 deletion changelog/unreleased/event-history.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Enhancement: Eventhistory service

Introduces the `eventhistory` service. It is a service that is storing events and providing a grpc API to retrieve them
Introduces the `eventhistory` service. It is a service that stores events and provides a grpc API to retrieve them.

https://github.com/owncloud/ocis/pull/5600
2 changes: 2 additions & 0 deletions ocis-pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
authbasic "github.com/owncloud/ocis/v2/services/auth-basic/pkg/config"
authbearer "github.com/owncloud/ocis/v2/services/auth-bearer/pkg/config"
authmachine "github.com/owncloud/ocis/v2/services/auth-machine/pkg/config"
eventhistory "github.com/owncloud/ocis/v2/services/eventhistory/pkg/config"
frontend "github.com/owncloud/ocis/v2/services/frontend/pkg/config"
gateway "github.com/owncloud/ocis/v2/services/gateway/pkg/config"
graph "github.com/owncloud/ocis/v2/services/graph/pkg/config"
Expand Down Expand Up @@ -84,6 +85,7 @@ type Config struct {
AuthBasic *authbasic.Config `yaml:"auth_basic"`
AuthBearer *authbearer.Config `yaml:"auth_bearer"`
AuthMachine *authmachine.Config `yaml:"auth_machine"`
EventHistory *eventhistory.Config `yaml:"eventhistory"`
Frontend *frontend.Config `yaml:"frontend"`
Gateway *gateway.Config `yaml:"gateway"`
Graph *graph.Config `yaml:"graph"`
Expand Down
2 changes: 2 additions & 0 deletions ocis-pkg/config/defaultconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
authbasic "github.com/owncloud/ocis/v2/services/auth-basic/pkg/config/defaults"
authbearer "github.com/owncloud/ocis/v2/services/auth-bearer/pkg/config/defaults"
authmachine "github.com/owncloud/ocis/v2/services/auth-machine/pkg/config/defaults"
eventhistory "github.com/owncloud/ocis/v2/services/eventhistory/pkg/config/defaults"
frontend "github.com/owncloud/ocis/v2/services/frontend/pkg/config/defaults"
gateway "github.com/owncloud/ocis/v2/services/gateway/pkg/config/defaults"
graph "github.com/owncloud/ocis/v2/services/graph/pkg/config/defaults"
Expand Down Expand Up @@ -48,6 +49,7 @@ func DefaultConfig() *Config {
AuthBasic: authbasic.DefaultConfig(),
AuthBearer: authbearer.DefaultConfig(),
AuthMachine: authmachine.DefaultConfig(),
EventHistory: eventhistory.DefaultConfig(),
Frontend: frontend.DefaultConfig(),
Gateway: gateway.DefaultConfig(),
Graph: graph.DefaultConfig(),
Expand Down
2 changes: 1 addition & 1 deletion ocis-pkg/roles/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Manager struct {
func NewManager(o ...Option) Manager {
opts := newOptions(o...)

nStore := ocisstore.GetStore(opts.storeOptions)
nStore := ocisstore.Create(opts.storeOptions...)
return Manager{
cache: nStore,
roleService: opts.roleService,
Expand Down
5 changes: 3 additions & 2 deletions ocis-pkg/roles/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

// Options are all the possible options.
type Options struct {
storeOptions ocisstore.OcisStoreOptions
storeOptions []ocisstore.Option
logger log.Logger
roleService settingssvc.RoleService
}
Expand All @@ -30,7 +30,8 @@ func RoleService(rs settingssvc.RoleService) Option {
}
}

func StoreOptions(storeOpts ocisstore.OcisStoreOptions) Option {
// StoreOptions are the options for the store
func StoreOptions(storeOpts []ocisstore.Option) Option {
return func(o *Options) {
o.storeOptions = storeOpts
}
Expand Down
50 changes: 50 additions & 0 deletions ocis-pkg/store/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package store

import "time"

// Option provides an option to configure the store
type Option func(*Options)

// Type defines the type of the store
func Type(typ string) Option {
return func(o *Options) {
o.Type = typ
}
}

// Addresses defines the addresses where the store can be reached
func Addresses(addrs ...string) Option {
return func(o *Options) {
o.Addresses = addrs
}
}

// Database defines the Database the store should use
func Database(db string) Option {
return func(o *Options) {
o.Database = db
}
}

// Table defines the table the store should use
func Table(t string) Option {
return func(o *Options) {
o.Table = t
}
}

// Size defines the maximum capacity of the store.
// Only applicable when using "ocmem" store
func Size(s int) Option {
return func(o *Options) {
o.Size = s
}
}

// TTL defines the time to life for elements in the store.
// Only applicable when using "natsjs" store
func TTL(t time.Duration) Option {
return func(o *Options) {
o.TTL = t
}
}
89 changes: 68 additions & 21 deletions ocis-pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ package store

import (
"context"
"strings"
"time"

natsjs "github.com/go-micro/plugins/v4/store/nats-js"
"github.com/go-micro/plugins/v4/store/redis"
"github.com/nats-io/nats.go"
"github.com/owncloud/ocis/v2/ocis-pkg/store/etcd"
"github.com/owncloud/ocis/v2/ocis-pkg/store/memory"
"go-micro.dev/v4/store"
)

var ocMemStore *store.Store

type OcisStoreOptions struct {
// Options are the options to configure the store
type Options struct {
// Type determines the implementation:
// * "noop", for a noop store (it does nothing)
// * "etcd", for etcd
Expand All @@ -20,44 +24,59 @@ type OcisStoreOptions struct {
// * "memory", for a in-memory implementation, which is the default if noone matches
Type string

// Address is a comma-separated list of nodes that the store
// will use. This is currently usable only with the etcd implementation. If it
// isn't provided, "127.0.0.1:2379" will be the only node used.
Address string
// Address is a list of nodes that the store will use.
Addresses []string

// Size configures the maximum capacity of the cache for
// the "ocmem" implementation, in number of items that the cache can hold per table.
// You can use 5000 to make the cache hold up to 5000 elements.
// The parameter only affects to the "ocmem" implementation, the rest will ignore it.
// If an invalid value is used, the default of 512 will be used instead.
Size int

// Database the store should use (optional)
Database string

// Table the store should use (optional)
Table string

// TTL is the time to life for documents stored in the store
TTL time.Duration
}

// GetStore returns a configured key-value store
// Create returns a configured key-value store
//
// Each microservice (or whatever piece is using the store) should use the
// options available in the interface's operations to choose the right database
// and table to prevent collisions with other microservices.
// Recommended approach is to use "services" or "ocis-pkg" for the database,
// and "services/<service-name>/" or "ocis-pkg/<pkg>/" for the package name.
func GetStore(ocisOpts OcisStoreOptions) store.Store {
var s store.Store

addresses := strings.Split(ocisOpts.Address, ",")
opts := []store.Option{
store.Nodes(addresses...),
func Create(opts ...Option) store.Store {
options := &Options{}
for _, o := range opts {
o(options)
}

switch ocisOpts.Type {
storeopts := storeOptions(options)

switch options.Type {
default:
// TODO: better to error in default case?
fallthrough
case "mem":
return store.NewMemoryStore(storeopts...)
case "noop":
s = store.NewNoopStore(opts...)
return store.NewNoopStore(storeopts...)
case "etcd":
s = etcd.NewEtcdStore(opts...)
return etcd.NewEtcdStore(storeopts...)
case "redis":
// FIXME redis plugin does not support redis cluster, sentinel or ring -> needs upstream patch or our implementation
return redis.NewStore(storeopts...)
case "ocmem":
if ocMemStore == nil {
var memStore store.Store

sizeNum := ocisOpts.Size
sizeNum := options.Size
if sizeNum <= 0 {
memStore = memory.NewMultiMemStore()
} else {
Expand All @@ -73,9 +92,37 @@ func GetStore(ocisOpts OcisStoreOptions) store.Store {
}
ocMemStore = &memStore
}
s = *ocMemStore
default:
s = store.NewMemoryStore(opts...)
return *ocMemStore
case "nats-js":
// TODO nats needs a DefaultTTL option as it does not support per Write TTL ...
// FIXME nats has restrictions on the key, we cannot use slashes AFAICT
// host, port, clusterid
return natsjs.NewStore(
append(storeopts,
natsjs.NatsOptions(nats.Options{Name: "TODO"}),
natsjs.DefaultTTL(options.TTL),
)...,
) // TODO test with ocis nats
}
return s
}

func storeOptions(o *Options) []store.Option {
var opts []store.Option

if o.Addresses != nil {
opts = append(opts, store.Nodes(o.Addresses...))
}

if o.Database != "" {
opts = append(opts, store.Database(o.Database))

}

if o.Table != "" {
opts = append(opts, store.Table(o.Table))

}

return opts

}
2 changes: 2 additions & 0 deletions ocis/pkg/runtime/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
appRegistry "github.com/owncloud/ocis/v2/services/app-registry/pkg/command"
authbasic "github.com/owncloud/ocis/v2/services/auth-basic/pkg/command"
authmachine "github.com/owncloud/ocis/v2/services/auth-machine/pkg/command"
eventhistory "github.com/owncloud/ocis/v2/services/eventhistory/pkg/command"
frontend "github.com/owncloud/ocis/v2/services/frontend/pkg/command"
gateway "github.com/owncloud/ocis/v2/services/gateway/pkg/command"
graph "github.com/owncloud/ocis/v2/services/graph/pkg/command"
Expand Down Expand Up @@ -129,6 +130,7 @@ func NewService(options ...Option) (*Service, error) {
s.ServicesRegistry[opts.Config.Notifications.Service.Name] = notifications.NewSutureService
s.ServicesRegistry[opts.Config.Search.Service.Name] = search.NewSutureService
s.ServicesRegistry[opts.Config.Postprocessing.Service.Name] = postprocessing.NewSutureService
s.ServicesRegistry[opts.Config.EventHistory.Service.Name] = eventhistory.NewSutureService

// populate delayed services
s.Delayed[opts.Config.Sharing.Service.Name] = sharing.NewSutureService
Expand Down
23 changes: 18 additions & 5 deletions services/eventhistory/README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
# Eventhistory service
# Eventhistory Service

The `eventhistory` consumes all events from the configured event systems, stores them and allows to retrieve them via an eventid
The `eventhistory` consumes all events from the configured event system like NATS, stores them and allows to retrieve them via an eventid.

## Prerequisites

Running the eventhistory service without an event sytem like NATS is not possible.

## Consuming

The `eventhistory` services consumes all events from the configured event sytem. Running it without an event sytem is not possible.
The `eventhistory` services consumes all events from the configured event sytem.

## Storing

The `eventhistory` stores each consumed event in the configured store. Possible stores are ? and ? but not ?.
The `eventhistory` service stores each consumed event via the configured store in `EVENTHISTORY_STORE_TYPE`. Possible stores are:
- `mem`: Basic in-memory store and the default.
- `ocmem`: Advanced in-memory store allowing max size.
- `redis`: Stores data in a configured redis cluster.
- `etcd`: Stores data in a configured etcd cluster.
- `nats-js`: Stores data using key-value-store feature of nats jetstream. https://docs.nats.io/nats-concepts/jetstream/key-value-store
- `noop`: Stores nothing. Useful for testing. Not recommended in productive enviroments.

1. Note that in-memory stores are by nature not reboot persistant.
2. Events stay in the store for 2 weeks by default. Use `EVENTHISTORY_RECORD_EXPIRY` to adjust this value.

## Retrieving

Other services can call the `eventhistory` service via a grpc call to retrieve events. The request must contain the eventid that should be retrieved
Other services can call the `eventhistory` service via a grpc call to retrieve events. The request must contain the eventid that should be retrieved.
2 changes: 1 addition & 1 deletion services/eventhistory/pkg/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type SutureService struct {
func NewSutureService(cfg *ociscfg.Config) suture.Service {
cfg.Notifications.Commons = cfg.Commons
return SutureService{
//cfg: cfg.Notifications,
cfg: cfg.EventHistory,
}
}

Expand Down
16 changes: 11 additions & 5 deletions services/eventhistory/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@ package command
import (
"context"
"fmt"
"strings"

"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
ogrpc "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/store"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/config"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/config/parser"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/logging"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/metrics"
"github.com/owncloud/ocis/v2/services/eventhistory/pkg/server/grpc"
"github.com/urfave/cli/v2"
"go-micro.dev/v4/store"
)

// Server is the entrypoint for the server command.
Expand Down Expand Up @@ -54,8 +55,13 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

// TODO: configure store
st := store.DefaultStore
st := store.Create(
store.Type(cfg.Store.Type),
store.Addresses(strings.Split(cfg.Store.Addresses, ",")...),
store.Database(cfg.Store.Database),
store.Table(cfg.Store.Table),
store.TTL(cfg.Store.RecordExpiry),
)

service := grpc.NewService(
grpc.Logger(logger),
Expand All @@ -69,11 +75,11 @@ func Server(cfg *config.Config) *cli.Command {
grpc.Store(st),
)

gr.Add(service.Run, func(_ error) {
gr.Add(service.Run, func(err error) {
logger.Error().
Err(err).
Str("server", "grpc").
Msg("Shutting down server")
Msg("Shutting Down server")

cancel()
})
Expand Down
7 changes: 6 additions & 1 deletion services/eventhistory/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ type GRPCConfig struct {

// Store configures the store to use
type Store struct {
RecordExpiry time.Duration `yaml:"record_expiry" env:"RECORD_EXPIRY" desc:"time to life for events in the store"`
Type string `yaml:"type" env:"EVENTHISTORY_STORE_TYPE" desc:"The type of the eventhistory store. Supported values are: 'mem', 'ocmem', 'etcd', 'redis', 'nats-js', 'noop'. See the text description for details."`
Addresses string `yaml:"addresses" env:"EVENTHISTORY_STORE_ADDRESSES" desc:"A comma separated list of addresses the store should use. No effect on 'inmemory' implementations."`
Database string `yaml:"database" env:"EVENTHISTORY_STORE_DATABASE" desc:"(optional) The database the store should use."`
Table string `yaml:"table" env:"EVENTHISTORY_STORE_TABLE" desc:"(optional) The table the store should use."`
RecordExpiry time.Duration `yaml:"record_expiry" env:"EVENTHISTORY_RECORD_EXPIRY" desc:"Time to life for events in the store. The duration can be set as number followed by a unit identifier like s, m or h. Defaults to '336h' (2 weeks)."`
Size int `yaml:"size" env:"EVENTHISTORY_STORE_SIZE" desc:"The maximum amount of items in the store. Only applies when using store type 'ocmem'. Defaults to 512."`
}

// Events combines the configuration options for the event bus.
Expand Down
Loading

0 comments on commit 109ffc0

Please sign in to comment.