Skip to content

Commit

Permalink
Merge pull request #6 from lyft/craj-flyte-mozart
Browse files Browse the repository at this point in the history
Generalize HiveExecutor constructor method to accept custom executor …
  • Loading branch information
chetcode authored Sep 10, 2019
2 parents 99d622c + a725b4d commit aea1f39
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 11 deletions.
11 changes: 7 additions & 4 deletions go/tasks/v1/qubole/client/qubole_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

const url = "https://api.qubole.com/api"
const apiPath = "/v1.2/commands"
const QuboleLogLinkFormat = "https://api.qubole.com/v2/analyze?command_id=%s"
const QuboleLogLinkFormat = "https://api.qubole.com/v2/analyze?command_id=%d"

const tokenKeyForAth = "X-AUTH-TOKEN"
const acceptHeaderKey = "Accept"
Expand All @@ -35,9 +35,12 @@ type quboleCmdDetailsInternal struct {
Status string
}

type QuboleUri = string

type QuboleCommandDetails struct {
ID int64
Status QuboleStatus
Uri QuboleUri
}

// QuboleClient API Request Body, meant to be passed into JSON.marshal
Expand Down Expand Up @@ -194,8 +197,8 @@ func (q *quboleClient) ExecuteHiveCommand(
return nil, err
}

status := newQuboleStatus(ctx, cmd.Status)
return &QuboleCommandDetails{ID: cmd.ID, Status: status}, nil
status := NewQuboleStatus(ctx, cmd.Status)
return &QuboleCommandDetails{ID: cmd.ID, Status: status, Uri: fmt.Sprintf(QuboleLogLinkFormat, cmd.ID)}, nil
}

/*
Expand Down Expand Up @@ -242,7 +245,7 @@ func (q *quboleClient) GetCommandStatus(ctx context.Context, commandID string, a
return QuboleStatusUnknown, err
}

cmdStatus := newQuboleStatus(ctx, cmd.Status)
cmdStatus := NewQuboleStatus(ctx, cmd.Status)
return cmdStatus, nil
}

Expand Down
2 changes: 1 addition & 1 deletion go/tasks/v1/qubole/client/qubole_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var QuboleStatuses = map[QuboleStatus]struct{}{
QuboleStatusCancelled: {},
}

func newQuboleStatus(ctx context.Context, status string) QuboleStatus {
func NewQuboleStatus(ctx context.Context, status string) QuboleStatus {
upperCased := strings.ToUpper(status)
if _, ok := QuboleStatuses[QuboleStatus(upperCased)]; ok {
return QuboleStatus(upperCased)
Expand Down
9 changes: 7 additions & 2 deletions go/tasks/v1/qubole/hive_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func (h HiveExecutor) CheckTaskStatus(ctx context.Context, taskCtx types.TaskCon
commandId := strconv.FormatInt(cmdDetails.ID, 10)
logger.Infof(ctx, "Created Qubole ID %s for %s", commandId, workCacheKey)
item.CommandId = commandId
item.CommandUri = cmdDetails.Uri
item.Status = QuboleWorkRunning
item.Query = "" // Clear the query to save space in etcd once we've successfully launched
err := h.executionBuffer.ConfirmExecution(ctx, workCacheKey, commandId)
Expand Down Expand Up @@ -553,10 +554,14 @@ func (h *HiveExecutor) SyncQuboleQuery(ctx context.Context, obj utils2.CacheItem
}

func NewHiveTaskExecutorWithCache(ctx context.Context) (*HiveExecutor, error) {
return NewHiveTaskExecutor(ctx, hiveExecutorId, client.NewQuboleClient())
}

func NewHiveTaskExecutor(ctx context.Context, executorId string, executorClient client.QuboleClient) (*HiveExecutor, error) {
hiveExecutor := HiveExecutor{
id: hiveExecutorId,
id: executorId,
secretsManager: NewSecretsManager(),
quboleClient: client.NewQuboleClient(),
quboleClient: executorClient,
}

return &hiveExecutor, nil
Expand Down
11 changes: 7 additions & 4 deletions go/tasks/v1/qubole/qubole_work.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package qubole

import (
"fmt"
"encoding/json"
"fmt"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flyteplugins/go/tasks/v1/events"
"github.com/lyft/flyteplugins/go/tasks/v1/qubole/client"
Expand Down Expand Up @@ -43,6 +44,8 @@ type QuboleWorkItem struct {
Query string `json:"query,omitempty"`

TimeoutSec uint32 `json:"timeout,omitempty"`

CommandUri string `json:"command_uri,omitempty"`
}

// This ID will be used in a process-wide cache, so it needs to be unique across all concurrent work being done by
Expand Down Expand Up @@ -148,10 +151,10 @@ func constructEventInfoFromQuboleWorkItems(taskCtx types.TaskContext, quboleWork
workItem := v.(QuboleWorkItem)
if workItem.CommandId != "" {
logs = append(logs, &core.TaskLog{
Name: fmt.Sprintf("Retry: %d Status: %s [%s]",
Name: fmt.Sprintf("Retry: %d Status: %s [%s]",
taskCtx.GetTaskExecutionID().GetID().RetryAttempt, workItem.Status, workItem.CommandId),
MessageFormat: core.TaskLog_UNKNOWN,
Uri: fmt.Sprintf(client.QuboleLogLinkFormat, workItem.CommandId),
Uri: workItem.CommandUri,
})
}
}
Expand Down Expand Up @@ -199,4 +202,4 @@ func InterfaceConverter(cachedInterface interface{}) (QuboleWorkItem, error) {
}

return *item, nil
}
}

0 comments on commit aea1f39

Please sign in to comment.