Skip to content

Commit

Permalink
Return full execution data on every request if under max specified si…
Browse files Browse the repository at this point in the history
…ze (flyteorg#109)
  • Loading branch information
katrogan authored Aug 12, 2020
1 parent 615d2c3 commit c246090
Show file tree
Hide file tree
Showing 11 changed files with 312 additions and 71 deletions.
3 changes: 2 additions & 1 deletion flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/jinzhu/gorm v1.9.12
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/lib/pq v1.3.0
github.com/lyft/flyteidl v0.18.1
github.com/lyft/flyteidl v0.18.2
github.com/lyft/flytepropeller v0.3.7
github.com/lyft/flytestdlib v0.3.9
github.com/magiconair/properties v1.8.1
Expand Down Expand Up @@ -52,4 +52,5 @@ replace (
k8s.io/api => github.com/lyft/api v0.0.0-20191031200350-b49a72c274e0
k8s.io/apimachinery => github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f
k8s.io/client-go => k8s.io/client-go v0.0.0-20191016111102-bec269661e48

)
10 changes: 2 additions & 8 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -457,16 +457,10 @@ github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f/go.mod h1:llRdnz
github.com/lyft/datacatalog v0.2.1/go.mod h1:ktrPvzTDUwHO5Lv0hLH38zLHnOJ++rGoAO0iQ/sIPJ4=
github.com/lyft/flyteidl v0.17.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.18.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.18.1 h1:COKkZi5k6bQvUYOk5gE70+FJX9/NUn0WOQ1uMrw3Qio=
github.com/lyft/flyteidl v0.18.1/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.4.2/go.mod h1:8zhqFG9BzbHNQGEXzGYltTJLD+KTmQZkanxXgeFI25c=
github.com/lyft/flyteidl v0.18.2 h1:znA8yy8ImnVUGRa2j6z/4zaRbPHNgFCUp84UTquHDJk=
github.com/lyft/flyteidl v0.18.2/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.4.4/go.mod h1:8zhqFG9BzbHNQGEXzGYltTJLD+KTmQZkanxXgeFI25c=
github.com/lyft/flytepropeller v0.3.4 h1:BaxfEJGNnFtYffsjnMhQesUlW06D1zrMM8v/wOos/nU=
github.com/lyft/flytepropeller v0.3.4/go.mod h1:8Stq6f16u7qL9U0CDx7mNkPi80JRl0gtbIMzeqRegug=
github.com/lyft/flytepropeller v0.3.5-0.20200807170205-cc336537cedf h1:pewwhbuOjXI3oiFtKwsmj4qBU29oP1ezP5YG8Ervujs=
github.com/lyft/flytepropeller v0.3.5-0.20200807170205-cc336537cedf/go.mod h1:8JuHCbR2MKMIlLr839NGJidj2tjBGmB/CQt8/zsHc1E=
github.com/lyft/flytepropeller v0.3.6 h1:bMpa96VqvxpRmMCEQw/rp64BC8voPZhjEoKF4FhmzIw=
github.com/lyft/flytepropeller v0.3.6/go.mod h1:1Iw3ngmJBP+52coloHL1rOxcX7EDDUUvTYFQQy2WYzk=
github.com/lyft/flytepropeller v0.3.7 h1:l2AguhyhiUDCvqjHYF8XJw46gPW9j4XNZwJEAJdiEtI=
github.com/lyft/flytepropeller v0.3.7/go.mod h1:8sNP7ZnEngNRYBMewmH4PtiRR0pus8RkjNoPqelyKX8=
github.com/lyft/flytestdlib v0.3.0 h1:nIkX4MlyYdcLLzaF35RI2P5BhARt+qMgHoFto8eVNzU=
Expand Down
38 changes: 33 additions & 5 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ type executionSystemMetrics struct {
}

type executionUserMetrics struct {
Scope promutils.Scope
ScheduledExecutionDelays projectDomainScopedStopWatchMap
WorkflowExecutionDurations projectDomainScopedStopWatchMap
Scope promutils.Scope
ScheduledExecutionDelays projectDomainScopedStopWatchMap
WorkflowExecutionDurations projectDomainScopedStopWatchMap
WorkflowExecutionInputBytes prometheus.Summary
WorkflowExecutionOutputBytes prometheus.Summary
}

type ExecutionManager struct {
Expand Down Expand Up @@ -1019,10 +1021,32 @@ func (m *ExecutionManager) GetExecutionData(
if err != nil {
return nil, err
}
return &admin.WorkflowExecutionGetDataResponse{
response := &admin.WorkflowExecutionGetDataResponse{
Outputs: &signedOutputsURLBlob,
Inputs: &inputsURLBlob,
}, nil
}
maxDataSize := m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes
if maxDataSize == 0 || inputsURLBlob.Bytes < maxDataSize {
var fullInputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, executionModel.InputsURI, &fullInputs)
if err != nil {
logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", executionModel.InputsURI, err)
}
response.FullInputs = &fullInputs
}
if maxDataSize == 0 || (signedOutputsURLBlob.Bytes < maxDataSize && execution.Closure.GetOutputs() != nil) {
var fullOutputs core.LiteralMap
outputsURI := execution.Closure.GetOutputs().GetUri()
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(outputsURI), &fullOutputs)
if err != nil {
logger.Warningf(ctx, "Failed to read outputs from URI [%s] with err: %v", outputsURI, err)
}
response.FullOutputs = &fullOutputs
}

m.userMetrics.WorkflowExecutionInputBytes.Observe(float64(response.Inputs.Bytes))
m.userMetrics.WorkflowExecutionOutputBytes.Observe(float64(response.Outputs.Bytes))
return response, nil
}

