Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v16] re-add grace period to Upload completer (again) #44978

Merged
merged 3 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/events/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,9 @@ type StreamPart struct {
Number int64
// ETag is a part e-tag
ETag string
// LastModified is the time of last modification of this part (if
// available).
LastModified time.Time
}

// StreamUpload represents stream multipart upload
Expand Down
4 changes: 4 additions & 0 deletions lib/events/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ const (
// AbandonedUploadPollingRate defines how often to check for
// abandoned uploads which need to be completed.
AbandonedUploadPollingRate = apidefaults.SessionTrackerTTL / 6

// UploadCompleterGracePeriod is the default period after which an upload's
// session tracker will be checked to see if it's an abandoned upload.
UploadCompleterGracePeriod = 24 * time.Hour
)

var (
Expand Down
21 changes: 17 additions & 4 deletions lib/events/azsessions/azsessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"slices"
"strconv"
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
Expand Down Expand Up @@ -450,7 +451,8 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa

// our parts are just over 5 MiB (events.MinUploadPartSizeBytes) so we can
// upload them in one shot
if _, err := cErr(partBlob.Upload(ctx, streaming.NopCloser(partBody), nil)); err != nil {
response, err := cErr(partBlob.Upload(ctx, streaming.NopCloser(partBody), nil))
if err != nil {
return nil, trace.Wrap(err)
}
h.log.WithFields(logrus.Fields{
Expand All @@ -459,7 +461,11 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa
fieldPartNumber: partNumber,
}).Debug("Uploaded part.")

return &events.StreamPart{Number: partNumber}, nil
var lastModified time.Time
if response.LastModified != nil {
lastModified = *response.LastModified
}
return &events.StreamPart{Number: partNumber, LastModified: lastModified}, nil
}

// ListParts implements [events.MultipartUploader].
Expand Down Expand Up @@ -492,8 +498,15 @@ func (h *Handler) ListParts(ctx context.Context, upload events.StreamUpload) ([]
if err != nil {
continue
}

parts = append(parts, events.StreamPart{Number: partNumber})
var lastModified time.Time
if b.Properties != nil &&
b.Properties.LastModified != nil {
lastModified = *b.Properties.LastModified
}
parts = append(parts, events.StreamPart{
Number: partNumber,
LastModified: lastModified,
})
}
}

Expand Down
26 changes: 26 additions & 0 deletions lib/events/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package events

import (
"cmp"
"context"
"fmt"
"os"
Expand Down Expand Up @@ -59,6 +60,11 @@ type UploadCompleterConfig struct {
Component string
// CheckPeriod is a period for checking the upload
CheckPeriod time.Duration
// GracePeriod is the period after which an upload's session tracker will be
// checked to see if it's an abandoned upload. A duration of zero will
// result in a sensible default, any negative value will result in no grace
// period.
GracePeriod time.Duration
// Clock is used to override clock in tests
Clock clockwork.Clock
// ClusterName identifies the originating teleport cluster
Expand Down Expand Up @@ -221,11 +227,21 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error {
}
}()

gracePeriod := cmp.Or(u.cfg.GracePeriod, UploadCompleterGracePeriod)
incompleteSessionUploads.Set(float64(len(uploads)))
// Complete upload for any uploads without an active session tracker
for _, upload := range uploads {
log := u.log.WithField("upload", upload.ID).WithField("session", upload.SessionID)

if gracePeriod > 0 && u.cfg.Clock.Since(upload.Initiated) <= gracePeriod {
log.Debug("Found incomplete upload within grace period, terminating check early.")
// not only we can skip this upload, but since uploads are sorted by
// Initiated oldest-to-newest, we can actually just stop checking as
// all further uploads will be closer in time to now and thus they
// will all be within the grace period
break
}

switch _, err := u.cfg.SessionTracker.GetSessionTracker(ctx, upload.SessionID.String()); {
case err == nil: // session is still in progress, continue to other uploads
log.Debug("session has active tracker and is not ready to be uploaded")
Expand All @@ -247,6 +263,16 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error {
}
return trace.Wrap(err, "listing parts")
}
var lastModified time.Time
for _, part := range parts {
if part.LastModified.After(lastModified) {
lastModified = part.LastModified
}
}
if u.cfg.Clock.Since(lastModified) <= gracePeriod {
log.Debug("Found incomplete upload with recently uploaded part, skipping.")
continue
}

log.Debugf("upload has %d parts", len(parts))

Expand Down
74 changes: 73 additions & 1 deletion lib/events/complete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) {
SessionTracker: sessionTrackerService,
Clock: clock,
ClusterName: "teleport-cluster",
GracePeriod: 24 * time.Hour,
})
require.NoError(t, err)

