From 7ac026b51e4c75db043fad924f9ec3ba8a003c41 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 23 May 2023 13:35:56 -0700 Subject: [PATCH 1/7] Lazy load grpc plugin Signed-off-by: Kevin Su --- go/tasks/plugins/webapi/grpc/plugin.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/tasks/plugins/webapi/grpc/plugin.go b/go/tasks/plugins/webapi/grpc/plugin.go index b180fdb88..6861dadb7 100644 --- a/go/tasks/plugins/webapi/grpc/plugin.go +++ b/go/tasks/plugins/webapi/grpc/plugin.go @@ -198,7 +198,7 @@ func newGrpcPlugin() webapi.PluginEntry { } } -func init() { +func RegisterGrpcPlugin() { gob.Register(ResourceMetaWrapper{}) gob.Register(ResourceWrapper{}) From 7305f64a20a210292bfe949b15a46b0707690d36 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 30 May 2023 11:47:33 -0700 Subject: [PATCH 2/7] rename Signed-off-by: Kevin Su --- go.mod | 2 +- go.sum | 4 ++-- .../plugins/webapi/{grpc => agent}/config.go | 10 ++++---- .../webapi/{grpc => agent}/config_test.go | 2 +- .../{grpc => agent}/integration_test.go | 6 ++--- .../plugins/webapi/{grpc => agent}/plugin.go | 24 +++++++++---------- .../webapi/{grpc => agent}/plugin_test.go | 8 +++---- 7 files changed, 28 insertions(+), 28 deletions(-) rename go/tasks/plugins/webapi/{grpc => agent}/config.go (87%) rename go/tasks/plugins/webapi/{grpc => agent}/config_test.go (95%) rename go/tasks/plugins/webapi/{grpc => agent}/integration_test.go (98%) rename go/tasks/plugins/webapi/{grpc => agent}/plugin.go (88%) rename go/tasks/plugins/webapi/{grpc => agent}/plugin_test.go (93%) diff --git a/go.mod b/go.mod index 7d73674eb..2a8168412 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/athena v1.0.0 github.com/bstadlbauer/dask-k8s-operator-go-client v0.1.0 github.com/coocood/freecache v1.1.1 - github.com/flyteorg/flyteidl v1.5.2 + github.com/flyteorg/flyteidl v1.5.9 github.com/flyteorg/flytestdlib v1.0.15 github.com/go-test/deep v1.0.7 github.com/golang/protobuf v1.5.2 diff --git a/go.sum b/go.sum index 1af063022..1f996f878 100644 --- a/go.sum +++ b/go.sum @@ -232,8 +232,8 @@ github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQL github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/flyteorg/flyteidl v1.5.2 h1:DZPzYkTg92qA4e17fd0ZW1M+gh1gJKh/VOK+F4bYgM8= -github.com/flyteorg/flyteidl v1.5.2/go.mod h1:ckLjB51moX4L0oQml+WTCrPK50zrJf6IZJ6LPC0RB4I= +github.com/flyteorg/flyteidl v1.5.9 h1:jqoenDx6p1Uncja1LMSzWmq3mBrMQ6vOdzN7/Ma3P28= +github.com/flyteorg/flyteidl v1.5.9/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0= github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s= github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk= diff --git a/go/tasks/plugins/webapi/grpc/config.go b/go/tasks/plugins/webapi/agent/config.go similarity index 87% rename from go/tasks/plugins/webapi/grpc/config.go rename to go/tasks/plugins/webapi/agent/config.go index 2fef371a0..14993b240 100644 --- a/go/tasks/plugins/webapi/grpc/config.go +++ b/go/tasks/plugins/webapi/agent/config.go @@ -1,4 +1,4 @@ -package grpc +package agent import ( "time" @@ -39,14 +39,14 @@ var ( Value: 50, }, }, - DefaultGrpcEndpoint: "dns:///external-plugin-service.flyte.svc.cluster.local:80", + DefaultGrpcEndpoint: "dns:///flyte-agent.flyte.svc.cluster.local:80", SupportedTaskTypes: []string{"task_type_1", "task_type_2"}, } - configSection = pluginsConfig.MustRegisterSubSection("external-plugin-service", &defaultConfig) + configSection = pluginsConfig.MustRegisterSubSection("agent-service", &defaultConfig) ) -// Config is config for 'databricks' plugin +// Config is config for 'agent' plugin type Config struct { // WebAPI defines config for the base WebAPI plugin WebAPI webapi.PluginConfig `json:"webApi" pflag:",Defines config for the base WebAPI plugin."` @@ -54,7 +54,7 @@ type Config struct { // ResourceConstraints defines resource constraints on how many executions to be created per project/overall at any given time ResourceConstraints core.ResourceConstraintsSpec `json:"resourceConstraints" pflag:"-,Defines resource constraints on how many executions to be created per project/overall at any given time."` - DefaultGrpcEndpoint string `json:"defaultGrpcEndpoint" pflag:",The default grpc endpoint of external plugin service."` + DefaultGrpcEndpoint string `json:"defaultGrpcEndpoint" pflag:",The default grpc endpoint of agent service."` // Maps endpoint to their plugin handler. {TaskType: Endpoint} EndpointForTaskTypes map[string]string `json:"endpointForTaskTypes" pflag:"-,"` diff --git a/go/tasks/plugins/webapi/grpc/config_test.go b/go/tasks/plugins/webapi/agent/config_test.go similarity index 95% rename from go/tasks/plugins/webapi/grpc/config_test.go rename to go/tasks/plugins/webapi/agent/config_test.go index 9e994f07f..e7201a2b9 100644 --- a/go/tasks/plugins/webapi/grpc/config_test.go +++ b/go/tasks/plugins/webapi/agent/config_test.go @@ -1,4 +1,4 @@ -package grpc +package agent import ( "testing" diff --git a/go/tasks/plugins/webapi/grpc/integration_test.go b/go/tasks/plugins/webapi/agent/integration_test.go similarity index 98% rename from go/tasks/plugins/webapi/grpc/integration_test.go rename to go/tasks/plugins/webapi/agent/integration_test.go index a4f1d4284..1c6473bbf 100644 --- a/go/tasks/plugins/webapi/grpc/integration_test.go +++ b/go/tasks/plugins/webapi/agent/integration_test.go @@ -1,4 +1,4 @@ -package grpc +package agent import ( "context" @@ -54,11 +54,11 @@ func (m *MockClient) DeleteTask(_ context.Context, _ *service.TaskDeleteRequest, return &service.TaskDeleteResponse{}, nil } -func mockGetClientFunc(_ context.Context, _ string, _ map[string]*grpc.ClientConn) (service.ExternalPluginServiceClient, error) { +func mockGetClientFunc(_ context.Context, _ string, _ map[string]*grpc.ClientConn) (service.AgentServiceClient, error) { return &MockClient{}, nil } -func mockGetBadClientFunc(_ context.Context, _ string, _ map[string]*grpc.ClientConn) (service.ExternalPluginServiceClient, error) { +func mockGetBadClientFunc(_ context.Context, _ string, _ map[string]*grpc.ClientConn) (service.AgentServiceClient, error) { return nil, fmt.Errorf("error") } diff --git a/go/tasks/plugins/webapi/grpc/plugin.go b/go/tasks/plugins/webapi/agent/plugin.go similarity index 88% rename from go/tasks/plugins/webapi/grpc/plugin.go rename to go/tasks/plugins/webapi/agent/plugin.go index 6861dadb7..85efac744 100644 --- a/go/tasks/plugins/webapi/grpc/plugin.go +++ b/go/tasks/plugins/webapi/agent/plugin.go @@ -1,4 +1,4 @@ -package grpc +package agent import ( "context" @@ -19,7 +19,7 @@ import ( "google.golang.org/grpc" ) -type GetClientFunc func(ctx context.Context, endpoint string, connectionCache map[string]*grpc.ClientConn) (service.ExternalPluginServiceClient, error) +type GetClientFunc func(ctx context.Context, endpoint string, connectionCache map[string]*grpc.ClientConn) (service.AgentServiceClient, error) type Plugin struct { metricScope promutils.Scope @@ -67,7 +67,7 @@ func (p Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContextR endpoint := getFinalEndpoint(taskTemplate.Type, p.cfg.DefaultGrpcEndpoint, p.cfg.EndpointForTaskTypes) client, err := p.getClient(ctx, endpoint, p.connectionCache) if err != nil { - return nil, nil, fmt.Errorf("failed to connect external plugin service with error: %v", err) + return nil, nil, fmt.Errorf("failed to connect to agent with error: %v", err) } res, err := client.CreateTask(ctx, &service.TaskCreateRequest{Inputs: inputs, Template: taskTemplate, OutputPrefix: outputPrefix}) @@ -89,7 +89,7 @@ func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest weba endpoint := getFinalEndpoint(metadata.TaskType, p.cfg.DefaultGrpcEndpoint, p.cfg.EndpointForTaskTypes) client, err := p.getClient(ctx, endpoint, p.connectionCache) if err != nil { - return nil, fmt.Errorf("failed to connect external plugin service with error: %v", err) + return nil, fmt.Errorf("failed to connect to agent with error: %v", err) } res, err := client.GetTask(ctx, &service.TaskGetRequest{TaskType: metadata.TaskType, JobId: metadata.JobID}) @@ -112,7 +112,7 @@ func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error endpoint := getFinalEndpoint(metadata.TaskType, p.cfg.DefaultGrpcEndpoint, p.cfg.EndpointForTaskTypes) client, err := p.getClient(ctx, endpoint, p.connectionCache) if err != nil { - return fmt.Errorf("failed to connect external plugin service with error: %v", err) + return fmt.Errorf("failed to connect to agent with error: %v", err) } _, err = client.DeleteTask(ctx, &service.TaskDeleteRequest{TaskType: metadata.TaskType, JobId: metadata.JobID}) @@ -150,10 +150,10 @@ func getFinalEndpoint(taskType, defaultEndpoint string, endpointForTaskTypes map return defaultEndpoint } -func getClientFunc(ctx context.Context, endpoint string, connectionCache map[string]*grpc.ClientConn) (service.ExternalPluginServiceClient, error) { +func getClientFunc(ctx context.Context, endpoint string, connectionCache map[string]*grpc.ClientConn) (service.AgentServiceClient, error) { conn, ok := connectionCache[endpoint] if ok { - return service.NewExternalPluginServiceClient(conn), nil + return service.NewAgentServiceClient(conn), nil } var opts []grpc.DialOption var err error @@ -178,14 +178,14 @@ func getClientFunc(ctx context.Context, endpoint string, connectionCache map[str } }() }() - return service.NewExternalPluginServiceClient(conn), nil + return service.NewAgentServiceClient(conn), nil } -func newGrpcPlugin() webapi.PluginEntry { +func newAgentPlugin() webapi.PluginEntry { supportedTaskTypes := GetConfig().SupportedTaskTypes return webapi.PluginEntry{ - ID: "external-plugin-service", + ID: "agent-service", SupportedTaskTypes: supportedTaskTypes, PluginLoader: func(ctx context.Context, iCtx webapi.PluginSetupContext) (webapi.AsyncPlugin, error) { return &Plugin{ @@ -198,9 +198,9 @@ func newGrpcPlugin() webapi.PluginEntry { } } -func RegisterGrpcPlugin() { +func RegisterAgentPlugin() { gob.Register(ResourceMetaWrapper{}) gob.Register(ResourceWrapper{}) - pluginmachinery.PluginRegistry().RegisterRemotePlugin(newGrpcPlugin()) + pluginmachinery.PluginRegistry().RegisterRemotePlugin(newAgentPlugin()) } diff --git a/go/tasks/plugins/webapi/grpc/plugin_test.go b/go/tasks/plugins/webapi/agent/plugin_test.go similarity index 93% rename from go/tasks/plugins/webapi/grpc/plugin_test.go rename to go/tasks/plugins/webapi/agent/plugin_test.go index 93bf11ec9..f9218de6d 100644 --- a/go/tasks/plugins/webapi/grpc/plugin_test.go +++ b/go/tasks/plugins/webapi/agent/plugin_test.go @@ -1,4 +1,4 @@ -package grpc +package agent import ( "context" @@ -38,10 +38,10 @@ func TestPlugin(t *testing.T) { assert.Equal(t, plugin.cfg.ResourceConstraints, constraints) }) - t.Run("tet newGrpcPlugin", func(t *testing.T) { - p := newGrpcPlugin() + t.Run("tet newAgentPlugin", func(t *testing.T) { + p := newAgentPlugin() assert.NotNil(t, p) - assert.Equal(t, p.ID, "external-plugin-service") + assert.Equal(t, p.ID, "flyte-agent") assert.NotNil(t, p.PluginLoader) }) From b466e7acdc2698835ed9e409b4d0cc9e21f01498 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 30 May 2023 11:54:11 -0700 Subject: [PATCH 3/7] rename Signed-off-by: Kevin Su --- .../plugins/webapi/agent/integration_test.go | 18 +++++++++--------- go/tasks/plugins/webapi/agent/plugin_test.go | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/go/tasks/plugins/webapi/agent/integration_test.go b/go/tasks/plugins/webapi/agent/integration_test.go index 1c6473bbf..de3b21a73 100644 --- a/go/tasks/plugins/webapi/agent/integration_test.go +++ b/go/tasks/plugins/webapi/agent/integration_test.go @@ -98,7 +98,7 @@ func TestEndToEnd(t *testing.T) { basePrefix := storage.DataReference("fake://bucket/prefix/") t.Run("run a job", func(t *testing.T) { - pluginEntry := pluginmachinery.CreateRemotePlugin(newMockGrpcPlugin()) + pluginEntry := pluginmachinery.CreateRemotePlugin(newMockAgentPlugin()) plugin, err := pluginEntry.LoadPlugin(context.TODO(), newFakeSetupContext("test1")) assert.NoError(t, err) @@ -107,8 +107,8 @@ func TestEndToEnd(t *testing.T) { }) t.Run("failed to create a job", func(t *testing.T) { - grpcPlugin := newMockGrpcPlugin() - grpcPlugin.PluginLoader = func(ctx context.Context, iCtx webapi.PluginSetupContext) (webapi.AsyncPlugin, error) { + agentPlugin := newMockAgentPlugin() + agentPlugin.PluginLoader = func(ctx context.Context, iCtx webapi.PluginSetupContext) (webapi.AsyncPlugin, error) { return &MockPlugin{ Plugin{ metricScope: iCtx.MetricsScope(), @@ -117,7 +117,7 @@ func TestEndToEnd(t *testing.T) { }, }, nil } - pluginEntry := pluginmachinery.CreateRemotePlugin(grpcPlugin) + pluginEntry := pluginmachinery.CreateRemotePlugin(agentPlugin) plugin, err := pluginEntry.LoadPlugin(context.TODO(), newFakeSetupContext("test2")) assert.NoError(t, err) @@ -144,8 +144,8 @@ func TestEndToEnd(t *testing.T) { tr.OnRead(context.Background()).Return(nil, fmt.Errorf("read fail")) tCtx.OnTaskReader().Return(tr) - grpcPlugin := newMockGrpcPlugin() - pluginEntry := pluginmachinery.CreateRemotePlugin(grpcPlugin) + agentPlugin := newAgentPlugin() + pluginEntry := pluginmachinery.CreateRemotePlugin(agentPlugin) plugin, err := pluginEntry.LoadPlugin(context.TODO(), newFakeSetupContext("test3")) assert.NoError(t, err) @@ -165,8 +165,8 @@ func TestEndToEnd(t *testing.T) { inputReader.OnGetMatch(mock.Anything).Return(nil, fmt.Errorf("read fail")) tCtx.OnInputReader().Return(inputReader) - grpcPlugin := newMockGrpcPlugin() - pluginEntry := pluginmachinery.CreateRemotePlugin(grpcPlugin) + agentPlugin := newMockAgentPlugin() + pluginEntry := pluginmachinery.CreateRemotePlugin(agentPlugin) plugin, err := pluginEntry.LoadPlugin(context.TODO(), newFakeSetupContext("test4")) assert.NoError(t, err) @@ -239,7 +239,7 @@ func getTaskContext(t *testing.T) *pluginCoreMocks.TaskExecutionContext { return tCtx } -func newMockGrpcPlugin() webapi.PluginEntry { +func newMockAgentPlugin() webapi.PluginEntry { return webapi.PluginEntry{ ID: "external-plugin-service", SupportedTaskTypes: []core.TaskType{"bigquery_query_job_task"}, diff --git a/go/tasks/plugins/webapi/agent/plugin_test.go b/go/tasks/plugins/webapi/agent/plugin_test.go index f9218de6d..328415cc8 100644 --- a/go/tasks/plugins/webapi/agent/plugin_test.go +++ b/go/tasks/plugins/webapi/agent/plugin_test.go @@ -25,7 +25,7 @@ func TestPlugin(t *testing.T) { cfg := defaultConfig cfg.WebAPI.Caching.Workers = 1 cfg.WebAPI.Caching.ResyncInterval.Duration = 5 * time.Second - cfg.DefaultGrpcEndpoint = "test-service.flyte.svc.cluster.local:80" + cfg.DefaultGrpcEndpoint = "test-agent.flyte.svc.cluster.local:80" cfg.EndpointForTaskTypes = map[string]string{"spark": "localhost:80"} err := SetConfig(&cfg) assert.NoError(t, err) From ca5d6e2eec3e38f3a79896f520fa64fa2a2f0d5b Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 30 May 2023 12:02:11 -0700 Subject: [PATCH 4/7] nit Signed-off-by: Kevin Su --- go/tasks/plugins/webapi/agent/plugin_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/tasks/plugins/webapi/agent/plugin_test.go b/go/tasks/plugins/webapi/agent/plugin_test.go index 328415cc8..174115eea 100644 --- a/go/tasks/plugins/webapi/agent/plugin_test.go +++ b/go/tasks/plugins/webapi/agent/plugin_test.go @@ -41,7 +41,7 @@ func TestPlugin(t *testing.T) { t.Run("tet newAgentPlugin", func(t *testing.T) { p := newAgentPlugin() assert.NotNil(t, p) - assert.Equal(t, p.ID, "flyte-agent") + assert.Equal(t, p.ID, "agent-service") assert.NotNil(t, p.PluginLoader) }) From ad3148f60b2c5cfae9b7d41314c92053ae20c250 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 2 Jun 2023 15:15:07 -0700 Subject: [PATCH 5/7] rename Signed-off-by: Kevin Su --- go.mod | 2 +- go.sum | 4 +- .../plugins/webapi/agent/integration_test.go | 21 +++++---- go/tasks/plugins/webapi/agent/plugin.go | 47 ++++++++++--------- 4 files changed, 38 insertions(+), 36 deletions(-) diff --git a/go.mod b/go.mod index 2a8168412..919ed121f 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/athena v1.0.0 github.com/bstadlbauer/dask-k8s-operator-go-client v0.1.0 github.com/coocood/freecache v1.1.1 - github.com/flyteorg/flyteidl v1.5.9 + github.com/flyteorg/flyteidl v1.5.10 github.com/flyteorg/flytestdlib v1.0.15 github.com/go-test/deep v1.0.7 github.com/golang/protobuf v1.5.2 diff --git a/go.sum b/go.sum index 1f996f878..7c1295f2c 100644 --- a/go.sum +++ b/go.sum @@ -232,8 +232,8 @@ github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQL github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/flyteorg/flyteidl v1.5.9 h1:jqoenDx6p1Uncja1LMSzWmq3mBrMQ6vOdzN7/Ma3P28= -github.com/flyteorg/flyteidl v1.5.9/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= +github.com/flyteorg/flyteidl v1.5.10 h1:SHeiaWRt8EAVuFsat+BJswtc07HTZ4DqhfTEYSm621k= +github.com/flyteorg/flyteidl v1.5.10/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0= github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s= github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk= diff --git a/go/tasks/plugins/webapi/agent/integration_test.go b/go/tasks/plugins/webapi/agent/integration_test.go index de3b21a73..49dd11449 100644 --- a/go/tasks/plugins/webapi/agent/integration_test.go +++ b/go/tasks/plugins/webapi/agent/integration_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "sync/atomic" "testing" "time" @@ -38,27 +39,27 @@ type MockPlugin struct { type MockClient struct { } -func (m *MockClient) CreateTask(_ context.Context, _ *service.TaskCreateRequest, _ ...grpc.CallOption) (*service.TaskCreateResponse, error) { - return &service.TaskCreateResponse{JobId: "job-id"}, nil +func (m *MockClient) CreateTask(_ context.Context, _ *admin.CreateTaskRequest, _ ...grpc.CallOption) (*admin.CreateTaskResponse, error) { + return &admin.CreateTaskResponse{ResourceMeta: []byte{1, 2, 3, 4}}, nil } -func (m *MockClient) GetTask(_ context.Context, _ *service.TaskGetRequest, _ ...grpc.CallOption) (*service.TaskGetResponse, error) { - return &service.TaskGetResponse{State: service.State_SUCCEEDED, Outputs: &flyteIdlCore.LiteralMap{ +func (m *MockClient) GetTask(_ context.Context, _ *admin.GetTaskRequest, _ ...grpc.CallOption) (*admin.GetTaskResponse, error) { + return &admin.GetTaskResponse{Resource: &admin.Resource{State: admin.State_SUCCEEDED, Outputs: &flyteIdlCore.LiteralMap{ Literals: map[string]*flyteIdlCore.Literal{ "arr": coreutils.MustMakeLiteral([]interface{}{[]interface{}{"a", "b"}, []interface{}{1, 2}}), }, - }}, nil + }}}, nil } -func (m *MockClient) DeleteTask(_ context.Context, _ *service.TaskDeleteRequest, _ ...grpc.CallOption) (*service.TaskDeleteResponse, error) { - return &service.TaskDeleteResponse{}, nil +func (m *MockClient) DeleteTask(_ context.Context, _ *admin.DeleteTaskRequest, _ ...grpc.CallOption) (*admin.DeleteTaskResponse, error) { + return &admin.DeleteTaskResponse{}, nil } -func mockGetClientFunc(_ context.Context, _ string, _ map[string]*grpc.ClientConn) (service.AgentServiceClient, error) { +func mockGetClientFunc(_ context.Context, _ string, _ map[string]*grpc.ClientConn) (service.AsyncAgentServiceClient, error) { return &MockClient{}, nil } -func mockGetBadClientFunc(_ context.Context, _ string, _ map[string]*grpc.ClientConn) (service.AgentServiceClient, error) { +func mockGetBadClientFunc(_ context.Context, _ string, _ map[string]*grpc.ClientConn) (service.AsyncAgentServiceClient, error) { return nil, fmt.Errorf("error") } @@ -241,7 +242,7 @@ func getTaskContext(t *testing.T) *pluginCoreMocks.TaskExecutionContext { func newMockAgentPlugin() webapi.PluginEntry { return webapi.PluginEntry{ - ID: "external-plugin-service", + ID: "agent-service", SupportedTaskTypes: []core.TaskType{"bigquery_query_job_task"}, PluginLoader: func(ctx context.Context, iCtx webapi.PluginSetupContext) (webapi.AsyncPlugin, error) { return &MockPlugin{ diff --git a/go/tasks/plugins/webapi/agent/plugin.go b/go/tasks/plugins/webapi/agent/plugin.go index 85efac744..2a4dc2976 100644 --- a/go/tasks/plugins/webapi/agent/plugin.go +++ b/go/tasks/plugins/webapi/agent/plugin.go @@ -4,6 +4,7 @@ import ( "context" "encoding/gob" "fmt" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "google.golang.org/grpc/grpclog" @@ -19,7 +20,7 @@ import ( "google.golang.org/grpc" ) -type GetClientFunc func(ctx context.Context, endpoint string, connectionCache map[string]*grpc.ClientConn) (service.AgentServiceClient, error) +type GetClientFunc func(ctx context.Context, endpoint string, connectionCache map[string]*grpc.ClientConn) (service.AsyncAgentServiceClient, error) type Plugin struct { metricScope promutils.Scope @@ -29,15 +30,15 @@ type Plugin struct { } type ResourceWrapper struct { - State service.State + State admin.State Outputs *flyteIdl.LiteralMap } type ResourceMetaWrapper struct { - OutputPrefix string - Token string - JobID string - TaskType string + OutputPrefix string + Token string + AgentResourceMeta []byte + TaskType string } func (p Plugin) GetConfig() webapi.PluginConfig { @@ -70,17 +71,17 @@ func (p Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContextR return nil, nil, fmt.Errorf("failed to connect to agent with error: %v", err) } - res, err := client.CreateTask(ctx, &service.TaskCreateRequest{Inputs: inputs, Template: taskTemplate, OutputPrefix: outputPrefix}) + res, err := client.CreateTask(ctx, &admin.CreateTaskRequest{Inputs: inputs, Template: taskTemplate, OutputPrefix: outputPrefix}) if err != nil { return nil, nil, err } return &ResourceMetaWrapper{ - OutputPrefix: outputPrefix, - JobID: res.GetJobId(), - Token: "", - TaskType: taskTemplate.Type, - }, &ResourceWrapper{State: service.State_RUNNING}, nil + OutputPrefix: outputPrefix, + AgentResourceMeta: res.GetResourceMeta(), + Token: "", + TaskType: taskTemplate.Type, + }, &ResourceWrapper{State: admin.State_RUNNING}, nil } func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest webapi.Resource, err error) { @@ -92,14 +93,14 @@ func (p Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest weba return nil, fmt.Errorf("failed to connect to agent with error: %v", err) } - res, err := client.GetTask(ctx, &service.TaskGetRequest{TaskType: metadata.TaskType, JobId: metadata.JobID}) + res, err := client.GetTask(ctx, &admin.GetTaskRequest{TaskType: metadata.TaskType, ResourceMeta: metadata.AgentResourceMeta}) if err != nil { return nil, err } return &ResourceWrapper{ - State: res.State, - Outputs: res.Outputs, + State: res.Resource.State, + Outputs: res.Resource.Outputs, }, nil } @@ -115,7 +116,7 @@ func (p Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error return fmt.Errorf("failed to connect to agent with error: %v", err) } - _, err = client.DeleteTask(ctx, &service.TaskDeleteRequest{TaskType: metadata.TaskType, JobId: metadata.JobID}) + _, err = client.DeleteTask(ctx, &admin.DeleteTaskRequest{TaskType: metadata.TaskType, ResourceMeta: metadata.AgentResourceMeta}) return err } @@ -124,13 +125,13 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase taskInfo := &core.TaskInfo{} switch resource.State { - case service.State_RUNNING: + case admin.State_RUNNING: return core.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, taskInfo), nil - case service.State_PERMANENT_FAILURE: + case admin.State_PERMANENT_FAILURE: return core.PhaseInfoFailure(pluginErrors.TaskFailedWithError, "failed to run the job", taskInfo), nil - case service.State_RETRYABLE_FAILURE: + case admin.State_RETRYABLE_FAILURE: return core.PhaseInfoRetryableFailure(pluginErrors.TaskFailedWithError, "failed to run the job", taskInfo), nil - case service.State_SUCCEEDED: + case admin.State_SUCCEEDED: if resource.Outputs != nil { err := taskCtx.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader(resource.Outputs, nil, nil)) if err != nil { @@ -150,10 +151,10 @@ func getFinalEndpoint(taskType, defaultEndpoint string, endpointForTaskTypes map return defaultEndpoint } -func getClientFunc(ctx context.Context, endpoint string, connectionCache map[string]*grpc.ClientConn) (service.AgentServiceClient, error) { +func getClientFunc(ctx context.Context, endpoint string, connectionCache map[string]*grpc.ClientConn) (service.AsyncAgentServiceClient, error) { conn, ok := connectionCache[endpoint] if ok { - return service.NewAgentServiceClient(conn), nil + return service.NewAsyncAgentServiceClient(conn), nil } var opts []grpc.DialOption var err error @@ -178,7 +179,7 @@ func getClientFunc(ctx context.Context, endpoint string, connectionCache map[str } }() }() - return service.NewAgentServiceClient(conn), nil + return service.NewAsyncAgentServiceClient(conn), nil } func newAgentPlugin() webapi.PluginEntry { From 1942b75282e82c535884218f6e9ebbdfe3330cc5 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 2 Jun 2023 15:28:15 -0700 Subject: [PATCH 6/7] lint Signed-off-by: Kevin Su --- go/tasks/plugins/webapi/agent/integration_test.go | 3 ++- go/tasks/plugins/webapi/agent/plugin.go | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go/tasks/plugins/webapi/agent/integration_test.go b/go/tasks/plugins/webapi/agent/integration_test.go index 49dd11449..1def45156 100644 --- a/go/tasks/plugins/webapi/agent/integration_test.go +++ b/go/tasks/plugins/webapi/agent/integration_test.go @@ -4,11 +4,12 @@ import ( "context" "encoding/json" "fmt" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "sync/atomic" "testing" "time" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + flyteIdlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" pluginCoreMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks" diff --git a/go/tasks/plugins/webapi/agent/plugin.go b/go/tasks/plugins/webapi/agent/plugin.go index 2a4dc2976..70a335021 100644 --- a/go/tasks/plugins/webapi/agent/plugin.go +++ b/go/tasks/plugins/webapi/agent/plugin.go @@ -4,6 +4,7 @@ import ( "context" "encoding/gob" "fmt" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "google.golang.org/grpc/grpclog" From 44e29a6bb3d30ef83202ab76cd3e33f846b15f77 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 5 Jun 2023 10:05:37 -0700 Subject: [PATCH 7/7] nit Signed-off-by: Kevin Su --- go/tasks/plugins/webapi/agent/integration_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/go/tasks/plugins/webapi/agent/integration_test.go b/go/tasks/plugins/webapi/agent/integration_test.go index 1def45156..30036ede7 100644 --- a/go/tasks/plugins/webapi/agent/integration_test.go +++ b/go/tasks/plugins/webapi/agent/integration_test.go @@ -8,14 +8,13 @@ import ( "testing" "time" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" - flyteIdlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" pluginCoreMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks" ioMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks" "github.com/flyteorg/flyteidl/clients/go/coreutils" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery"