Skip to content

Commit

Permalink
sse stream per user
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <[email protected]>
  • Loading branch information
kobergj committed Apr 3, 2023
1 parent cff9845 commit 6bcdd34
Show file tree
Hide file tree
Showing 6 changed files with 386 additions and 24 deletions.
2 changes: 1 addition & 1 deletion ocis-pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type Config struct {
Policies *policies.Config `yaml:"policies"`
Proxy *proxy.Config `yaml:"proxy"`
Settings *settings.Config `yaml:"settings"`
Hub *hub.Config `yaml:"settings"`
Hub *hub.Config `yaml:"hub"`
Sharing *sharing.Config `yaml:"sharing"`
StorageSystem *storagesystem.Config `yaml:"storage_system"`
StoragePublicLink *storagepublic.Config `yaml:"storage_public"`
Expand Down
25 changes: 24 additions & 1 deletion services/hub/pkg/config/defaults/defaultconfig.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package defaults

import (
"github.com/owncloud/ocis/v2/services/hub/pkg/config"
"strings"

"github.com/owncloud/ocis/v2/ocis-pkg/shared"
"github.com/owncloud/ocis/v2/services/hub/pkg/config"
)

// FullDefaultConfig used by doc generation
Expand All @@ -24,9 +26,16 @@ func DefaultConfig() *config.Config {
Namespace: "com.owncloud.web",
Root: "/",
},
Reva: shared.DefaultRevaConfig(),
Events: config.Events{
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
EnableTLS: false,
},
}
}

// EnsureDefaults ensures default values
func EnsureDefaults(cfg *config.Config) {
if cfg.TokenManager == nil && cfg.Commons != nil && cfg.Commons.TokenManager != nil {
cfg.TokenManager = &config.TokenManager{
Expand All @@ -35,8 +44,22 @@ func EnsureDefaults(cfg *config.Config) {
} else if cfg.TokenManager == nil {
cfg.TokenManager = &config.TokenManager{}
}

if cfg.MachineAuthAPIKey == "" && cfg.Commons != nil && cfg.Commons.MachineAuthAPIKey != "" {
cfg.MachineAuthAPIKey = cfg.Commons.MachineAuthAPIKey
}

if cfg.Reva == nil && cfg.Commons != nil && cfg.Commons.Reva != nil {
cfg.Reva = &shared.Reva{
Address: cfg.Commons.Reva.Address,
TLS: cfg.Commons.Reva.TLS,
}
} else if cfg.Reva == nil {
cfg.Reva = &shared.Reva{}
}
}

// Sanitize saniztizes the config
func Sanitize(cfg *config.Config) {
// sanitize config
if cfg.HTTP.Root != "/" {
Expand Down
3 changes: 3 additions & 0 deletions services/hub/pkg/config/parser/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,8 @@ func Validate(cfg *config.Config) error {
return shared.MissingJWTTokenError(cfg.Service.Name)
}

if cfg.MachineAuthAPIKey == "" {
return shared.MissingMachineAuthApiKeyError(cfg.Service.Name)
}
return nil
}
11 changes: 11 additions & 0 deletions services/hub/pkg/service/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package service

// UploadReady informs an client that an upload is ready to work with
type UploadReady struct {
FileID string
SpaceID string
Filename string
Timestamp string

Message string
}
68 changes: 65 additions & 3 deletions services/hub/pkg/service/service.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
package service

import (
"crypto/tls"
"crypto/x509"
"log"
"net/http"
"os"

"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/go-chi/chi/v5"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/owncloud/ocis/v2/ocis-pkg/account"
"github.com/owncloud/ocis/v2/ocis-pkg/crypto"
opkgm "github.com/owncloud/ocis/v2/ocis-pkg/middleware"
"github.com/owncloud/ocis/v2/services/hub/pkg/config"
"net/http"
)

// Service defines the service handlers.

type Service struct {
m *chi.Mux
}
Expand All @@ -23,8 +31,22 @@ func New(cfg *config.Config) Service {
),
)

s, err := NewSSE(cfg)
if err != nil {
log.Fatal("cant initiate sse", err)
}

ch, err := eventsConsumer(cfg.Events)
if err != nil {
log.Fatal("cant consume events", err)
}

go s.ListenForEvents(ch)

m.Route("/hub", func(r chi.Router) {
r.Route("/sse", ServeSSE)
r.Route("/sse", func(r chi.Router) {
r.Get("/", s.ServeHTTP)
})
})

svc := Service{
Expand All @@ -34,6 +56,46 @@ func New(cfg *config.Config) Service {
return svc
}

func eventsConsumer(evtsCfg config.Events) (<-chan events.Event, error) {
var tlsConf *tls.Config
if evtsCfg.EnableTLS {
var rootCAPool *x509.CertPool
if evtsCfg.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate)
if err != nil {
return nil, err
}

rootCAPool, err = crypto.NewCertPoolFromPEM(rootCrtFile)
if err != nil {
return nil, err
}
evtsCfg.TLSInsecure = false
}

tlsConf = &tls.Config{
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: evtsCfg.TLSInsecure, //nolint:gosec
RootCAs: rootCAPool,
}
}
client, err := stream.Nats(
natsjs.TLSConfig(tlsConf),
natsjs.Address(evtsCfg.Endpoint),
natsjs.ClusterID(evtsCfg.Cluster),
)
if err != nil {
return nil, err
}

evts, err := events.Consume(client, "hub", events.UploadReady{})
if err != nil {
return nil, err
}

return evts, nil
}

// ServeHTTP implements the Service interface.
func (s Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.m.ServeHTTP(w, r)
Expand Down
Loading

0 comments on commit 6bcdd34

Please sign in to comment.