-
Notifications
You must be signed in to change notification settings - Fork 28
/
manifest.go
170 lines (147 loc) · 4.2 KB
/
manifest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// Copyright 2020-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.
package cbft
import (
"encoding/json"
"fmt"
"io"
"net/url"
"sync"
"time"
"github.com/couchbase/cbgt"
log "github.com/couchbase/clog"
)
var manifestsCache *manifestCache
func init() {
manifestsCache = &manifestCache{
mCache: make(map[string]*Manifest, 10),
stopCh: make(chan struct{}),
}
}
type Collection struct {
Uid string `json:"uid"`
Name string `json:"name"`
typeMapping string
// is true if the collection is a synonym collection
isSynonym bool
}
type Scope struct {
Name string `json:"name"`
Uid string `json:"uid"`
Collections []Collection `json:"collections"`
Limits map[string]map[string]int `json:"limits"`
}
type Manifest struct {
Uid string `json:"uid"`
Scopes []Scope `json:"scopes"`
}
// TODO manifestCache needs to be refreshed from the future
// streaming APIs from the ns_server in real time
type manifestCache struct {
once sync.Once
m sync.RWMutex
stopCh chan struct{}
mCache map[string]*Manifest // bucketName<=>Manifest
monitorRunning bool
}
func GetBucketManifest(bucketName string) (*Manifest, error) {
return manifestsCache.getBucketManifest(bucketName)
}
func (c *manifestCache) getBucketManifest(bucketName string) (*Manifest, error) {
c.m.RLock()
if m, exists := c.mCache[bucketName]; exists {
c.m.RUnlock()
return m, nil
}
c.m.RUnlock()
manifest, err := c.fetchCollectionManifest(bucketName)
if err != nil {
return nil, err
}
c.m.Lock()
c.mCache[bucketName] = manifest
c.once.Do(func() { go c.monitor() })
c.m.Unlock()
return manifest, nil
}
func (c *manifestCache) fetchCollectionManifest(bucket string) (*Manifest, error) {
if CurrentNodeDefsFetcher == nil || bucket == "" {
return nil, fmt.Errorf("invalid input")
}
return obtainManifest(CurrentNodeDefsFetcher.GetManager().Server(), bucket)
}
func removeBucketFromManifestCache(bucket string) {
manifestsCache.m.Lock()
delete(manifestsCache.mCache, bucket)
if len(manifestsCache.mCache) == 0 && manifestsCache.monitorRunning {
manifestsCache.stopCh <- struct{}{}
manifestsCache.once = sync.Once{}
manifestsCache.monitorRunning = false
}
manifestsCache.m.Unlock()
}
func (c *manifestCache) monitor() {
// TODO - until the streaming endpoints from ns_server
mTicker := time.NewTicker(1 * time.Second)
defer mTicker.Stop()
c.m.Lock()
c.monitorRunning = true
c.m.Unlock()
for {
select {
case <-c.stopCh:
return
case _, ok := <-mTicker.C:
if !ok {
return
}
c.m.RLock()
manifestCache := c.mCache
c.m.RUnlock()
for bucket, old := range manifestCache {
curr, err := c.fetchCollectionManifest(bucket)
if err != nil {
log.Debugf("manifest: manifest refresh failed for bucket %s, err: %v",
bucket, err)
continue
}
if old.Uid != curr.Uid {
c.m.Lock()
c.mCache[bucket] = curr
c.m.Unlock()
}
}
}
}
}
// -----------------------------------------------------------------------------
func obtainManifest(serverURL, bucket string) (*Manifest, error) {
if len(serverURL) == 0 || len(bucket) == 0 {
return nil, fmt.Errorf("manifest: empty arguments")
}
path := fmt.Sprintf("/pools/default/buckets/%s/scopes", url.QueryEscape(bucket))
u, err := cbgt.CBAuthURL(serverURL + path)
if err != nil {
return nil, fmt.Errorf("manifest: error building URL, err: %v", err)
}
resp, err := HttpGet(cbgt.HttpClient(), u)
if err != nil {
return nil, fmt.Errorf("manifest: request, err: %v", err)
}
defer resp.Body.Close()
respBuf, err := io.ReadAll(resp.Body)
if err != nil || len(respBuf) == 0 {
return nil, fmt.Errorf("manifest: error reading resp.Body, err: %v", err)
}
rv := &Manifest{}
err = json.Unmarshal(respBuf, rv)
if err != nil {
return nil, fmt.Errorf("manifest: error parsing respBuf, err: %v", err)
}
return rv, nil
}