Skip to content

Commit

Permalink
use golinstor caching instead of custom implementation
Browse files Browse the repository at this point in the history
Reuse the caching implementation from golinstor, it is probably
more complete and less error prone.

While at it, also make the timeout configurable.

Signed-off-by: Moritz Wanzenböck <[email protected]>
  • Loading branch information
WanzenBug authored and rck committed Apr 26, 2024
1 parent e48ad92 commit 6d06c85
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 154 deletions.
10 changes: 9 additions & 1 deletion cmd/linstor-csi/linstor-csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"net/http"
"net/url"
"os"
"time"

linstor "github.com/LINBIT/golinstor"
lapicache "github.com/LINBIT/golinstor/cache"
lapi "github.com/LINBIT/golinstor/client"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
Expand All @@ -48,6 +50,8 @@ func main() {
bearerTokenFile = flag.String("bearer-token", "", "Read the bearer token from the given file and use it for authentication.")
propNs = flag.String("property-namespace", linstor.NamespcAuxiliary, "Limit the reported topology keys to properties from the given namespace.")
labelBySP = flag.Bool("label-by-storage-pool", true, "Set to false to disable labeling of nodes based on their configured storage pools.")
nodeCacheTimeout = flag.Duration("node-cache-timeout", 1*time.Minute, "Duration for which the results of node and storage pool related API responses should be cached.")
resourceCacheTimeout = flag.Duration("resource-cache-timeout", 1*time.Minute, "Duration for which the results of resource related API responses should be cached.")
)

flag.Var(&volume.DefaultRemoteAccessPolicy, "default-remote-access-policy", "")
Expand Down Expand Up @@ -78,9 +82,13 @@ func main() {
}

linstorOpts := []lapi.Option{
lapi.Limit(r, *burst),
lapi.Limiter(rate.NewLimiter(r, *burst)),
lapi.UserAgent("linstor-csi/" + driver.Version),
lapi.Log(logger),
lapicache.WithCaches(
&lapicache.NodeCache{Timeout: *nodeCacheTimeout},
&lapicache.ResourceCache{Timeout: *resourceCacheTimeout},
),
}

