Skip to content

Commit

Permalink
Complete empty uploads
Browse files Browse the repository at this point in the history
The upload completer scans for uploads that need to be completed,
likely due to an error or process restart. Prior to this change,
it only completed uploads that had 1 or more parts. Since completing
an upload is what cleans up the directory on disk (or in the case of
cloud storage, finishes the multipart upload), it was possible
for us to leave behind empty directories (or multipart uploads)
for uploads with no parts.

This change makes it valid to complete uploads with no parts, which
ensures that these directories get cleaned up.

Also fix an issue with the GCS uploader, which failed to properly calculate
the upload ID from the path. This is because strings.Split(s, "/") returns an empty
string as the last element when s ends with a /.

Updates #9646
  • Loading branch information
zmb3 committed Mar 4, 2022
1 parent 5b3e5b0 commit eb4ee45
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 45 deletions.
36 changes: 25 additions & 11 deletions lib/events/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/gravitational/teleport"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types/events"
apievents "github.com/gravitational/teleport/api/types/events"
apiutils "github.com/gravitational/teleport/api/utils"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/utils"
Expand Down Expand Up @@ -137,28 +136,43 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error {
if err != nil {
return trace.Wrap(err)
}
if len(parts) == 0 {
continue
}
u.log.Debugf("Upload %v grace period is over. Trying to complete.", upload)

u.log.Debugf("Upload %v grace period is over. Trying to complete.", upload.ID)
if err := u.cfg.Uploader.CompleteUpload(ctx, upload, parts); err != nil {
return trace.Wrap(err)
}
u.log.Debugf("Completed upload %v.", upload)
completed++
uploadData := u.cfg.Uploader.GetUploadMetadata(upload.SessionID)
err = u.ensureSessionEndEvent(ctx, uploadData)
if err != nil {
return trace.Wrap(err)

if len(parts) == 0 {
continue
}

uploadData := u.cfg.Uploader.GetUploadMetadata(upload.SessionID)

// Schedule a background operation to check for (and emit) a session end event.
// This is necessary because we'll need to download the session in order to
// enumerate its events, and the S3 API takes a little while after the upload
// is completed before version metadata becomes available.
go func() {
select {
case <-ctx.Done():
return
case <-time.After(2 * time.Minute):
u.log.Debugf("checking for session end event for session %v", upload.SessionID)
if err := u.ensureSessionEndEvent(ctx, uploadData); err != nil {
u.log.WithError(err).Warningf("failed to ensure session end event")
}
}
}()
session := &events.SessionUpload{
Metadata: apievents.Metadata{
Metadata: events.Metadata{
Type: SessionUploadEvent,
Code: SessionUploadCode,
ID: uuid.New(),
Index: SessionUploadIndex,
},
SessionMetadata: apievents.SessionMetadata{
SessionMetadata: events.SessionMetadata{
SessionID: string(uploadData.SessionID),
},
SessionURL: uploadData.URL,
Expand Down
71 changes: 71 additions & 0 deletions lib/events/complete_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
Copyright 2022 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package events

import (
"context"
"testing"
"time"

apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/session"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"
)

// TestUploadCompleterCompletesEmptyUploads verifies that the upload completer
// completes uploads that have no parts. This ensures that we don't leave empty
// directories behind.
func TestUploadCompleterCompletesEmptyUploads(t *testing.T) {
clock := clockwork.NewFakeClock()
mu := NewMemoryUploader()
mu.Clock = clock

log := &MockAuditLog{}

uc, err := NewUploadCompleter(UploadCompleterConfig{
Unstarted: true,
Uploader: mu,
AuditLog: log,
GracePeriod: 2 * time.Hour,
})
require.NoError(t, err)

upload, err := mu.CreateUpload(context.Background(), session.NewID())
require.NoError(t, err)
clock.Advance(3 * time.Hour)

err = uc.CheckUploads(context.Background())
require.NoError(t, err)

require.True(t, mu.uploads[upload.ID].completed)
}

type MockAuditLog struct {
DiscardAuditLog

emitter MockEmitter
sessionEvents []EventFields
}

func (m *MockAuditLog) GetSessionEvents(namespace string, sid session.ID, after int, includePrintEvents bool) ([]EventFields, error) {
return m.sessionEvents, nil
}

func (m *MockAuditLog) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent) error {
return m.emitter.EmitAuditEvent(ctx, event)
}
3 changes: 0 additions & 3 deletions lib/events/filesessions/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa

// CompleteUpload completes the upload
func (h *Handler) CompleteUpload(ctx context.Context, upload events.StreamUpload, parts []events.StreamPart) error {
if len(parts) == 0 {
return trace.BadParameter("need at least one part to complete the upload")
}
if err := checkUpload(upload); err != nil {
return trace.Wrap(err)
}
Expand Down
7 changes: 6 additions & 1 deletion lib/events/gcssessions/gcsstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ func (h *Handler) CompleteUpload(ctx context.Context, upload events.StreamUpload
return convertGCSError(err)
}

// If there are no parts to complete, move to cleanup
if len(parts) == 0 {
return h.cleanupUpload(ctx, upload)
}

objects := h.partsToObjects(upload, parts)
for len(objects) > maxParts {
h.Logger.Debugf("Got %v objects for upload %v, performing temp merge.",
Expand Down Expand Up @@ -365,7 +370,7 @@ func uploadFromPath(path string) (*events.StreamUpload, error) {
if err := sessionID.Check(); err != nil {
return nil, trace.Wrap(err)
}
parts := strings.Split(dir, slash)
parts := strings.Split(strings.TrimSuffix(dir, slash), slash)
if len(parts) < 2 {
return nil, trace.BadParameter("expected format uploads/<upload-id>, got %v", dir)
}
Expand Down
34 changes: 34 additions & 0 deletions lib/events/gcssessions/gcsstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,40 @@ import (
"github.com/gravitational/trace"
)

func TestUploadFromPath(t *testing.T) {
for _, test := range []struct {
path string
sessionID, uploadID string
assertErr require.ErrorAssertionFunc
}{
{
path: "uploads/73de0358-2a40-4940-ae26-0c06877e35d9/cf9e08d5-6651-4ddd-a472-52d2286d6bb4.upload",
sessionID: "cf9e08d5-6651-4ddd-a472-52d2286d6bb4",
uploadID: "73de0358-2a40-4940-ae26-0c06877e35d9",
assertErr: require.NoError,
},
{
path: "uploads/73de0358-2a40-4940-ae26-0c06877e35d9/cf9e08d5-6651-4ddd-a472-52d2286d6bb4.BADEXTENSION",
assertErr: require.Error,
},
{
path: "no-dir.upload",
assertErr: require.Error,
},
} {
t.Run(test.path, func(t *testing.T) {
upload, err := uploadFromPath(test.path)
test.assertErr(t, err)
if test.sessionID != "" {
require.Equal(t, test.sessionID, string(upload.SessionID))
}
if test.uploadID != "" {
require.Equal(t, test.uploadID, upload.ID)
}
})
}
}

// TestStreams tests various streaming upload scenarios
func TestStreams(t *testing.T) {
ctx := context.Background()
Expand Down
17 changes: 17 additions & 0 deletions lib/events/s3sessions/s3stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,25 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa
return &events.StreamPart{ETag: *resp.ETag, Number: partNumber}, nil
}

func (h *Handler) abortUpload(ctx context.Context, upload events.StreamUpload) error {
req := &s3.AbortMultipartUploadInput{
Bucket: aws.String(h.Bucket),
Key: aws.String(h.path(upload.SessionID)),
UploadId: aws.String(upload.ID),
}
_, err := h.client.AbortMultipartUploadWithContext(ctx, req)
if err != nil {
return ConvertS3Error(err)
}
return nil
}

// CompleteUpload completes the upload
func (h *Handler) CompleteUpload(ctx context.Context, upload events.StreamUpload, parts []events.StreamPart) error {
if len(parts) == 0 {
return h.abortUpload(ctx, upload)
}

start := time.Now()
defer func() { h.Infof("UploadPart(%v) completed in %v.", upload.ID, time.Since(start)) }()

Expand Down
5 changes: 5 additions & 0 deletions lib/events/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,8 @@ type MemoryUploader struct {
uploads map[string]*MemoryUpload
objects map[session.ID][]byte
eventsC chan UploadEvent

Clock clockwork.Clock
}

// MemoryUpload is used in tests
Expand Down Expand Up @@ -1124,6 +1126,9 @@ func (m *MemoryUploader) CreateUpload(ctx context.Context, sessionID session.ID)
ID: uuid.New(),
SessionID: sessionID,
}
if m.Clock != nil {
upload.Initiated = m.Clock.Now()
}
m.uploads[upload.ID] = &MemoryUpload{
id: upload.ID,
sessionID: sessionID,
Expand Down
30 changes: 0 additions & 30 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2128,36 +2128,6 @@ func (process *TeleportProcess) initUploaderService(streamer events.Streamer, au
}
}

// DELETE IN (5.1.0)
// this uploader was superseded by filesessions.Uploader,
// see below
uploader, err := events.NewUploader(events.UploaderConfig{
DataDir: filepath.Join(process.Config.DataDir, teleport.LogsDir),
Namespace: apidefaults.Namespace,
ServerID: teleport.ComponentUpload,
AuditLog: auditLog,
EventsC: process.Config.UploadEventsC,
})
if err != nil {
return trace.Wrap(err)
}
process.RegisterFunc("uploader.service", func() error {
err := uploader.Serve()
if err != nil {
log.Errorf("Uploader server exited with error: %v.", err)
}
return nil
})

process.OnExit("uploader.shutdown", func(payload interface{}) {
log.Infof("Shutting down.")
warnOnErr(uploader.Stop(), log)
log.Infof("Exited.")
})

// This uploader supersedes the events.Uploader above,
// that is kept for backwards compatibility purposes for one release.
// Delete this comment once the uploader above is phased out.
fileUploader, err := filesessions.NewUploader(filesessions.UploaderConfig{
ScanDir: filepath.Join(streamingDir...),
Streamer: streamer,
Expand Down

0 comments on commit eb4ee45

Please sign in to comment.