Skip to content

Commit

Permalink
Public share json persistence (#3199)
Browse files Browse the repository at this point in the history
* extract file persistence

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

* add json memory 'persistence'

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

* add json cs3 persistence

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

* Improve naming, make code testable

* Return errtypes.NotFound if statting a non-existent file

* Make sure to not overwrite data on the storage. Pass on context.

* Register file, disk and cs3 flavors of the json public share manager

* Implement Load() for the json public share manager

* Add changelog

* Fix hound issues

* Fix linter issues

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
Co-authored-by: André Duffeck <[email protected]>
  • Loading branch information
butonic and aduffeck authored Sep 9, 2022
1 parent 62a0ae0 commit 057b14c
Show file tree
Hide file tree
Showing 8 changed files with 605 additions and 114 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/json-publicshare-manager.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Add support for cs3 storage backends to the json publicshare manager

We enhanced the json publicshare manager to support a cs3 storage backend alongside the file and memory backends.

https://github.com/cs3org/reva/pull/3199
193 changes: 120 additions & 73 deletions pkg/publicshare/manager/json/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"sync"
Expand All @@ -43,72 +41,121 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/publicshare"
"github.com/cs3org/reva/v2/pkg/publicshare/manager/json/persistence"
"github.com/cs3org/reva/v2/pkg/publicshare/manager/json/persistence/cs3"
"github.com/cs3org/reva/v2/pkg/publicshare/manager/json/persistence/file"
"github.com/cs3org/reva/v2/pkg/publicshare/manager/json/persistence/memory"
"github.com/cs3org/reva/v2/pkg/publicshare/manager/registry"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)

func init() {
registry.Register("json", New)
registry.Register("json", NewFile)
registry.Register("jsoncs3", NewCS3)
registry.Register("jsonmemory", NewMemory)
}

// New returns a new filesystem public shares manager.
func New(c map[string]interface{}) (publicshare.Manager, error) {
conf := &config{}
// NewFile returns a new filesystem public shares manager.
func NewFile(c map[string]interface{}) (publicshare.Manager, error) {
conf := &fileConfig{}
if err := mapstructure.Decode(c, conf); err != nil {
return nil, err
}

conf.init()
if conf.File == "" {
conf.File = "/var/tmp/reva/publicshares"
}

m := manager{
gatewayAddr: conf.GatewayAddr,
mutex: &sync.Mutex{},
file: conf.File,
passwordHashCost: conf.SharePasswordHashCost,
janitorRunInterval: conf.JanitorRunInterval,
enableExpiredSharesCleanup: conf.EnableExpiredSharesCleanup,
}

// attempt to create the db file
var fi os.FileInfo
var err error
if fi, err = os.Stat(m.file); os.IsNotExist(err) {
folder := filepath.Dir(m.file)
if err := os.MkdirAll(folder, 0755); err != nil {
return nil, err
}
if _, err := os.Create(m.file); err != nil {
return nil, err
}
p := file.New(conf.File)
if err := p.Init(context.Background()); err != nil {
return nil, err
}

if fi == nil || fi.Size() == 0 {
err := ioutil.WriteFile(m.file, []byte("{}"), 0644)
if err != nil {
return nil, err
}
return New(conf.GatewayAddr, conf.SharePasswordHashCost, conf.JanitorRunInterval, conf.EnableExpiredSharesCleanup, p)
}

// NewMemory returns a new in-memory public shares manager.
func NewMemory(c map[string]interface{}) (publicshare.Manager, error) {
conf := &commonConfig{}
if err := mapstructure.Decode(c, conf); err != nil {
return nil, err
}

go m.startJanitorRun()
conf.init()
p := memory.New()

if err := p.Init(context.Background()); err != nil {
return nil, err
}

return New(conf.GatewayAddr, conf.SharePasswordHashCost, conf.JanitorRunInterval, conf.EnableExpiredSharesCleanup, p)
}

// NewCS3 returns a new cs3 public shares manager.
func NewCS3(c map[string]interface{}) (publicshare.Manager, error) {
conf := &cs3Config{}
if err := mapstructure.Decode(c, conf); err != nil {
return nil, err
}

conf.init()

s, err := metadata.NewCS3Storage(conf.ProviderAddr, conf.ProviderAddr, conf.ServiceUserID, conf.ServiceUserIdp, conf.MachineAuthAPIKey)
if err != nil {
return nil, err
}
p := cs3.New(s)

if err := p.Init(context.Background()); err != nil {
return nil, err
}

return New(conf.GatewayAddr, conf.SharePasswordHashCost, conf.JanitorRunInterval, conf.EnableExpiredSharesCleanup, p)
}

// New returns a new public share manager instance
func New(gwAddr string, pwHashCost, janitorRunInterval int, enableCleanup bool, p persistence.Persistence) (publicshare.Manager, error) {
m := &manager{
gatewayAddr: gwAddr,
mutex: &sync.Mutex{},
passwordHashCost: pwHashCost,
janitorRunInterval: janitorRunInterval,
enableExpiredSharesCleanup: enableCleanup,
persistence: p,
}

return &m, nil
go m.startJanitorRun()
return m, nil
}

type config struct {
type commonConfig struct {
GatewayAddr string `mapstructure:"gateway_addr"`
File string `mapstructure:"file"`
SharePasswordHashCost int `mapstructure:"password_hash_cost"`
JanitorRunInterval int `mapstructure:"janitor_run_interval"`
EnableExpiredSharesCleanup bool `mapstructure:"enable_expired_shares_cleanup"`
}

func (c *config) init() {
if c.File == "" {
c.File = "/var/tmp/reva/publicshares"
}
type fileConfig struct {
commonConfig `mapstructure:",squash"`

File string `mapstructure:"file"`
}

type cs3Config struct {
commonConfig `mapstructure:",squash"`

ProviderAddr string `mapstructure:"provider_addr"`
ServiceUserID string `mapstructure:"service_user_id"`
ServiceUserIdp string `mapstructure:"service_user_idp"`
MachineAuthAPIKey string `mapstructure:"machine_auth_apikey"`
}

func (c *commonConfig) init() {
if c.SharePasswordHashCost == 0 {
c.SharePasswordHashCost = 11
}
Expand All @@ -120,7 +167,7 @@ func (c *config) init() {
type manager struct {
gatewayAddr string
mutex *sync.Mutex
file string
persistence persistence.Persistence

passwordHashCost int
janitorRunInterval int
Expand Down Expand Up @@ -153,7 +200,7 @@ func (m *manager) Dump(ctx context.Context, shareChan chan<- *publicshare.WithPa
m.mutex.Lock()
defer m.mutex.Unlock()

db, err := m.readDb()
db, err := m.persistence.Read(ctx)
if err != nil {
return err
}
Expand All @@ -170,6 +217,27 @@ func (m *manager) Dump(ctx context.Context, shareChan chan<- *publicshare.WithPa
return nil
}

// Load imports public shares and received shares from channels (e.g. during migration)
func (m *manager) Load(ctx context.Context, shareChan <-chan *publicshare.WithPassword) error {
db, err := m.persistence.Read(ctx)
if err != nil {
return err
}

for ps := range shareChan {
encShare, err := utils.MarshalProtoV1ToJSON(&ps.PublicShare)
if err != nil {
return err
}

db[ps.PublicShare.Id.GetOpaqueId()] = map[string]interface{}{
"share": string(encShare),
"password": ps.Password,
}
}
return m.persistence.Write(ctx, db)
}

// CreatePublicShare adds a new entry to manager.shares
func (m *manager) CreatePublicShare(ctx context.Context, u *user.User, rInfo *provider.ResourceInfo, g *link.Grant) (*link.PublicShare, error) {
id := &link.PublicShareId{
Expand Down Expand Up @@ -230,7 +298,7 @@ func (m *manager) CreatePublicShare(ctx context.Context, u *user.User, rInfo *pr
return nil, err
}

db, err := m.readDb()
db, err := m.persistence.Read(ctx)
if err != nil {
return nil, err
}
Expand All @@ -244,7 +312,7 @@ func (m *manager) CreatePublicShare(ctx context.Context, u *user.User, rInfo *pr
return nil, errors.New("key already exists")
}

err = m.writeDb(db)
err = m.persistence.Write(ctx, db)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -303,7 +371,7 @@ func (m *manager) UpdatePublicShare(ctx context.Context, u *user.User, req *link
m.mutex.Lock()
defer m.mutex.Unlock()

db, err := m.readDb()
db, err := m.persistence.Read(ctx)
if err != nil {
return nil, err
}
Expand All @@ -325,7 +393,7 @@ func (m *manager) UpdatePublicShare(ctx context.Context, u *user.User, req *link

db[share.Id.OpaqueId] = data

err = m.writeDb(db)
err = m.persistence.Write(ctx, db)
if err != nil {
return nil, err
}
Expand All @@ -352,7 +420,7 @@ func (m *manager) GetPublicShare(ctx context.Context, u *user.User, ref *link.Pu
m.mutex.Lock()
defer m.mutex.Unlock()

db, err := m.readDb()
db, err := m.persistence.Read(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -393,7 +461,7 @@ func (m *manager) ListPublicShares(ctx context.Context, u *user.User, filters []

log := appctx.GetLogger(ctx)

db, err := m.readDb()
db, err := m.persistence.Read(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -458,7 +526,7 @@ func (m *manager) cleanupExpiredShares() {
m.mutex.Lock()
defer m.mutex.Unlock()

db, _ := m.readDb()
db, _ := m.persistence.Read(context.Background())

for _, v := range db {
d := v.(map[string]interface{})["share"]
Expand Down Expand Up @@ -498,7 +566,7 @@ func (m *manager) revokeExpiredPublicShare(ctx context.Context, s *link.PublicSh
// RevokePublicShare undocumented.
func (m *manager) RevokePublicShare(ctx context.Context, u *user.User, ref *link.PublicShareReference) error {
m.mutex.Lock()
db, err := m.readDb()
db, err := m.persistence.Read(ctx)
if err != nil {
return err
}
Expand All @@ -523,11 +591,11 @@ func (m *manager) RevokePublicShare(ctx context.Context, u *user.User, ref *link

m.mutex.Lock()
defer m.mutex.Unlock()
return m.writeDb(db)
return m.persistence.Write(ctx, db)
}

func (m *manager) getByToken(ctx context.Context, token string) (*link.PublicShare, string, error) {
db, err := m.readDb()
db, err := m.persistence.Read(ctx)
if err != nil {
return nil, "", err
}
Expand All @@ -552,7 +620,7 @@ func (m *manager) getByToken(ctx context.Context, token string) (*link.PublicSha

// GetPublicShareByToken gets a public share by its opaque token.
func (m *manager) GetPublicShareByToken(ctx context.Context, token string, auth *link.PublicShareAuthentication, sign bool) (*link.PublicShare, error) {
db, err := m.readDb()
db, err := m.persistence.Read(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -596,27 +664,6 @@ func (m *manager) GetPublicShareByToken(ctx context.Context, token string, auth
return nil, errtypes.NotFound(fmt.Sprintf("share with token: `%v` not found", token))
}

func (m *manager) readDb() (map[string]interface{}, error) {
db := map[string]interface{}{}
readBytes, err := ioutil.ReadFile(m.file)
if err != nil {
return nil, err
}
if err := json.Unmarshal(readBytes, &db); err != nil {
return nil, err
}
return db, nil
}

func (m *manager) writeDb(db map[string]interface{}) error {
dbAsJSON, err := json.Marshal(db)
if err != nil {
return err
}

return ioutil.WriteFile(m.file, dbAsJSON, 0644)
}

type publicShare struct {
link.PublicShare
Password string `json:"password"`
Expand Down
Loading

0 comments on commit 057b14c

Please sign in to comment.