Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Sync Plugin Interface #402

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,11 @@ require (
)

replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d

// replace github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin => /mnt/c/code/dev/flyteidl/gen/pb-go/flyteidl/admin

// replace github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin => /mnt/c/code/dev/flyteidl/gen/pb-go/flyteidl/admin

// replace github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service => /mnt/c/code/dev/flyteidl/gen/pb-go/flyteidl/service

replace github.com/flyteorg/flyteidl v1.5.13 => /mnt/c/code/dev/flyteidl
9 changes: 9 additions & 0 deletions go/tasks/config/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package config

import (
"context"

"github.com/flyteorg/flytestdlib/config"
"github.com/flyteorg/flytestdlib/logger"
)

const configSectionKey = "plugins"
Expand All @@ -21,6 +24,12 @@ func GetConfig() *Config {
return rootSection.GetConfig().(*Config)
}

/*
resourcemanager&{noop 1000 {[] 0}}
admin-launcher&{100 10 10000 10}
workflowStore&{ResourceVersionCache}
*/
func MustRegisterSubSection(subSectionKey string, section config.Config) config.Section {
logger.Error(context.TODO(), "@@@ MustRegisterSubSection subSectionKey->[%v], section->[%v]", subSectionKey, section)
return rootSection.MustRegisterSection(subSectionKey, section)
}
6 changes: 5 additions & 1 deletion go/tasks/pluginmachinery/internal/webapi/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package webapi

