-
Notifications
You must be signed in to change notification settings - Fork 82
/
handleArtifacts.go
300 lines (246 loc) · 8.56 KB
/
handleArtifacts.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package fleet
import (
"bytes"
"context"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"encoding/json"
"io"
"net/http"
"time"
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/limit"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/throttle"
"github.com/julienschmidt/httprouter"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
const (
defaultMaxParallel = 8 // TODO: configurable
defaultCacheTTL = time.Hour * 24 // TODO: configurable
defaultThrottleTTL = time.Minute // TODO: configurable
)
var (
ErrorThrottle = errors.New("cannot acquire throttle token")
ErrorBadSha2 = errors.New("malformed sha256")
ErrorRecord = errors.New("artifact record mismatch")
ErrorMismatchSha2 = errors.New("mismatched sha256")
)
type ArtifactT struct {
bulker bulk.Bulk
cache cache.Cache
esThrottle *throttle.Throttle
limit *limit.Limiter
}
func NewArtifactT(cfg *config.Server, bulker bulk.Bulk, cache cache.Cache) *ArtifactT {
log.Info().
Interface("limits", cfg.Limits.ArtifactLimit).
Int("maxParallel", defaultMaxParallel).
Msg("Artifact install limits")
return &ArtifactT{
bulker: bulker,
cache: cache,
limit: limit.NewLimiter(&cfg.Limits.ArtifactLimit),
esThrottle: throttle.NewThrottle(defaultMaxParallel),
}
}
func (rt Router) handleArtifacts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
start := time.Now()
var (
id = ps.ByName("id") // Identifier in the artifact record
sha2 = ps.ByName("sha2") // DecodedSha256 in the artifact record
)
reqId := r.Header.Get(logger.HeaderRequestID)
zlog := log.With().
Str("id", id).
Str("sha2", sha2).
Str("remoteAddr", r.RemoteAddr).
Str(EcsHttpRequestId, reqId).
Logger()
rdr, err := rt.at.handleArtifacts(r, zlog, id, sha2)
var nWritten int64
if err == nil {
nWritten, err = io.Copy(w, rdr)
zlog.Trace().
Err(err).
Int64(EcsHttpResponseBodyBytes, nWritten).
Int64(EcsEventDuration, time.Since(start).Nanoseconds()).
Msg("Response sent")
cntArtifacts.bodyOut.Add(uint64(nWritten))
}
if err != nil {
cntArtifacts.IncError(err)
resp := NewErrorResp(err)
zlog.WithLevel(resp.Level).
Err(err).
Int(EcsHttpResponseCode, resp.StatusCode).
Int64(EcsHttpResponseBodyBytes, nWritten).
Int64(EcsEventDuration, time.Since(start).Nanoseconds()).
Msg("fail artifact")
if err := resp.Write(w); err != nil {
zlog.Error().Err(err).Msg("fail writing error response")
}
}
}
func (at ArtifactT) handleArtifacts(r *http.Request, zlog zerolog.Logger, id, sha2 string) (io.Reader, error) {
limitF, err := at.limit.Acquire()
if err != nil {
return nil, err
}
defer limitF()
// Authenticate the APIKey; retrieve agent record.
// Note: This is going to be a bit slow even if we hit the cache on the api key.
// In order to validate that the agent still has that api key, we fetch the agent record from elastic.
agent, err := authAgent(r, "", at.bulker, at.cache)
if err != nil {
return nil, err
}
// Metrics; serenity now.
dfunc := cntArtifacts.IncStart()
defer dfunc()
zlog = zlog.With().
Str("APIKeyId", agent.AccessApiKeyId).
Str("agentId", agent.Id).
Logger()
return at.handle(r.Context(), zlog, agent, id, sha2)
}
type artHandler struct {
zlog zerolog.Logger
bulker bulk.Bulk
c cache.Cache
}
func (at ArtifactT) handle(ctx context.Context, zlog zerolog.Logger, agent *model.Agent, id, sha2 string) (io.Reader, error) {
// Input validation
if err := validateSha2String(sha2); err != nil {
return nil, err
}
// Determine whether the agent should have access to this artifact
if err := at.authorizeArtifact(ctx, agent, id, sha2); err != nil {
zlog.Warn().Err(err).Msg("Unauthorized GET on artifact")
return nil, err
}
// Grab artifact, whether from cache or elastic.
artifact, err := at.getArtifact(ctx, zlog, id, sha2)
if err != nil {
return nil, err
}
// Sanity check; just in case something underneath is misbehaving
if artifact.Identifier != id || artifact.DecodedSha256 != sha2 {
err = ErrorRecord
zlog.Info().
Err(err).
Str("artifact_id", artifact.Identifier).
Str("artifact_sha2", artifact.DecodedSha256).
Msg("Identifier mismatch on url")
return nil, err
}
zlog.Debug().
Int("sz", len(artifact.Body)).
Int64("decodedSz", artifact.DecodedSize).
Str("compression", artifact.CompressionAlgorithm).
Str("encryption", artifact.EncryptionAlgorithm).
Str("created", artifact.Created).
Msg("Artifact GET")
// Write the payload
rdr := bytes.NewReader(artifact.Body)
return rdr, nil
}
// TODO: Pull the policy record for this agent and validate that the
// requested artifact is assigned to this policy. This will prevent
// agents from retrieving artifacts that they do not have access to.
// Note that this is racy, the policy could have changed to allow an
// artifact before this instantiation of FleetServer has its local
// copy updated. Take the race conditions into consideration.
//
// Initial implementation is dependent on security by obscurity; ie.
// it should be difficult for an attacker to guess a guid.
func (at ArtifactT) authorizeArtifact(ctx context.Context, agent *model.Agent, ident, sha2 string) error {
return nil // TODO
}
// Return artifact from cache by sha2 or fetch directly from Elastic.
// Update cache on successful retrieval from Elastic.
func (at ArtifactT) getArtifact(ctx context.Context, zlog zerolog.Logger, ident, sha2 string) (*model.Artifact, error) {
// Check the cache; return immediately if found.
if artifact, ok := at.cache.GetArtifact(ident, sha2); ok {
return &artifact, nil
}
// Fetch the artifact from elastic
art, err := at.fetchArtifact(ctx, zlog, ident, sha2)
if err != nil {
zlog.Info().Err(err).Msg("Fail retrieve artifact")
return nil, err
}
// The 'Body' field type is Raw; extract to string.
var srcPayload string
if err = json.Unmarshal(art.Body, &srcPayload); err != nil {
zlog.Error().Err(err).Msg("Cannot unmarshal artifact payload")
return nil, err
}
// Artifact is stored base64 encoded in ElasticSearch.
// Base64 decode the payload before putting in cache
// to avoid having to decode on each cache hit.
dstPayload, err := base64.StdEncoding.DecodeString(srcPayload)
if err != nil {
zlog.Error().Err(err).Msg("Fail base64 decode artifact")
return nil, err
}
// Validate the sha256 hash; this is just good hygiene.
if err = validateSha2Data(dstPayload, art.EncodedSha256); err != nil {
zlog.Error().Err(err).Msg("Fail sha2 hash validation")
return nil, err
}
// Reassign decoded payload before adding to cache, avoid base64 decode on cache hit.
art.Body = dstPayload
// Update the cache.
at.cache.SetArtifact(*art, defaultCacheTTL)
return art, nil
}
// Attempt to fetch the artifact from Elastic
// TODO: Design a mechanism to mitigate a DDOS attack on bogus hashes.
// Perhaps have a cache of the most recently used hashes available, and items that aren't
// in the cache can do a lookup but throttle as below. We could update the cache every 10m or so.
func (at ArtifactT) fetchArtifact(ctx context.Context, zlog zerolog.Logger, ident, sha2 string) (*model.Artifact, error) {
// Throttle prevents more than N outstanding requests to elastic globally and per sha2.
if token := at.esThrottle.Acquire(sha2, defaultThrottleTTL); token == nil {
return nil, ErrorThrottle
} else {
defer token.Release()
}
start := time.Now()
artifact, err := dl.FindArtifact(ctx, at.bulker, ident, sha2)
zlog.Info().
Err(err).
Int64(EcsEventDuration, time.Since(start).Nanoseconds()).
Msg("fetch artifact")
return artifact, errors.Wrap(err, "fetchArtifact")
}
func validateSha2String(sha2 string) error {
if len(sha2) != 64 {
return ErrorBadSha2
}
if _, err := hex.DecodeString(sha2); err != nil {
return ErrorBadSha2
}
return nil
}
func validateSha2Data(data []byte, sha2 string) error {
src, err := hex.DecodeString(sha2)
if err != nil {
return errors.Wrap(err, "sha2 hex decode")
}
sum := sha256.Sum256(data)
if !bytes.Equal(sum[:], src) {
return ErrorMismatchSha2
}
return nil
}