From 8fb8cb76d35cf183495183a77f020a5c55e21457 Mon Sep 17 00:00:00 2001 From: Zac Bergquist Date: Wed, 2 Mar 2022 14:44:43 -0700 Subject: [PATCH] Complete empty uploads The upload completer scans for uploads that need to be completed, likely due to an error or process restart. Prior to this change, it only completed uploads that had 1 or more parts. Since completing an upload is what cleans up the directory on disk (or in the case of cloud storage, finishes the multipart upload), it was possible for us to leave behind empty directories (or multipart uploads) for uploads with no parts. This change makes it valid to complete uploads with no parts, which ensures that these directories get cleaned up. Also fix an issue with the GCS uploader, which failed to properly calculate the upload ID from the path. This is because strings.Split(s, "/") returns an empty string as the last element when s ends with a /. Updates #9646 --- lib/events/complete.go | 36 ++++++++---- lib/events/complete_test.go | 71 ++++++++++++++++++++++++ lib/events/filesessions/filestream.go | 3 - lib/events/gcssessions/gcsstream.go | 7 ++- lib/events/gcssessions/gcsstream_test.go | 34 ++++++++++++ lib/events/s3sessions/s3stream.go | 17 ++++++ lib/events/stream.go | 5 ++ lib/service/service.go | 30 ---------- 8 files changed, 158 insertions(+), 45 deletions(-) create mode 100644 lib/events/complete_test.go diff --git a/lib/events/complete.go b/lib/events/complete.go index 3b8a2b050f9d1..bfbc818c85ad8 100644 --- a/lib/events/complete.go +++ b/lib/events/complete.go @@ -23,7 +23,6 @@ import ( "github.com/gravitational/teleport" apidefaults "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/types/events" - apievents "github.com/gravitational/teleport/api/types/events" apiutils "github.com/gravitational/teleport/api/utils" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/utils" @@ -137,28 +136,43 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error { if err != nil { return trace.Wrap(err) } - if len(parts) == 0 { - continue - } - u.log.Debugf("Upload %v grace period is over. Trying to complete.", upload) + + u.log.Debugf("Upload %v grace period is over. Trying to complete.", upload.ID) if err := u.cfg.Uploader.CompleteUpload(ctx, upload, parts); err != nil { return trace.Wrap(err) } u.log.Debugf("Completed upload %v.", upload) completed++ - uploadData := u.cfg.Uploader.GetUploadMetadata(upload.SessionID) - err = u.ensureSessionEndEvent(ctx, uploadData) - if err != nil { - return trace.Wrap(err) + + if len(parts) == 0 { + continue } + + uploadData := u.cfg.Uploader.GetUploadMetadata(upload.SessionID) + + // Schedule a background operation to check for (and emit) a session end event. + // This is necessary because we'll need to download the session in order to + // enumerate its events, and the S3 API takes a little while after the upload + // is completed before version metadata becomes available. + go func() { + select { + case <-ctx.Done(): + return + case <-time.After(2 * time.Minute): + u.log.Debugf("checking for session end event for session %v", upload.SessionID) + if err := u.ensureSessionEndEvent(ctx, uploadData); err != nil { + u.log.WithError(err).Warningf("failed to ensure session end event") + } + } + }() session := &events.SessionUpload{ - Metadata: apievents.Metadata{ + Metadata: events.Metadata{ Type: SessionUploadEvent, Code: SessionUploadCode, ID: uuid.New().String(), Index: SessionUploadIndex, }, - SessionMetadata: apievents.SessionMetadata{ + SessionMetadata: events.SessionMetadata{ SessionID: string(uploadData.SessionID), }, SessionURL: uploadData.URL, diff --git a/lib/events/complete_test.go b/lib/events/complete_test.go new file mode 100644 index 0000000000000..baa3151ac0f71 --- /dev/null +++ b/lib/events/complete_test.go @@ -0,0 +1,71 @@ +/* +Copyright 2022 Gravitational, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package events + +import ( + "context" + "testing" + "time" + + apievents "github.com/gravitational/teleport/api/types/events" + "github.com/gravitational/teleport/lib/session" + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/require" +) + +// TestUploadCompleterCompletesEmptyUploads verifies that the upload completer +// completes uploads that have no parts. This ensures that we don't leave empty +// directories behind. +func TestUploadCompleterCompletesEmptyUploads(t *testing.T) { + clock := clockwork.NewFakeClock() + mu := NewMemoryUploader() + mu.Clock = clock + + log := &MockAuditLog{} + + uc, err := NewUploadCompleter(UploadCompleterConfig{ + Unstarted: true, + Uploader: mu, + AuditLog: log, + GracePeriod: 2 * time.Hour, + }) + require.NoError(t, err) + + upload, err := mu.CreateUpload(context.Background(), session.NewID()) + require.NoError(t, err) + clock.Advance(3 * time.Hour) + + err = uc.CheckUploads(context.Background()) + require.NoError(t, err) + + require.True(t, mu.uploads[upload.ID].completed) +} + +type MockAuditLog struct { + DiscardAuditLog + + emitter MockEmitter + sessionEvents []EventFields +} + +func (m *MockAuditLog) GetSessionEvents(namespace string, sid session.ID, after int, includePrintEvents bool) ([]EventFields, error) { + return m.sessionEvents, nil +} + +func (m *MockAuditLog) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent) error { + return m.emitter.EmitAuditEvent(ctx, event) +} diff --git a/lib/events/filesessions/filestream.go b/lib/events/filesessions/filestream.go index c7415c866b702..29669e57a6bed 100644 --- a/lib/events/filesessions/filestream.go +++ b/lib/events/filesessions/filestream.go @@ -96,9 +96,6 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa // CompleteUpload completes the upload func (h *Handler) CompleteUpload(ctx context.Context, upload events.StreamUpload, parts []events.StreamPart) error { - if len(parts) == 0 { - return trace.BadParameter("need at least one part to complete the upload") - } if err := checkUpload(upload); err != nil { return trace.Wrap(err) } diff --git a/lib/events/gcssessions/gcsstream.go b/lib/events/gcssessions/gcsstream.go index 4663841b1eb9b..b1ef99f57892c 100644 --- a/lib/events/gcssessions/gcsstream.go +++ b/lib/events/gcssessions/gcsstream.go @@ -125,6 +125,11 @@ func (h *Handler) CompleteUpload(ctx context.Context, upload events.StreamUpload return convertGCSError(err) } + // If there are no parts to complete, move to cleanup + if len(parts) == 0 { + return h.cleanupUpload(ctx, upload) + } + objects := h.partsToObjects(upload, parts) for len(objects) > maxParts { h.Logger.Debugf("Got %v objects for upload %v, performing temp merge.", @@ -365,7 +370,7 @@ func uploadFromPath(path string) (*events.StreamUpload, error) { if err := sessionID.Check(); err != nil { return nil, trace.Wrap(err) } - parts := strings.Split(dir, slash) + parts := strings.Split(strings.TrimSuffix(dir, slash), slash) if len(parts) < 2 { return nil, trace.BadParameter("expected format uploads/, got %v", dir) } diff --git a/lib/events/gcssessions/gcsstream_test.go b/lib/events/gcssessions/gcsstream_test.go index cb578ae5d4b2b..7fb63ec05af59 100644 --- a/lib/events/gcssessions/gcsstream_test.go +++ b/lib/events/gcssessions/gcsstream_test.go @@ -34,6 +34,40 @@ import ( "github.com/gravitational/trace" ) +func TestUploadFromPath(t *testing.T) { + for _, test := range []struct { + path string + sessionID, uploadID string + assertErr require.ErrorAssertionFunc + }{ + { + path: "uploads/73de0358-2a40-4940-ae26-0c06877e35d9/cf9e08d5-6651-4ddd-a472-52d2286d6bb4.upload", + sessionID: "cf9e08d5-6651-4ddd-a472-52d2286d6bb4", + uploadID: "73de0358-2a40-4940-ae26-0c06877e35d9", + assertErr: require.NoError, + }, + { + path: "uploads/73de0358-2a40-4940-ae26-0c06877e35d9/cf9e08d5-6651-4ddd-a472-52d2286d6bb4.BADEXTENSION", + assertErr: require.Error, + }, + { + path: "no-dir.upload", + assertErr: require.Error, + }, + } { + t.Run(test.path, func(t *testing.T) { + upload, err := uploadFromPath(test.path) + test.assertErr(t, err) + if test.sessionID != "" { + require.Equal(t, test.sessionID, string(upload.SessionID)) + } + if test.uploadID != "" { + require.Equal(t, test.uploadID, upload.ID) + } + }) + } +} + // TestStreams tests various streaming upload scenarios func TestStreams(t *testing.T) { ctx := context.Background() diff --git a/lib/events/s3sessions/s3stream.go b/lib/events/s3sessions/s3stream.go index 201723c01b529..bef290faff5b0 100644 --- a/lib/events/s3sessions/s3stream.go +++ b/lib/events/s3sessions/s3stream.go @@ -91,8 +91,25 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa return &events.StreamPart{ETag: *resp.ETag, Number: partNumber}, nil } +func (h *Handler) abortUpload(ctx context.Context, upload events.StreamUpload) error { + req := &s3.AbortMultipartUploadInput{ + Bucket: aws.String(h.Bucket), + Key: aws.String(h.path(upload.SessionID)), + UploadId: aws.String(upload.ID), + } + _, err := h.client.AbortMultipartUploadWithContext(ctx, req) + if err != nil { + return ConvertS3Error(err) + } + return nil +} + // CompleteUpload completes the upload func (h *Handler) CompleteUpload(ctx context.Context, upload events.StreamUpload, parts []events.StreamPart) error { + if len(parts) == 0 { + return h.abortUpload(ctx, upload) + } + start := time.Now() defer func() { h.Infof("UploadPart(%v) completed in %v.", upload.ID, time.Since(start)) }() diff --git a/lib/events/stream.go b/lib/events/stream.go index c00e31c1d4fa0..a33a192eed37f 100644 --- a/lib/events/stream.go +++ b/lib/events/stream.go @@ -1074,6 +1074,8 @@ type MemoryUploader struct { uploads map[string]*MemoryUpload objects map[session.ID][]byte eventsC chan UploadEvent + + Clock clockwork.Clock } // MemoryUpload is used in tests @@ -1114,6 +1116,9 @@ func (m *MemoryUploader) CreateUpload(ctx context.Context, sessionID session.ID) ID: uuid.New().String(), SessionID: sessionID, } + if m.Clock != nil { + upload.Initiated = m.Clock.Now() + } m.uploads[upload.ID] = &MemoryUpload{ id: upload.ID, sessionID: sessionID, diff --git a/lib/service/service.go b/lib/service/service.go index bfa5d33818800..7f952a828cd6b 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -2130,36 +2130,6 @@ func (process *TeleportProcess) initUploaderService(streamer events.Streamer, au } } - // DELETE IN (5.1.0) - // this uploader was superseded by filesessions.Uploader, - // see below - uploader, err := events.NewUploader(events.UploaderConfig{ - DataDir: filepath.Join(process.Config.DataDir, teleport.LogsDir), - Namespace: apidefaults.Namespace, - ServerID: teleport.ComponentUpload, - AuditLog: auditLog, - EventsC: process.Config.UploadEventsC, - }) - if err != nil { - return trace.Wrap(err) - } - process.RegisterFunc("uploader.service", func() error { - err := uploader.Serve() - if err != nil { - log.Errorf("Uploader server exited with error: %v.", err) - } - return nil - }) - - process.OnExit("uploader.shutdown", func(payload interface{}) { - log.Infof("Shutting down.") - warnOnErr(uploader.Stop(), log) - log.Infof("Exited.") - }) - - // This uploader supersedes the events.Uploader above, - // that is kept for backwards compatibility purposes for one release. - // Delete this comment once the uploader above is phased out. fileUploader, err := filesessions.NewUploader(filesessions.UploaderConfig{ ScanDir: filepath.Join(streamingDir...), Streamer: streamer,