import (
"context"
"runtime"

"github.com/flyteorg/flytestdlib/promutils"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -55,6 +56,9 @@ type CacheItem struct {
func (q *ResourceCache) SyncResource(ctx context.Context, batch cache.Batch) (
updatedBatch []cache.ItemSyncResponse, err error) {

pc, file, line, _ := runtime.Caller(1)
funcName := runtime.FuncForPC(pc).Name()
logger.Infof(context.TODO(), "@@@ webapi cache.go SyncResource was called by file [%v] [%v]:[%v]", file, funcName, line)
resp := make([]cache.ItemSyncResponse, 0, len(batch))
for _, resource := range batch {
// Cast the item back to the thing we want to work with.
Expand Down Expand Up @@ -98,7 +102,7 @@ func (q *ResourceCache) SyncResource(ctx context.Context, batch cache.Batch) (
}

// Get an updated status
logger.Debugf(ctx, "Querying AsyncPlugin for %s", resource.GetID())
logger.Infof(ctx, "@@@ Querying AsyncPlugin for %s", resource.GetID())
newResource, err := q.client.Get(ctx, newPluginContext(cacheItem.ResourceMeta, cacheItem.Resource, "", nil))
if err != nil {
logger.Infof(ctx, "Error retrieving resource [%s]. Error: %v", resource.GetID(), err)
Expand Down
139 changes: 114 additions & 25 deletions go/tasks/pluginmachinery/internal/webapi/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/gob"
"fmt"
"runtime"
"time"

"k8s.io/utils/clock"
Expand Down Expand Up @@ -36,6 +37,7 @@ const (
type CorePlugin struct {
id string
p webapi.AsyncPlugin
sp webapi.SyncPlugin
cache cache.AutoRefresh
tokenAllocator tokenAllocator
metrics Metrics
Expand Down Expand Up @@ -68,7 +70,45 @@ func (c CorePlugin) GetProperties() core.PluginProperties {
return core.PluginProperties{}
}

func (c CorePlugin) syncHandle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) {
/*
_, err := c.unmarshalState(ctx, tCtx.PluginStateReader())
if err != nil {
return core.UnknownTransition, err
}

*/

incomingState, err := c.unmarshalState(ctx, tCtx.PluginStateReader())
if err != nil {
return core.UnknownTransition, err
}

if incomingState.Phase == PhaseNotStarted {
// nextState, phaseInfo, err = c.sp.Do(ctx, tCtx, &incomingState)
} else {
return core.UnknownTransition, err
}

// write a function, syncLaunch in launcher.go
logger.Infof(ctx, "@@@ SyncHandle was called")
return core.UnknownTransition, nil
}

/*
1. call sp.do
2. check error
3. write state
*/
func (c CorePlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) {
if c.sp != nil {
return c.syncHandle(ctx, tCtx)
}

pc, file, line, _ := runtime.Caller(1)
funcName := runtime.FuncForPC(pc).Name()
logger.Infof(context.TODO(), "@@@ core.go Handle by [%v] [%v]:[%v]", file, funcName, line)

incomingState, err := c.unmarshalState(ctx, tCtx.PluginStateReader())
if err != nil {
return core.UnknownTransition, err
Expand Down Expand Up @@ -165,47 +205,92 @@ func createRemotePlugin(pluginEntry webapi.PluginEntry, c clock.Clock) core.Plug
RegisteredTaskTypes: pluginEntry.SupportedTaskTypes,
LoadPlugin: func(ctx context.Context, iCtx core.SetupContext) (
core.Plugin, error) {
p, err := pluginEntry.PluginLoader(ctx, iCtx)
p, sp, err := pluginEntry.PluginLoader(ctx, iCtx)
if err != nil {
return nil, err
}

err = validateConfig(p.GetConfig())
if err != nil {
return nil, fmt.Errorf("config validation failed. Error: %w", err)
}
if p != nil {
err = validateConfig(p.GetConfig())
if err != nil {
return nil, fmt.Errorf("config validation failed. Error: %w", err)
}

// If the plugin will use a custom state, register it to be able to
// serialize/deserialize interfaces later.
if customState := p.GetConfig().ResourceMeta; customState != nil {
gob.Register(customState)
}
// If the plugin will use a custom state, register it to be able to
// serialize/deserialize interfaces later.
if customState := p.GetConfig().ResourceMeta; customState != nil {
gob.Register(customState)
}

if quotas := p.GetConfig().ResourceQuotas; len(quotas) > 0 {
for ns, quota := range quotas {
err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, ns, quota)
if err != nil {
return nil, err
if quotas := p.GetConfig().ResourceQuotas; len(quotas) > 0 {
for ns, quota := range quotas {
err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, ns, quota)
if err != nil {
return nil, err
}
}
}
}

resourceCache, err := NewResourceCache(ctx, pluginEntry.ID, p, p.GetConfig().Caching,
iCtx.MetricsScope().NewSubScope("cache"))
resourceCache, err := NewResourceCache(ctx, pluginEntry.ID, p, p.GetConfig().Caching,
iCtx.MetricsScope().NewSubScope("cache"))

if err != nil {
return nil, err
}
if err != nil {
return nil, err
}

err = resourceCache.Start(ctx)
if err != nil {
return nil, err
err = resourceCache.Start(ctx)
if err != nil {
return nil, err
}

return CorePlugin{
id: pluginEntry.ID,
p: p,
sp: sp,
cache: resourceCache,
metrics: newMetrics(iCtx.MetricsScope()),
tokenAllocator: newTokenAllocator(c),
}, nil

} else if sp != nil {
err = validateConfig(p.GetConfig())
if err != nil {
return nil, fmt.Errorf("config validation failed. Error: %w", err)
}

// If the plugin will use a custom state, register it to be able to
// serialize/deserialize interfaces later.
if customState := sp.GetConfig().ResourceMeta; customState != nil {
gob.Register(customState)
}

if quotas := sp.GetConfig().ResourceQuotas; len(quotas) > 0 {
for ns, quota := range quotas {
err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, ns, quota)
if err != nil {
return nil, err
}
}
}

// resourceCache, err := NewResourceCache(ctx, pluginEntry.ID, p, sp.GetConfig().Caching,
// iCtx.MetricsScope().NewSubScope("cache"))

// if err != nil {
// return nil, err
// }

// err = resourceCache.Start(ctx)
// if err != nil {
// return nil, err
// }
}

return CorePlugin{
id: pluginEntry.ID,
p: p,
cache: resourceCache,
sp: sp,
cache: nil,
metrics: newMetrics(iCtx.MetricsScope()),
tokenAllocator: newTokenAllocator(c),
}, nil
Expand All @@ -214,5 +299,9 @@ func createRemotePlugin(pluginEntry webapi.PluginEntry, c clock.Clock) core.Plug
}

func CreateRemotePlugin(pluginEntry webapi.PluginEntry) core.PluginEntry {
pc, file, line, _ := runtime.Caller(1)
funcName := runtime.FuncForPC(pc).Name()
logger.Infof(context.TODO(), "@@@ core.go func CreateRemotePlugin was called by file [%v] [%v]:[%v]", file, funcName, line)

return createRemotePlugin(pluginEntry, clock.RealClock{})
}
4 changes: 2 additions & 2 deletions go/tasks/pluginmachinery/internal/webapi/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ func TestCreateRemotePlugin(t *testing.T) {
CreateRemotePlugin(webapi.PluginEntry{
ID: "MyTestPlugin",
SupportedTaskTypes: []core.TaskType{"test-task"},
PluginLoader: func(ctx context.Context, iCtx webapi.PluginSetupContext) (webapi.AsyncPlugin, error) {
PluginLoader: func(ctx context.Context, iCtx webapi.PluginSetupContext) (webapi.AsyncPlugin, webapi.SyncPlugin, error) {
return newPluginWithProperties(webapi.PluginConfig{
Caching: webapi.CachingConfig{
Size: 10,
},
}), nil
}), nil, nil
},
IsDefault: false,
DefaultForTaskTypes: []core.TaskType{"test-task"},
Expand Down
6 changes: 6 additions & 0 deletions go/tasks/pluginmachinery/internal/webapi/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package webapi

import (
"context"
"runtime"
"time"

"github.com/flyteorg/flytestdlib/cache"
Expand All @@ -13,6 +14,11 @@ import (

func launch(ctx context.Context, p webapi.AsyncPlugin, tCtx core.TaskExecutionContext, cache cache.AutoRefresh,
state *State) (newState *State, phaseInfo core.PhaseInfo, err error) {

pc, file, line, _ := runtime.Caller(1)
funcName := runtime.FuncForPC(pc).Name()
logger.Infof(context.TODO(), "@@@ launcher.go launch by [%v] [%v]:[%v]", file, funcName, line)

rMeta, r, err := p.Create(ctx, tCtx)
if err != nil {
logger.Errorf(ctx, "Failed to create resource. Error: %v", err)
Expand Down
3 changes: 3 additions & 0 deletions go/tasks/pluginmachinery/internal/webapi/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ const (
// remotely.
PhaseAllocationTokenAcquired

// PhaseResourcesCreating indicates the task is being created remotely.
PhaseResourcesCreating

// PhaseResourcesCreated indicates the task has been created remotely.
PhaseResourcesCreated

Expand Down
32 changes: 32 additions & 0 deletions go/tasks/pluginmachinery/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pluginmachinery

import (
"context"
"runtime"
"sync"

internalRemote "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/internal/webapi"
Expand All @@ -27,6 +28,14 @@ func PluginRegistry() TaskPluginRegistry {
}

func (p *taskPluginRegistry) RegisterRemotePlugin(info webapi.PluginEntry) {
pc, file, line, _ := runtime.Caller(1)
funcName := runtime.FuncForPC(pc).Name()
logger.Infof(context.TODO(), "@@@ RegisterRemotePlugin was called by file [%v] [%v]:[%v]", file, funcName, line)

logger.Infof(context.TODO(), "@@@ Registering Remote plugin Info ID [%v]", info.ID)
logger.Infof(context.TODO(), "@@@ Registering Remote plugin Info SupportedTaskTypes [%v]", info.SupportedTaskTypes)
logger.Infof(context.TODO(), "@@@ Registering Remote plugin Info PluginLoader [%v]", info.PluginLoader)
logger.Infof(context.TODO(), "@@@ Registering Remote plugin Info [%v]", info)
ctx := context.Background()
if info.ID == "" {
logger.Panicf(ctx, "ID is required attribute for k8s plugin")
Expand All @@ -43,6 +52,10 @@ func (p *taskPluginRegistry) RegisterRemotePlugin(info webapi.PluginEntry) {
p.m.Lock()
defer p.m.Unlock()
p.corePlugin = append(p.corePlugin, internalRemote.CreateRemotePlugin(info))
pluginEntry := internalRemote.CreateRemotePlugin(info)
logger.Infof(context.TODO(), "@@@ CreateRemotePlugin ID: %s", pluginEntry.ID)
logger.Infof(context.TODO(), "@@@ CreateRemotePlugin RegisteredTaskTypes: %v", pluginEntry.RegisteredTaskTypes)
// logger.Infof(context.TODO(), "@@@ internalRemote.CreateRemotePlugin(info) [%v]", internalRemote.CreateRemotePlugin(info))
}

func CreateRemotePlugin(pluginEntry webapi.PluginEntry) core.PluginEntry {
Expand All @@ -51,6 +64,14 @@ func CreateRemotePlugin(pluginEntry webapi.PluginEntry) core.PluginEntry {

// Use this method to register Kubernetes Plugins
func (p *taskPluginRegistry) RegisterK8sPlugin(info k8s.PluginEntry) {
pc, file, line, _ := runtime.Caller(1)
funcName := runtime.FuncForPC(pc).Name()
logger.Infof(context.TODO(), "@@@ RegisterK8sPlugin was called by file [%v] [%v]:[%v]", file, funcName, line)

logger.Infof(context.TODO(), "@@@ Registering K8s plugin Info ID [%v]", info.ID)
logger.Infof(context.TODO(), "@@@ Registering K8s plugin Info RegisteredTaskTypes [%v]", info.RegisteredTaskTypes)
logger.Infof(context.TODO(), "@@@ Registering K8s plugin Info ResourceToWatch [%v]", info.ResourceToWatch)
logger.Infof(context.TODO(), "@@@ Registering K8s plugin Info [%v]", info)
if info.ID == "" {
logger.Panicf(context.TODO(), "ID is required attribute for k8s plugin")
}
Expand All @@ -70,10 +91,19 @@ func (p *taskPluginRegistry) RegisterK8sPlugin(info k8s.PluginEntry) {
p.m.Lock()
defer p.m.Unlock()
p.k8sPlugin = append(p.k8sPlugin, info)
logger.Infof(context.TODO(), "@@@ p.k8sPlugin [%v]", p.corePlugin)
}

// Use this method to register core plugins
func (p *taskPluginRegistry) RegisterCorePlugin(info core.PluginEntry) {
pc, file, line, _ := runtime.Caller(1)
funcName := runtime.FuncForPC(pc).Name()
logger.Infof(context.TODO(), "@@@ RegisterCorePlugin was called by file [%v] [%v]:[%v]", file, funcName, line)

logger.Infof(context.TODO(), "@@@ Registering core plugin Info ID [%v]", info.ID)
logger.Infof(context.TODO(), "@@@ Registering core plugin Info RegisteredTaskTypes [%v]", info.RegisteredTaskTypes)
logger.Infof(context.TODO(), "@@@ Registering core plugin Info LoadPlugin [%v]", info.LoadPlugin)
logger.Infof(context.TODO(), "@@@ Registering core plugin Info [%v]", info)
if info.ID == "" {
logger.Panicf(context.TODO(), "ID is required attribute for k8s plugin")
}
Expand All @@ -87,6 +117,7 @@ func (p *taskPluginRegistry) RegisterCorePlugin(info core.PluginEntry) {
p.m.Lock()
defer p.m.Unlock()
p.corePlugin = append(p.corePlugin, info)
logger.Infof(context.TODO(), "@@@ p.corePlugin [%v]", p.corePlugin)
}

// Returns a snapshot of all the registered core plugins.
Expand All @@ -107,6 +138,7 @@ type TaskPluginRegistry interface {
RegisterK8sPlugin(info k8s.PluginEntry)
RegisterCorePlugin(info core.PluginEntry)
RegisterRemotePlugin(info webapi.PluginEntry)
// RegisterRemoteSyncPlugin(info webapi.PluginEntry)
GetCorePlugins() []core.PluginEntry
GetK8sPlugins() []k8s.PluginEntry
}
Loading