Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebAPI plugins optimization #5237

Merged
merged 14 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,15 @@ type ResourceCache struct {
// A wrapper for each item in the cache.
type CacheItem struct {
State

Resource webapi.Resource
}

func (c CacheItem) IsTerminal() bool {
if c.Resource != nil {
if resource, ok := c.Resource.(interface{ IsTerminal() bool }); ok {
return resource.IsTerminal()
}
}
return c.State.Phase.IsTerminal()
}

Expand Down Expand Up @@ -80,7 +84,7 @@ func (q *ResourceCache) SyncResource(ctx context.Context, batch cache.Batch) (
logger.Debugf(ctx, "Sync loop - processing resource with cache key [%s]",
resource.GetID())

if cacheItem.State.Phase.IsTerminal() {
if cacheItem.IsTerminal() {
logger.Debugf(ctx, "Sync loop - resource cache key [%v] in terminal state [%s]",
resource.GetID())
resp = append(resp, cache.ItemSyncResponse{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func launch(ctx context.Context, p webapi.AsyncPlugin, tCtx core.TaskExecutionCo
// Store the created resource name, and update our state.
state.ResourceMeta = rMeta
state.Phase = PhaseResourcesCreated
state.PhaseVersion = 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 2?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was 2.

return state, core.PhaseInfoQueued(time.Now(), 2, "launched"), nil

because we set it to 0 when we allocate a token.

}, core.PhaseInfoQueued(a.clock.Now(), 0, "Allocation token required"), nil


cacheItem := CacheItem{
State: *state,
}
Expand All @@ -49,5 +51,5 @@ func launch(ctx context.Context, p webapi.AsyncPlugin, tCtx core.TaskExecutionCo
return nil, core.PhaseInfo{}, err
}

return state, core.PhaseInfoQueued(time.Now(), 2, "launched"), nil
return state, core.PhaseInfoQueued(time.Now(), state.PhaseVersion, "launched"), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func Test_launch(t *testing.T) {
s := State{
ResourceMeta: "abc",
Phase: PhaseResourcesCreated,
PhaseVersion: 2,
}
c.OnGetOrCreate("my-id", CacheItem{State: s}).Return(CacheItem{State: s}, nil)

Expand All @@ -51,6 +52,7 @@ func Test_launch(t *testing.T) {
c := &mocks2.AutoRefresh{}
s := State{
Phase: PhaseResourcesCreated,
PhaseVersion: 2,
ResourceMeta: "abc",
}

Expand Down Expand Up @@ -96,6 +98,7 @@ func Test_launch(t *testing.T) {
c := &mocks2.AutoRefresh{}
s := State{
Phase: PhaseResourcesCreated,
PhaseVersion: 2,
ResourceMeta: "my-id",
}
c.OnGetOrCreate("my-id", CacheItem{State: s}).Return(CacheItem{State: s}, fmt.Errorf("failed to cache"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func monitor(ctx context.Context, tCtx core.TaskExecutionContext, p Client, cach
}
return state, core.PhaseInfoFailure(errors.CacheFailed, cacheItem.ErrorMessage, nil), nil
}
return state, core.PhaseInfoQueued(time.Now(), core.DefaultPhaseVersion, "job submitted"), nil
return state, core.PhaseInfoQueued(time.Now(), cacheItem.PhaseVersion, "job submitted"), nil
}

newPhase, err := p.Status(ctx, newPluginContext(cacheItem.ResourceMeta, cacheItem.Resource, "", tCtx))
Expand All @@ -57,6 +57,7 @@ func monitor(ctx context.Context, tCtx core.TaskExecutionContext, p Client, cach
}

cacheItem.Phase = newPluginPhase
cacheItem.PhaseVersion = newPhase.Version()

if newPluginPhase.IsTerminal() {
// Queue item for deletion in the cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type State struct {
// Phase current phase of the resource.
Phase Phase `json:"phase,omitempty"`

// PhaseVersion is the version of the phase. This is used to detect if the phase has changed since the last time
PhaseVersion uint32

// ResourceMeta contain metadata about resource this task created. This can be a complex structure or a simple type
// (e.g. a string). It should contain enough information for the plugin to interact (retrieve, check status, delete)
// with the resource through the remote service.
Expand Down
5 changes: 5 additions & 0 deletions flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ type ResourceWrapper struct {
LogLinks []*flyteIdl.TaskLog
}

// IsTerminal is used to avoid making network calls to the agent service if the resource is already in a terminal state.
func (r ResourceWrapper) IsTerminal() bool {
return r.Phase == flyteIdl.TaskExecution_SUCCEEDED || r.Phase == flyteIdl.TaskExecution_FAILED || r.Phase == flyteIdl.TaskExecution_ABORTED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add TIMED_OUT too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no timeout.

message TaskExecution{
enum Phase {
UNDEFINED = 0;
QUEUED = 1;
RUNNING = 2;
SUCCEEDED = 3;
ABORTED = 4;
FAILED = 5;
// To indicate cases where task is initializing, like: ErrImagePull, ContainerCreating, PodInitializing
INITIALIZING = 6;
// To address cases, where underlying resource is not available: Backoff error, Resource quota exceeded
WAITING_FOR_RESOURCES = 7;
}

}

type ResourceMetaWrapper struct {
OutputPrefix string
AgentResourceMeta []byte
Expand Down
28 changes: 21 additions & 7 deletions flytestdlib/cache/auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,14 @@ type autoRefresh struct {
syncCb SyncFunc
createBatchesCb CreateBatchesFunc
lruMap *lru.Cache
toDelete *syncSet
syncPeriod time.Duration
workqueue workqueue.RateLimitingInterface
parallelizm int
lock sync.RWMutex
// Items that are currently being processed are in the processing set.
// It will prevent the same item from being processed multiple times by different workers.
processing *syncSet
toDelete *syncSet
syncPeriod time.Duration
workqueue workqueue.RateLimitingInterface
parallelizm int
lock sync.RWMutex
}

func getEvictionFunction(counter prometheus.Counter) func(key interface{}, value interface{}) {
Expand Down Expand Up @@ -211,6 +214,13 @@ func (w *autoRefresh) GetOrCreate(id ItemID, item Item) (Item, error) {

w.lruMap.Add(id, item)
w.metrics.CacheMiss.Inc()

// It fixes cold start issue in the AutoRefreshCache by adding the item to the workqueue when it is created.
// This way, the item will be processed without waiting for the next sync cycle (30s by default).
batch := make([]ItemWrapper, 0, 1)
batch = append(batch, itemWrapper{id: id, item: item})
w.workqueue.AddRateLimited(&batch)
w.processing.Insert(id)
return item, nil
}

Expand All @@ -236,7 +246,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error {
// If not ok, it means evicted between the item was evicted between getting the keys and this update loop
// which is fine, we can just ignore.
if value, ok := w.lruMap.Peek(k); ok {
if item, ok := value.(Item); !ok || (ok && !item.IsTerminal()) {
if item, ok := value.(Item); !ok || (ok && !item.IsTerminal() && !w.processing.Contains(k)) {
snapshot = append(snapshot, itemWrapper{
id: k.(ItemID),
item: value.(Item),
Expand All @@ -253,6 +263,9 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error {
for _, batch := range batches {
b := batch
w.workqueue.AddRateLimited(&b)
for i := 1; i < len(b); i++ {
w.processing.Insert(b[i].GetID())
}
}

return nil
Expand Down Expand Up @@ -295,7 +308,6 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) {
logger.Debugf(ctx, "Shutting down worker")
return nil
}

// Since we create batches every time we sync, we will just remove the item from the queue here
// regardless of whether it succeeded the sync or not.
w.workqueue.Forget(batch)
Expand All @@ -304,6 +316,7 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) {
newBatch := make(Batch, 0, len(*batch.(*Batch)))
for _, b := range *batch.(*Batch) {
itemID := b.GetID()
w.processing.Remove(itemID)
item, ok := w.lruMap.Get(itemID)
if !ok {
logger.Debugf(ctx, "item with id [%v] not found in cache", itemID)
Expand Down Expand Up @@ -363,6 +376,7 @@ func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, sy
createBatchesCb: createBatches,
syncCb: syncCb,
lruMap: lruCache,
processing: newSyncSet(),
toDelete: newSyncSet(),
syncPeriod: resyncPeriod,
workqueue: workqueue.NewNamedRateLimitingQueue(syncRateLimiter, scope.CurrentScope()),
Expand Down
Loading