Skip to content

Commit

Permalink
Complete empty uploads
Browse files Browse the repository at this point in the history
The upload completer scans for uploads that need to be completed,
likely due to an error or process restart. Prior to this change,
it only completed uploads that had 1 or more parts. Since completing
an upload is what cleans up the directory on disk, it was possible
for us to leave behind empty directories for uploads with no parts.

This change makes it valid to complete uploads with no parts, which
ensures that these directories get cleaned up.

In addition, the upload completer checks the upload for a session.end
event, and emits one if there is not already an end event present.
This logic only handled session.end, and not
windows.desktop.session.end, so that has been updated as well.

Updates #9646
  • Loading branch information
zmb3 committed Feb 25, 2022
1 parent 46bcaf5 commit 04cd3dc
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 37 deletions.
94 changes: 60 additions & 34 deletions lib/events/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/gravitational/teleport"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types/events"
apievents "github.com/gravitational/teleport/api/types/events"
apiutils "github.com/gravitational/teleport/api/utils"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/utils"
Expand Down Expand Up @@ -137,28 +136,31 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error {
if err != nil {
return trace.Wrap(err)
}
if len(parts) == 0 {
continue
}

u.log.Debugf("Upload %v grace period is over. Trying to complete.", upload)
if err := u.cfg.Uploader.CompleteUpload(ctx, upload, parts); err != nil {
return trace.Wrap(err)
}
u.log.Debugf("Completed upload %v.", upload)
completed++

if len(parts) == 0 {
continue
}

uploadData := u.cfg.Uploader.GetUploadMetadata(upload.SessionID)
err = u.ensureSessionEndEvent(ctx, uploadData)
if err != nil {
return trace.Wrap(err)
}
session := &events.SessionUpload{
Metadata: apievents.Metadata{
Metadata: events.Metadata{
Type: SessionUploadEvent,
Code: SessionUploadCode,
ID: uuid.New().String(),
Index: SessionUploadIndex,
},
SessionMetadata: apievents.SessionMetadata{
SessionMetadata: events.SessionMetadata{
SessionID: string(uploadData.SessionID),
},
SessionURL: uploadData.URL,
Expand All @@ -184,7 +186,7 @@ func (u *UploadCompleter) ensureSessionEndEvent(ctx context.Context, uploadData
var serverID, clusterName, user, login, hostname, namespace, serverAddr string
var interactive bool

// Get session events to find fields for constructed session end
// Get session events to find fields for constructed session end)
sessionEvents, err := u.cfg.AuditLog.GetSessionEvents(apidefaults.Namespace, uploadData.SessionID, 0, false)
if err != nil {
return trace.Wrap(err)
Expand All @@ -193,17 +195,20 @@ func (u *UploadCompleter) ensureSessionEndEvent(ctx context.Context, uploadData
return nil
}

// Return if session.end event already exists
// Return if session end event already exists
for _, event := range sessionEvents {
if event.GetType() == SessionEndEvent {
switch event.GetType() {
case SessionEndEvent, WindowsDesktopSessionEndEvent:
return nil
}
}

// Session start event is the first of session events
sessionStart := sessionEvents[0]
if sessionStart.GetType() != SessionStartEvent {
return trace.BadParameter("invalid session, session start is not the first event")
switch typ := sessionStart.GetType(); typ {
case SessionStartEvent, WindowsDesktopSessionStartEvent:
default:
return trace.BadParameter("invalid session, session start is not the first event (%v)", typ)
}

// Set variables
Expand All @@ -222,30 +227,51 @@ func (u *UploadCompleter) ensureSessionEndEvent(ctx context.Context, uploadData
lastEvent := sessionEvents[len(sessionEvents)-1]

participants := getParticipants(sessionEvents)
sessionMeta := events.SessionMetadata{
SessionID: string(uploadData.SessionID),
}
userMeta := events.UserMetadata{
User: user,
Login: login,
}

var sessionEndEvent events.AuditEvent
switch sessionStart.GetType() {
case SessionStartEvent:
sessionEndEvent = &events.SessionEnd{
Metadata: events.Metadata{
Type: SessionEndEvent,
Code: SessionEndCode,
ClusterName: clusterName,
},
ServerMetadata: events.ServerMetadata{
ServerID: serverID,
ServerNamespace: namespace,
ServerHostname: hostname,
ServerAddr: serverAddr,
},
SessionMetadata: sessionMeta,
UserMetadata: userMeta,

sessionEndEvent := &events.SessionEnd{
Metadata: events.Metadata{
Type: SessionEndEvent,
Code: SessionEndCode,
ClusterName: clusterName,
},
ServerMetadata: events.ServerMetadata{
ServerID: serverID,
ServerNamespace: namespace,
ServerHostname: hostname,
ServerAddr: serverAddr,
},
SessionMetadata: events.SessionMetadata{
SessionID: string(uploadData.SessionID),
},
UserMetadata: events.UserMetadata{
User: user,
Login: login,
},
Participants: participants,
Interactive: interactive,
StartTime: sessionStart.GetTime(EventTime),
EndTime: lastEvent.GetTime(EventTime),
Participants: participants,
Interactive: interactive,
StartTime: sessionStart.GetTime(EventTime),
EndTime: lastEvent.GetTime(EventTime),
}
case WindowsDesktopSessionStartEvent:
sessionEndEvent = &events.WindowsDesktopSessionEnd{
Metadata: events.Metadata{
Type: WindowsDesktopSessionEndEvent,
Code: DesktopSessionEndCode,
ClusterName: clusterName,
},
SessionMetadata: sessionMeta,
UserMetadata: userMeta,

Participants: participants,
StartTime: sessionStart.GetTime(EventTime),
EndTime: lastEvent.GetTime(EventTime),
}
}

// Check and set event fields
Expand Down
126 changes: 126 additions & 0 deletions lib/events/complete_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
Copyright 2022 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package events

import (
"context"
"strings"
"testing"
"time"

apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/session"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"
)

// TestUploadCompleterEmitsSessionEnd verifies that the upload completer
// emits session.end or windows.desktop.session.end events for sessions
// that are completed.
func TestUploadCompleterEmitsSessionEnd(t *testing.T) {
for _, test := range []struct {
startEvent, endEvent string
}{
{SessionStartEvent, SessionEndEvent},
{WindowsDesktopSessionStartEvent, WindowsDesktopSessionEndEvent},
} {
t.Run(test.endEvent, func(t *testing.T) {
clock := clockwork.NewFakeClock()
mu := NewMemoryUploader()
mu.Clock = clock

log := &MockAuditLog{
sessionEvents: []EventFields{
{
EventType: test.startEvent,
SessionClusterName: "cluster-name",
},
},
}

uc, err := NewUploadCompleter(UploadCompleterConfig{
Unstarted: true,
Uploader: mu,
AuditLog: log,
GracePeriod: 2 * time.Hour,
})
require.NoError(t, err)

upload, err := mu.CreateUpload(context.Background(), session.NewID())
require.NoError(t, err)
clock.Advance(3 * time.Hour)

// session end events are only emitted if there's at least one
// part to be uploaded, so create that here
_, err = mu.UploadPart(context.Background(), *upload, 0, strings.NewReader("part"))
require.NoError(t, err)

err = uc.CheckUploads(context.Background())
require.NoError(t, err)

// expect two events - a session end and a session upload
require.Len(t, log.emitter.Events(), 2)
require.Equal(t, test.endEvent, log.emitter.Events()[0].GetType())
require.Equal(t, "cluster-name", log.emitter.Events()[0].GetClusterName())

require.IsType(t, &apievents.SessionUpload{}, log.emitter.Events()[1])
})
}
}

// TestUploadCompleterCompletesEmptyUploads verifies that the upload completer
// completes uploads that have no parts. This ensures that we don't leave empty
// directories behind.
func TestUploadCompleterCompletesEmptyUploads(t *testing.T) {
clock := clockwork.NewFakeClock()
mu := NewMemoryUploader()
mu.Clock = clock

log := &MockAuditLog{}

uc, err := NewUploadCompleter(UploadCompleterConfig{
Unstarted: true,
Uploader: mu,
AuditLog: log,
GracePeriod: 2 * time.Hour,
})
require.NoError(t, err)

upload, err := mu.CreateUpload(context.Background(), session.NewID())
require.NoError(t, err)
clock.Advance(3 * time.Hour)

err = uc.CheckUploads(context.Background())
require.NoError(t, err)

require.True(t, mu.uploads[upload.ID].completed)
}

type MockAuditLog struct {
DiscardAuditLog

emitter MockEmitter
sessionEvents []EventFields
}

func (m *MockAuditLog) GetSessionEvents(namespace string, sid session.ID, after int, includePrintEvents bool) ([]EventFields, error) {
return m.sessionEvents, nil
}

func (m *MockAuditLog) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent) error {
return m.emitter.EmitAuditEvent(ctx, event)
}
3 changes: 0 additions & 3 deletions lib/events/filesessions/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa

// CompleteUpload completes the upload
func (h *Handler) CompleteUpload(ctx context.Context, upload events.StreamUpload, parts []events.StreamPart) error {
if len(parts) == 0 {
return trace.BadParameter("need at least one part to complete the upload")
}
if err := checkUpload(upload); err != nil {
return trace.Wrap(err)
}
Expand Down
5 changes: 5 additions & 0 deletions lib/events/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,8 @@ type MemoryUploader struct {
uploads map[string]*MemoryUpload
objects map[session.ID][]byte
eventsC chan UploadEvent

Clock clockwork.Clock
}

// MemoryUpload is used in tests
Expand Down Expand Up @@ -1114,6 +1116,9 @@ func (m *MemoryUploader) CreateUpload(ctx context.Context, sessionID session.ID)
ID: uuid.New().String(),
SessionID: sessionID,
}
if m.Clock != nil {
upload.Initiated = m.Clock.Now()
}
m.uploads[upload.ID] = &MemoryUpload{
id: upload.ID,
sessionID: sessionID,
Expand Down

0 comments on commit 04cd3dc

Please sign in to comment.