-
Notifications
You must be signed in to change notification settings - Fork 4.8k
/
controller.go
328 lines (299 loc) · 10.8 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"context"
"fmt"
"io"
"strings"
"sync"
"time"
"github.com/docker/distribution"
"github.com/opencontainers/go-digest"
"github.com/goharbor/harbor/src/controller/artifact"
"github.com/goharbor/harbor/src/controller/blob"
"github.com/goharbor/harbor/src/controller/event/operator"
"github.com/goharbor/harbor/src/controller/tag"
"github.com/goharbor/harbor/src/lib"
"github.com/goharbor/harbor/src/lib/cache"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
proModels "github.com/goharbor/harbor/src/pkg/project/models"
model_tag "github.com/goharbor/harbor/src/pkg/tag/model/tag"
)
const (
// wait more time than manifest (maxManifestWait) because manifest list depends on manifest ready
maxManifestListWait = 20
maxManifestWait = 10
sleepIntervalSec = 20
// keep manifest list in cache for one week
manifestListCacheInterval = 7 * 24 * 60 * 60 * time.Second
)
var (
// Ctl is a global proxy controller instance
ctl Controller
once sync.Once
)
// Controller defines the operations related with pull through proxy
type Controller interface {
// UseLocalBlob check if the blob should use local copy
UseLocalBlob(ctx context.Context, art lib.ArtifactInfo) bool
// UseLocalManifest check manifest should use local copy
UseLocalManifest(ctx context.Context, art lib.ArtifactInfo, remote RemoteInterface) (bool, *ManifestList, error)
// ProxyBlob proxy the blob request to the remote server, p is the proxy project
// art is the ArtifactInfo which includes the digest of the blob
ProxyBlob(ctx context.Context, p *proModels.Project, art lib.ArtifactInfo) (int64, io.ReadCloser, error)
// ProxyManifest proxy the manifest request to the remote server, p is the proxy project,
// art is the ArtifactInfo which includes the tag or digest of the manifest
ProxyManifest(ctx context.Context, art lib.ArtifactInfo, remote RemoteInterface) (distribution.Manifest, error)
// HeadManifest send manifest head request to the remote server
HeadManifest(ctx context.Context, art lib.ArtifactInfo, remote RemoteInterface) (bool, *distribution.Descriptor, error)
// EnsureTag ensure tag for digest
EnsureTag(ctx context.Context, art lib.ArtifactInfo, tagName string) error
}
type controller struct {
blobCtl blob.Controller
artifactCtl artifact.Controller
local localInterface
cache cache.Cache
handlerRegistry map[string]ManifestCacheHandler
}
// ControllerInstance -- Get the proxy controller instance
func ControllerInstance() Controller {
// Lazy load the controller
// Because LocalHelper is not ready unless core startup completely
once.Do(func() {
l := newLocalHelper()
ctl = &controller{
blobCtl: blob.Ctl,
artifactCtl: artifact.Ctl,
local: newLocalHelper(),
cache: cache.Default(),
handlerRegistry: NewCacheHandlerRegistry(l),
}
})
return ctl
}
func (c *controller) EnsureTag(ctx context.Context, art lib.ArtifactInfo, tagName string) error {
// search the digest in cache and query with trimmed digest
var trimmedDigest string
err := c.cache.Fetch(ctx, TrimmedManifestlist+art.Digest, &trimmedDigest)
if errors.Is(err, cache.ErrNotFound) { // nolint:revive
// skip to update digest, continue
} else if err != nil {
// for other error, return
return err
} else {
// found in redis, update the digest
art.Digest = trimmedDigest
log.Debugf("Found trimmed digest: %v", trimmedDigest)
}
a, err := c.local.GetManifest(ctx, art)
if err != nil {
return err
}
if a == nil {
return fmt.Errorf("the artifact is not ready yet, failed to tag it to %v", tagName)
}
tagID, err := tag.Ctl.Ensure(ctx, a.RepositoryID, a.Artifact.ID, tagName)
if err != nil {
return err
}
// update the pull time of tag for the first time cache
return tag.Ctl.Update(ctx, &tag.Tag{
Tag: model_tag.Tag{
ID: tagID,
PullTime: time.Now(),
},
}, "PullTime")
}
func (c *controller) UseLocalBlob(ctx context.Context, art lib.ArtifactInfo) bool {
if len(art.Digest) == 0 {
return false
}
exist, err := c.local.BlobExist(ctx, art)
if err != nil {
return false
}
return exist
}
// ManifestList ...
type ManifestList struct {
Content []byte
Digest string
ContentType string
}
// UseLocalManifest check if these manifest could be found in local registry,
// the return error should be nil when it is not found in local and need to delegate to remote registry
// the return error should be NotFoundError when it is not found in remote registry
// the error will be captured by framework and return 404 to client
func (c *controller) UseLocalManifest(ctx context.Context, art lib.ArtifactInfo, remote RemoteInterface) (bool, *ManifestList, error) {
a, err := c.local.GetManifest(ctx, art)
if err != nil {
return false, nil, err
}
// Pull by digest when artifact exist in local
if a != nil && len(art.Digest) > 0 {
return true, nil, nil
}
remoteRepo := getRemoteRepo(art)
exist, desc, err := remote.ManifestExist(remoteRepo, getReference(art)) // HEAD
if err != nil {
if errors.IsRateLimitError(err) && a != nil { // if rate limit, use local if it exists, otherwise return error
return true, nil, nil
}
return false, nil, err
}
if !exist || desc == nil {
go func() {
c.local.DeleteManifest(remoteRepo, art.Tag)
}()
return false, nil, errors.NotFoundError(fmt.Errorf("repo %v, tag %v not found", art.Repository, art.Tag))
}
var content []byte
var contentType string
if c.cache == nil {
return a != nil && string(desc.Digest) == a.Digest, nil, nil // digest matches
}
// Pass digest to the cache key, digest is more stable than tag, because tag could be updated
if len(art.Digest) == 0 {
art.Digest = string(desc.Digest)
}
err = c.cache.Fetch(ctx, manifestListKey(art.Repository, art), &content)
if err != nil {
if errors.Is(err, cache.ErrNotFound) {
log.Debugf("Digest is not found in manifest list cache, key=cache:%v", manifestListKey(art.Repository, art))
} else {
log.Errorf("Failed to get manifest list from cache, error: %v", err)
}
return a != nil && string(desc.Digest) == a.Digest, nil, nil
}
err = c.cache.Fetch(ctx, manifestListContentTypeKey(art.Repository, art), &contentType)
if err != nil {
log.Debugf("failed to get the manifest list content type, not use local. error:%v", err)
return false, nil, nil
}
log.Debugf("Get the manifest list with key=cache:%v", manifestListKey(art.Repository, art))
return true, &ManifestList{content, string(desc.Digest), contentType}, nil
}
func manifestListKey(repo string, art lib.ArtifactInfo) string {
// actual redis key format is cache:manifestlist:<repo name>:<tag> or cache:manifestlist:<repo name>:sha256:xxxx
return "manifestlist:" + repo + ":" + getReference(art)
}
func manifestListContentTypeKey(rep string, art lib.ArtifactInfo) string {
return manifestListKey(rep, art) + ":contenttype"
}
func (c *controller) ProxyManifest(ctx context.Context, art lib.ArtifactInfo, remote RemoteInterface) (distribution.Manifest, error) {
var man distribution.Manifest
remoteRepo := getRemoteRepo(art)
ref := getReference(art)
man, dig, err := remote.Manifest(remoteRepo, ref)
if err != nil {
if errors.IsNotFoundErr(err) {
go func() {
c.local.DeleteManifest(remoteRepo, art.Tag)
}()
}
return man, err
}
ct, _, err := man.Payload()
if err != nil {
return man, err
}
// Push manifest in background
go func(operator string) {
bCtx := orm.Copy(ctx)
a, err := c.local.GetManifest(bCtx, art)
if err != nil {
log.Errorf("failed to get manifest, error %v", err)
}
// Push manifest to local when pull with digest, or artifact not found, or digest mismatch
if len(art.Tag) == 0 || a == nil || a.Digest != dig {
artInfo := art
if len(artInfo.Digest) == 0 {
artInfo.Digest = dig
}
c.waitAndPushManifest(bCtx, remoteRepo, man, artInfo, ct, remote)
}
// Query artifact after push
if a == nil {
a, err = c.local.GetManifest(bCtx, art)
if err != nil {
log.Errorf("failed to get manifest, error %v", err)
}
}
if a != nil {
SendPullEvent(bCtx, a, art.Tag, operator)
}
}(operator.FromContext(ctx))
return man, nil
}
func (c *controller) HeadManifest(_ context.Context, art lib.ArtifactInfo, remote RemoteInterface) (bool, *distribution.Descriptor, error) {
remoteRepo := getRemoteRepo(art)
ref := getReference(art)
return remote.ManifestExist(remoteRepo, ref)
}
func (c *controller) ProxyBlob(ctx context.Context, p *proModels.Project, art lib.ArtifactInfo) (int64, io.ReadCloser, error) {
remoteRepo := getRemoteRepo(art)
log.Debugf("The blob doesn't exist, proxy the request to the target server, url:%v", remoteRepo)
rHelper, err := NewRemoteHelper(ctx, p.RegistryID)
if err != nil {
return 0, nil, err
}
size, bReader, err := rHelper.BlobReader(remoteRepo, art.Digest)
if err != nil {
log.Errorf("failed to pull blob, error %v", err)
return 0, nil, err
}
desc := distribution.Descriptor{Size: size, Digest: digest.Digest(art.Digest)}
go func() {
err := c.putBlobToLocal(remoteRepo, art.Repository, desc, rHelper)
if err != nil {
log.Errorf("error while putting blob to local repo, %v", err)
}
}()
return size, bReader, nil
}
func (c *controller) putBlobToLocal(remoteRepo string, localRepo string, desc distribution.Descriptor, r RemoteInterface) error {
log.Debugf("Put blob to local registry!, sourceRepo:%v, localRepo:%v, digest: %v", remoteRepo, localRepo, desc.Digest)
_, bReader, err := r.BlobReader(remoteRepo, string(desc.Digest))
if err != nil {
log.Errorf("failed to create blob reader, error %v", err)
return err
}
defer bReader.Close()
err = c.local.PushBlob(localRepo, desc, bReader)
return err
}
func (c *controller) waitAndPushManifest(ctx context.Context, remoteRepo string, man distribution.Manifest, art lib.ArtifactInfo, contType string, r RemoteInterface) {
h, ok := c.handlerRegistry[contType]
if !ok {
h, ok = c.handlerRegistry[defaultHandler]
if !ok {
return
}
}
h.CacheContent(ctx, remoteRepo, man, art, r, contType)
}
// getRemoteRepo get the remote repository name, used in proxy cache
func getRemoteRepo(art lib.ArtifactInfo) string {
return strings.TrimPrefix(art.Repository, art.ProjectName+"/")
}
func getReference(art lib.ArtifactInfo) string {
if len(art.Digest) > 0 {
return art.Digest
}
return art.Tag
}