Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor echo plugin #5565

Merged
merged 5 commits into from
Jul 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 60 additions & 34 deletions flyteplugins/go/tasks/plugins/testing/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"context"
"fmt"
"sync"
"time"

idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
Expand All @@ -20,6 +21,7 @@
type EchoPlugin struct {
enqueueOwner core.EnqueueOwner
taskStartTimes map[string]time.Time
sync.Mutex
}

func (e *EchoPlugin) GetID() string {
Expand All @@ -30,9 +32,11 @@
return core.PluginProperties{}
}

func (e *EchoPlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) {
echoConfig := ConfigSection.GetConfig().(*Config)

// Enqueue the task to be re-evaluated after SleepDuration.
// If the task is already enqueued, return the start time of the task.
func (e *EchoPlugin) addTask(ctx context.Context, tCtx core.TaskExecutionContext) time.Time {
e.Lock()
defer e.Unlock()

Check warning on line 39 in flyteplugins/go/tasks/plugins/testing/echo.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/testing/echo.go#L37-L39

Added lines #L37 - L39 were not covered by tests
var startTime time.Time
var exists bool
taskExecutionID := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()
Expand All @@ -42,47 +46,34 @@

// start timer to enqueue owner once task sleep duration has elapsed
go func() {
echoConfig := ConfigSection.GetConfig().(*Config)

Check warning on line 49 in flyteplugins/go/tasks/plugins/testing/echo.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/testing/echo.go#L49

Added line #L49 was not covered by tests
time.Sleep(echoConfig.SleepDuration.Duration)
if err := e.enqueueOwner(tCtx.TaskExecutionMetadata().GetOwnerID()); err != nil {
logger.Warnf(ctx, "failed to enqueue owner [%s]: %v", tCtx.TaskExecutionMetadata().GetOwnerID(), err)
}
}()
}
return startTime

Check warning on line 56 in flyteplugins/go/tasks/plugins/testing/echo.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/testing/echo.go#L56

Added line #L56 was not covered by tests
}

if time.Since(startTime) >= echoConfig.SleepDuration.Duration {
// copy inputs to outputs
inputToOutputVariableMappings, err := compileInputToOutputVariableMappings(ctx, tCtx)
if err != nil {
return core.UnknownTransition, err
}

if len(inputToOutputVariableMappings) > 0 {
inputLiterals, err := tCtx.InputReader().Get(ctx)
if err != nil {
return core.UnknownTransition, err
}

outputLiterals := make(map[string]*idlcore.Literal, len(inputToOutputVariableMappings))
for inputVariableName, outputVariableName := range inputToOutputVariableMappings {
outputLiterals[outputVariableName] = inputLiterals.Literals[inputVariableName]
}
// Remove the task from the taskStartTimes map.
func (e *EchoPlugin) removeTask(taskExecutionID string) {
e.Lock()
defer e.Unlock()
delete(e.taskStartTimes, taskExecutionID)

Check warning on line 63 in flyteplugins/go/tasks/plugins/testing/echo.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/testing/echo.go#L60-L63

Added lines #L60 - L63 were not covered by tests
}

outputLiteralMap := &idlcore.LiteralMap{
Literals: outputLiterals,
}
func (e *EchoPlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) {
echoConfig := ConfigSection.GetConfig().(*Config)

Check warning on line 67 in flyteplugins/go/tasks/plugins/testing/echo.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/testing/echo.go#L66-L67

Added lines #L66 - L67 were not covered by tests

outputFile := tCtx.OutputWriter().GetOutputPath()
if err := tCtx.DataStore().WriteProtobuf(ctx, outputFile, storage.Options{}, outputLiteralMap); err != nil {
return core.UnknownTransition, err
}
if echoConfig.SleepDuration.Duration == time.Duration(0) {
return copyInputsToOutputs(ctx, tCtx)

Check warning on line 70 in flyteplugins/go/tasks/plugins/testing/echo.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/testing/echo.go#L69-L70

Added lines #L69 - L70 were not covered by tests
}

or := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), 0)
if err = tCtx.OutputWriter().Put(ctx, or); err != nil {
return core.UnknownTransition, err
}
}
startTime := e.addTask(ctx, tCtx)