if *lsEndpoint != "" {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.22.0
toolchain go1.22.2

require (
github.com/LINBIT/golinstor v0.51.0
github.com/LINBIT/golinstor v0.52.0
github.com/container-storage-interface/spec v1.9.0
github.com/haySwim/data v0.2.0
github.com/kubernetes-csi/csi-test/v5 v5.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/LINBIT/golinstor v0.51.0 h1:XvfcQN3jg7b/s79wrpTe/jzfFDGSNZy2pjGG8SDtFRM=
github.com/LINBIT/golinstor v0.51.0/go.mod h1:MCkHNdHxoGw4mnt8DGsSqWNF5ZGhYFy6Lr4tQLyVBs0=
github.com/LINBIT/golinstor v0.52.0 h1:b+l26DJvLxGIYuvopLFJUoE7aywa6oCmCeL3jav3kKM=
github.com/LINBIT/golinstor v0.52.0/go.mod h1:D811Eyjhoy6t1Tl36HTW4by0s6O5g0cVgV/t58oN+0g=
github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI=
github.com/Masterminds/goutils v1.1.1/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU=
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
Expand Down
150 changes: 0 additions & 150 deletions pkg/linstor/highlevelclient/high_level_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"

lc "github.com/LINBIT/golinstor"
lapi "github.com/LINBIT/golinstor/client"
"github.com/container-storage-interface/spec/lib/go/csi"

Expand All @@ -48,16 +45,6 @@ func NewHighLevelClient(options ...lapi.Option) (*HighLevelClient, error) {
return nil, err
}

c.Nodes = &NodeCacheProvider{
NodeProvider: c.Nodes,
timeout: 1 * time.Minute,
}

c.Resources = &ResourceCacheProvider{
ResourceProvider: c.Resources,
timeout: 1 * time.Minute,
}

return &HighLevelClient{Client: c}, nil
}

Expand Down Expand Up @@ -201,140 +188,3 @@ func (c *HighLevelClient) ReservedCapacity(ctx context.Context, node, pool strin

return reserved, nil
}

type NodeCacheProvider struct {
lapi.NodeProvider
timeout time.Duration
nodesMu sync.Mutex
nodesUpdated time.Time
nodes []lapi.Node
poolsMu sync.Mutex
poolsUpdated time.Time
pools []lapi.StoragePool
}

func (n *NodeCacheProvider) GetAll(ctx context.Context, opts ...*lapi.ListOpts) ([]lapi.Node, error) {
n.nodesMu.Lock()
defer n.nodesMu.Unlock()

now := time.Now()

if n.nodesUpdated.Add(n.timeout).After(now) {
return filter(n.nodes, func(node lapi.Node) string {
return node.Name
}, nil, opts...), nil
}

nodes, err := n.NodeProvider.GetAll(ctx)
if err != nil {
return nil, err
}

n.nodes = nodes
n.nodesUpdated = now

return filter(n.nodes, func(node lapi.Node) string {
return node.Name
}, nil, opts...), nil
}

func (n *NodeCacheProvider) GetStoragePoolView(ctx context.Context, opts ...*lapi.ListOpts) ([]lapi.StoragePool, error) {
n.poolsMu.Lock()
defer n.poolsMu.Unlock()

now := time.Now()

if n.poolsUpdated.Add(n.timeout).After(now) {
return filter(n.pools,
func(pool lapi.StoragePool) string { return pool.NodeName },
func(pool lapi.StoragePool) string { return pool.StoragePoolName },
opts...,
), nil
}

pools, err := n.NodeProvider.GetStoragePoolView(ctx)
if err != nil {
return nil, err
}

n.pools = pools
n.poolsUpdated = now

return filter(n.pools,
func(pool lapi.StoragePool) string { return pool.NodeName },
func(pool lapi.StoragePool) string { return pool.StoragePoolName },
opts...,
), nil
}

type ResourceCacheProvider struct {
lapi.ResourceProvider
timeout time.Duration
resourceViewMu sync.Mutex
resourceViewUpdated time.Time
resourceView []lapi.ResourceWithVolumes
}

func (r *ResourceCacheProvider) GetResourceView(ctx context.Context, opts ...*lapi.ListOpts) ([]lapi.ResourceWithVolumes, error) {
r.resourceViewMu.Lock()
defer r.resourceViewMu.Unlock()

now := time.Now()

if r.resourceViewUpdated.Add(r.timeout).After(now) {
return filter(r.resourceView,
func(res lapi.ResourceWithVolumes) string { return res.NodeName },
func(res lapi.ResourceWithVolumes) string { return res.Props[lc.KeyStorPoolName] },
opts...,
), nil
}

view, err := r.ResourceProvider.GetResourceView(ctx)
if err != nil {
return nil, err
}

r.resourceView = view
r.resourceViewUpdated = now

return filter(r.resourceView,
func(res lapi.ResourceWithVolumes) string { return res.NodeName },
func(res lapi.ResourceWithVolumes) string { return res.Props[lc.KeyStorPoolName] },
opts...,
), nil
}

func filter[T any](items []T, getNodeName, getPoolName func(T) string, opts ...*lapi.ListOpts) []T {
filterNames := make(map[string]struct{})
filterPools := make(map[string]struct{})

for _, o := range opts {
for _, n := range o.Node {
filterNames[n] = struct{}{}
}

for _, sp := range o.StoragePool {
filterPools[sp] = struct{}{}
}
}

var result []T

for _, item := range items {
if len(filterNames) > 0 {
if _, ok := filterNames[getNodeName(item)]; !ok {
continue
}
}

if len(filterPools) > 0 {
if _, ok := filterPools[getPoolName(item)]; !ok {
continue
}
}

result = append(result, item)
}

return result
}

0 comments on commit 6d06c85

Please sign in to comment.