func (m *ExecutionManager) ListExecutions(
Expand Down Expand Up @@ -1234,6 +1258,10 @@ func NewExecutionManager(
Scope: userScope,
ScheduledExecutionDelays: make(map[string]map[string]*promutils.StopWatch),
WorkflowExecutionDurations: make(map[string]map[string]*promutils.StopWatch),
WorkflowExecutionInputBytes: userScope.MustNewSummary("input_size_bytes",
"size in bytes of serialized execution inputs"),
WorkflowExecutionOutputBytes: userScope.MustNewSummary("output_size_bytes",
"size in bytes of serialized execution outputs"),
}

resourceManager := resources.NewResourceManager(db, config.ApplicationConfiguration())
Expand Down
36 changes: 34 additions & 2 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func getMockStorageForExecTest(ctx context.Context) *storage.DataStore {
return nil
}
workflowClosure := testutils.GetWorkflowClosure()
if err := mockStorage.WriteProtobuf(ctx, storage.DataReference(remoteClosureIdentifier), defaultStorageOptions, workflowClosure); err != nil {
if err := mockStorage.WriteProtobuf(ctx, remoteClosureIdentifier, defaultStorageOptions, workflowClosure); err != nil {
return nil
}
return mockStorage
Expand Down Expand Up @@ -2013,10 +2013,34 @@ func TestGetExecutionData(t *testing.T) {

return admin.UrlBlob{}, errors.New("unexpected input")
}
mockStorage := commonMocks.GetMockStorageClient()
fullInputs := &core.LiteralMap{
Literals: map[string]*core.Literal{
"foo": testutils.MakeStringLiteral("foo-value-1"),
},
}
fullOutputs := &core.LiteralMap{
Literals: map[string]*core.Literal{
"bar": testutils.MakeStringLiteral("bar-value-1"),
},
}
mockStorage.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func(
ctx context.Context, reference storage.DataReference, msg proto.Message) error {
if reference.String() == "inputs" {
marshalled, _ := proto.Marshal(fullInputs)
_ = proto.Unmarshal(marshalled, msg)
return nil
} else if reference.String() == outputURI {
marshalled, _ := proto.Marshal(fullOutputs)
_ = proto.Unmarshal(marshalled, msg)
return nil
}
return fmt.Errorf("unexpected call to find value in storage [%v]", reference.String())
}

repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc)
execManager := NewExecutionManager(
repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
repository, getMockExecutionsConfigProvider(), mockStorage, workflowengineMocks.NewMockExecutor(),
mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
dataResponse, err := execManager.GetExecutionData(context.Background(), admin.WorkflowExecutionGetDataRequest{
Id: &executionIdentifier,
Expand All @@ -2031,6 +2055,8 @@ func TestGetExecutionData(t *testing.T) {
Url: "inputs",
Bytes: 200,
},
FullInputs: fullInputs,
FullOutputs: fullOutputs,
}, dataResponse))
}

