diff --git a/lib/events/complete.go b/lib/events/complete.go index 3b8a2b050f9d1..bfbc818c85ad8 100644 --- a/lib/events/complete.go +++ b/lib/events/complete.go @@ -23,7 +23,6 @@ import ( "github.com/gravitational/teleport" apidefaults "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/types/events" - apievents "github.com/gravitational/teleport/api/types/events" apiutils "github.com/gravitational/teleport/api/utils" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/utils" @@ -137,28 +136,43 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error { if err != nil { return trace.Wrap(err) } - if len(parts) == 0 { - continue - } - u.log.Debugf("Upload %v grace period is over. Trying to complete.", upload) + + u.log.Debugf("Upload %v grace period is over. Trying to complete.", upload.ID) if err := u.cfg.Uploader.CompleteUpload(ctx, upload, parts); err != nil { return trace.Wrap(err) } u.log.Debugf("Completed upload %v.", upload) completed++ - uploadData := u.cfg.Uploader.GetUploadMetadata(upload.SessionID) - err = u.ensureSessionEndEvent(ctx, uploadData) - if err != nil { - return trace.Wrap(err) + + if len(parts) == 0 { + continue } + + uploadData := u.cfg.Uploader.GetUploadMetadata(upload.SessionID) + + // Schedule a background operation to check for (and emit) a session end event. + // This is necessary because we'll need to download the session in order to + // enumerate its events, and the S3 API takes a little while after the upload + // is completed before version metadata becomes available. + go func() { + select { + case <-ctx.Done(): + return + case <-time.After(2 * time.Minute): + u.log.Debugf("checking for session end event for session %v", upload.SessionID) + if err := u.ensureSessionEndEvent(ctx, uploadData); err != nil { + u.log.WithError(err).Warningf("failed to ensure session end event") + } + } + }() session := &events.SessionUpload{ - Metadata: apievents.Metadata{ + Metadata: events.Metadata{ Type: SessionUploadEvent, Code: SessionUploadCode, ID: uuid.New().String(), Index: SessionUploadIndex, }, - SessionMetadata: apievents.SessionMetadata{ + SessionMetadata: events.SessionMetadata{ SessionID: string(uploadData.SessionID), }, SessionURL: uploadData.URL, diff --git a/lib/events/complete_test.go b/lib/events/complete_test.go new file mode 100644 index 0000000000000..baa3151ac0f71 --- /dev/null +++ b/lib/events/complete_test.go @@ -0,0 +1,71 @@ +/* +Copyright 2022 Gravitational, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package events + +import ( + "context" + "testing" + "time" + + apievents "github.com/gravitational/teleport/api/types/events" + "github.com/gravitational/teleport/lib/session" + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/require" +) + +// TestUploadCompleterCompletesEmptyUploads verifies that the upload completer +// completes uploads that have no parts. This ensures that we don't leave empty +// directories behind. +func TestUploadCompleterCompletesEmptyUploads(t *testing.T) { + clock := clockwork.NewFakeClock() + mu := NewMemoryUploader() + mu.Clock = clock + + log := &MockAuditLog{} + + uc, err := NewUploadCompleter(UploadCompleterConfig{ + Unstarted: true, + Uploader: mu, + AuditLog: log, + GracePeriod: 2 * time.Hour, + }) + require.NoError(t, err) + + upload, err := mu.CreateUpload(context.Background(), session.NewID()) + require.NoError(t, err) + clock.Advance(3 * time.Hour) + + err = uc.CheckUploads(context.Background()) + require.NoError(t, err) + + require.True(t, mu.uploads[upload.ID].completed) +} + +type MockAuditLog struct { + DiscardAuditLog + + emitter MockEmitter + sessionEvents []EventFields +} + +func (m *MockAuditLog) GetSessionEvents(namespace string, sid session.ID, after int, includePrintEvents bool) ([]EventFields, error) { + return m.sessionEvents, nil +} + +func (m *MockAuditLog) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent) error { + return m.emitter.EmitAuditEvent(ctx, event) +} diff --git a/lib/events/filesessions/filestream.go b/lib/events/filesessions/filestream.go index c7415c866b702..29669e57a6bed 100644 --- a/lib/events/filesessions/filestream.go +++ b/lib/events/filesessions/filestream.go @@ -96,9 +96,6 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa // CompleteUpload completes the upload func (h *Handler) CompleteUpload(ctx context.Context, upload events.StreamUpload, parts []events.StreamPart) error { - if len(parts) == 0 { - return trace.BadParameter("need at least one part to complete the upload") - } if err := checkUpload(upload); err != nil { return trace.Wrap(err) } diff --git a/lib/events/gcssessions/gcsstream.go b/lib/events/gcssessions/gcsstream.go index 4663841b1eb9b..b1ef99f57892c 100644 --- a/lib/events/gcssessions/gcsstream.go +++ b/lib/events/gcssessions/gcsstream.go @@ -125,6 +125,11 @@ func (h *Handler) CompleteUpload(ctx context.Context, upload events.StreamUpload return convertGCSError(err) } + // If there are no parts to complete, move to cleanup + if len(parts) == 0 { + return h.cleanupUpload(ctx, upload) + } + objects := h.partsToObjects(upload, parts) for len(objects) > maxParts { h.Logger.Debugf("Got %v objects for upload %v, performing temp merge.", @@ -365,7 +370,7 @@ func uploadFromPath(path string) (*events.StreamUpload, error) { if err := sessionID.Check(); err != nil { return nil, trace.Wrap(err) } - parts := strings.Split(dir, slash) + parts := strings.Split(strings.TrimSuffix(dir, slash), slash) if len(parts) < 2 { return nil, trace.BadParameter("expected format uploads/, got %v", dir) } diff --git a/lib/events/gcssessions/gcsstream_test.go b/lib/events/gcssessions/gcsstream_test.go index cb578ae5d4b2b..7fb63ec05af59 100644 --- a/lib/events/gcssessions/gcsstream_test.go +++ b/lib/events/gcssessions/gcsstream_test.go @@ -34,6 +34,40 @@ import ( "github.com/gravitational/trace" ) +func TestUploadFromPath(t *testing.T) { + for _, test := range []struct { + path string + sessionID, uploadID string + assertErr require.ErrorAssertionFunc + }{ + { + path: "uploads/73de0358-2a40-4940-ae26-0c06877e35d9/cf9e08d5-6651-4ddd-a472-52d2286d6bb4.upload", + sessionID: "cf9e08d5-6651-4ddd-a472-52d2286d6bb4", + uploadID: "73de0358-2a40-4940-ae26-0c06877e35d9", + assertErr: require.NoError, + }, + { + path: "uploads/73de0358-2a40-4940-ae26-0c06877e35d9/cf9e08d5-6651-4ddd-a472-52d2286d6bb4.BADEXTENSION", + assertErr: require.Error, + }, + { + path: "no-dir.upload", + assertErr: require.Error, + }, + } { + t.Run(test.path, func(t *testing.T) { + upload, err := uploadFromPath(test.path) + test.assertErr(t, err) + if test.sessionID != "" { + require.Equal(t, test.sessionID, string(upload.SessionID)) + } + if test.uploadID != "" { + require.Equal(t, test.uploadID, upload.ID) + } + }) + } +} + // TestStreams tests various streaming upload scenarios func TestStreams(t *testing.T) { ctx := context.Background() diff --git a/lib/events/s3sessions/s3stream.go b/lib/events/s3sessions/s3stream.go index 201723c01b529..bef290faff5b0 100644 --- a/lib/events/s3sessions/s3stream.go +++ b/lib/events/s3sessions/s3stream.go @@ -91,8 +91,25 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa return &events.StreamPart{ETag: *resp.ETag, Number: partNumber}, nil } +func (h *Handler) abortUpload(ctx context.Context, upload events.StreamUpload) error { + req := &s3.AbortMultipartUploadInput{ + Bucket: aws.String(h.Bucket), + Key: aws.String(h.path(upload.SessionID)), + UploadId: aws.String(upload.ID), + } + _, err := h.client.AbortMultipartUploadWithContext(ctx, req) + if err != nil { + return ConvertS3Error(err) + } + return nil +} + // CompleteUpload completes the upload func (h *Handler) CompleteUpload(ctx context.Context, upload events.StreamUpload, parts []events.StreamPart) error { + if len(parts) == 0 { + return h.abortUpload(ctx, upload) + } + start := time.Now() defer func() { h.Infof("UploadPart(%v) completed in %v.", upload.ID, time.Since(start)) }() diff --git a/lib/events/stream.go b/lib/events/stream.go index c00e31c1d4fa0..a33a192eed37f 100644 --- a/lib/events/stream.go +++ b/lib/events/stream.go @@ -1074,6 +1074,8 @@ type MemoryUploader struct { uploads map[string]*MemoryUpload objects map[session.ID][]byte eventsC chan UploadEvent + + Clock clockwork.Clock } // MemoryUpload is used in tests @@ -1114,6 +1116,9 @@ func (m *MemoryUploader) CreateUpload(ctx context.Context, sessionID session.ID) ID: uuid.New().String(), SessionID: sessionID, } + if m.Clock != nil { + upload.Initiated = m.Clock.Now() + } m.uploads[upload.ID] = &MemoryUpload{ id: upload.ID, sessionID: sessionID, diff --git a/lib/service/service.go b/lib/service/service.go index bfa5d33818800..7f952a828cd6b 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -2130,36 +2130,6 @@ func (process *TeleportProcess) initUploaderService(streamer events.Streamer, au } } - // DELETE IN (5.1.0) - // this uploader was superseded by filesessions.Uploader, - // see below - uploader, err := events.NewUploader(events.UploaderConfig{ - DataDir: filepath.Join(process.Config.DataDir, teleport.LogsDir), - Namespace: apidefaults.Namespace, - ServerID: teleport.ComponentUpload, - AuditLog: auditLog, - EventsC: process.Config.UploadEventsC, - }) - if err != nil { - return trace.Wrap(err) - } - process.RegisterFunc("uploader.service", func() error { - err := uploader.Serve() - if err != nil { - log.Errorf("Uploader server exited with error: %v.", err) - } - return nil - }) - - process.OnExit("uploader.shutdown", func(payload interface{}) { - log.Infof("Shutting down.") - warnOnErr(uploader.Stop(), log) - log.Infof("Exited.") - }) - - // This uploader supersedes the events.Uploader above, - // that is kept for backwards compatibility purposes for one release. - // Delete this comment once the uploader above is phased out. fileUploader, err := filesessions.NewUploader(filesessions.UploaderConfig{ ScanDir: filepath.Join(streamingDir...), Streamer: streamer,