Skip to content

Commit

Permalink
Remove job on cancel transfer (cs3org#3882)
Browse files Browse the repository at this point in the history
* Inform in case the share has already been accepted.

* Add transfer jobs remove conf option on transfer cancel (cs3org#3881)

* Add conf flag for removing transfer jobs on cancel

* Rename remove on cancel conf flags

* Rename datatx tutorial and add conf changes 'Cleanup transfers' section

---------

Co-authored-by: Antoon P <[email protected]>
  • Loading branch information
2 people authored and gmgigi96 committed Jun 28, 2023
1 parent 194532c commit 8f96b21
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 25 deletions.
4 changes: 4 additions & 0 deletions changelog/unreleased/remove-job-on-cancel-transfer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Bugfix: Remove transfer on cancel should also remove transfer job

https://github.com/cs3org/reva/pull/3882
https://github.com/cs3org/reva/issues/3881
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,12 @@ transfer-retry -txId fe671ae3-0fbf-4b06-b7df-32418c2ebfcb
```

## 6 Cleanup transfers
Transfers will be removed from the db using the `transfer-cancel` command when the configuration property `remove_on_cancel` of the datatx service has been set to `true` as follows:
Transfers will be removed from the db using the `transfer-cancel` command when the configuration property `remove_transfer_on_cancel` and `remove_transfer_job_on_cancel` of the datatx service and rclone driver respectively have been set to `true` as follows:
```
[grpc.services.datatx]
remove_on_cancel = true
remove_transfer_on_cancel = true
[grpc.services.datatx.txdrivers.rclone]
remove_transfer_job_on_cancel = true
```
Currently this setting is recommended.
6 changes: 6 additions & 0 deletions examples/datatx/datatx.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ txdriver = "rclone"
tx_shares_file = ""
# base folder of the data transfers (eg. /home/DataTransfers)
data_transfers_folder = ""
# if set to 'true' the transfer will always be removed from the db upon cancel request
# recommended value is true
remove_transfer_on_cancel = true

# rclone driver
[grpc.services.datatx.txdrivers.rclone]
Expand All @@ -33,6 +36,9 @@ file = ""
job_status_check_interval = 2000
# the job timeout in milliseconds (must be long enough for big transfers!)
job_timeout = 120000
# if set to 'true' the transfer job will always be removed from the db upon transfer cancel request
# recommended value is true
remove_transfer_job_on_cancel = true

[http.services.ocdav]
# reva supports http third party copy
Expand Down
10 changes: 5 additions & 5 deletions internal/grpc/services/datatx/datatx.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ type config struct {
TxDriver string `mapstructure:"txdriver"`
TxDrivers map[string]map[string]interface{} `mapstructure:"txdrivers"`
// storage driver to persist share/transfer relation
StorageDriver string `mapstructure:"storage_driver"`
StorageDrivers map[string]map[string]interface{} `mapstructure:"storage_drivers"`
TxSharesFile string `mapstructure:"tx_shares_file"`
RemoveOnCancel bool `mapstructure:"remove_on_cancel"`
StorageDriver string `mapstructure:"storage_driver"`
StorageDrivers map[string]map[string]interface{} `mapstructure:"storage_drivers"`
TxSharesFile string `mapstructure:"tx_shares_file"`
RemoveTransferOnCancel bool `mapstructure:"remove_transfer_on_cancel"`
}

type service struct {
Expand Down Expand Up @@ -209,7 +209,7 @@ func (s *service) CancelTransfer(ctx context.Context, req *datatx.CancelTransfer
}

transferRemovedMessage := ""
if s.conf.RemoveOnCancel {
if s.conf.RemoveTransferOnCancel {
delete(s.txShareDriver.model.TxShares, req.TxId.GetOpaqueId())
if err := s.txShareDriver.model.saveTxShare(); err != nil {
err = errors.Wrap(err, "datatx service: error deleting transfer: "+datatx.Status_STATUS_INVALID.String())
Expand Down
13 changes: 11 additions & 2 deletions internal/grpc/services/gateway/ocmshareprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive
}, nil
}

// retrieve the persisted received share
// retrieve the current received share
getShareReq := &ocm.GetReceivedOCMShareRequest{
Ref: &ocm.ShareReference{
Spec: &ocm.ShareReference_Id{
Expand Down Expand Up @@ -234,7 +234,16 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive
}
}
// handle transfer in case it has not already been accepted
if s.isTransferShare(share) && req.GetShare().State == ocm.ShareState_SHARE_STATE_ACCEPTED && share.State != ocm.ShareState_SHARE_STATE_ACCEPTED {
if s.isTransferShare(share) && req.GetShare().State == ocm.ShareState_SHARE_STATE_ACCEPTED {
if share.State == ocm.ShareState_SHARE_STATE_ACCEPTED {
log.Err(err).Msg("gateway: error calling UpdateReceivedShare, share already accepted.")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{
Code: rpc.Code_CODE_FAILED_PRECONDITION,
Message: "Share already accepted.",
},
}, err
}
// get provided destination path
transferDestinationPath, err := s.getTransferDestinationPath(ctx, req)
if err != nil {
Expand Down
47 changes: 31 additions & 16 deletions pkg/datatx/manager/rclone/rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ func (c *config) init(m map[string]interface{}) {
}

type config struct {
Endpoint string `mapstructure:"endpoint"`
AuthUser string `mapstructure:"auth_user"` // rclone basicauth user
AuthPass string `mapstructure:"auth_pass"` // rclone basicauth pass
AuthHeader string `mapstructure:"auth_header"`
File string `mapstructure:"file"`
JobStatusCheckInterval int `mapstructure:"job_status_check_interval"`
JobTimeout int `mapstructure:"job_timeout"`
Insecure bool `mapstructure:"insecure"`
Endpoint string `mapstructure:"endpoint"`
AuthUser string `mapstructure:"auth_user"` // rclone basicauth user
AuthPass string `mapstructure:"auth_pass"` // rclone basicauth pass
AuthHeader string `mapstructure:"auth_header"`
File string `mapstructure:"file"`
JobStatusCheckInterval int `mapstructure:"job_status_check_interval"`
JobTimeout int `mapstructure:"job_timeout"`
Insecure bool `mapstructure:"insecure"`
RemoveTransferJobOnCancel bool `mapstructure:"remove_transfer_job_on_cancel"`
}

type rclone struct {
Expand Down Expand Up @@ -644,10 +645,24 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d
Ctime: nil,
}, err
}

cTime, _ := strconv.ParseInt(transfer.Ctime, 10, 64)
transferRemovedMessage := ""
if driver.config.RemoveTransferJobOnCancel {
delete(driver.pDriver.model.Transfers, transfer.TransferID)
if err := driver.pDriver.model.saveTransfer(nil); err != nil {
return &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: transferID},
Status: datatx.Status_STATUS_INVALID,
Ctime: &typespb.Timestamp{Seconds: uint64(cTime)},
}, err
}
transferRemovedMessage = "(transfer job successfully removed)"
}

_, endStatusFound := txEndStatuses[transfer.TransferStatus.String()]
if endStatusFound {
err := errors.New("rclone driver: transfer already in end state")
err := errors.Wrapf(errors.New("rclone driver: transfer already in end state"), transferRemovedMessage)
return &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: transferID},
Status: datatx.Status_STATUS_INVALID,
Expand All @@ -665,7 +680,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d

data, err := json.Marshal(rcloneCancelTransferReq)
if err != nil {
err = errors.Wrap(err, "rclone driver: error marshalling rclone req data")
err := errors.Wrapf(errors.New("rclone driver: error marshalling rclone req data"), transferRemovedMessage)
return &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: transferID},
Status: datatx.Status_STATUS_INVALID,
Expand All @@ -677,7 +692,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d

u, err := url.Parse(driver.config.Endpoint)
if err != nil {
err = errors.Wrap(err, "rclone driver: error parsing driver endpoint")
err := errors.Wrapf(errors.New("rclone driver: error parsing driver endpoint"), transferRemovedMessage)
return &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: transferID},
Status: datatx.Status_STATUS_INVALID,
Expand All @@ -689,7 +704,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d

req, err := http.NewRequest(http.MethodPost, requestURL, bytes.NewReader(data))
if err != nil {
err = errors.Wrap(err, "rclone driver: error framing post request")
err := errors.Wrapf(errors.New("rclone driver: error framing post request"), transferRemovedMessage)
return &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: transferID},
Status: datatx.Status_STATUS_INVALID,
Expand All @@ -702,7 +717,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d

res, err := driver.client.Do(req)
if err != nil {
err = errors.Wrap(err, "rclone driver: error sending post request")
err := errors.Wrapf(errors.New("rclone driver: error sending post request"), transferRemovedMessage)
return &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: transferID},
Status: datatx.Status_STATUS_INVALID,
Expand All @@ -715,14 +730,14 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d
if res.StatusCode != http.StatusOK {
var errorResData rcloneHTTPErrorRes
if err = json.NewDecoder(res.Body).Decode(&errorResData); err != nil {
err = errors.Wrap(err, "rclone driver: error decoding response data")
err := errors.Wrapf(errors.New("rclone driver: error decoding response data"), transferRemovedMessage)
return &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: transferID},
Status: datatx.Status_STATUS_INVALID,
Ctime: &typespb.Timestamp{Seconds: uint64(cTime)},
}, err
}
err = errors.Wrap(errors.Errorf("status: %v, error: %v", errorResData.Status, errorResData.Error), "rclone driver: rclone request responded with error")
err = errors.Wrap(errors.Errorf("%v, status: %v, error: %v", transferRemovedMessage, errorResData.Status, errorResData.Error), "rclone driver: rclone request responded with error")
return &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: transferID},
Status: datatx.Status_STATUS_INVALID,
Expand All @@ -744,7 +759,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d
}
var resData rcloneCancelTransferResJSON
if err = json.NewDecoder(res.Body).Decode(&resData); err != nil {
err = errors.Wrap(err, "rclone driver: error decoding response data")
err := errors.Wrapf(errors.New("rclone driver: error decoding response data"), transferRemovedMessage)
return &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: transferID},
Status: datatx.Status_STATUS_INVALID,
Expand Down

0 comments on commit 8f96b21

Please sign in to comment.