Skip to content

Commit

Permalink
Add support Databricks plugin (flyteorg#299)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored Dec 19, 2022
1 parent 428b7a4 commit ab3ee8a
Show file tree
Hide file tree
Showing 22 changed files with 757 additions and 80 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.0.0
github.com/aws/aws-sdk-go-v2/service/athena v1.0.0
github.com/coocood/freecache v1.1.1
github.com/flyteorg/flyteidl v1.2.3
github.com/flyteorg/flyteidl v1.3.1
github.com/flyteorg/flytestdlib v1.0.11
github.com/go-test/deep v1.0.7
github.com/golang/protobuf v1.5.2
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGE
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v1.2.3 h1:4A90rFyGXiUtFnQIgSPxPzBZRy9RoAPsfxs7OWYHfFA=
github.com/flyteorg/flyteidl v1.2.3/go.mod h1:f0AFl7RFycH7+JLq2th0ReH7v+Xse+QTw4jGdIxiS8I=
github.com/flyteorg/flyteidl v1.3.1 h1:KIE7hXRmhBHttpyvfWo6X2LYLtx66Oj/nbKMG2V5bs0=
github.com/flyteorg/flyteidl v1.3.1/go.mod h1:OJAq333OpInPnMhvVz93AlEjmlQ+t0FAD4aakIYE4OU=
github.com/flyteorg/flytestdlib v1.0.0/go.mod h1:QSVN5wIM1lM9d60eAEbX7NwweQXW96t5x4jbyftn89c=
github.com/flyteorg/flytestdlib v1.0.11 h1:f7B8x2/zMuimEVi4Jx0zqzvNhdi7aq7+ZWoqHsbp4F4=
github.com/flyteorg/flytestdlib v1.0.11/go.mod h1:nIBmBHtjTJvhZEn3e/EwVC/iMkR2tUX8hEiXjRBpH/s=
Expand Down Expand Up @@ -845,6 +845,7 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw=
github.com/vektra/mockery v1.1.2/go.mod h1:VcfZjKaFOPO+MpN4ZvwPjs4c48lkq1o3Ym8yHZJu0jU=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down Expand Up @@ -1200,6 +1201,7 @@ golang.org/x/tools v0.0.0-20200224181240-023911ca70b2/go.mod h1:TB2adYChydJhpapK
golang.org/x/tools v0.0.0-20200227222343-706bc42d1f0d/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200304193943-95d2e580d8eb/go.mod h1:o4KQGtdN14AW+yjsvvwRTJJuXz8XRtIHtEnmAXLyFUw=
golang.org/x/tools v0.0.0-20200312045724-11d5b4c81c7d/go.mod h1:o4KQGtdN14AW+yjsvvwRTJJuXz8XRtIHtEnmAXLyFUw=
golang.org/x/tools v0.0.0-20200323144430-8dcfad9e016e/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8=
golang.org/x/tools v0.0.0-20200331025713-a30bf2db82d4/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8=
golang.org/x/tools v0.0.0-20200501065659-ab2804fb9c9d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200505023115-26f46d2f7ef8/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
Expand Down
1 change: 0 additions & 1 deletion go/tasks/pluginmachinery/core/allocationstatus_enumer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion go/tasks/pluginmachinery/core/phase_enumer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 37 additions & 34 deletions go/tasks/pluginmachinery/core/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,42 +37,45 @@ type ResourceRegistrar interface {

// ResourceManager Interface
// 1. Terms and definitions
// - Resource: resource is an abstraction of anything that has a limited quota of units and can be claimed in a
// single unit or multiple units at once. At Flyte's current state, a resource means a logical
// separation (e.g., a cluster) of an external service that allows a limited number of outstanding
// requests to be sent to.
// - Token: Flyte uses a token to serve as the placeholder to represent a unit of resource. Flyte resource manager
// manages resources by managing the tokens of the resources.
// 2. Description
// ResourceManager provides a task-type-specific pooling system for Flyte Tasks. Plugin writers can optionally
// request for resources in their tasks, in single quantity.
// 3. Usage
// A Flyte plugin registers the resources and the desired quota of each resource with ResourceRegistrar at the
// setup time of Flyte Propeller. At the end of the setup time, Flyte Propeller builds a ResourceManager based on
// these registration requests.
// - Resource: resource is an abstraction of anything that has a limited quota of units and can be claimed in a
// single unit or multiple units at once. At Flyte's current state, a resource means a logical
// separation (e.g., a cluster) of an external service that allows a limited number of outstanding
// requests to be sent to.
// - Token: Flyte uses a token to serve as the placeholder to represent a unit of resource. Flyte resource manager
// manages resources by managing the tokens of the resources.
//
// During runtime, the ResourceManager does two simple things: allocating tokens and releasing tokens. When a Flyte
// task execution wants to send a request to an external service, the plugin should claim a unit of the corresponding
// resource. Specifically, an execution needs to generate a unique token, and register the token with ResourceManager
// by calling ResourceManager's AllocateResource() function. ResourceManager will check its current utilization and
// the allocation policy to decide whether or not to grant the request. Only when receiving the "AllocationGranted"
// status shall this execution move forward and send out the request. The granted token will be recorded in a token
// pool corresponding to the resource and managed by ResourceManager. When the request is done, the plugin will ask
// the resource manager to release the token by calling ResourceManager's ReleaseResource(), and the token will be
// erased from the corresponding pool.
// 4. Example
// Flyte has a built-on Qubole plugin that allows Flyte tasks to send out Hive commands to Qubole.
// In the plugin, a single Qubole cluster is a resource, and sending out a single Hive command to a Qubole cluster consumes
// a token of the corresponding resource. The resource allocation is achieved by the Qubole plugin calling
// status, err := AllocateResource(ctx, <cluster name>, <token string>, <constraint spec>)
// and the de-allocation is achieved by the plugin calling
// status, err := AllocateResource(ctx, <cluster name>, <token string>, <constraint spec>)
// 2. Description
// ResourceManager provides a task-type-specific pooling system for Flyte Tasks. Plugin writers can optionally
// request for resources in their tasks, in single quantity.
//
// For example,
// status, err := AllocateResource(ctx, "default_cluster", "flkgiwd13-akjdoe-0", ResourceConstraintsSpec{})
// When the corresponding Hive command finishes, the plugin needs to make the following function call to release
// the corresponding token
// err := ReleaseResource(ctx, "default_cluster", "flkgiwd13-akjdoe-0")
// 3. Usage
// A Flyte plugin registers the resources and the desired quota of each resource with ResourceRegistrar at the
// setup time of Flyte Propeller. At the end of the setup time, Flyte Propeller builds a ResourceManager based on
// these registration requests.
//
// During runtime, the ResourceManager does two simple things: allocating tokens and releasing tokens. When a Flyte
// task execution wants to send a request to an external service, the plugin should claim a unit of the corresponding
// resource. Specifically, an execution needs to generate a unique token, and register the token with ResourceManager
// by calling ResourceManager's AllocateResource() function. ResourceManager will check its current utilization and
// the allocation policy to decide whether or not to grant the request. Only when receiving the "AllocationGranted"
// status shall this execution move forward and send out the request. The granted token will be recorded in a token
// pool corresponding to the resource and managed by ResourceManager. When the request is done, the plugin will ask
// the resource manager to release the token by calling ResourceManager's ReleaseResource(), and the token will be
// erased from the corresponding pool.
//
// 4. Example
// Flyte has a built-on Qubole plugin that allows Flyte tasks to send out Hive commands to Qubole.
// In the plugin, a single Qubole cluster is a resource, and sending out a single Hive command to a Qubole cluster consumes
// a token of the corresponding resource. The resource allocation is achieved by the Qubole plugin calling
// status, err := AllocateResource(ctx, <cluster name>, <token string>, <constraint spec>)
// and the de-allocation is achieved by the plugin calling
// status, err := AllocateResource(ctx, <cluster name>, <token string>, <constraint spec>)
//
// For example,
// status, err := AllocateResource(ctx, "default_cluster", "flkgiwd13-akjdoe-0", ResourceConstraintsSpec{})
// When the corresponding Hive command finishes, the plugin needs to make the following function call to release
// the corresponding token
// err := ReleaseResource(ctx, "default_cluster", "flkgiwd13-akjdoe-0")
type ResourceManager interface {
GetID() string
// During execution time, plugins can call AllocateResource() to register a token to the token pool associated with a resource with the resource manager.
Expand Down
22 changes: 11 additions & 11 deletions go/tasks/pluginmachinery/core/template/template.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
// Package template exports the Render method
// Render Evaluates templates in each command with the equivalent value from passed args. Templates are case-insensitive
// Supported templates are:
// - {{ .InputFile }} to receive the input file path. The protocol used will depend on the underlying system
// configuration. E.g. s3://bucket/key/to/file.pb or /var/run/local.pb are both valid.
// - {{ .OutputPrefix }} to receive the path prefix for where to store the outputs.
// - {{ .Inputs.myInput }} to receive the actual value of the input passed. See docs on LiteralMapToTemplateArgs for how
// what to expect each literal type to be serialized as.
// - {{ .RawOutputDataPrefix }} to receive a path where the raw output data should be ideally written. It is guaranteed
// to be unique per retry and finally one will be saved as the output path
// - {{ .PerRetryUniqueKey }} A key/id/str that is generated per retry and is guaranteed to be unique. Useful in query
// - {{ .InputFile }} to receive the input file path. The protocol used will depend on the underlying system
// configuration. E.g. s3://bucket/key/to/file.pb or /var/run/local.pb are both valid.
// - {{ .OutputPrefix }} to receive the path prefix for where to store the outputs.
// - {{ .Inputs.myInput }} to receive the actual value of the input passed. See docs on LiteralMapToTemplateArgs for how
// what to expect each literal type to be serialized as.
// - {{ .RawOutputDataPrefix }} to receive a path where the raw output data should be ideally written. It is guaranteed
// to be unique per retry and finally one will be saved as the output path
// - {{ .PerRetryUniqueKey }} A key/id/str that is generated per retry and is guaranteed to be unique. Useful in query
// manipulations
// - {{ .TaskTemplatePath }} A path in blobstore/metadata store (e.g. s3, gcs etc) to where an offloaded version of the
// - {{ .TaskTemplatePath }} A path in blobstore/metadata store (e.g. s3, gcs etc) to where an offloaded version of the
// task template exists and can be accessed by the container / task execution environment. The template is a
// a serialized protobuf
// - {{ .PrevCheckpointPrefix }} A path to the checkpoint directory for the previous attempt. If this is the first attempt
// - {{ .PrevCheckpointPrefix }} A path to the checkpoint directory for the previous attempt. If this is the first attempt
// then this is replaced by an empty string
// - {{ .CheckpointOutputPrefix }} A Flyte aware path where the current execution should write the checkpoints.
// - {{ .CheckpointOutputPrefix }} A Flyte aware path where the current execution should write the checkpoints.
package template

import (
Expand Down
1 change: 0 additions & 1 deletion go/tasks/pluginmachinery/core/transitiontype_enumer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion go/tasks/pluginmachinery/internal/webapi/phase_enumer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions go/tasks/pluginmachinery/webapi/example/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ func (p Plugin) Create(ctx context.Context, tCtx webapi.TaskExecutionContextRead
// This API will be called asynchronously and periodically to update the set of tasks currently in progress. It's
// acceptable if this API is blocking since it'll be called from a background go-routine.
// Best practices:
// 1) Instead of returning the entire response object retrieved from the WebAPI, construct a smaller object that
// 1. Instead of returning the entire response object retrieved from the WebAPI, construct a smaller object that
// has enough information to construct the status/phase, error and/or output.
// 2) This object will NOT be serialized/marshaled. It's, therefore, not a requirement to make it so.
// 3) There is already client-side throttling in place. If the WebAPI returns a throttling error, you should return
// 2. This object will NOT be serialized/marshaled. It's, therefore, not a requirement to make it so.
// 3. There is already client-side throttling in place. If the WebAPI returns a throttling error, you should return
// it as is so that the appropriate metrics are updated and the system administrator can update throttling
// params accordingly.
func (p Plugin) Get(ctx context.Context, tCtx webapi.GetContext) (latest webapi.Resource, err error) {
Expand Down
1 change: 0 additions & 1 deletion go/tasks/pluginmachinery/workqueue/workstatus_enumer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion go/tasks/plugins/array/awsbatch/jobs_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ type JobStore struct {
}

// Submits a new job to AWS Batch and retrieves job info. Note that submitted jobs will not have status populated.
//
func (s JobStore) SubmitJob(ctx context.Context, input *batch.SubmitJobInput) (jobID string, err error) {
name := *input.JobName
if item, err := s.AutoRefresh.Get(name); err == nil {
Expand Down
1 change: 0 additions & 1 deletion go/tasks/plugins/array/core/phase_enumer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 17 additions & 17 deletions go/tasks/plugins/hive/client/qubole_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,14 @@ func (q *quboleClient) executeRequest(ctx context.Context, method string, u *url
}

/*
Execute Hive Command on the QuboleClient Hive Cluster and return the CommandID
param: context.Context ctx: The default go context.
param: string commandStr: the query to execute
param: uint32 timeoutVal: timeout for the query to execute in seconds
param: string ClusterLabel: label for cluster on which to execute the Hive Command.
param: CommandMetadata _: additional labels for the command
return: *int64: CommandID for the command executed
return: error: error in-case of a failure
Execute Hive Command on the QuboleClient Hive Cluster and return the CommandID
param: context.Context ctx: The default go context.
param: string commandStr: the query to execute
param: uint32 timeoutVal: timeout for the query to execute in seconds
param: string ClusterLabel: label for cluster on which to execute the Hive Command.
param: CommandMetadata _: additional labels for the command
return: *int64: CommandID for the command executed
return: error: error in-case of a failure
*/
func (q *quboleClient) ExecuteHiveCommand(
ctx context.Context,
Expand Down Expand Up @@ -231,10 +231,10 @@ func (q *quboleClient) ExecuteHiveCommand(
}

/*
Terminate a QuboleClient command
param: context.Context ctx: The default go context.
param: string CommandID: the CommandID to terminate.
return: error: error in-case of a failure
Terminate a QuboleClient command
param: context.Context ctx: The default go context.
param: string CommandID: the CommandID to terminate.
return: error: error in-case of a failure
*/
func (q *quboleClient) KillCommand(ctx context.Context, commandID string, accountKey string) error {
commandStatus, err := url.Parse(commandID)
Expand All @@ -249,11 +249,11 @@ func (q *quboleClient) KillCommand(ctx context.Context, commandID string, accoun
}

/*
Get the status of a QuboleClient command
param: context.Context ctx: The default go context.
param: string CommandID: the CommandID to fetch the status for
return: *string: commandStatus for the CommandID passed
return: error: error in-case of a failure
Get the status of a QuboleClient command
param: context.Context ctx: The default go context.
param: string CommandID: the CommandID to fetch the status for
return: *string: commandStatus for the CommandID passed
return: error: error in-case of a failure
*/
func (q *quboleClient) GetCommandStatus(ctx context.Context, commandID string, accountKey string) (QuboleStatus, error) {
commandStatus, err := url.Parse(commandID)
Expand Down
5 changes: 3 additions & 2 deletions go/tasks/plugins/hive/executions_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,9 @@ func (q *QuboleHiveExecutionsCache) SyncQuboleQuery(ctx context.Context, batch c

// We need some way to translate results we get from Qubole, into a plugin phase
// NB: This function should only return plugin phases that are greater than (">") phases that represent states before
// the query was kicked off. That is, it will never make sense to go back to PhaseNotStarted, after we've
// submitted the query to Qubole.
//
// the query was kicked off. That is, it will never make sense to go back to PhaseNotStarted, after we've
// submitted the query to Qubole.
func QuboleStatusToExecutionPhase(s client.QuboleStatus) (ExecutionPhase, error) {
switch s {
case client.QuboleStatusDone:
Expand Down
1 change: 0 additions & 1 deletion go/tasks/plugins/presto/client/prestostatus_enumer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ab3ee8a

Please sign in to comment.