diff --git a/flyteadmin/go.mod b/flyteadmin/go.mod index 8bb2f6d630..32e362cc69 100644 --- a/flyteadmin/go.mod +++ b/flyteadmin/go.mod @@ -22,7 +22,7 @@ require ( github.com/jinzhu/gorm v1.9.12 github.com/kelseyhightower/envconfig v1.4.0 // indirect github.com/lib/pq v1.3.0 - github.com/lyft/flyteidl v0.18.1 + github.com/lyft/flyteidl v0.18.2 github.com/lyft/flytepropeller v0.3.7 github.com/lyft/flytestdlib v0.3.9 github.com/magiconair/properties v1.8.1 @@ -52,4 +52,5 @@ replace ( k8s.io/api => github.com/lyft/api v0.0.0-20191031200350-b49a72c274e0 k8s.io/apimachinery => github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f k8s.io/client-go => k8s.io/client-go v0.0.0-20191016111102-bec269661e48 + ) diff --git a/flyteadmin/go.sum b/flyteadmin/go.sum index cc08541c33..6807af4230 100644 --- a/flyteadmin/go.sum +++ b/flyteadmin/go.sum @@ -457,16 +457,10 @@ github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f/go.mod h1:llRdnz github.com/lyft/datacatalog v0.2.1/go.mod h1:ktrPvzTDUwHO5Lv0hLH38zLHnOJ++rGoAO0iQ/sIPJ4= github.com/lyft/flyteidl v0.17.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteidl v0.18.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= -github.com/lyft/flyteidl v0.18.1 h1:COKkZi5k6bQvUYOk5gE70+FJX9/NUn0WOQ1uMrw3Qio= github.com/lyft/flyteidl v0.18.1/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= -github.com/lyft/flyteplugins v0.4.2/go.mod h1:8zhqFG9BzbHNQGEXzGYltTJLD+KTmQZkanxXgeFI25c= +github.com/lyft/flyteidl v0.18.2 h1:znA8yy8ImnVUGRa2j6z/4zaRbPHNgFCUp84UTquHDJk= +github.com/lyft/flyteidl v0.18.2/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteplugins v0.4.4/go.mod h1:8zhqFG9BzbHNQGEXzGYltTJLD+KTmQZkanxXgeFI25c= -github.com/lyft/flytepropeller v0.3.4 h1:BaxfEJGNnFtYffsjnMhQesUlW06D1zrMM8v/wOos/nU= -github.com/lyft/flytepropeller v0.3.4/go.mod h1:8Stq6f16u7qL9U0CDx7mNkPi80JRl0gtbIMzeqRegug= -github.com/lyft/flytepropeller v0.3.5-0.20200807170205-cc336537cedf h1:pewwhbuOjXI3oiFtKwsmj4qBU29oP1ezP5YG8Ervujs= -github.com/lyft/flytepropeller v0.3.5-0.20200807170205-cc336537cedf/go.mod h1:8JuHCbR2MKMIlLr839NGJidj2tjBGmB/CQt8/zsHc1E= -github.com/lyft/flytepropeller v0.3.6 h1:bMpa96VqvxpRmMCEQw/rp64BC8voPZhjEoKF4FhmzIw= -github.com/lyft/flytepropeller v0.3.6/go.mod h1:1Iw3ngmJBP+52coloHL1rOxcX7EDDUUvTYFQQy2WYzk= github.com/lyft/flytepropeller v0.3.7 h1:l2AguhyhiUDCvqjHYF8XJw46gPW9j4XNZwJEAJdiEtI= github.com/lyft/flytepropeller v0.3.7/go.mod h1:8sNP7ZnEngNRYBMewmH4PtiRR0pus8RkjNoPqelyKX8= github.com/lyft/flytestdlib v0.3.0 h1:nIkX4MlyYdcLLzaF35RI2P5BhARt+qMgHoFto8eVNzU= diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index bafe0ae177..94374104e5 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -65,9 +65,11 @@ type executionSystemMetrics struct { } type executionUserMetrics struct { - Scope promutils.Scope - ScheduledExecutionDelays projectDomainScopedStopWatchMap - WorkflowExecutionDurations projectDomainScopedStopWatchMap + Scope promutils.Scope + ScheduledExecutionDelays projectDomainScopedStopWatchMap + WorkflowExecutionDurations projectDomainScopedStopWatchMap + WorkflowExecutionInputBytes prometheus.Summary + WorkflowExecutionOutputBytes prometheus.Summary } type ExecutionManager struct { @@ -1019,10 +1021,32 @@ func (m *ExecutionManager) GetExecutionData( if err != nil { return nil, err } - return &admin.WorkflowExecutionGetDataResponse{ + response := &admin.WorkflowExecutionGetDataResponse{ Outputs: &signedOutputsURLBlob, Inputs: &inputsURLBlob, - }, nil + } + maxDataSize := m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes + if maxDataSize == 0 || inputsURLBlob.Bytes < maxDataSize { + var fullInputs core.LiteralMap + err := m.storageClient.ReadProtobuf(ctx, executionModel.InputsURI, &fullInputs) + if err != nil { + logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", executionModel.InputsURI, err) + } + response.FullInputs = &fullInputs + } + if maxDataSize == 0 || (signedOutputsURLBlob.Bytes < maxDataSize && execution.Closure.GetOutputs() != nil) { + var fullOutputs core.LiteralMap + outputsURI := execution.Closure.GetOutputs().GetUri() + err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(outputsURI), &fullOutputs) + if err != nil { + logger.Warningf(ctx, "Failed to read outputs from URI [%s] with err: %v", outputsURI, err) + } + response.FullOutputs = &fullOutputs + } + + m.userMetrics.WorkflowExecutionInputBytes.Observe(float64(response.Inputs.Bytes)) + m.userMetrics.WorkflowExecutionOutputBytes.Observe(float64(response.Outputs.Bytes)) + return response, nil } func (m *ExecutionManager) ListExecutions( @@ -1234,6 +1258,10 @@ func NewExecutionManager( Scope: userScope, ScheduledExecutionDelays: make(map[string]map[string]*promutils.StopWatch), WorkflowExecutionDurations: make(map[string]map[string]*promutils.StopWatch), + WorkflowExecutionInputBytes: userScope.MustNewSummary("input_size_bytes", + "size in bytes of serialized execution inputs"), + WorkflowExecutionOutputBytes: userScope.MustNewSummary("output_size_bytes", + "size in bytes of serialized execution outputs"), } resourceManager := resources.NewResourceManager(db, config.ApplicationConfiguration()) diff --git a/flyteadmin/pkg/manager/impl/execution_manager_test.go b/flyteadmin/pkg/manager/impl/execution_manager_test.go index 9b42efe1ab..93b51bd37d 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/execution_manager_test.go @@ -168,7 +168,7 @@ func getMockStorageForExecTest(ctx context.Context) *storage.DataStore { return nil } workflowClosure := testutils.GetWorkflowClosure() - if err := mockStorage.WriteProtobuf(ctx, storage.DataReference(remoteClosureIdentifier), defaultStorageOptions, workflowClosure); err != nil { + if err := mockStorage.WriteProtobuf(ctx, remoteClosureIdentifier, defaultStorageOptions, workflowClosure); err != nil { return nil } return mockStorage @@ -2013,10 +2013,34 @@ func TestGetExecutionData(t *testing.T) { return admin.UrlBlob{}, errors.New("unexpected input") } + mockStorage := commonMocks.GetMockStorageClient() + fullInputs := &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "foo": testutils.MakeStringLiteral("foo-value-1"), + }, + } + fullOutputs := &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "bar": testutils.MakeStringLiteral("bar-value-1"), + }, + } + mockStorage.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func( + ctx context.Context, reference storage.DataReference, msg proto.Message) error { + if reference.String() == "inputs" { + marshalled, _ := proto.Marshal(fullInputs) + _ = proto.Unmarshal(marshalled, msg) + return nil + } else if reference.String() == outputURI { + marshalled, _ := proto.Marshal(fullOutputs) + _ = proto.Unmarshal(marshalled, msg) + return nil + } + return fmt.Errorf("unexpected call to find value in storage [%v]", reference.String()) + } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), + repository, getMockExecutionsConfigProvider(), mockStorage, workflowengineMocks.NewMockExecutor(), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) dataResponse, err := execManager.GetExecutionData(context.Background(), admin.WorkflowExecutionGetDataRequest{ Id: &executionIdentifier, @@ -2031,6 +2055,8 @@ func TestGetExecutionData(t *testing.T) { Url: "inputs", Bytes: 200, }, + FullInputs: fullInputs, + FullOutputs: fullOutputs, }, dataResponse)) } @@ -2215,6 +2241,12 @@ func TestGetExecutionData_LegacyModel(t *testing.T) { Url: "inputs", Bytes: 200, }, + FullInputs: &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "foo": testutils.MakeStringLiteral("foo-value-1"), + }, + }, + FullOutputs: &core.LiteralMap{}, }, dataResponse)) var inputs core.LiteralMap err = storageClient.ReadProtobuf(context.Background(), storage.DataReference("s3://bucket/metadata/project/domain/name/inputs"), &inputs) diff --git a/flyteadmin/pkg/manager/impl/node_execution_manager.go b/flyteadmin/pkg/manager/impl/node_execution_manager.go index 91ae00193a..3e37e9c87c 100644 --- a/flyteadmin/pkg/manager/impl/node_execution_manager.go +++ b/flyteadmin/pkg/manager/impl/node_execution_manager.go @@ -4,6 +4,8 @@ import ( "context" "strconv" + "github.com/lyft/flytestdlib/storage" + "github.com/lyft/flytestdlib/contextutils" "github.com/lyft/flyteadmin/pkg/manager/impl/shared" @@ -25,6 +27,7 @@ import ( repoInterfaces "github.com/lyft/flyteadmin/pkg/repositories/interfaces" "github.com/lyft/flyteadmin/pkg/repositories/models" "github.com/lyft/flyteadmin/pkg/repositories/transformers" + runtimeInterfaces "github.com/lyft/flyteadmin/pkg/runtime/interfaces" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "google.golang.org/grpc/codes" @@ -38,12 +41,16 @@ type nodeExecutionMetrics struct { NodeExecutionEventsCreated prometheus.Counter MissingWorkflowExecution prometheus.Counter ClosureSizeBytes prometheus.Summary + NodeExecutionInputBytes prometheus.Summary + NodeExecutionOutputBytes prometheus.Summary } type NodeExecutionManager struct { - db repositories.RepositoryInterface - metrics nodeExecutionMetrics - urlData dataInterfaces.RemoteURLInterface + db repositories.RepositoryInterface + config runtimeInterfaces.Configuration + storageClient *storage.DataStore + metrics nodeExecutionMetrics + urlData dataInterfaces.RemoteURLInterface } type updateNodeExecutionStatus int @@ -391,15 +398,38 @@ func (m *NodeExecutionManager) GetNodeExecutionData( return nil, err } } - return &admin.NodeExecutionGetDataResponse{ + response := &admin.NodeExecutionGetDataResponse{ Inputs: &signedInputsURLBlob, Outputs: &signedOutputsURLBlob, - }, nil + } + maxDataSize := m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes + if maxDataSize == 0 || signedInputsURLBlob.Bytes < maxDataSize { + var fullInputs core.LiteralMap + err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(nodeExecution.InputUri), &fullInputs) + if err != nil { + logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", nodeExecution.InputUri, err) + } + response.FullInputs = &fullInputs + } + if maxDataSize == 0 || (signedOutputsURLBlob.Bytes < maxDataSize && len(nodeExecution.Closure.GetOutputUri()) > 0) { + var fullOutputs core.LiteralMap + err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(nodeExecution.Closure.GetOutputUri()), &fullOutputs) + if err != nil { + logger.Warningf(ctx, "Failed to read outputs from URI [%s] with err: %v", + nodeExecution.Closure.GetOutputUri(), err) + } + response.FullOutputs = &fullOutputs + } + + m.metrics.NodeExecutionInputBytes.Observe(float64(response.Inputs.Bytes)) + m.metrics.NodeExecutionOutputBytes.Observe(float64(response.Outputs.Bytes)) + + return response, nil } func NewNodeExecutionManager( - db repositories.RepositoryInterface, scope promutils.Scope, - urlData dataInterfaces.RemoteURLInterface) interfaces.NodeExecutionInterface { + db repositories.RepositoryInterface, config runtimeInterfaces.Configuration, storageClient *storage.DataStore, + scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface) interfaces.NodeExecutionInterface { metrics := nodeExecutionMetrics{ Scope: scope, ActiveNodeExecutions: scope.MustNewGauge("active_node_executions", @@ -414,10 +444,16 @@ func NewNodeExecutionManager( "overall count of node execution events received that are missing a parent workflow execution"), ClosureSizeBytes: scope.MustNewSummary("closure_size_bytes", "size in bytes of serialized node execution closure"), + NodeExecutionInputBytes: scope.MustNewSummary("input_size_bytes", + "size in bytes of serialized node execution inputs"), + NodeExecutionOutputBytes: scope.MustNewSummary("output_size_bytes", + "size in bytes of serialized node execution outputs"), } return &NodeExecutionManager{ - db: db, - metrics: metrics, - urlData: urlData, + db: db, + config: config, + storageClient: storageClient, + metrics: metrics, + urlData: urlData, } } diff --git a/flyteadmin/pkg/manager/impl/node_execution_manager_test.go b/flyteadmin/pkg/manager/impl/node_execution_manager_test.go index 3e8d89d1f2..676baa495a 100644 --- a/flyteadmin/pkg/manager/impl/node_execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/node_execution_manager_test.go @@ -3,12 +3,17 @@ package impl import ( "context" "errors" + "fmt" "testing" "time" + "github.com/lyft/flyteadmin/pkg/manager/impl/testutils" + "github.com/lyft/flytestdlib/storage" + "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" "github.com/lyft/flyteadmin/pkg/common" + commonMocks "github.com/lyft/flyteadmin/pkg/common/mocks" dataMocks "github.com/lyft/flyteadmin/pkg/data/mocks" flyteAdminErrors "github.com/lyft/flyteadmin/pkg/errors" "github.com/lyft/flyteadmin/pkg/repositories" @@ -129,7 +134,8 @@ func TestCreateNodeEvent(t *testing.T) { }, *input) return nil }) - nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request) assert.Nil(t, err) assert.NotNil(t, resp) @@ -184,7 +190,8 @@ func TestCreateNodeEvent_Update(t *testing.T) { return nil }) - nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request) assert.Nil(t, err) assert.NotNil(t, resp) @@ -197,7 +204,8 @@ func TestCreateNodeEvent_MissingExecution(t *testing.T) { func(ctx context.Context, input interfaces.GetResourceInput) (models.Execution, error) { return models.Execution{}, expectedErr }) - nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request) assert.EqualError(t, err, "failed to get existing execution id: [project:\"project\""+ " domain:\"domain\" name:\"name\" ] with err: expected error") @@ -217,7 +225,8 @@ func TestCreateNodeEvent_CreateDatabaseError(t *testing.T) { func(ctx context.Context, event *models.NodeExecutionEvent, input *models.NodeExecution) error { return expectedErr }) - nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request) assert.EqualError(t, err, expectedErr.Error()) assert.Nil(t, resp) @@ -252,7 +261,8 @@ func TestCreateNodeEvent_UpdateDatabaseError(t *testing.T) { func(ctx context.Context, event *models.NodeExecutionEvent, nodeExecution *models.NodeExecution) error { return expectedErr }) - nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request) assert.EqualError(t, err, expectedErr.Error()) assert.Nil(t, resp) @@ -281,7 +291,8 @@ func TestCreateNodeEvent_UpdateTerminalEventError(t *testing.T) { StartedAt: &occurredAt, }, nil }) - nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request) assert.Nil(t, resp) assert.NotNil(t, err) @@ -316,7 +327,8 @@ func TestCreateNodeEvent_UpdateDuplicateEventError(t *testing.T) { StartedAt: &occurredAt, }, nil }) - nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) resp, err := nodeExecManager.CreateNodeEvent(context.Background(), request) assert.Equal(t, codes.AlreadyExists, err.(flyteAdminErrors.FlyteAdminError).Code()) assert.Nil(t, resp) @@ -329,7 +341,8 @@ func TestCreateNodeEvent_FirstEventIsTerminal(t *testing.T) { func(ctx context.Context, input interfaces.GetNodeExecutionInput) (models.NodeExecution, error) { return models.NodeExecution{}, flyteAdminErrors.NewFlyteAdminError(codes.NotFound, "foo") }) - nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) succeededRequest := admin.NodeExecutionEventRequest{ RequestId: "request id", Event: &event.NodeExecutionEvent{ @@ -390,7 +403,8 @@ func TestGetNodeExecution(t *testing.T) { NodeExecutionMetadata: metadataBytes, }, nil }) - nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) nodeExecution, err := nodeExecManager.GetNodeExecution(context.Background(), admin.NodeExecutionGetRequest{ Id: &nodeExecutionIdentifier, }) @@ -453,7 +467,8 @@ func TestGetNodeExecutionParentNode(t *testing.T) { }, }, nil }) - nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) nodeExecution, err := nodeExecManager.GetNodeExecution(context.Background(), admin.NodeExecutionGetRequest{ Id: &nodeExecutionIdentifier, }) @@ -474,7 +489,8 @@ func TestGetNodeExecution_DatabaseError(t *testing.T) { func(ctx context.Context, input interfaces.GetNodeExecutionInput) (models.NodeExecution, error) { return models.NodeExecution{}, expectedErr }) - nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) nodeExecution, err := nodeExecManager.GetNodeExecution(context.Background(), admin.NodeExecutionGetRequest{ Id: &nodeExecutionIdentifier, }) @@ -501,7 +517,8 @@ func TestGetNodeExecution_TransformerError(t *testing.T) { Closure: []byte("i'm invalid"), }, nil }) - nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) nodeExecution, err := nodeExecManager.GetNodeExecution(context.Background(), admin.NodeExecutionGetRequest{ Id: &nodeExecutionIdentifier, }) @@ -570,7 +587,8 @@ func TestListNodeExecutionsLevelZero(t *testing.T) { }, }, nil }) - nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) nodeExecutions, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{ WorkflowExecutionId: &core.WorkflowExecutionIdentifier{ Project: "project", @@ -669,7 +687,8 @@ func TestListNodeExecutionsWithParent(t *testing.T) { }, }, nil }) - nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) nodeExecutions, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{ WorkflowExecutionId: &core.WorkflowExecutionIdentifier{ Project: "project", @@ -703,7 +722,8 @@ func TestListNodeExecutionsWithParent(t *testing.T) { } func TestListNodeExecutions_InvalidParams(t *testing.T) { - nodeExecManager := NewNodeExecutionManager(nil, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(nil, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) _, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{ Filters: "eq(execution.project, project)", }) @@ -729,7 +749,8 @@ func TestListNodeExecutions_DatabaseError(t *testing.T) { interfaces.NodeExecutionCollectionOutput, error) { return interfaces.NodeExecutionCollectionOutput{}, expectedErr }) - nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) nodeExecutions, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{ WorkflowExecutionId: &core.WorkflowExecutionIdentifier{ Project: "project", @@ -767,7 +788,8 @@ func TestListNodeExecutions_TransformerError(t *testing.T) { }, }, nil }) - nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) nodeExecutions, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{ WorkflowExecutionId: &core.WorkflowExecutionIdentifier{ Project: "project", @@ -795,7 +817,8 @@ func TestListNodeExecutions_NothingToReturn(t *testing.T) { listExecutionsCalled = true return interfaces.ExecutionCollectionOutput{}, nil }) - nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) _, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{ WorkflowExecutionId: &core.WorkflowExecutionIdentifier{ Project: "project", @@ -892,7 +915,8 @@ func TestListNodeExecutionsForTask(t *testing.T) { }, }, nil }) - nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL) nodeExecutions, err := nodeExecManager.ListNodeExecutionsForTask(context.Background(), admin.NodeExecutionForTaskListRequest{ TaskExecutionId: &core.TaskExecutionIdentifier{ NodeExecutionId: &core.NodeExecutionIdentifier{ @@ -993,7 +1017,32 @@ func TestGetNodeExecutionData(t *testing.T) { return admin.UrlBlob{}, errors.New("unexpected input") } - nodeExecManager := NewNodeExecutionManager(repository, mockScope.NewTestScope(), mockNodeExecutionRemoteURL) + mockStorage := commonMocks.GetMockStorageClient() + fullInputs := &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "foo": testutils.MakeStringLiteral("foo-value-1"), + }, + } + fullOutputs := &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "bar": testutils.MakeStringLiteral("bar-value-1"), + }, + } + mockStorage.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func( + ctx context.Context, reference storage.DataReference, msg proto.Message) error { + if reference.String() == "input uri" { + marshalled, _ := proto.Marshal(fullInputs) + _ = proto.Unmarshal(marshalled, msg) + return nil + } else if reference.String() == "output uri" { + marshalled, _ := proto.Marshal(fullOutputs) + _ = proto.Unmarshal(marshalled, msg) + return nil + } + return fmt.Errorf("unexpected call to find value in storage [%v]", reference.String()) + } + nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), mockStorage, + mockScope.NewTestScope(), mockNodeExecutionRemoteURL) dataResponse, err := nodeExecManager.GetNodeExecutionData(context.Background(), admin.NodeExecutionGetDataRequest{ Id: &nodeExecutionIdentifier, }) @@ -1007,5 +1056,7 @@ func TestGetNodeExecutionData(t *testing.T) { Url: "outputs", Bytes: 200, }, + FullInputs: fullInputs, + FullOutputs: fullOutputs, }, dataResponse)) } diff --git a/flyteadmin/pkg/manager/impl/task_execution_manager.go b/flyteadmin/pkg/manager/impl/task_execution_manager.go index d8d488f03a..ddb927e7cd 100644 --- a/flyteadmin/pkg/manager/impl/task_execution_manager.go +++ b/flyteadmin/pkg/manager/impl/task_execution_manager.go @@ -5,6 +5,8 @@ import ( "fmt" "strconv" + "github.com/lyft/flytestdlib/storage" + "github.com/lyft/flytestdlib/contextutils" "github.com/lyft/flytestdlib/promutils" @@ -20,6 +22,7 @@ import ( repoInterfaces "github.com/lyft/flyteadmin/pkg/repositories/interfaces" "github.com/lyft/flyteadmin/pkg/repositories/models" "github.com/lyft/flyteadmin/pkg/repositories/transformers" + runtimeInterfaces "github.com/lyft/flyteadmin/pkg/runtime/interfaces" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flytestdlib/logger" @@ -35,12 +38,16 @@ type taskExecutionMetrics struct { MissingTaskExecution prometheus.Counter MissingTaskDefinition prometheus.Counter ClosureSizeBytes prometheus.Summary + TaskExecutionInputBytes prometheus.Summary + TaskExecutionOutputBytes prometheus.Summary } type TaskExecutionManager struct { - db repositories.RepositoryInterface - metrics taskExecutionMetrics - urlData dataInterfaces.RemoteURLInterface + db repositories.RepositoryInterface + config runtimeInterfaces.Configuration + storageClient *storage.DataStore + metrics taskExecutionMetrics + urlData dataInterfaces.RemoteURLInterface } func getTaskExecutionContext(ctx context.Context, identifier *core.TaskExecutionIdentifier) context.Context { @@ -275,14 +282,36 @@ func (m *TaskExecutionManager) GetTaskExecutionData( return nil, err } } - return &admin.TaskExecutionGetDataResponse{ + response := &admin.TaskExecutionGetDataResponse{ Inputs: &signedInputsURLBlob, Outputs: &signedOutputsURLBlob, - }, nil + } + maxDataSize := m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes + if maxDataSize == 0 || signedInputsURLBlob.Bytes < maxDataSize { + var fullInputs core.LiteralMap + err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(taskExecution.InputUri), &fullInputs) + if err != nil { + logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", taskExecution.InputUri, err) + } + response.FullInputs = &fullInputs + } + if maxDataSize == 0 || (signedOutputsURLBlob.Bytes < maxDataSize && len(taskExecution.Closure.GetOutputUri()) > 0) { + var fullOutputs core.LiteralMap + err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(taskExecution.Closure.GetOutputUri()), &fullOutputs) + if err != nil { + logger.Warningf(ctx, "Failed to read outputs from URI [%s] with err: %v", + taskExecution.Closure.GetOutputUri(), err) + } + response.FullOutputs = &fullOutputs + } + + m.metrics.TaskExecutionInputBytes.Observe(float64(response.Inputs.Bytes)) + m.metrics.TaskExecutionOutputBytes.Observe(float64(response.Outputs.Bytes)) + return response, nil } func NewTaskExecutionManager( - db repositories.RepositoryInterface, + db repositories.RepositoryInterface, config runtimeInterfaces.Configuration, storageClient *storage.DataStore, scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface) interfaces.TaskExecutionInterface { metrics := taskExecutionMetrics{ Scope: scope, @@ -300,10 +329,16 @@ func NewTaskExecutionManager( "overall count of task execution events received that are missing a task definition"), ClosureSizeBytes: scope.MustNewSummary("closure_size_bytes", "size in bytes of serialized task execution closure"), + TaskExecutionInputBytes: scope.MustNewSummary("input_size_bytes", + "size in bytes of serialized node execution inputs"), + TaskExecutionOutputBytes: scope.MustNewSummary("output_size_bytes", + "size in bytes of serialized node execution outputs"), } return &TaskExecutionManager{ - db: db, - metrics: metrics, - urlData: urlData, + db: db, + config: config, + storageClient: storageClient, + metrics: metrics, + urlData: urlData, } } diff --git a/flyteadmin/pkg/manager/impl/task_execution_manager_test.go b/flyteadmin/pkg/manager/impl/task_execution_manager_test.go index 31c8c9b286..9cfb181b54 100644 --- a/flyteadmin/pkg/manager/impl/task_execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/task_execution_manager_test.go @@ -3,13 +3,18 @@ package impl import ( "context" "errors" + "fmt" "testing" "time" + "github.com/lyft/flyteadmin/pkg/manager/impl/testutils" + "github.com/lyft/flytestdlib/storage" + "github.com/lyft/flyteadmin/pkg/common" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" + commonMocks "github.com/lyft/flyteadmin/pkg/common/mocks" dataMocks "github.com/lyft/flyteadmin/pkg/data/mocks" flyteAdminErrors "github.com/lyft/flyteadmin/pkg/errors" "github.com/lyft/flyteadmin/pkg/repositories" @@ -170,7 +175,8 @@ func TestCreateTaskEvent(t *testing.T) { }, input) return nil }) - taskExecManager := NewTaskExecutionManager(repository, mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) assert.True(t, getTaskCalled) assert.True(t, createTaskCalled) @@ -283,7 +289,8 @@ func TestCreateTaskEvent_Update(t *testing.T) { OutputUri: expectedOutputResult.OutputUri, } - taskExecManager := NewTaskExecutionManager(repository, mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) assert.True(t, getTaskCalled) assert.True(t, updateTaskCalled) @@ -298,7 +305,8 @@ func TestCreateTaskEvent_MissingExecution(t *testing.T) { func(ctx context.Context, input interfaces.GetNodeExecutionInput) (models.NodeExecution, error) { return models.NodeExecution{}, expectedErr }) - taskExecManager := NewTaskExecutionManager(repository, mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) assert.EqualError(t, err, "failed to get existing node execution id: [node_id:\"node-id\""+ " execution_id: ] "+ @@ -318,7 +326,8 @@ func TestCreateTaskEvent_CreateDatabaseError(t *testing.T) { func(ctx context.Context, input models.TaskExecution) error { return expectedErr }) - taskExecManager := NewTaskExecutionManager(repository, mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) assert.EqualError(t, err, expectedErr.Error()) assert.Nil(t, resp) @@ -358,7 +367,8 @@ func TestCreateTaskEvent_UpdateDatabaseError(t *testing.T) { func(ctx context.Context, execution models.TaskExecution) error { return expectedErr }) - nodeExecManager := NewTaskExecutionManager(repository, mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + nodeExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) resp, err := nodeExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) assert.EqualError(t, err, expectedErr.Error()) assert.Nil(t, resp) @@ -391,7 +401,8 @@ func TestCreateTaskEvent_UpdateTerminalEventError(t *testing.T) { }, nil }) taskEventRequest.Event.Phase = core.TaskExecution_RUNNING - taskExecManager := NewTaskExecutionManager(repository, mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) assert.Nil(t, resp) @@ -458,7 +469,8 @@ func TestCreateTaskEvent_PhaseVersionChange(t *testing.T) { taskEventRequest.Event.PhaseVersion = uint32(1) taskEventRequest.Event.OccurredAt = taskEventUpdatedAtProto - taskExecManager := NewTaskExecutionManager(repository, mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest) assert.True(t, getTaskCalled) assert.True(t, updateTaskCalled) @@ -524,7 +536,8 @@ func TestGetTaskExecution(t *testing.T) { }, }, nil }) - taskExecManager := NewTaskExecutionManager(repository, mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) taskExecution, err := taskExecManager.GetTaskExecution(context.Background(), admin.TaskExecutionGetRequest{ Id: &core.TaskExecutionIdentifier{ TaskId: sampleTaskID, @@ -574,7 +587,8 @@ func TestGetTaskExecution_TransformerError(t *testing.T) { Closure: []byte("i'm an invalid task closure"), }, nil }) - taskExecManager := NewTaskExecutionManager(repository, mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) taskExecution, err := taskExecManager.GetTaskExecution(context.Background(), admin.TaskExecutionGetRequest{ Id: &core.TaskExecutionIdentifier{ TaskId: sampleTaskID, @@ -684,7 +698,8 @@ func TestListTaskExecutions(t *testing.T) { }, }, nil }) - taskExecManager := NewTaskExecutionManager(repository, mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) taskExecutions, err := taskExecManager.ListTaskExecutions(context.Background(), admin.TaskExecutionListRequest{ NodeExecutionId: &core.NodeExecutionIdentifier{ NodeId: "nodey b", @@ -755,7 +770,8 @@ func TestListTaskExecutions_NoFilters(t *testing.T) { listTaskCalled = true return interfaces.TaskExecutionCollectionOutput{}, nil }) - taskExecManager := NewTaskExecutionManager(repository, mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) _, err := taskExecManager.ListTaskExecutions(context.Background(), admin.TaskExecutionListRequest{ Token: "1", Limit: 99, @@ -773,7 +789,8 @@ func TestListTaskExecutions_NoLimit(t *testing.T) { getTaskCalled = true return interfaces.TaskExecutionCollectionOutput{}, nil }) - taskExecManager := NewTaskExecutionManager(repository, mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) _, err := taskExecManager.ListTaskExecutions(context.Background(), admin.TaskExecutionListRequest{ Limit: 0, }) @@ -804,7 +821,8 @@ func TestListTaskExecutions_NothingToReturn(t *testing.T) { listTasksCalled = true return interfaces.TaskCollectionOutput{}, nil }) - taskExecManager := NewTaskExecutionManager(repository, mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), + getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL) _, err := taskExecManager.ListTaskExecutions(context.Background(), admin.TaskExecutionListRequest{ NodeExecutionId: &core.NodeExecutionIdentifier{ ExecutionId: &core.WorkflowExecutionIdentifier{ @@ -882,7 +900,32 @@ func TestGetTaskExecutionData(t *testing.T) { return admin.UrlBlob{}, errors.New("unexpected input") } - taskExecManager := NewTaskExecutionManager(repository, mockScope.NewTestScope(), mockTaskExecutionRemoteURL) + mockStorage := commonMocks.GetMockStorageClient() + fullInputs := &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "foo": testutils.MakeStringLiteral("foo-value-1"), + }, + } + fullOutputs := &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "bar": testutils.MakeStringLiteral("bar-value-1"), + }, + } + mockStorage.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func( + ctx context.Context, reference storage.DataReference, msg proto.Message) error { + if reference.String() == "input-uri.pb" { + marshalled, _ := proto.Marshal(fullInputs) + _ = proto.Unmarshal(marshalled, msg) + return nil + } else if reference.String() == "test-output.pb" { + marshalled, _ := proto.Marshal(fullOutputs) + _ = proto.Unmarshal(marshalled, msg) + return nil + } + return fmt.Errorf("unexpected call to find value in storage [%v]", reference.String()) + } + taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), mockStorage, + mockScope.NewTestScope(), mockTaskExecutionRemoteURL) dataResponse, err := taskExecManager.GetTaskExecutionData(context.Background(), admin.TaskExecutionGetDataRequest{ Id: &core.TaskExecutionIdentifier{ TaskId: sampleTaskID, @@ -901,5 +944,7 @@ func TestGetTaskExecutionData(t *testing.T) { Url: "outputs", Bytes: 200, }, + FullInputs: fullInputs, + FullOutputs: fullOutputs, }, dataResponse)) } diff --git a/flyteadmin/pkg/manager/impl/testutils/mock_closures.go b/flyteadmin/pkg/manager/impl/testutils/mock_closures.go index 8b698118d9..fea23a0b40 100644 --- a/flyteadmin/pkg/manager/impl/testutils/mock_closures.go +++ b/flyteadmin/pkg/manager/impl/testutils/mock_closures.go @@ -48,3 +48,20 @@ func GetWorkflowClosureBytes() []byte { var workflowClosureBytes, _ = proto.Marshal(GetWorkflowClosure()) return workflowClosureBytes } + +func MakeStringLiteral(value string) *core.Literal { + p := &core.Primitive{ + Value: &core.Primitive_StringValue{ + StringValue: value, + }, + } + return &core.Literal{ + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: p, + }, + }, + }, + } +} diff --git a/flyteadmin/pkg/rpc/adminservice/base.go b/flyteadmin/pkg/rpc/adminservice/base.go index bedf4bd885..1b7bb9f2f2 100644 --- a/flyteadmin/pkg/rpc/adminservice/base.go +++ b/flyteadmin/pkg/rpc/adminservice/base.go @@ -157,9 +157,9 @@ func NewAdminServer(kubeConfig, master string) *AdminService { ExecutionManager: executionManager, NamedEntityManager: namedEntityManager, NodeExecutionManager: manager.NewNodeExecutionManager( - db, adminScope.NewSubScope("node_execution_manager"), urlData), + db, configuration, dataStorageClient, adminScope.NewSubScope("node_execution_manager"), urlData), TaskExecutionManager: manager.NewTaskExecutionManager( - db, adminScope.NewSubScope("task_execution_manager"), urlData), + db, configuration, dataStorageClient, adminScope.NewSubScope("task_execution_manager"), urlData), ProjectManager: manager.NewProjectManager(db, configuration), ResourceManager: resources.NewResourceManager(db, configuration.ApplicationConfiguration()), Metrics: InitMetrics(adminScope), diff --git a/flyteadmin/pkg/runtime/interfaces/application_configuration.go b/flyteadmin/pkg/runtime/interfaces/application_configuration.go index 21c4da1f66..a39741b423 100644 --- a/flyteadmin/pkg/runtime/interfaces/application_configuration.go +++ b/flyteadmin/pkg/runtime/interfaces/application_configuration.go @@ -110,6 +110,8 @@ type RemoteDataConfig struct { // Some cloud providers require a region to be set. Region string `json:"region"` SignedURL SignedURL `json:"signedUrls"` + // Specifies the max size in bytes for which execution data such as inputs and outputs will be populated in line. + MaxSizeInBytes int64 `json:"maxSizeInBytes"` } // This section handles configuration for the workflow notifications pipeline.