Skip to content

Commit

Permalink
add tests from Yi-Cheng
Browse files Browse the repository at this point in the history
Signed-off-by: Future-Outlier <[email protected]>
  • Loading branch information
Future-Outlier committed Nov 27, 2024
1 parent ab04192 commit 54aa165
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 19 deletions.
1 change: 1 addition & 0 deletions flyteadmin/pkg/repositories/transformers/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func addNodeRunningState(request *admin.NodeExecutionEventRequest, nodeExecution
"failed to marshal occurredAt into a timestamp proto with error: %v", err)
}
closure.StartedAt = startedAtProto
closure.DeckUri = request.GetEvent().GetDeckUri()
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var childExecutionID = &core.WorkflowExecutionIdentifier{
const dynamicWorkflowClosureRef = "s3://bucket/admin/metadata/workflow"

const testInputURI = "fake://bucket/inputs.pb"
const DeckURI = "fake://bucket/deck.html"

var testInputs = &core.LiteralMap{
Literals: map[string]*core.Literal{
Expand All @@ -65,6 +66,7 @@ func TestAddRunningState(t *testing.T) {
Event: &event.NodeExecutionEvent{
Phase: core.NodeExecution_RUNNING,
OccurredAt: startedAtProto,
DeckUri: DeckURI,
},
}
nodeExecutionModel := models.NodeExecution{}
Expand All @@ -73,6 +75,7 @@ func TestAddRunningState(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, startedAt, *nodeExecutionModel.StartedAt)
assert.True(t, proto.Equal(startedAtProto, closure.GetStartedAt()))
assert.Equal(t, DeckURI, closure.GetDeckUri())
}

func TestAddTerminalState_OutputURI(t *testing.T) {
Expand All @@ -84,6 +87,7 @@ func TestAddTerminalState_OutputURI(t *testing.T) {
OutputUri: outputURI,
},
OccurredAt: occurredAtProto,
DeckUri: DeckURI,
},
}
startedAt := occurredAt.Add(-time.Minute)
Expand All @@ -99,6 +103,7 @@ func TestAddTerminalState_OutputURI(t *testing.T) {
assert.Nil(t, err)
assert.EqualValues(t, outputURI, closure.GetOutputUri())
assert.Equal(t, time.Minute, nodeExecutionModel.Duration)
assert.Equal(t, DeckURI, closure.GetDeckUri())
}

func TestAddTerminalState_OutputData(t *testing.T) {
Expand Down
96 changes: 77 additions & 19 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,43 @@ func getPluginMetricKey(pluginID, taskType string) string {
return taskType + "_" + pluginID
}

func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, deckPath *storage.DataReference, entry catalog.Entry) {
func (p *pluginRequestedTransition) AddDeckURI(ctx context.Context, tCtx *taskExecutionContext) {
var deckURI *storage.DataReference
deckURIValue := tCtx.ow.GetDeckPath()
deckURI = &deckURIValue

if p.execInfo.OutputInfo == nil {
p.execInfo.OutputInfo = &handler.OutputInfo{}
}

p.execInfo.OutputInfo.DeckURI = deckURI
}

// RemoveNonexistentDeckURI removes the deck URI from the plugin execution info if the URI does not exist in remote storage.
func (p *pluginRequestedTransition) RemoveNonexistentDeckURI(ctx context.Context, tCtx *taskExecutionContext) error {
reader := tCtx.ow.GetReader()
if reader == nil && p.execInfo.OutputInfo != nil {
p.execInfo.OutputInfo.DeckURI = nil
return nil
}

exists, err := reader.DeckExists(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
return regErrors.Wrapf(err, "failed to check existence of deck file")
}

if !exists && p.execInfo.OutputInfo != nil {
p.execInfo.OutputInfo.DeckURI = nil
}

return nil
}

func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, entry catalog.Entry) {
p.ttype = handler.TransitionTypeEphemeral
p.pInfo = pluginCore.PhaseInfoSuccess(nil)
p.ObserveSuccess(outputPath, deckPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()})
p.ObserveSuccess(outputPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()})
}

func (p *pluginRequestedTransition) PopulateCacheInfo(entry catalog.Entry) {
Expand Down Expand Up @@ -144,10 +177,13 @@ func (p *pluginRequestedTransition) FinalTaskEvent(input ToTaskExecutionEventInp
return ToTaskExecutionEvent(input)
}

func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, deckPath *storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
p.execInfo.OutputInfo = &handler.OutputInfo{
OutputURI: outputPath,
DeckURI: deckPath,
func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
if p.execInfo.OutputInfo == nil {
p.execInfo.OutputInfo = &handler.OutputInfo{
OutputURI: outputPath,
}
} else {
p.execInfo.OutputInfo.OutputURI = outputPath
}

p.execInfo.TaskNodeInfo = &handler.TaskNodeInfo{
Expand All @@ -171,7 +207,7 @@ func (p *pluginRequestedTransition) FinalTransition(ctx context.Context) (handle
}

logger.Debugf(ctx, "Task still running")
return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(nil)), nil
return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(&p.execInfo)), nil
}

// The plugin interface available especially for testing.
Expand Down Expand Up @@ -464,8 +500,19 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
}
}

// Regardless of the observed phase, we always add the DeckUri to support real-time deck functionality.
// The deck should be accessible even if the task is still running or has failed.
// It's possible that the deck URI may not exist in remote storage yet or will never exist.
// So, it is console's responsibility to handle the case when the deck URI actually does not exist.
pluginTrns.AddDeckURI(ctx, tCtx)

switch pluginTrns.pInfo.Phase() {
case pluginCore.PhaseSuccess:
// This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseSuccess).
err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx)
if err != nil {
return pluginTrns, err
}
// -------------------------------------
// TODO: @kumare create Issue# Remove the code after we use closures to handle dynamic nodes
// This code only exists to support Dynamic tasks. Eventually dynamic tasks will use closure nodes to execute
Expand Down Expand Up @@ -501,25 +548,36 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
})
} else {
var deckURI *storage.DataReference
if tCtx.ow.GetReader() != nil {
exists, err := tCtx.ow.GetReader().DeckExists(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
return pluginTrns, regErrors.Wrapf(err, "failed to check existence of deck file")
} else if exists {
deckURIValue := tCtx.ow.GetDeckPath()
deckURI = &deckURIValue
}
}
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckURI,
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(),
&event.TaskNodeMetadata{
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
})

////var deckURI *storage.DataReference
//if tCtx.ow.GetReader() != nil {
// exists, err := tCtx.ow.GetReader().DeckExists(ctx)
// if err != nil {
// logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
// return pluginTrns, regErrors.Wrapf(err, "failed to check existence of deck file")
// } else if exists {
// deckURIValue := tCtx.ow.GetDeckPath()
// deckURI = &deckURIValue
// }
//}
//pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(),
// &event.TaskNodeMetadata{
// CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
// })
}
case pluginCore.PhaseRetryableFailure:
fallthrough
case pluginCore.PhasePermanentFailure:
// This is to prevent the console from potentially checking the deck URI that does not exist if in final
// phase(PhaseFailure).
err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx)
if err != nil {
return pluginTrns, err
}
pluginTrns.ObservedFailure(
&event.TaskNodeMetadata{
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
Expand Down

0 comments on commit 54aa165

Please sign in to comment.