Expand All @@ -81,7 +82,18 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) {
require.NoError(t, err)
require.False(t, mu.IsCompleted(upload.ID))

clock.Advance(1 * time.Hour)
// enough to expire the session tracker, not enough to pass the grace period
clock.Advance(2 * time.Hour)

err = uc.CheckUploads(context.Background())
require.NoError(t, err)
require.False(t, mu.IsCompleted(upload.ID))

trackers, err := sessionTrackerService.GetActiveSessionTrackers(context.Background())
require.NoError(t, err)
require.Empty(t, trackers)

clock.Advance(22*time.Hour + time.Nanosecond)

err = uc.CheckUploads(context.Background())
require.NoError(t, err)
Expand Down Expand Up @@ -147,6 +159,7 @@ func TestUploadCompleterAcquiresSemaphore(t *testing.T) {
},
acquireErr: nil,
},
GracePeriod: -1,
})
require.NoError(t, err)

Expand Down Expand Up @@ -193,6 +206,7 @@ func TestUploadCompleterEmitsSessionEnd(t *testing.T) {
Clock: clock,
SessionTracker: &mockSessionTrackerService{},
ClusterName: "teleport-cluster",
GracePeriod: -1,
})
require.NoError(t, err)

Expand Down Expand Up @@ -224,6 +238,63 @@ func TestUploadCompleterEmitsSessionEnd(t *testing.T) {
}
}

func TestCheckUploadsSkipsUploadsInProgress(t *testing.T) {
clock := clockwork.NewFakeClock()
sessionTrackers := []types.SessionTracker{}

sessionTrackerService := &mockSessionTrackerService{
clock: clock,
trackers: sessionTrackers,
}

// simulate an upload that started well before the grace period,
// but the most recently uploaded part is still within the grace period
gracePeriod := 10 * time.Minute
uploadInitiated := clock.Now().Add(-3 * gracePeriod)
lastPartUploaded := clock.Now().Add(-2 * gracePeriod / 3)

var completedUploads []events.StreamUpload

uploader := &eventstest.MockUploader{
MockListUploads: func(ctx context.Context) ([]events.StreamUpload, error) {
return []events.StreamUpload{
{
ID: "upload-1234",
SessionID: session.NewID(),
Initiated: uploadInitiated,
},
}, nil
},
MockListParts: func(ctx context.Context, upload events.StreamUpload) ([]events.StreamPart, error) {
return []events.StreamPart{
{
Number: int64(1),
ETag: "foo",
LastModified: lastPartUploaded,
},
}, nil
},
MockCompleteUpload: func(ctx context.Context, upload events.StreamUpload, parts []events.StreamPart) error {
completedUploads = append(completedUploads, upload)
return nil
},
}

uc, err := events.NewUploadCompleter(events.UploadCompleterConfig{
Uploader: uploader,
AuditLog: &eventstest.MockAuditLog{},
SessionTracker: sessionTrackerService,
Clock: clock,
ClusterName: "teleport-cluster",
GracePeriod: gracePeriod,
})
require.NoError(t, err)

uc.CheckUploads(context.Background())
require.Empty(t, completedUploads)

}

func TestCheckUploadsContinuesOnError(t *testing.T) {
clock := clockwork.NewFakeClock()
expires := clock.Now().Add(time.Hour * 1)
Expand Down Expand Up @@ -286,6 +357,7 @@ func TestCheckUploadsContinuesOnError(t *testing.T) {
SessionTracker: sessionTrackerService,
Clock: clock,
ClusterName: "teleport-cluster",
GracePeriod: -1,
})
require.NoError(t, err)

Expand Down
37 changes: 26 additions & 11 deletions lib/events/eventstest/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type MemoryUploader struct {
objects map[session.ID][]byte
eventsC chan events.UploadEvent

// Clock is an optional [clockwork.Clock] to determine the time to associate
// with uploads and parts.
Clock clockwork.Clock
}

