Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebAPI plugins optimization #5237

Merged
merged 14 commits into from
Apr 22, 2024
Merged

WebAPI plugins optimization #5237

merged 14 commits into from
Apr 22, 2024

Conversation

pingsutw
Copy link
Member

@pingsutw pingsutw commented Apr 16, 2024

Tracking issue

NA

Why are the changes needed?

There are some issues in the webapi plugin:

  • Push duplicate items into the workqueue since we add obj into the workqueue instead of itemID
  • It doesn't correctly handle stale CRD. Propeller transitions the status to Queue after the task succeeds since the plugin returns PhaseInfoQueued when an item is being deleted from the cache.
  • It pushed the item to the workqueue after the task was completed because it only marked the item as complete when evaluating the node. Therefore, in the below case, the propeller will continue to send the Get request to the external system.
Sync Cycle Async Cache Propeller
1 Add Item
2 plugin.get()
3 plugin.get()
4 plugin.get()
5 plugin.get()
6 Update the phase in the item
7 Remove the item
image

What changes were proposed in this pull request?

  • Add processing set to prevent handling the same item from different workers
  • Add IsTerminal to the webapi.Resource for early termination of the processed item.
  • Add PhaseVersion to cache.state. If the cache is missed, propeller will return the previous phase and version and wait for the item to be processed by the async cache.
  • Fixes cold start issue in the AutoRefreshCache by pushing the item to the workqueue when it's being added to the lru cache.

How was this patch tested?

Local/remote cluster

Setup process

from flytekit import workflow, task, LaunchPlan
from flytekitplugins.mock_agent import Sleep


@task(task_config=Sleep(duration=2))
def sleep_task() -> str:
    return "Hello World!"


@workflow()
def sleep_wf():
    for i in range(100):
        sleep_task()


@workflow
def load_test_wf():
    sleep_lp = LaunchPlan.get_or_create(name="fixed_inputs", workflow=sleep_wf, max_parallelism=25)
    for i in range(20):
        sleep_lp()


if __name__ == "__main__":
    sleep_task()

Screenshots

Before:
~5 mins / per workflow

image

~30s for a single task

image

After:
~3 mins / per workflow

image

~15s for a single task

image

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

NA

Docs link

NA

pingsutw added 12 commits April 15, 2024 04:23
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
@dosubot dosubot bot added size:M This PR changes 30-99 lines, ignoring generated files. enhancement New feature or request labels Apr 16, 2024
Signed-off-by: Kevin Su <[email protected]>
@pingsutw pingsutw requested a review from EngHabu April 16, 2024 18:06
@@ -363,6 +376,7 @@ func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, sy
createBatchesCb: createBatches,
syncCb: syncCb,
lruMap: lruCache,
Processing: newSyncSet(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
Processing: newSyncSet(),
processing: newSyncSet(),

@@ -40,6 +40,11 @@ type ResourceWrapper struct {
LogLinks []*flyteIdl.TaskLog
}

// IsTerminal is used to avoid making network calls to the agent service if the resource is already in a terminal state.
func (r ResourceWrapper) IsTerminal() bool {
return r.Phase == flyteIdl.TaskExecution_SUCCEEDED || r.Phase == flyteIdl.TaskExecution_FAILED || r.Phase == flyteIdl.TaskExecution_ABORTED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add TIMED_OUT too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no timeout.

message TaskExecution{
enum Phase {
UNDEFINED = 0;
QUEUED = 1;
RUNNING = 2;
SUCCEEDED = 3;
ABORTED = 4;
FAILED = 5;
// To indicate cases where task is initializing, like: ErrImagePull, ContainerCreating, PodInitializing
INITIALIZING = 6;
// To address cases, where underlying resource is not available: Backoff error, Resource quota exceeded
WAITING_FOR_RESOURCES = 7;
}

@@ -38,6 +38,8 @@ func launch(ctx context.Context, p webapi.AsyncPlugin, tCtx core.TaskExecutionCo
// Store the created resource name, and update our state.
state.ResourceMeta = rMeta
state.Phase = PhaseResourcesCreated
state.PhaseVersion = 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 2?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was 2.

return state, core.PhaseInfoQueued(time.Now(), 2, "launched"), nil

because we set it to 0 when we allocate a token.

}, core.PhaseInfoQueued(a.clock.Now(), 0, "Allocation token required"), nil

Signed-off-by: Kevin Su <[email protected]>
@dosubot dosubot bot added the lgtm This PR has been approved by a maintainer label Apr 22, 2024
@pingsutw pingsutw merged commit 4ff2707 into master Apr 22, 2024
46 checks passed
@pingsutw pingsutw deleted the fix-cache branch April 22, 2024 22:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request lgtm This PR has been approved by a maintainer size:M This PR changes 30-99 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants