diff --git a/lib/events/api.go b/lib/events/api.go index 9a700529da890..7998ada4c120a 100644 --- a/lib/events/api.go +++ b/lib/events/api.go @@ -875,6 +875,9 @@ type StreamPart struct { Number int64 // ETag is a part e-tag ETag string + // LastModified is the time of last modification of this part (if + // available). + LastModified time.Time } // StreamUpload represents stream multipart upload diff --git a/lib/events/auditlog.go b/lib/events/auditlog.go index 709e2c7a23d95..5cb3ab744c31d 100644 --- a/lib/events/auditlog.go +++ b/lib/events/auditlog.go @@ -111,6 +111,10 @@ const ( // AbandonedUploadPollingRate defines how often to check for // abandoned uploads which need to be completed. AbandonedUploadPollingRate = apidefaults.SessionTrackerTTL / 6 + + // UploadCompleterGracePeriod is the default period after which an upload's + // session tracker will be checked to see if it's an abandoned upload. + UploadCompleterGracePeriod = 24 * time.Hour ) var ( diff --git a/lib/events/azsessions/azsessions.go b/lib/events/azsessions/azsessions.go index 85aa13d39084d..527f024670825 100644 --- a/lib/events/azsessions/azsessions.go +++ b/lib/events/azsessions/azsessions.go @@ -28,6 +28,7 @@ import ( "slices" "strconv" "strings" + "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" @@ -450,7 +451,8 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa // our parts are just over 5 MiB (events.MinUploadPartSizeBytes) so we can // upload them in one shot - if _, err := cErr(partBlob.Upload(ctx, streaming.NopCloser(partBody), nil)); err != nil { + response, err := cErr(partBlob.Upload(ctx, streaming.NopCloser(partBody), nil)) + if err != nil { return nil, trace.Wrap(err) } h.log.WithFields(logrus.Fields{ @@ -459,7 +461,11 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa fieldPartNumber: partNumber, }).Debug("Uploaded part.") - return &events.StreamPart{Number: partNumber}, nil + var lastModified time.Time + if response.LastModified != nil { + lastModified = *response.LastModified + } + return &events.StreamPart{Number: partNumber, LastModified: lastModified}, nil } // ListParts implements [events.MultipartUploader]. @@ -492,8 +498,15 @@ func (h *Handler) ListParts(ctx context.Context, upload events.StreamUpload) ([] if err != nil { continue } - - parts = append(parts, events.StreamPart{Number: partNumber}) + var lastModified time.Time + if b.Properties != nil && + b.Properties.LastModified != nil { + lastModified = *b.Properties.LastModified + } + parts = append(parts, events.StreamPart{ + Number: partNumber, + LastModified: lastModified, + }) } } diff --git a/lib/events/complete.go b/lib/events/complete.go index b335363e667c2..62e610df1d7ce 100644 --- a/lib/events/complete.go +++ b/lib/events/complete.go @@ -19,6 +19,7 @@ package events import ( + "cmp" "context" "fmt" "os" @@ -59,6 +60,11 @@ type UploadCompleterConfig struct { Component string // CheckPeriod is a period for checking the upload CheckPeriod time.Duration + // GracePeriod is the period after which an upload's session tracker will be + // checked to see if it's an abandoned upload. A duration of zero will + // result in a sensible default, any negative value will result in no grace + // period. + GracePeriod time.Duration // Clock is used to override clock in tests Clock clockwork.Clock // ClusterName identifies the originating teleport cluster @@ -221,11 +227,21 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error { } }() + gracePeriod := cmp.Or(u.cfg.GracePeriod, UploadCompleterGracePeriod) incompleteSessionUploads.Set(float64(len(uploads))) // Complete upload for any uploads without an active session tracker for _, upload := range uploads { log := u.log.WithField("upload", upload.ID).WithField("session", upload.SessionID) + if gracePeriod > 0 && u.cfg.Clock.Since(upload.Initiated) <= gracePeriod { + log.Debug("Found incomplete upload within grace period, terminating check early.") + // not only we can skip this upload, but since uploads are sorted by + // Initiated oldest-to-newest, we can actually just stop checking as + // all further uploads will be closer in time to now and thus they + // will all be within the grace period + break + } + switch _, err := u.cfg.SessionTracker.GetSessionTracker(ctx, upload.SessionID.String()); { case err == nil: // session is still in progress, continue to other uploads log.Debug("session has active tracker and is not ready to be uploaded") @@ -247,6 +263,16 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error { } return trace.Wrap(err, "listing parts") } + var lastModified time.Time + for _, part := range parts { + if part.LastModified.After(lastModified) { + lastModified = part.LastModified + } + } + if u.cfg.Clock.Since(lastModified) <= gracePeriod { + log.Debug("Found incomplete upload with recently uploaded part, skipping.") + continue + } log.Debugf("upload has %d parts", len(parts)) diff --git a/lib/events/complete_test.go b/lib/events/complete_test.go index e21b20e1e2058..a810f690682f2 100644 --- a/lib/events/complete_test.go +++ b/lib/events/complete_test.go @@ -71,6 +71,7 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) { SessionTracker: sessionTrackerService, Clock: clock, ClusterName: "teleport-cluster", + GracePeriod: 24 * time.Hour, }) require.NoError(t, err) @@ -81,7 +82,18 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) { require.NoError(t, err) require.False(t, mu.IsCompleted(upload.ID)) - clock.Advance(1 * time.Hour) + // enough to expire the session tracker, not enough to pass the grace period + clock.Advance(2 * time.Hour) + + err = uc.CheckUploads(context.Background()) + require.NoError(t, err) + require.False(t, mu.IsCompleted(upload.ID)) + + trackers, err := sessionTrackerService.GetActiveSessionTrackers(context.Background()) + require.NoError(t, err) + require.Empty(t, trackers) + + clock.Advance(22*time.Hour + time.Nanosecond) err = uc.CheckUploads(context.Background()) require.NoError(t, err) @@ -147,6 +159,7 @@ func TestUploadCompleterAcquiresSemaphore(t *testing.T) { }, acquireErr: nil, }, + GracePeriod: -1, }) require.NoError(t, err) @@ -193,6 +206,7 @@ func TestUploadCompleterEmitsSessionEnd(t *testing.T) { Clock: clock, SessionTracker: &mockSessionTrackerService{}, ClusterName: "teleport-cluster", + GracePeriod: -1, }) require.NoError(t, err) @@ -224,6 +238,63 @@ func TestUploadCompleterEmitsSessionEnd(t *testing.T) { } } +func TestCheckUploadsSkipsUploadsInProgress(t *testing.T) { + clock := clockwork.NewFakeClock() + sessionTrackers := []types.SessionTracker{} + + sessionTrackerService := &mockSessionTrackerService{ + clock: clock, + trackers: sessionTrackers, + } + + // simulate an upload that started well before the grace period, + // but the most recently uploaded part is still within the grace period + gracePeriod := 10 * time.Minute + uploadInitiated := clock.Now().Add(-3 * gracePeriod) + lastPartUploaded := clock.Now().Add(-2 * gracePeriod / 3) + + var completedUploads []events.StreamUpload + + uploader := &eventstest.MockUploader{ + MockListUploads: func(ctx context.Context) ([]events.StreamUpload, error) { + return []events.StreamUpload{ + { + ID: "upload-1234", + SessionID: session.NewID(), + Initiated: uploadInitiated, + }, + }, nil + }, + MockListParts: func(ctx context.Context, upload events.StreamUpload) ([]events.StreamPart, error) { + return []events.StreamPart{ + { + Number: int64(1), + ETag: "foo", + LastModified: lastPartUploaded, + }, + }, nil + }, + MockCompleteUpload: func(ctx context.Context, upload events.StreamUpload, parts []events.StreamPart) error { + completedUploads = append(completedUploads, upload) + return nil + }, + } + + uc, err := events.NewUploadCompleter(events.UploadCompleterConfig{ + Uploader: uploader, + AuditLog: &eventstest.MockAuditLog{}, + SessionTracker: sessionTrackerService, + Clock: clock, + ClusterName: "teleport-cluster", + GracePeriod: gracePeriod, + }) + require.NoError(t, err) + + uc.CheckUploads(context.Background()) + require.Empty(t, completedUploads) + +} + func TestCheckUploadsContinuesOnError(t *testing.T) { clock := clockwork.NewFakeClock() expires := clock.Now().Add(time.Hour * 1) @@ -286,6 +357,7 @@ func TestCheckUploadsContinuesOnError(t *testing.T) { SessionTracker: sessionTrackerService, Clock: clock, ClusterName: "teleport-cluster", + GracePeriod: -1, }) require.NoError(t, err) diff --git a/lib/events/eventstest/uploader.go b/lib/events/eventstest/uploader.go index 63a30cd242684..99cee3b1e3f94 100644 --- a/lib/events/eventstest/uploader.go +++ b/lib/events/eventstest/uploader.go @@ -55,6 +55,8 @@ type MemoryUploader struct { objects map[session.ID][]byte eventsC chan events.UploadEvent + // Clock is an optional [clockwork.Clock] to determine the time to associate + // with uploads and parts. Clock clockwork.Clock } @@ -63,7 +65,7 @@ type MemoryUpload struct { // id is the upload ID id string // parts is the upload parts - parts map[int64][]byte + parts map[int64]part // sessionID is the session ID associated with the upload sessionID session.ID //completed specifies upload as completed @@ -73,6 +75,11 @@ type MemoryUpload struct { Initiated time.Time } +type part struct { + data []byte + lastModified time.Time +} + func (m *MemoryUploader) trySendEvent(event events.UploadEvent) { if m.eventsC == nil { return @@ -98,6 +105,7 @@ func (m *MemoryUploader) CreateUpload(ctx context.Context, sessionID session.ID) upload := &events.StreamUpload{ ID: uuid.New().String(), SessionID: sessionID, + Initiated: time.Now(), } if m.Clock != nil { upload.Initiated = m.Clock.Now() @@ -105,7 +113,7 @@ func (m *MemoryUploader) CreateUpload(ctx context.Context, sessionID session.ID) m.uploads[upload.ID] = &MemoryUpload{ id: upload.ID, sessionID: sessionID, - parts: make(map[int64][]byte), + parts: make(map[int64]part), Initiated: upload.Initiated, } return upload, nil @@ -127,11 +135,11 @@ func (m *MemoryUploader) CompleteUpload(ctx context.Context, upload events.Strea partsSet := make(map[int64]bool, len(parts)) for _, part := range parts { partsSet[part.Number] = true - data, ok := up.parts[part.Number] + upPart, ok := up.parts[part.Number] if !ok { return trace.NotFound("part %v has not been uploaded", part.Number) } - result = append(result, data...) + result = append(result, upPart.data...) } // exclude parts that are not requested to be completed for number := range up.parts { @@ -157,8 +165,15 @@ func (m *MemoryUploader) UploadPart(ctx context.Context, upload events.StreamUpl if !ok { return nil, trace.NotFound("upload %q is not found", upload.ID) } - up.parts[partNumber] = data - return &events.StreamPart{Number: partNumber}, nil + lastModified := time.Now() + if m.Clock != nil { + lastModified = m.Clock.Now() + } + up.parts[partNumber] = part{ + data: data, + lastModified: lastModified, + } + return &events.StreamPart{Number: partNumber, LastModified: lastModified}, nil } // ListUploads lists uploads that have been initiated but not completed with @@ -199,7 +214,7 @@ func (m *MemoryUploader) GetParts(uploadID string) ([][]byte, error) { return partNumbers[i] < partNumbers[j] }) for _, partNumber := range partNumbers { - sortedParts = append(sortedParts, up.parts[partNumber]) + sortedParts = append(sortedParts, up.parts[partNumber].data) } return sortedParts, nil } @@ -290,8 +305,8 @@ type MockUploader struct { CreateUploadError error ReserveUploadPartError error - ListPartsError error + MockListParts func(ctx context.Context, upload events.StreamUpload) ([]events.StreamPart, error) MockListUploads func(ctx context.Context) ([]events.StreamUpload, error) MockCompleteUpload func(ctx context.Context, upload events.StreamUpload, parts []events.StreamPart) error } @@ -311,9 +326,9 @@ func (m *MockUploader) ReserveUploadPart(_ context.Context, _ events.StreamUploa return m.ReserveUploadPartError } -func (m *MockUploader) ListParts(_ context.Context, _ events.StreamUpload) ([]events.StreamPart, error) { - if m.ListPartsError != nil { - return nil, m.ListPartsError +func (m *MockUploader) ListParts(ctx context.Context, upload events.StreamUpload) ([]events.StreamPart, error) { + if m.MockListParts != nil { + return m.MockListParts(ctx, upload) } return []events.StreamPart{}, nil diff --git a/lib/events/filesessions/filestream.go b/lib/events/filesessions/filestream.go index c9c5ff5ccd855..0e0399ce89835 100644 --- a/lib/events/filesessions/filestream.go +++ b/lib/events/filesessions/filestream.go @@ -124,12 +124,19 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa } // Rename reservation to part file. - err = os.Rename(reservationPath, h.partPath(upload, partNumber)) + partPath := h.partPath(upload, partNumber) + err = os.Rename(reservationPath, partPath) if err != nil { return nil, trace.ConvertSystemError(err) } - return &events.StreamPart{Number: partNumber}, nil + var lastModified time.Time + fi, err := os.Stat(partPath) + if err == nil { + lastModified = fi.ModTime() + } + + return &events.StreamPart{Number: partNumber, LastModified: lastModified}, nil } // CompleteUpload completes the upload @@ -254,7 +261,8 @@ func (h *Handler) ListParts(ctx context.Context, upload events.StreamUpload) ([] return nil } parts = append(parts, events.StreamPart{ - Number: part, + Number: part, + LastModified: info.ModTime(), }) return nil }) diff --git a/lib/events/gcssessions/gcsstream.go b/lib/events/gcssessions/gcsstream.go index f18487fed85e9..f51a5df111b22 100644 --- a/lib/events/gcssessions/gcsstream.go +++ b/lib/events/gcssessions/gcsstream.go @@ -99,7 +99,7 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa if err != nil { return nil, convertGCSError(err) } - return &events.StreamPart{Number: partNumber}, nil + return &events.StreamPart{Number: partNumber, LastModified: writer.Attrs().Created}, nil } // CompleteUpload completes the upload @@ -249,6 +249,7 @@ func (h *Handler) ListParts(ctx context.Context, upload events.StreamUpload) ([] if err != nil { return nil, trace.Wrap(err) } + part.LastModified = attrs.Updated parts = append(parts, *part) } return parts, nil diff --git a/lib/events/s3sessions/s3stream.go b/lib/events/s3sessions/s3stream.go index c855ca564180f..ec04d7ae9d761 100644 --- a/lib/events/s3sessions/s3stream.go +++ b/lib/events/s3sessions/s3stream.go @@ -103,9 +103,18 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa return nil, trace.Wrap(awsutils.ConvertS3Error(err), "UploadPart(upload %v) part(%v) session(%v)", upload.ID, partNumber, upload.SessionID) } - + // TODO(espadolini): the AWS SDK v1 doesn't expose the Date of the response + // in [s3.UploadPartOutput] so we use the current time instead; AWS SDK v2 + // might expose the returned Date as part of the metadata, so we should + // check if that matches the actual LastModified of the part. It doesn't + // make much sense to do an additional request to check the LastModified of + // the part we just uploaded, however. log.Infof("Uploaded part %v in %v", partNumber, time.Since(start)) - return &events.StreamPart{ETag: aws.StringValue(resp.ETag), Number: partNumber}, nil + return &events.StreamPart{ + ETag: aws.StringValue(resp.ETag), + Number: partNumber, + LastModified: time.Now(), + }, nil } func (h *Handler) abortUpload(ctx context.Context, upload events.StreamUpload) error { @@ -205,10 +214,10 @@ func (h *Handler) ListParts(ctx context.Context, upload events.StreamUpload) ([] return nil, awsutils.ConvertS3Error(err) } for _, part := range re.Parts { - parts = append(parts, events.StreamPart{ - Number: aws.Int64Value(part.PartNumber), - ETag: aws.StringValue(part.ETag), + Number: aws.Int64Value(part.PartNumber), + ETag: aws.StringValue(part.ETag), + LastModified: aws.TimeValue(part.LastModified), }) } if !aws.BoolValue(re.IsTruncated) { diff --git a/lib/events/stream_test.go b/lib/events/stream_test.go index 3371b5c56ca19..6b1dab52e6575 100644 --- a/lib/events/stream_test.go +++ b/lib/events/stream_test.go @@ -134,8 +134,12 @@ func TestNewStreamErrors(t *testing.T) { expectedErr error }{ { - desc: "ListPartsError", - uploader: &eventstest.MockUploader{ListPartsError: expectedErr}, + desc: "ListPartsError", + uploader: &eventstest.MockUploader{ + MockListParts: func(ctx context.Context, upload events.StreamUpload) ([]events.StreamPart, error) { + return nil, expectedErr + }, + }, }, { desc: "ReserveUploadPartError",