diff --git a/changelog/unreleased/remove-job-on-cancel-transfer.md b/changelog/unreleased/remove-job-on-cancel-transfer.md new file mode 100644 index 00000000000..2063fea34cd --- /dev/null +++ b/changelog/unreleased/remove-job-on-cancel-transfer.md @@ -0,0 +1,4 @@ +Bugfix: Remove transfer on cancel should also remove transfer job + +https://github.com/cs3org/reva/pull/3882 +https://github.com/cs3org/reva/issues/3881 \ No newline at end of file diff --git a/docs/content/en/docs/tutorials/data-transfer-tutorial.md b/docs/content/en/docs/tutorials/datatx-tutorial.md similarity index 97% rename from docs/content/en/docs/tutorials/data-transfer-tutorial.md rename to docs/content/en/docs/tutorials/datatx-tutorial.md index 48aa76d94a9..2da5a5ddf6a 100644 --- a/docs/content/en/docs/tutorials/data-transfer-tutorial.md +++ b/docs/content/en/docs/tutorials/datatx-tutorial.md @@ -158,9 +158,12 @@ transfer-retry -txId fe671ae3-0fbf-4b06-b7df-32418c2ebfcb ``` ## 6 Cleanup transfers -Transfers will be removed from the db using the `transfer-cancel` command when the configuration property `remove_on_cancel` of the datatx service has been set to `true` as follows: +Transfers will be removed from the db using the `transfer-cancel` command when the configuration property `remove_transfer_on_cancel` and `remove_transfer_job_on_cancel` of the datatx service and rclone driver respectively have been set to `true` as follows: ``` [grpc.services.datatx] -remove_on_cancel = true +remove_transfer_on_cancel = true + +[grpc.services.datatx.txdrivers.rclone] +remove_transfer_job_on_cancel = true ``` Currently this setting is recommended. \ No newline at end of file diff --git a/examples/datatx/datatx.toml b/examples/datatx/datatx.toml index bb19821f9e9..2a4633758ed 100644 --- a/examples/datatx/datatx.toml +++ b/examples/datatx/datatx.toml @@ -13,6 +13,9 @@ txdriver = "rclone" tx_shares_file = "" # base folder of the data transfers (eg. /home/DataTransfers) data_transfers_folder = "" +# if set to 'true' the transfer will always be removed from the db upon cancel request +# recommended value is true +remove_transfer_on_cancel = true # rclone driver [grpc.services.datatx.txdrivers.rclone] @@ -33,6 +36,9 @@ file = "" job_status_check_interval = 2000 # the job timeout in milliseconds (must be long enough for big transfers!) job_timeout = 120000 +# if set to 'true' the transfer job will always be removed from the db upon transfer cancel request +# recommended value is true +remove_transfer_job_on_cancel = true [http.services.ocdav] # reva supports http third party copy diff --git a/internal/grpc/services/datatx/datatx.go b/internal/grpc/services/datatx/datatx.go index b3ac0b43983..e2be50749b3 100644 --- a/internal/grpc/services/datatx/datatx.go +++ b/internal/grpc/services/datatx/datatx.go @@ -46,10 +46,10 @@ type config struct { TxDriver string `mapstructure:"txdriver"` TxDrivers map[string]map[string]interface{} `mapstructure:"txdrivers"` // storage driver to persist share/transfer relation - StorageDriver string `mapstructure:"storage_driver"` - StorageDrivers map[string]map[string]interface{} `mapstructure:"storage_drivers"` - TxSharesFile string `mapstructure:"tx_shares_file"` - RemoveOnCancel bool `mapstructure:"remove_on_cancel"` + StorageDriver string `mapstructure:"storage_driver"` + StorageDrivers map[string]map[string]interface{} `mapstructure:"storage_drivers"` + TxSharesFile string `mapstructure:"tx_shares_file"` + RemoveTransferOnCancel bool `mapstructure:"remove_transfer_on_cancel"` } type service struct { @@ -209,7 +209,7 @@ func (s *service) CancelTransfer(ctx context.Context, req *datatx.CancelTransfer } transferRemovedMessage := "" - if s.conf.RemoveOnCancel { + if s.conf.RemoveTransferOnCancel { delete(s.txShareDriver.model.TxShares, req.TxId.GetOpaqueId()) if err := s.txShareDriver.model.saveTxShare(); err != nil { err = errors.Wrap(err, "datatx service: error deleting transfer: "+datatx.Status_STATUS_INVALID.String()) diff --git a/internal/grpc/services/gateway/ocmshareprovider.go b/internal/grpc/services/gateway/ocmshareprovider.go index 1470318bde0..5effeab9248 100644 --- a/internal/grpc/services/gateway/ocmshareprovider.go +++ b/internal/grpc/services/gateway/ocmshareprovider.go @@ -170,7 +170,7 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive }, nil } - // retrieve the persisted received share + // retrieve the current received share getShareReq := &ocm.GetReceivedOCMShareRequest{ Ref: &ocm.ShareReference{ Spec: &ocm.ShareReference_Id{ @@ -234,7 +234,16 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive } } // handle transfer in case it has not already been accepted - if s.isTransferShare(share) && req.GetShare().State == ocm.ShareState_SHARE_STATE_ACCEPTED && share.State != ocm.ShareState_SHARE_STATE_ACCEPTED { + if s.isTransferShare(share) && req.GetShare().State == ocm.ShareState_SHARE_STATE_ACCEPTED { + if share.State == ocm.ShareState_SHARE_STATE_ACCEPTED { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare, share already accepted.") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{ + Code: rpc.Code_CODE_FAILED_PRECONDITION, + Message: "Share already accepted.", + }, + }, err + } // get provided destination path transferDestinationPath, err := s.getTransferDestinationPath(ctx, req) if err != nil { diff --git a/pkg/datatx/manager/rclone/rclone.go b/pkg/datatx/manager/rclone/rclone.go index 21e3d6e3f0a..0e9d2dfe554 100644 --- a/pkg/datatx/manager/rclone/rclone.go +++ b/pkg/datatx/manager/rclone/rclone.go @@ -62,14 +62,15 @@ func (c *config) init(m map[string]interface{}) { } type config struct { - Endpoint string `mapstructure:"endpoint"` - AuthUser string `mapstructure:"auth_user"` // rclone basicauth user - AuthPass string `mapstructure:"auth_pass"` // rclone basicauth pass - AuthHeader string `mapstructure:"auth_header"` - File string `mapstructure:"file"` - JobStatusCheckInterval int `mapstructure:"job_status_check_interval"` - JobTimeout int `mapstructure:"job_timeout"` - Insecure bool `mapstructure:"insecure"` + Endpoint string `mapstructure:"endpoint"` + AuthUser string `mapstructure:"auth_user"` // rclone basicauth user + AuthPass string `mapstructure:"auth_pass"` // rclone basicauth pass + AuthHeader string `mapstructure:"auth_header"` + File string `mapstructure:"file"` + JobStatusCheckInterval int `mapstructure:"job_status_check_interval"` + JobTimeout int `mapstructure:"job_timeout"` + Insecure bool `mapstructure:"insecure"` + RemoveTransferJobOnCancel bool `mapstructure:"remove_transfer_job_on_cancel"` } type rclone struct { @@ -644,10 +645,24 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d Ctime: nil, }, err } + cTime, _ := strconv.ParseInt(transfer.Ctime, 10, 64) + transferRemovedMessage := "" + if driver.config.RemoveTransferJobOnCancel { + delete(driver.pDriver.model.Transfers, transfer.TransferID) + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferID}, + Status: datatx.Status_STATUS_INVALID, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, err + } + transferRemovedMessage = "(transfer job successfully removed)" + } + _, endStatusFound := txEndStatuses[transfer.TransferStatus.String()] if endStatusFound { - err := errors.New("rclone driver: transfer already in end state") + err := errors.Wrapf(errors.New("rclone driver: transfer already in end state"), transferRemovedMessage) return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, @@ -665,7 +680,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d data, err := json.Marshal(rcloneCancelTransferReq) if err != nil { - err = errors.Wrap(err, "rclone driver: error marshalling rclone req data") + err := errors.Wrapf(errors.New("rclone driver: error marshalling rclone req data"), transferRemovedMessage) return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, @@ -677,7 +692,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d u, err := url.Parse(driver.config.Endpoint) if err != nil { - err = errors.Wrap(err, "rclone driver: error parsing driver endpoint") + err := errors.Wrapf(errors.New("rclone driver: error parsing driver endpoint"), transferRemovedMessage) return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, @@ -689,7 +704,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d req, err := http.NewRequest(http.MethodPost, requestURL, bytes.NewReader(data)) if err != nil { - err = errors.Wrap(err, "rclone driver: error framing post request") + err := errors.Wrapf(errors.New("rclone driver: error framing post request"), transferRemovedMessage) return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, @@ -702,7 +717,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d res, err := driver.client.Do(req) if err != nil { - err = errors.Wrap(err, "rclone driver: error sending post request") + err := errors.Wrapf(errors.New("rclone driver: error sending post request"), transferRemovedMessage) return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, @@ -715,14 +730,14 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d if res.StatusCode != http.StatusOK { var errorResData rcloneHTTPErrorRes if err = json.NewDecoder(res.Body).Decode(&errorResData); err != nil { - err = errors.Wrap(err, "rclone driver: error decoding response data") + err := errors.Wrapf(errors.New("rclone driver: error decoding response data"), transferRemovedMessage) return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, }, err } - err = errors.Wrap(errors.Errorf("status: %v, error: %v", errorResData.Status, errorResData.Error), "rclone driver: rclone request responded with error") + err = errors.Wrap(errors.Errorf("%v, status: %v, error: %v", transferRemovedMessage, errorResData.Status, errorResData.Error), "rclone driver: rclone request responded with error") return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, @@ -744,7 +759,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d } var resData rcloneCancelTransferResJSON if err = json.NewDecoder(res.Body).Decode(&resData); err != nil { - err = errors.Wrap(err, "rclone driver: error decoding response data") + err := errors.Wrapf(errors.New("rclone driver: error decoding response data"), transferRemovedMessage) return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID,