Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

External Plugin Service (grpc) #330

Merged
merged 81 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
e44b90e
Add fastapi plugin
pingsutw Jan 21, 2023
9bf6784
Add dummy plugin
pingsutw Jan 24, 2023
2aff416
nit
pingsutw Jan 24, 2023
e0361d2
test
pingsutw Feb 16, 2023
c34859b
test
pingsutw Feb 16, 2023
e25bdea
test
pingsutw Feb 17, 2023
23c4b89
test
pingsutw Feb 17, 2023
14c8d1e
wip
pingsutw Feb 17, 2023
d78e1b8
wip
pingsutw Feb 17, 2023
0a6cc94
wip
pingsutw Feb 17, 2023
fc8a0ac
wip
pingsutw Feb 17, 2023
71163cb
wip
pingsutw Feb 17, 2023
c96b6d1
wip
pingsutw Feb 17, 2023
3b23bd4
wip
pingsutw Feb 17, 2023
cffd3fa
wip
pingsutw Feb 17, 2023
54593b6
wip
pingsutw Feb 17, 2023
212dd17
wip
pingsutw Feb 17, 2023
0499cfd
wip
pingsutw Feb 18, 2023
640233f
wip
pingsutw Feb 18, 2023
a1a2132
wip
pingsutw Feb 18, 2023
eb06a38
wip
pingsutw Feb 18, 2023
a2881f3
grpc plugin
pingsutw Feb 24, 2023
4342e5f
updated idl version
pingsutw Feb 24, 2023
bc11ffc
merged master
pingsutw Feb 24, 2023
5f4b1ff
wip
pingsutw Feb 24, 2023
8e569f3
wip
pingsutw Feb 24, 2023
2f2dd00
wip
pingsutw Feb 24, 2023
994e67b
wip
pingsutw Feb 24, 2023
2d491c8
wip
pingsutw Feb 24, 2023
f6f20ac
wip
pingsutw Feb 24, 2023
3db2afb
wip
pingsutw Feb 25, 2023
16f97aa
add grpc plugin
pingsutw Feb 27, 2023
f7bf1f5
nit
pingsutw Feb 28, 2023
ec42cf9
nit
pingsutw Feb 28, 2023
5c6957c
nit
pingsutw Feb 28, 2023
0efb30a
wip
pingsutw Feb 28, 2023
eb80e5b
wip
pingsutw Feb 28, 2023
a62cae8
wip
pingsutw Feb 28, 2023
0025427
wip
pingsutw Feb 28, 2023
d326926
wip
pingsutw Feb 28, 2023
5184e62
wip
pingsutw Feb 28, 2023
762fd94
wip
pingsutw Feb 28, 2023
8b9429d
wip
pingsutw Feb 28, 2023
d54e691
wip
pingsutw Feb 28, 2023
145422f
wip
pingsutw Feb 28, 2023
c185a89
wip
pingsutw Mar 6, 2023
1c20b26
wip
pingsutw Mar 6, 2023
ffe3a18
wip
pingsutw Mar 13, 2023
f90710a
Merge branch 'master' of github.com:flyteorg/flyteplugins into backen…
pingsutw Mar 13, 2023
ed36d16
wip
pingsutw Mar 13, 2023
70f5790
wip
pingsutw Mar 13, 2023
94e4271
wip
pingsutw Mar 13, 2023
a0745f8
update idl
pingsutw Mar 14, 2023
b36c082
more tests
pingsutw Mar 14, 2023
e156b50
merged master
pingsutw Mar 14, 2023
cc241e8
lint
pingsutw Mar 14, 2023
3e8ee33
write output
pingsutw Mar 14, 2023
0caea6d
remove prev state
pingsutw Mar 14, 2023
658efd4
bump idl
pingsutw Mar 14, 2023
d931a43
lint
pingsutw Mar 15, 2023
ee545cf
wip
pingsutw Mar 23, 2023
76d9490
test
pingsutw Mar 24, 2023
d52fd9a
merged master
pingsutw Mar 31, 2023
66919af
update idl
pingsutw Mar 31, 2023
17343dd
rename
pingsutw Mar 31, 2023
f66e0d8
Merge branch 'master' of github.com:flyteorg/flyteplugins into backen…
pingsutw Mar 31, 2023
6666895
more test
pingsutw Mar 31, 2023
14a9d2b
remove grpcTokenKey
pingsutw Apr 3, 2023
430d244
Add SupportedTaskTypes
pingsutw Apr 3, 2023
9b8864e
nit
pingsutw Apr 4, 2023
1cda1d0
nit
pingsutw Apr 4, 2023
43909bd
cache connection
pingsutw Apr 12, 2023
394d241
Merge branch 'master' of github.com:flyteorg/flyteplugins into backen…
pingsutw Apr 12, 2023
bb18b14
more tests
pingsutw Apr 12, 2023
4617457
fixes tests
pingsutw Apr 12, 2023
2a52592
fixes tests
pingsutw Apr 12, 2023
2338b32
more tests
pingsutw Apr 13, 2023
ca9338e
lint
pingsutw Apr 13, 2023
899682e
remove bigquery_query_job_task
pingsutw Apr 19, 2023
9f07a38
add bigquery_query_job_task
pingsutw Apr 19, 2023
05e46ef
set random value for SupportedTaskTypes
pingsutw Apr 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/athena v1.0.0
github.com/bstadlbauer/dask-k8s-operator-go-client v0.1.0
github.com/coocood/freecache v1.1.1
github.com/flyteorg/flyteidl v1.3.14
github.com/flyteorg/flyteidl v1.3.16
github.com/flyteorg/flytestdlib v1.0.15
github.com/go-test/deep v1.0.7
github.com/golang/protobuf v1.5.2
Expand Down Expand Up @@ -86,6 +86,7 @@ require (
github.com/google/gofuzz v1.2.0 // indirect
github.com/googleapis/gax-go/v2 v2.3.0 // indirect
github.com/googleapis/go-type-adapters v1.0.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQL
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/flyteorg/flyteidl v1.3.14 h1:o5M0g/r6pXTPu5PEurbYxbQmuOu3hqqsaI2M6uvK0N8=
github.com/flyteorg/flyteidl v1.3.14/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteidl v1.3.16 h1:mRq1VeUl5LP12dezbGHLQcrLuAmO9kawK9X7arqCInM=
github.com/flyteorg/flyteidl v1.3.16/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0=
github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s=
github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk=
Expand Down Expand Up @@ -443,6 +443,7 @@ github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:Fecb
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
Expand Down
72 changes: 72 additions & 0 deletions go/tasks/plugins/webapi/grpc/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package grpc

import (
"time"

pluginsConfig "github.com/flyteorg/flyteplugins/go/tasks/config"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/webapi"
"github.com/flyteorg/flytestdlib/config"
)

var (
defaultConfig = Config{
WebAPI: webapi.PluginConfig{
ResourceQuotas: map[core.ResourceNamespace]int{
"default": 1000,
},
ReadRateLimiter: webapi.RateLimiterConfig{
Burst: 100,
QPS: 10,
},
WriteRateLimiter: webapi.RateLimiterConfig{
Burst: 100,
QPS: 10,
},
Caching: webapi.CachingConfig{
Size: 500000,
ResyncInterval: config.Duration{Duration: 30 * time.Second},
Workers: 10,
MaxSystemFailures: 5,
},
ResourceMeta: nil,
},
ResourceConstraints: core.ResourceConstraintsSpec{
ProjectScopeResourceConstraint: &core.ResourceConstraint{
Value: 100,
},
NamespaceScopeResourceConstraint: &core.ResourceConstraint{
Value: 50,
},
},
DefaultGrpcEndpoint: "dns:///external-plugin-service.flyte.svc.cluster.local:80",
SupportedTaskTypes: []string{"task_type_1", "task_type_2"},
}

configSection = pluginsConfig.MustRegisterSubSection("external-plugin-service", &defaultConfig)
)

// Config is config for 'databricks' plugin
type Config struct {
// WebAPI defines config for the base WebAPI plugin
WebAPI webapi.PluginConfig `json:"webApi" pflag:",Defines config for the base WebAPI plugin."`

// ResourceConstraints defines resource constraints on how many executions to be created per project/overall at any given time
ResourceConstraints core.ResourceConstraintsSpec `json:"resourceConstraints" pflag:"-,Defines resource constraints on how many executions to be created per project/overall at any given time."`

DefaultGrpcEndpoint string `json:"defaultGrpcEndpoint" pflag:",The default grpc endpoint of external plugin service."`

// Maps endpoint to their plugin handler. {TaskType: Endpoint}
EndpointForTaskTypes map[string]string `json:"endpointForTaskTypes" pflag:"-,"`
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

// SupportedTaskTypes is a list of task types that are supported by this plugin.
SupportedTaskTypes []string `json:"supportedTaskTypes" pflag:"-,Defines a list of task types that are supported by this plugin."`
}

func GetConfig() *Config {
return configSection.GetConfig().(*Config)
}

func SetConfig(cfg *Config) error {
return configSection.SetConfig(cfg)
}
17 changes: 17 additions & 0 deletions go/tasks/plugins/webapi/grpc/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package grpc

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestGetAndSetConfig(t *testing.T) {
cfg := defaultConfig
cfg.WebAPI.Caching.Workers = 1
cfg.WebAPI.Caching.ResyncInterval.Duration = 5 * time.Second
err := SetConfig(&cfg)
assert.NoError(t, err)
assert.Equal(t, &cfg, GetConfig())
}
268 changes: 268 additions & 0 deletions go/tasks/plugins/webapi/grpc/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
package grpc

import (
"context"
"encoding/json"
"fmt"
"sync/atomic"
"testing"
"time"

flyteIdlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
pluginCoreMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks"
ioMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks"

"github.com/flyteorg/flyteidl/clients/go/coreutils"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/webapi"
"github.com/flyteorg/flyteplugins/tests"
"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"
"github.com/flyteorg/flytestdlib/storage"
"github.com/flyteorg/flytestdlib/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/util/rand"
)

type MockPlugin struct {
Plugin
}

type MockClient struct {
}

func (m *MockClient) CreateTask(_ context.Context, _ *service.TaskCreateRequest, _ ...grpc.CallOption) (*service.TaskCreateResponse, error) {
return &service.TaskCreateResponse{JobId: "job-id"}, nil
}

func (m *MockClient) GetTask(_ context.Context, _ *service.TaskGetRequest, _ ...grpc.CallOption) (*service.TaskGetResponse, error) {
return &service.TaskGetResponse{State: service.State_SUCCEEDED, Outputs: &flyteIdlCore.LiteralMap{
Literals: map[string]*flyteIdlCore.Literal{
"arr": coreutils.MustMakeLiteral([]interface{}{[]interface{}{"a", "b"}, []interface{}{1, 2}}),
},
}}, nil
}

func (m *MockClient) DeleteTask(_ context.Context, _ *service.TaskDeleteRequest, _ ...grpc.CallOption) (*service.TaskDeleteResponse, error) {
return &service.TaskDeleteResponse{}, nil
}

func mockGetClientFunc(_ context.Context, _ string, _ map[string]*grpc.ClientConn) (service.ExternalPluginServiceClient, error) {
return &MockClient{}, nil
}

func mockGetBadClientFunc(_ context.Context, _ string, _ map[string]*grpc.ClientConn) (service.ExternalPluginServiceClient, error) {
return nil, fmt.Errorf("error")
}

func TestEndToEnd(t *testing.T) {
iter := func(ctx context.Context, tCtx pluginCore.TaskExecutionContext) error {
return nil
}

cfg := defaultConfig
cfg.WebAPI.ResourceQuotas = map[core.ResourceNamespace]int{}
cfg.WebAPI.Caching.Workers = 1
cfg.WebAPI.Caching.ResyncInterval.Duration = 5 * time.Second
err := SetConfig(&cfg)
assert.NoError(t, err)

databricksConfDict := map[string]interface{}{
"name": "flytekit databricks plugin example",
"new_cluster": map[string]string{
"spark_version": "11.0.x-scala2.12",
"node_type_id": "r3.xlarge",
"num_workers": "4",
},
"timeout_seconds": 3600,
"max_retries": 1,
}
databricksConfig, err := utils.MarshalObjToStruct(databricksConfDict)
assert.NoError(t, err)
sparkJob := plugins.SparkJob{DatabricksConf: databricksConfig, DatabricksToken: "token", SparkConf: map[string]string{"spark.driver.bindAddress": "127.0.0.1"}}
st, err := utils.MarshalPbToStruct(&sparkJob)
assert.NoError(t, err)

inputs, _ := coreutils.MakeLiteralMap(map[string]interface{}{"x": 1})
template := flyteIdlCore.TaskTemplate{
Type: "bigquery_query_job_task",
Custom: st,
}
basePrefix := storage.DataReference("fake://bucket/prefix/")

t.Run("run a job", func(t *testing.T) {
pluginEntry := pluginmachinery.CreateRemotePlugin(newMockGrpcPlugin())
plugin, err := pluginEntry.LoadPlugin(context.TODO(), newFakeSetupContext("test1"))
assert.NoError(t, err)

phase := tests.RunPluginEndToEndTest(t, plugin, &template, inputs, nil, nil, iter)
assert.Equal(t, true, phase.Phase().IsSuccess())
})

t.Run("failed to create a job", func(t *testing.T) {
grpcPlugin := newMockGrpcPlugin()
grpcPlugin.PluginLoader = func(ctx context.Context, iCtx webapi.PluginSetupContext) (webapi.AsyncPlugin, error) {
return &MockPlugin{
Plugin{
metricScope: iCtx.MetricsScope(),
cfg: GetConfig(),
getClient: mockGetBadClientFunc,
},
}, nil
}
pluginEntry := pluginmachinery.CreateRemotePlugin(grpcPlugin)
plugin, err := pluginEntry.LoadPlugin(context.TODO(), newFakeSetupContext("test2"))
assert.NoError(t, err)

tCtx := getTaskContext(t)
tr := &pluginCoreMocks.TaskReader{}
tr.OnRead(context.Background()).Return(&template, nil)
tCtx.OnTaskReader().Return(tr)
inputReader := &ioMocks.InputReader{}
inputReader.OnGetInputPrefixPath().Return(basePrefix)
inputReader.OnGetInputPath().Return(basePrefix + "/inputs.pb")
inputReader.OnGetMatch(mock.Anything).Return(inputs, nil)
tCtx.OnInputReader().Return(inputReader)

trns, err := plugin.Handle(context.Background(), tCtx)
assert.Error(t, err)
assert.Equal(t, trns.Info().Phase(), core.PhaseUndefined)
err = plugin.Abort(context.Background(), tCtx)
assert.Nil(t, err)
})

t.Run("failed to read task template", func(t *testing.T) {
tCtx := getTaskContext(t)
tr := &pluginCoreMocks.TaskReader{}
tr.OnRead(context.Background()).Return(nil, fmt.Errorf("read fail"))
tCtx.OnTaskReader().Return(tr)

grpcPlugin := newMockGrpcPlugin()
pluginEntry := pluginmachinery.CreateRemotePlugin(grpcPlugin)
plugin, err := pluginEntry.LoadPlugin(context.TODO(), newFakeSetupContext("test3"))
assert.NoError(t, err)

trns, err := plugin.Handle(context.Background(), tCtx)
assert.Error(t, err)
assert.Equal(t, trns.Info().Phase(), core.PhaseUndefined)
})

t.Run("failed to read inputs", func(t *testing.T) {
tCtx := getTaskContext(t)
tr := &pluginCoreMocks.TaskReader{}
tr.OnRead(context.Background()).Return(&template, nil)
tCtx.OnTaskReader().Return(tr)
inputReader := &ioMocks.InputReader{}
inputReader.OnGetInputPrefixPath().Return(basePrefix)
inputReader.OnGetInputPath().Return(basePrefix + "/inputs.pb")
inputReader.OnGetMatch(mock.Anything).Return(nil, fmt.Errorf("read fail"))
tCtx.OnInputReader().Return(inputReader)

grpcPlugin := newMockGrpcPlugin()
pluginEntry := pluginmachinery.CreateRemotePlugin(grpcPlugin)
plugin, err := pluginEntry.LoadPlugin(context.TODO(), newFakeSetupContext("test4"))
assert.NoError(t, err)

trns, err := plugin.Handle(context.Background(), tCtx)
assert.Error(t, err)
assert.Equal(t, trns.Info().Phase(), core.PhaseUndefined)
})
}

func getTaskContext(t *testing.T) *pluginCoreMocks.TaskExecutionContext {
latestKnownState := atomic.Value{}
pluginStateReader := &pluginCoreMocks.PluginStateReader{}
pluginStateReader.OnGetMatch(mock.Anything).Return(0, nil).Run(func(args mock.Arguments) {
o := args.Get(0)
x, err := json.Marshal(latestKnownState.Load())
assert.NoError(t, err)
assert.NoError(t, json.Unmarshal(x, &o))
})
pluginStateWriter := &pluginCoreMocks.PluginStateWriter{}
pluginStateWriter.OnPutMatch(mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {
latestKnownState.Store(args.Get(1))
})

pluginStateWriter.OnReset().Return(nil).Run(func(args mock.Arguments) {
latestKnownState.Store(nil)
})

execID := rand.String(3)
tID := &pluginCoreMocks.TaskExecutionID{}
tID.OnGetGeneratedName().Return(execID + "-my-task-1")
tID.OnGetID().Return(flyteIdlCore.TaskExecutionIdentifier{
TaskId: &flyteIdlCore.Identifier{
ResourceType: flyteIdlCore.ResourceType_TASK,
Project: "a",
Domain: "d",
Name: "n",
Version: "abc",
},
NodeExecutionId: &flyteIdlCore.NodeExecutionIdentifier{
NodeId: "node1",
ExecutionId: &flyteIdlCore.WorkflowExecutionIdentifier{
Project: "a",
Domain: "d",
Name: "exec",
},
},
RetryAttempt: 0,
})
tMeta := &pluginCoreMocks.TaskExecutionMetadata{}
tMeta.OnGetTaskExecutionID().Return(tID)
resourceManager := &pluginCoreMocks.ResourceManager{}
resourceManager.OnAllocateResourceMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(pluginCore.AllocationStatusGranted, nil)
resourceManager.OnReleaseResourceMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil)

basePrefix := storage.DataReference("fake://bucket/prefix/" + execID)
outputWriter := &ioMocks.OutputWriter{}
outputWriter.OnGetRawOutputPrefix().Return("/sandbox/")
outputWriter.OnGetOutputPrefixPath().Return(basePrefix)
outputWriter.OnGetErrorPath().Return(basePrefix + "/error.pb")
outputWriter.OnGetOutputPath().Return(basePrefix + "/outputs.pb")
outputWriter.OnGetCheckpointPrefix().Return("/checkpoint")
outputWriter.OnGetPreviousCheckpointsPrefix().Return("/prev")

tCtx := &pluginCoreMocks.TaskExecutionContext{}
tCtx.OnOutputWriter().Return(outputWriter)
tCtx.OnResourceManager().Return(resourceManager)
tCtx.OnPluginStateReader().Return(pluginStateReader)
tCtx.OnPluginStateWriter().Return(pluginStateWriter)
tCtx.OnTaskExecutionMetadata().Return(tMeta)
return tCtx
}

func newMockGrpcPlugin() webapi.PluginEntry {
return webapi.PluginEntry{
ID: "external-plugin-service",
SupportedTaskTypes: []core.TaskType{"bigquery_query_job_task"},
PluginLoader: func(ctx context.Context, iCtx webapi.PluginSetupContext) (webapi.AsyncPlugin, error) {
return &MockPlugin{
Plugin{
metricScope: iCtx.MetricsScope(),
cfg: GetConfig(),
getClient: mockGetClientFunc,
},
}, nil
},
}
}

func newFakeSetupContext(name string) *pluginCoreMocks.SetupContext {
fakeResourceRegistrar := pluginCoreMocks.ResourceRegistrar{}
fakeResourceRegistrar.On("RegisterResourceQuota", mock.Anything, mock.Anything, mock.Anything).Return(nil)
labeled.SetMetricKeys(contextutils.NamespaceKey)

fakeSetupContext := pluginCoreMocks.SetupContext{}
fakeSetupContext.OnMetricsScope().Return(promutils.NewScope(name))
fakeSetupContext.OnResourceRegistrar().Return(&fakeResourceRegistrar)

return &fakeSetupContext
}
Loading