Expand Down Expand Up @@ -2215,6 +2241,12 @@ func TestGetExecutionData_LegacyModel(t *testing.T) {
Url: "inputs",
Bytes: 200,
},
FullInputs: &core.LiteralMap{
Literals: map[string]*core.Literal{
"foo": testutils.MakeStringLiteral("foo-value-1"),
},
},
FullOutputs: &core.LiteralMap{},
}, dataResponse))
var inputs core.LiteralMap
err = storageClient.ReadProtobuf(context.Background(), storage.DataReference("s3://bucket/metadata/project/domain/name/inputs"), &inputs)
Expand Down
56 changes: 46 additions & 10 deletions flyteadmin/pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"strconv"

"github.com/lyft/flytestdlib/storage"

"github.com/lyft/flytestdlib/contextutils"

"github.com/lyft/flyteadmin/pkg/manager/impl/shared"
Expand All @@ -25,6 +27,7 @@ import (
repoInterfaces "github.com/lyft/flyteadmin/pkg/repositories/interfaces"
"github.com/lyft/flyteadmin/pkg/repositories/models"
"github.com/lyft/flyteadmin/pkg/repositories/transformers"
runtimeInterfaces "github.com/lyft/flyteadmin/pkg/runtime/interfaces"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"google.golang.org/grpc/codes"
Expand All @@ -38,12 +41,16 @@ type nodeExecutionMetrics struct {
NodeExecutionEventsCreated prometheus.Counter
MissingWorkflowExecution prometheus.Counter
ClosureSizeBytes prometheus.Summary
NodeExecutionInputBytes prometheus.Summary
NodeExecutionOutputBytes prometheus.Summary
}

type NodeExecutionManager struct {
db repositories.RepositoryInterface
metrics nodeExecutionMetrics
urlData dataInterfaces.RemoteURLInterface
db repositories.RepositoryInterface
config runtimeInterfaces.Configuration
storageClient *storage.DataStore
metrics nodeExecutionMetrics
urlData dataInterfaces.RemoteURLInterface
}

type updateNodeExecutionStatus int
Expand Down Expand Up @@ -391,15 +398,38 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
return nil, err
}
}
return &admin.NodeExecutionGetDataResponse{
response := &admin.NodeExecutionGetDataResponse{
Inputs: &signedInputsURLBlob,
Outputs: &signedOutputsURLBlob,
}, nil
}
maxDataSize := m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes
if maxDataSize == 0 || signedInputsURLBlob.Bytes < maxDataSize {
var fullInputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(nodeExecution.InputUri), &fullInputs)
if err != nil {
logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", nodeExecution.InputUri, err)
}
response.FullInputs = &fullInputs
}
if maxDataSize == 0 || (signedOutputsURLBlob.Bytes < maxDataSize && len(nodeExecution.Closure.GetOutputUri()) > 0) {
var fullOutputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(nodeExecution.Closure.GetOutputUri()), &fullOutputs)
if err != nil {
logger.Warningf(ctx, "Failed to read outputs from URI [%s] with err: %v",
nodeExecution.Closure.GetOutputUri(), err)
}
response.FullOutputs = &fullOutputs
}

m.metrics.NodeExecutionInputBytes.Observe(float64(response.Inputs.Bytes))
m.metrics.NodeExecutionOutputBytes.Observe(float64(response.Outputs.Bytes))

return response, nil
}

func NewNodeExecutionManager(
db repositories.RepositoryInterface, scope promutils.Scope,
urlData dataInterfaces.RemoteURLInterface) interfaces.NodeExecutionInterface {
db repositories.RepositoryInterface, config runtimeInterfaces.Configuration, storageClient *storage.DataStore,
scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface) interfaces.NodeExecutionInterface {
metrics := nodeExecutionMetrics{
Scope: scope,
ActiveNodeExecutions: scope.MustNewGauge("active_node_executions",
Expand All @@ -414,10 +444,16 @@ func NewNodeExecutionManager(
"overall count of node execution events received that are missing a parent workflow execution"),
ClosureSizeBytes: scope.MustNewSummary("closure_size_bytes",
"size in bytes of serialized node execution closure"),
NodeExecutionInputBytes: scope.MustNewSummary("input_size_bytes",
"size in bytes of serialized node execution inputs"),
NodeExecutionOutputBytes: scope.MustNewSummary("output_size_bytes",
"size in bytes of serialized node execution outputs"),
}
return &NodeExecutionManager{
db: db,
metrics: metrics,
urlData: urlData,
db: db,
config: config,
storageClient: storageClient,
metrics: metrics,
urlData: urlData,
}
}
Loading

0 comments on commit c246090

Please sign in to comment.