Check warning on line 73 in flyteplugins/go/tasks/plugins/testing/echo.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/testing/echo.go#L73

Added line #L73 was not covered by tests

return core.DoTransition(core.PhaseInfoSuccess(nil)), nil
if time.Since(startTime) >= echoConfig.SleepDuration.Duration {
return copyInputsToOutputs(ctx, tCtx)

Check warning on line 76 in flyteplugins/go/tasks/plugins/testing/echo.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/testing/echo.go#L75-L76

Added lines #L75 - L76 were not covered by tests
}

return core.DoTransition(core.PhaseInfoRunning(core.DefaultPhaseVersion, nil)), nil
Expand All @@ -94,10 +85,45 @@

func (e *EchoPlugin) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) error {
taskExecutionID := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()
delete(e.taskStartTimes, taskExecutionID)
e.removeTask(taskExecutionID)

Check warning on line 88 in flyteplugins/go/tasks/plugins/testing/echo.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/testing/echo.go#L88

Added line #L88 was not covered by tests
return nil
}

// copyInputsToOutputs copies the input literals to the output location.
func copyInputsToOutputs(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) {
inputToOutputVariableMappings, err := compileInputToOutputVariableMappings(ctx, tCtx)
if err != nil {
return core.UnknownTransition, err

Check warning on line 96 in flyteplugins/go/tasks/plugins/testing/echo.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/testing/echo.go#L93-L96

Added lines #L93 - L96 were not covered by tests
}

if len(inputToOutputVariableMappings) > 0 {
inputLiterals, err := tCtx.InputReader().Get(ctx)
if err != nil {
return core.UnknownTransition, err

Check warning on line 102 in flyteplugins/go/tasks/plugins/testing/echo.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/testing/echo.go#L99-L102

Added lines #L99 - L102 were not covered by tests
}

outputLiterals := make(map[string]*idlcore.Literal, len(inputToOutputVariableMappings))
for inputVariableName, outputVariableName := range inputToOutputVariableMappings {
outputLiterals[outputVariableName] = inputLiterals.Literals[inputVariableName]

Check warning on line 107 in flyteplugins/go/tasks/plugins/testing/echo.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/testing/echo.go#L105-L107

Added lines #L105 - L107 were not covered by tests
}

outputLiteralMap := &idlcore.LiteralMap{
Literals: outputLiterals,

Check warning on line 111 in flyteplugins/go/tasks/plugins/testing/echo.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/testing/echo.go#L110-L111

Added lines #L110 - L111 were not covered by tests
}

outputFile := tCtx.OutputWriter().GetOutputPath()
if err := tCtx.DataStore().WriteProtobuf(ctx, outputFile, storage.Options{}, outputLiteralMap); err != nil {
return core.UnknownTransition, err

Check warning on line 116 in flyteplugins/go/tasks/plugins/testing/echo.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/testing/echo.go#L114-L116

Added lines #L114 - L116 were not covered by tests
}

or := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), 0)
if err = tCtx.OutputWriter().Put(ctx, or); err != nil {
return core.UnknownTransition, err

Check warning on line 121 in flyteplugins/go/tasks/plugins/testing/echo.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/testing/echo.go#L119-L121

Added lines #L119 - L121 were not covered by tests
}
}
return core.DoTransition(core.PhaseInfoSuccess(nil)), nil

Check warning on line 124 in flyteplugins/go/tasks/plugins/testing/echo.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/testing/echo.go#L124

Added line #L124 was not covered by tests
}

func compileInputToOutputVariableMappings(ctx context.Context, tCtx core.TaskExecutionContext) (map[string]string, error) {
// validate outputs are castable from inputs otherwise error as this plugin is not applicable
taskTemplate, err := tCtx.TaskReader().Read(ctx)
Expand Down
Loading