Expand All @@ -63,7 +65,7 @@ type MemoryUpload struct {
// id is the upload ID
id string
// parts is the upload parts
parts map[int64][]byte
parts map[int64]part
// sessionID is the session ID associated with the upload
sessionID session.ID
//completed specifies upload as completed
Expand All @@ -73,6 +75,11 @@ type MemoryUpload struct {
Initiated time.Time
}

type part struct {
data []byte
lastModified time.Time
}

func (m *MemoryUploader) trySendEvent(event events.UploadEvent) {
if m.eventsC == nil {
return
Expand All @@ -98,14 +105,15 @@ func (m *MemoryUploader) CreateUpload(ctx context.Context, sessionID session.ID)
upload := &events.StreamUpload{
ID: uuid.New().String(),
SessionID: sessionID,
Initiated: time.Now(),
}
if m.Clock != nil {
upload.Initiated = m.Clock.Now()
}
m.uploads[upload.ID] = &MemoryUpload{
id: upload.ID,
sessionID: sessionID,
parts: make(map[int64][]byte),
parts: make(map[int64]part),
Initiated: upload.Initiated,
}
return upload, nil
Expand All @@ -127,11 +135,11 @@ func (m *MemoryUploader) CompleteUpload(ctx context.Context, upload events.Strea
partsSet := make(map[int64]bool, len(parts))
for _, part := range parts {
partsSet[part.Number] = true
data, ok := up.parts[part.Number]
upPart, ok := up.parts[part.Number]
if !ok {
return trace.NotFound("part %v has not been uploaded", part.Number)
}
result = append(result, data...)
result = append(result, upPart.data...)
}
// exclude parts that are not requested to be completed
for number := range up.parts {
Expand All @@ -157,8 +165,15 @@ func (m *MemoryUploader) UploadPart(ctx context.Context, upload events.StreamUpl
if !ok {
return nil, trace.NotFound("upload %q is not found", upload.ID)
}
up.parts[partNumber] = data
return &events.StreamPart{Number: partNumber}, nil
lastModified := time.Now()
if m.Clock != nil {
lastModified = m.Clock.Now()
}
up.parts[partNumber] = part{
data: data,
lastModified: lastModified,
}
return &events.StreamPart{Number: partNumber, LastModified: lastModified}, nil
}

// ListUploads lists uploads that have been initiated but not completed with
Expand Down Expand Up @@ -199,7 +214,7 @@ func (m *MemoryUploader) GetParts(uploadID string) ([][]byte, error) {
return partNumbers[i] < partNumbers[j]
})
for _, partNumber := range partNumbers {
sortedParts = append(sortedParts, up.parts[partNumber])
sortedParts = append(sortedParts, up.parts[partNumber].data)
}
return sortedParts, nil
}
Expand Down Expand Up @@ -290,8 +305,8 @@ type MockUploader struct {

CreateUploadError error
ReserveUploadPartError error
ListPartsError error

MockListParts func(ctx context.Context, upload events.StreamUpload) ([]events.StreamPart, error)
MockListUploads func(ctx context.Context) ([]events.StreamUpload, error)
MockCompleteUpload func(ctx context.Context, upload events.StreamUpload, parts []events.StreamPart) error
}
Expand All @@ -311,9 +326,9 @@ func (m *MockUploader) ReserveUploadPart(_ context.Context, _ events.StreamUploa
return m.ReserveUploadPartError
}

func (m *MockUploader) ListParts(_ context.Context, _ events.StreamUpload) ([]events.StreamPart, error) {
if m.ListPartsError != nil {
return nil, m.ListPartsError
func (m *MockUploader) ListParts(ctx context.Context, upload events.StreamUpload) ([]events.StreamPart, error) {
if m.MockListParts != nil {
return m.MockListParts(ctx, upload)
}

return []events.StreamPart{}, nil
Expand Down
14 changes: 11 additions & 3 deletions lib/events/filesessions/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,19 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa
}

// Rename reservation to part file.
err = os.Rename(reservationPath, h.partPath(upload, partNumber))
partPath := h.partPath(upload, partNumber)
err = os.Rename(reservationPath, partPath)
if err != nil {
return nil, trace.ConvertSystemError(err)
}

return &events.StreamPart{Number: partNumber}, nil
var lastModified time.Time
fi, err := os.Stat(partPath)
if err == nil {
lastModified = fi.ModTime()
}

return &events.StreamPart{Number: partNumber, LastModified: lastModified}, nil
}

// CompleteUpload completes the upload
Expand Down Expand Up @@ -254,7 +261,8 @@ func (h *Handler) ListParts(ctx context.Context, upload events.StreamUpload) ([]
return nil
}
parts = append(parts, events.StreamPart{
Number: part,
Number: part,
LastModified: info.ModTime(),
})
return nil
})
Expand Down
Loading
Loading