Skip to content

Commit

Permalink
feat: modify computetask failure report (#277)
Browse files Browse the repository at this point in the history
## Companion PR

- Substra/substra-backend#727
- Substra/substra-frontend#240

## Description
Modify `FailureReport`:
- add field `asset_type` containing the kind of asset the failure report
connect to
- rename `compute_task_key` to `asset_key`, which is a [wire compatible
change](https://groups.google.com/g/protobuf/c/hX4Mj0P4N0w) (i.e. does
not need to be declared as a new field)

## How has this been tested?

As this is going to be merged on a branch that is going to be merged to
a POC branch, we use MNIST as a baseline of a working model. We will
deal with failing tests on the POC before merging on main.

The e2e tests are also broken due to an issue on producing dumps during
release, but passed locally.

## Checklist

- [x] [changelog](../CHANGELOG.md) was updated with notable changes
- [ ] documentation was updated

---------

Signed-off-by: Guilhem Barthes <[email protected]>
Signed-off-by: Guilhem Barthés <[email protected]>
  • Loading branch information
guilhem-barthes committed Sep 13, 2023
1 parent 9ecf6f4 commit f529721
Show file tree
Hide file tree
Showing 21 changed files with 339 additions and 116 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Enum `FailedAssetKind` ([#277](https://github.com/Substra/orchestrator/pull/277))
- Field `asset_type` of type `FailedAssetKind` in `FailureReport` ([#277](https://github.com/Substra/orchestrator/pull/277))


### Changed

- Renamed `compute_task_key`by `asset_key` in `FailureReport` ([#277](https://github.com/Substra/orchestrator/pull/277))
- `FailureReport` now can be reference a `ComputeTask` or a `Function` through `asset_key` + `asset_type` ([#277](https://github.com/Substra/orchestrator/pull/277))


## [0.35.2](https://github.com/Substra/orchestrator/releases/tag/0.35.2) - 2023-07-25

### Changed
Expand Down
2 changes: 1 addition & 1 deletion chaincode/failurereport/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *SmartContract) GetFailureReport(ctx ledger.TransactionContext, wrapper
return nil, err
}

model, err := service.GetFailureReport(params.GetComputeTaskKey())
model, err := service.GetFailureReport(params.GetAssetKey())
if err != nil {
s.logger.Error().Err(err).Msg("failed to fetch failure report")
return nil, err
Expand Down
12 changes: 7 additions & 5 deletions chaincode/failurereport/contract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ func TestRegisterFailureReport(t *testing.T) {
mspid := "org"

newFailureReport := &asset.NewFailureReport{
ComputeTaskKey: "taskUUID",
LogsAddress: &asset.Addressable{},
AssetKey: "taskUUID",
AssetType: asset.FailedAssetKind_FAILED_ASSET_COMPUTE_TASK,
LogsAddress: &asset.Addressable{},
}
wrapper, err := communication.Wrap(context.Background(), newFailureReport)
assert.NoError(t, err)
Expand All @@ -55,15 +56,16 @@ func TestGetFailureReport(t *testing.T) {
contract := &SmartContract{}

param := &asset.GetFailureReportParam{
ComputeTaskKey: "uuid",
AssetKey: "uuid",
}
wrapper, err := communication.Wrap(context.Background(), param)
assert.NoError(t, err)

ctx := new(ledger.MockTransactionContext)

failureReport := &asset.FailureReport{
ComputeTaskKey: param.ComputeTaskKey,
AssetKey: param.AssetKey,
AssetType: asset.FailedAssetKind_FAILED_ASSET_COMPUTE_TASK,
}
mockService := getMockedService(ctx)
mockService.On("GetFailureReport", "uuid").Return(failureReport, nil).Once()
Expand All @@ -74,7 +76,7 @@ func TestGetFailureReport(t *testing.T) {
resp := new(asset.FailureReport)
err = wrapped.Unwrap(resp)
assert.NoError(t, err)
assert.Equal(t, resp.ComputeTaskKey, param.ComputeTaskKey)
assert.Equal(t, resp.AssetKey, param.AssetKey)

mockService.AssertExpectations(t)
}
10 changes: 5 additions & 5 deletions chaincode/ledger/dbal_failure.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"google.golang.org/protobuf/encoding/protojson"
)

func (db *DB) GetFailureReport(computeTaskKey string) (*asset.FailureReport, error) {
func (db *DB) GetFailureReport(assetKey string) (*asset.FailureReport, error) {
failureReport := new(asset.FailureReport)

b, err := db.getState(asset.FailureReportKind, computeTaskKey)
b, err := db.getState(asset.FailureReportKind, assetKey)
if err != nil {
return nil, err
}
Expand All @@ -22,17 +22,17 @@ func (db *DB) GetFailureReport(computeTaskKey string) (*asset.FailureReport, err
}

func (db *DB) AddFailureReport(failureReport *asset.FailureReport) error {
exists, err := db.hasKey(asset.FailureReportKind, failureReport.GetComputeTaskKey())
exists, err := db.hasKey(asset.FailureReportKind, failureReport.GetAssetKey())
if err != nil {
return err
}
if exists {
return errors.NewConflict(asset.FailureReportKind, failureReport.GetComputeTaskKey())
return errors.NewConflict(asset.FailureReportKind, failureReport.GetAssetKey())
}
bytes, err := marshaller.Marshal(failureReport)
if err != nil {
return err
}

return db.putState(asset.FailureReportKind, failureReport.GetComputeTaskKey(), bytes)
return db.putState(asset.FailureReportKind, failureReport.GetAssetKey(), bytes)
}
13 changes: 7 additions & 6 deletions e2e/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,10 +609,11 @@ func (c *TestClient) QueryPlans(filter *asset.PlanQueryFilter, pageToken string,
return resp
}

func (c *TestClient) RegisterFailureReport(taskRef string) *asset.FailureReport {
func (c *TestClient) RegisterFailureReport(assetRef string) *asset.FailureReport {
newFailureReport := &asset.NewFailureReport{
ComputeTaskKey: c.ks.GetKey(taskRef),
ErrorType: asset.ErrorType_ERROR_TYPE_EXECUTION,
AssetKey: c.ks.GetKey(assetRef),
AssetType: asset.FailedAssetKind_FAILED_ASSET_COMPUTE_TASK,
ErrorType: asset.ErrorType_ERROR_TYPE_EXECUTION,
LogsAddress: &asset.Addressable{
Checksum: "5e12e1a2687d81b268558217856547f8a4519f9688933351386a7f902cf1ce5d",
StorageAddress: "http://somewhere.local/failure/" + uuid.NewString(),
Expand All @@ -628,12 +629,12 @@ func (c *TestClient) RegisterFailureReport(taskRef string) *asset.FailureReport
return failureReport
}

func (c *TestClient) GetFailureReport(taskRef string) *asset.FailureReport {
func (c *TestClient) GetFailureReport(assetRef string) *asset.FailureReport {
param := &asset.GetFailureReportParam{
ComputeTaskKey: c.ks.GetKey(taskRef),
AssetKey: c.ks.GetKey(assetRef),
}

c.logger.Debug().Str("task key", param.ComputeTaskKey).Msg("getting failure report")
c.logger.Debug().Str("asset key", param.AssetKey).Str("asset type", param.AssetType).Msg("getting failure report")
failureReport, err := c.failureReportService.GetFailureReport(c.ctx, param)
if err != nil {
c.logger.Fatal().Err(err).Msg("GetFailureReport failed")
Expand Down
1 change: 0 additions & 1 deletion e2e/client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,6 @@ func DefaultSimpleFunctionOptions() *FunctionOptions {
Outputs: map[string]*asset.FunctionOutput{
"model": {Kind: asset.AssetKind_ASSET_MODEL},
},
Status: asset.FunctionStatus_FUNCTION_STATUS_CREATED,
}
}

Expand Down
4 changes: 2 additions & 2 deletions e2e/failure_report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ func TestRegisterFailureReport(t *testing.T) {
registeredFailureReport := appClient.RegisterFailureReport(client.DefaultTrainTaskRef)
task := appClient.GetComputeTask(client.DefaultTrainTaskRef)

require.Equal(t, task.Key, registeredFailureReport.ComputeTaskKey)
require.Equal(t, task.Key, registeredFailureReport.AssetKey)
require.Equal(t, asset.ComputeTaskStatus_STATUS_FAILED, task.Status)

retrievedFailureReport := appClient.GetFailureReport(client.DefaultTrainTaskRef)
e2erequire.ProtoEqual(t, registeredFailureReport, retrievedFailureReport)

eventResp := appClient.QueryEvents(&asset.EventQueryFilter{
AssetKey: registeredFailureReport.ComputeTaskKey,
AssetKey: registeredFailureReport.AssetKey,
AssetKind: asset.AssetKind_ASSET_FAILURE_REPORT,
EventKind: asset.EventKind_EVENT_ASSET_CREATED,
}, "", 100)
Expand Down
2 changes: 1 addition & 1 deletion lib/asset/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestMarshalUnmarshalEventAsset(t *testing.T) {
},
"failureReport": {
AssetKind: AssetKind_ASSET_FAILURE_REPORT,
Asset: &Event_FailureReport{FailureReport: &FailureReport{ComputeTaskKey: "failed-task"}},
Asset: &Event_FailureReport{FailureReport: &FailureReport{AssetKey: "failed-task"}},
},
"model": {
AssetKind: AssetKind_ASSET_MODEL,
Expand Down
20 changes: 15 additions & 5 deletions lib/asset/failure_report.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,40 @@ enum ErrorType {
ERROR_TYPE_INTERNAL = 3;
}

enum FailedAssetKind {
FAILED_ASSET_UNKNOWN = 0;
FAILED_ASSET_COMPUTE_TASK = 1;
FAILED_ASSET_FUNCTION = 2;
}

// FailureReport is used to store information related to a failed ComputeTask.
message FailureReport {
string compute_task_key = 1;
string asset_key = 1;
ErrorType error_type = 2;
Addressable logs_address = 3;
google.protobuf.Timestamp creation_date = 4;
// The owner of a failure report matches the 'worker' field of the associated compute task but can differ from
// In the case of a compute task failure, the owner of a failure report matches the 'worker' field of the associated compute task but can differ from
// the owner of the compute task. Indeed, a task belonging to some user can be executed on an organization belonging
// to another user. The failure report generated will be located on the execution organization and belong to the owner
// to another user.
// In the case of a function, the owner will be the owner of the function (which builds the function).
// The failure report generated will be located on the execution organization and belong to the owner
// of this organization.
string owner = 5;
FailedAssetKind asset_type = 6;
}

// NewFailureReport is used to register a FailureReport.
// It will be processed into a FailureReport.
message NewFailureReport {
string compute_task_key = 1;
string asset_key = 1;
ErrorType error_type = 2;
Addressable logs_address = 3;
FailedAssetKind asset_type = 4;
}

// GetFailureReportParam is used to fetch a Failure.
message GetFailureReportParam {
string compute_task_key = 1;
string asset_key = 1;
}

service FailureReportService {
Expand Down
3 changes: 2 additions & 1 deletion lib/asset/failure_report_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
// Validate returns an error if the new FailureReport object is not valid.
func (f *NewFailureReport) Validate() error {
return validation.ValidateStruct(f,
validation.Field(&f.ComputeTaskKey, validation.Required, is.UUID),
validation.Field(&f.AssetKey, validation.Required, is.UUID),
validation.Field(&f.ErrorType, validation.In(ErrorType_ERROR_TYPE_BUILD, ErrorType_ERROR_TYPE_EXECUTION, ErrorType_ERROR_TYPE_INTERNAL)),
validation.Field(&f.AssetType, validation.In(FailedAssetKind_FAILED_ASSET_UNKNOWN, FailedAssetKind_FAILED_ASSET_FUNCTION, FailedAssetKind_FAILED_ASSET_COMPUTE_TASK)),
validation.Field(&f.LogsAddress, validation.When(utils.SliceContains([]ErrorType{ErrorType_ERROR_TYPE_EXECUTION, ErrorType_ERROR_TYPE_BUILD}, f.ErrorType), validation.Required).Else(validation.Nil)),
)
}
57 changes: 35 additions & 22 deletions lib/asset/failure_report_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,53 @@ func TestFailureReportValidate(t *testing.T) {

cases := map[string]failureReportTestCase{
"empty": {&NewFailureReport{}, false},
"invalidComputeTaskKey": {&NewFailureReport{
ComputeTaskKey: "notUUID",
ErrorType: ErrorType_ERROR_TYPE_BUILD,
LogsAddress: nil,
"invalidAssetKeyFunction": {&NewFailureReport{
AssetKey: "notUUID",
AssetType: FailedAssetKind_FAILED_ASSET_FUNCTION,
ErrorType: ErrorType_ERROR_TYPE_BUILD,
LogsAddress: nil,
}, false},
"invalidAssetKeyComputeTask": {&NewFailureReport{
AssetKey: "notUUID",
AssetType: FailedAssetKind_FAILED_ASSET_COMPUTE_TASK,
ErrorType: ErrorType_ERROR_TYPE_BUILD,
LogsAddress: nil,
}, false},
"validBuildError": {&NewFailureReport{
ComputeTaskKey: "08680966-97ae-4573-8b2d-6c4db2b3c532",
ErrorType: ErrorType_ERROR_TYPE_BUILD,
LogsAddress: validAddressable,
AssetKey: "08680966-97ae-4573-8b2d-6c4db2b3c532",
AssetType: FailedAssetKind_FAILED_ASSET_FUNCTION,
ErrorType: ErrorType_ERROR_TYPE_BUILD,
LogsAddress: validAddressable,
}, true},
"invalidBuildError": {&NewFailureReport{
ComputeTaskKey: "08680966-97ae-4573-8b2d-6c4db2b3c532",
ErrorType: ErrorType_ERROR_TYPE_BUILD,
LogsAddress: nil,
AssetKey: "08680966-97ae-4573-8b2d-6c4db2b3c532",
AssetType: FailedAssetKind_FAILED_ASSET_FUNCTION,
ErrorType: ErrorType_ERROR_TYPE_BUILD,
LogsAddress: nil,
}, false},
"validExecutionError": {&NewFailureReport{
ComputeTaskKey: "08680966-97ae-4573-8b2d-6c4db2b3c532",
ErrorType: ErrorType_ERROR_TYPE_EXECUTION,
LogsAddress: validAddressable,
AssetKey: "08680966-97ae-4573-8b2d-6c4db2b3c532",
AssetType: FailedAssetKind_FAILED_ASSET_COMPUTE_TASK,
ErrorType: ErrorType_ERROR_TYPE_EXECUTION,
LogsAddress: validAddressable,
}, true},
"invalidExecutionError": {&NewFailureReport{
ComputeTaskKey: "08680966-97ae-4573-8b2d-6c4db2b3c532",
ErrorType: ErrorType_ERROR_TYPE_EXECUTION,
LogsAddress: nil,
AssetKey: "08680966-97ae-4573-8b2d-6c4db2b3c532",
AssetType: FailedAssetKind_FAILED_ASSET_COMPUTE_TASK,
ErrorType: ErrorType_ERROR_TYPE_EXECUTION,
LogsAddress: nil,
}, false},
"validInternalError": {&NewFailureReport{
ComputeTaskKey: "08680966-97ae-4573-8b2d-6c4db2b3c532",
ErrorType: ErrorType_ERROR_TYPE_INTERNAL,
LogsAddress: nil,
AssetKey: "08680966-97ae-4573-8b2d-6c4db2b3c532",
AssetType: FailedAssetKind_FAILED_ASSET_COMPUTE_TASK,
ErrorType: ErrorType_ERROR_TYPE_INTERNAL,
LogsAddress: nil,
}, true},
"invalidInternalError": {&NewFailureReport{
ComputeTaskKey: "08680966-97ae-4573-8b2d-6c4db2b3c532",
ErrorType: ErrorType_ERROR_TYPE_INTERNAL,
LogsAddress: validAddressable,
AssetKey: "08680966-97ae-4573-8b2d-6c4db2b3c532",
AssetType: FailedAssetKind_FAILED_ASSET_COMPUTE_TASK,
ErrorType: ErrorType_ERROR_TYPE_INTERNAL,
LogsAddress: validAddressable,
}, false},
}

Expand Down
23 changes: 23 additions & 0 deletions lib/asset/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,26 @@ func (fs *FunctionStatus) Scan(value interface{}) error {

return nil
}

// Value implements the driver.Valuer interface.
// Simply returns the string representation of the FunctionStatus.
func (fak *FailedAssetKind) Value() (driver.Value, error) {
return fak.String(), nil
}

// Scan implements the sql.Scanner interface.
// Simply decodes a string into the FunctionStatus.
func (fak *FailedAssetKind) Scan(value interface{}) error {
s, ok := value.(string)
if !ok {
return errors.NewInternal("cannot scan failed asset kind: invalid string")
}

v, ok := FailedAssetKind_value[s]
if !ok {
return errors.NewInternal("cannot scan failed asset kind: unknown value")
}
*fak = FailedAssetKind(v)

return nil
}
14 changes: 14 additions & 0 deletions lib/asset/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,17 @@ func TestEventKindValue(t *testing.T) {

assert.Equal(t, kind, scanned)
}

func TestFailedAssetKindKindValue(t *testing.T) {
k := FailedAssetKind_FAILED_ASSET_UNKNOWN
kind := &k

value, err := kind.Value()
assert.NoError(t, err, "failed asset kind serialization should not fail")

scanned := new(FailedAssetKind)
err = scanned.Scan(value)
assert.NoError(t, err, "failed asset kind scan should not fail")

assert.Equal(t, kind, scanned)
}
2 changes: 1 addition & 1 deletion lib/persistence/failure_report_dbal.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

type FailureReportDBAL interface {
GetFailureReport(computeTaskKey string) (*asset.FailureReport, error)
GetFailureReport(assetKey string) (*asset.FailureReport, error)
AddFailureReport(f *asset.FailureReport) error
}

Expand Down
Loading

0 comments on commit f529721

Please sign in to comment.