From eb4ee454508f96f3a76bb020c13e956c5b16a77a Mon Sep 17 00:00:00 2001 From: Zac Bergquist Date: Wed, 2 Mar 2022 14:44:43 -0700 Subject: [PATCH] Complete empty uploads The upload completer scans for uploads that need to be completed, likely due to an error or process restart. Prior to this change, it only completed uploads that had 1 or more parts. Since completing an upload is what cleans up the directory on disk (or in the case of cloud storage, finishes the multipart upload), it was possible for us to leave behind empty directories (or multipart uploads) for uploads with no parts. This change makes it valid to complete uploads with no parts, which ensures that these directories get cleaned up. Also fix an issue with the GCS uploader, which failed to properly calculate the upload ID from the path. This is because strings.Split(s, "/") returns an empty string as the last element when s ends with a /. Updates #9646 --- lib/events/complete.go | 36 ++++++++---- lib/events/complete_test.go | 71 ++++++++++++++++++++++++ lib/events/filesessions/filestream.go | 3 - lib/events/gcssessions/gcsstream.go | 7 ++- lib/events/gcssessions/gcsstream_test.go | 34 ++++++++++++ lib/events/s3sessions/s3stream.go | 17 ++++++ lib/events/stream.go | 5 ++ lib/service/service.go | 30 ---------- 8 files changed, 158 insertions(+), 45 deletions(-) create mode 100644 lib/events/complete_test.go diff --git a/lib/events/complete.go b/lib/events/complete.go index b7ab99c3a8844..ddd1c0e9d9a43 100644 --- a/lib/events/complete.go +++ b/lib/events/complete.go @@ -23,7 +23,6 @@ import ( "github.com/gravitational/teleport" apidefaults "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/types/events" - apievents "github.com/gravitational/teleport/api/types/events" apiutils "github.com/gravitational/teleport/api/utils" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/utils" @@ -137,28 +136,43 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error { if err != nil { return trace.Wrap(err) } - if len(parts) == 0 { - continue - } - u.log.Debugf("Upload %v grace period is over. Trying to complete.", upload) + + u.log.Debugf("Upload %v grace period is over. Trying to complete.", upload.ID) if err := u.cfg.Uploader.CompleteUpload(ctx, upload, parts); err != nil { return trace.Wrap(err) } u.log.Debugf("Completed upload %v.", upload) completed++ - uploadData := u.cfg.Uploader.GetUploadMetadata(upload.SessionID) - err = u.ensureSessionEndEvent(ctx, uploadData) - if err != nil { - return trace.Wrap(err) + + if len(parts) == 0 { + continue } + + uploadData := u.cfg.Uploader.GetUploadMetadata(upload.SessionID) + + // Schedule a background operation to check for (and emit) a session end event. + // This is necessary because we'll need to download the session in order to + // enumerate its events, and the S3 API takes a little while after the upload + // is completed before version metadata becomes available. + go func() { + select { + case <-ctx.Done(): + return + case <-time.After(2 * time.Minute): + u.log.Debugf("checking for session end event for session %v", upload.SessionID) + if err := u.ensureSessionEndEvent(ctx, uploadData); err != nil { + u.log.WithError(err).Warningf("failed to ensure session end event") + } + } + }() session := &events.SessionUpload{ - Metadata: apievents.Metadata{ + Metadata: events.Metadata{ Type: SessionUploadEvent, Code: SessionUploadCode, ID: uuid.New(), Index: SessionUploadIndex, }, - SessionMetadata: apievents.SessionMetadata{ + SessionMetadata: events.SessionMetadata{ SessionID: string(uploadData.SessionID), }, SessionURL: uploadData.URL, diff --git a/lib/events/complete_test.go b/lib/events/complete_test.go new file mode 100644 index 0000000000000..baa3151ac0f71 --- /dev/null +++ b/lib/events/complete_test.go @@ -0,0 +1,71 @@ +/* +Copyright 2022 Gravitational, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package events + +import ( + "context" + "testing" + "time" + + apievents "github.com/gravitational/teleport/api/types/events" + "github.com/gravitational/teleport/lib/session" + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/require" +) + +// TestUploadCompleterCompletesEmptyUploads verifies that the upload completer +// completes uploads that have no parts. This ensures that we don't leave empty +// directories behind. +func TestUploadCompleterCompletesEmptyUploads(t *testing.T) { + clock := clockwork.NewFakeClock() + mu := NewMemoryUploader() + mu.Clock = clock + + log := &MockAuditLog{} + + uc, err := NewUploadCompleter(UploadCompleterConfig{ + Unstarted: true, + Uploader: mu, + AuditLog: log, + GracePeriod: 2 * time.Hour, + }) + require.NoError(t, err) + + upload, err := mu.CreateUpload(context.Background(), session.NewID()) + require.NoError(t, err) + clock.Advance(3 * time.Hour) + + err = uc.CheckUploads(context.Background()) + require.NoError(t, err) + + require.True(t, mu.uploads[upload.ID].completed) +} + +type MockAuditLog struct { + DiscardAuditLog + + emitter MockEmitter + sessionEvents []EventFields +} + +func (m *MockAuditLog) GetSessionEvents(namespace string, sid session.ID, after int, includePrintEvents bool) ([]EventFields, error) { + return m.sessionEvents, nil +} + +func (m *MockAuditLog) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent) error { + return m.emitter.EmitAuditEvent(ctx, event) +} diff --git a/lib/events/filesessions/filestream.go b/lib/events/filesessions/filestream.go index beacdfd5a8499..bbfb3d746c82a 100644 --- a/lib/events/filesessions/filestream.go +++ b/lib/events/filesessions/filestream.go @@ -96,9 +96,6 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa // CompleteUpload completes the upload func (h *Handler) CompleteUpload(ctx context.Context, upload events.StreamUpload, parts []events.StreamPart) error { - if len(parts) == 0 { - return trace.BadParameter("need at least one part to complete the upload") - } if err := checkUpload(upload); err != nil { return trace.Wrap(err) } diff --git a/lib/events/gcssessions/gcsstream.go b/lib/events/gcssessions/gcsstream.go index 40c0633d00cf0..7506bef8a1584 100644 --- a/lib/events/gcssessions/gcsstream.go +++ b/lib/events/gcssessions/gcsstream.go @@ -125,6 +125,11 @@ func (h *Handler) CompleteUpload(ctx context.Context, upload events.StreamUpload return convertGCSError(err) } + // If there are no parts to complete, move to cleanup + if len(parts) == 0 { + return h.cleanupUpload(ctx, upload) + } + objects := h.partsToObjects(upload, parts) for len(objects) > maxParts { h.Logger.Debugf("Got %v objects for upload %v, performing temp merge.", @@ -365,7 +370,7 @@ func uploadFromPath(path string) (*events.StreamUpload, error) { if err := sessionID.Check(); err != nil { return nil, trace.Wrap(err) } - parts := strings.Split(dir, slash) + parts := strings.Split(strings.TrimSuffix(dir, slash), slash) if len(parts) < 2 { return nil, trace.BadParameter("expected format uploads/, got %v", dir) } diff --git a/lib/events/gcssessions/gcsstream_test.go b/lib/events/gcssessions/gcsstream_test.go index cb578ae5d4b2b..7fb63ec05af59 100644 --- a/lib/events/gcssessions/gcsstream_test.go +++ b/lib/events/gcssessions/gcsstream_test.go @@ -34,6 +34,40 @@ import ( "github.com/gravitational/trace" ) +func TestUploadFromPath(t *testing.T) { + for _, test := range []struct { + path string + sessionID, uploadID string + assertErr require.ErrorAssertionFunc + }{ + { + path: "uploads/73de0358-2a40-4940-ae26-0c06877e35d9/cf9e08d5-6651-4ddd-a472-52d2286d6bb4.upload", + sessionID: "cf9e08d5-6651-4ddd-a472-52d2286d6bb4", + uploadID: "73de0358-2a40-4940-ae26-0c06877e35d9", + assertErr: require.NoError, + }, + { + path: "uploads/73de0358-2a40-4940-ae26-0c06877e35d9/cf9e08d5-6651-4ddd-a472-52d2286d6bb4.BADEXTENSION", + assertErr: require.Error, + }, + { + path: "no-dir.upload", + assertErr: require.Error, + }, + } { + t.Run(test.path, func(t *testing.T) { + upload, err := uploadFromPath(test.path) + test.assertErr(t, err) + if test.sessionID != "" { + require.Equal(t, test.sessionID, string(upload.SessionID)) + } + if test.uploadID != "" { + require.Equal(t, test.uploadID, upload.ID) + } + }) + } +} + // TestStreams tests various streaming upload scenarios func TestStreams(t *testing.T) { ctx := context.Background() diff --git a/lib/events/s3sessions/s3stream.go b/lib/events/s3sessions/s3stream.go index 201723c01b529..bef290faff5b0 100644 --- a/lib/events/s3sessions/s3stream.go +++ b/lib/events/s3sessions/s3stream.go @@ -91,8 +91,25 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa return &events.StreamPart{ETag: *resp.ETag, Number: partNumber}, nil } +func (h *Handler) abortUpload(ctx context.Context, upload events.StreamUpload) error { + req := &s3.AbortMultipartUploadInput{ + Bucket: aws.String(h.Bucket), + Key: aws.String(h.path(upload.SessionID)), + UploadId: aws.String(upload.ID), + } + _, err := h.client.AbortMultipartUploadWithContext(ctx, req) + if err != nil { + return ConvertS3Error(err) + } + return nil +} + // CompleteUpload completes the upload func (h *Handler) CompleteUpload(ctx context.Context, upload events.StreamUpload, parts []events.StreamPart) error { + if len(parts) == 0 { + return h.abortUpload(ctx, upload) + } + start := time.Now() defer func() { h.Infof("UploadPart(%v) completed in %v.", upload.ID, time.Since(start)) }() diff --git a/lib/events/stream.go b/lib/events/stream.go index b2666ff3421a1..1cfac7a156471 100644 --- a/lib/events/stream.go +++ b/lib/events/stream.go @@ -1084,6 +1084,8 @@ type MemoryUploader struct { uploads map[string]*MemoryUpload objects map[session.ID][]byte eventsC chan UploadEvent + + Clock clockwork.Clock } // MemoryUpload is used in tests @@ -1124,6 +1126,9 @@ func (m *MemoryUploader) CreateUpload(ctx context.Context, sessionID session.ID) ID: uuid.New(), SessionID: sessionID, } + if m.Clock != nil { + upload.Initiated = m.Clock.Now() + } m.uploads[upload.ID] = &MemoryUpload{ id: upload.ID, sessionID: sessionID, diff --git a/lib/service/service.go b/lib/service/service.go index bb46f140a3f7a..b657bc901a59c 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -2128,36 +2128,6 @@ func (process *TeleportProcess) initUploaderService(streamer events.Streamer, au } } - // DELETE IN (5.1.0) - // this uploader was superseded by filesessions.Uploader, - // see below - uploader, err := events.NewUploader(events.UploaderConfig{ - DataDir: filepath.Join(process.Config.DataDir, teleport.LogsDir), - Namespace: apidefaults.Namespace, - ServerID: teleport.ComponentUpload, - AuditLog: auditLog, - EventsC: process.Config.UploadEventsC, - }) - if err != nil { - return trace.Wrap(err) - } - process.RegisterFunc("uploader.service", func() error { - err := uploader.Serve() - if err != nil { - log.Errorf("Uploader server exited with error: %v.", err) - } - return nil - }) - - process.OnExit("uploader.shutdown", func(payload interface{}) { - log.Infof("Shutting down.") - warnOnErr(uploader.Stop(), log) - log.Infof("Exited.") - }) - - // This uploader supersedes the events.Uploader above, - // that is kept for backwards compatibility purposes for one release. - // Delete this comment once the uploader above is phased out. fileUploader, err := filesessions.NewUploader(filesessions.UploaderConfig{ ScanDir: filepath.Join(streamingDir...), Streamer: streamer,