-
Notifications
You must be signed in to change notification settings - Fork 35
/
copy.go
170 lines (158 loc) · 4.52 KB
/
copy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package storage
import (
"fmt"
"github.com/pkg/errors"
"github.com/viant/afs/option"
"github.com/viant/endly"
"github.com/viant/endly/internal/udf"
"github.com/viant/endly/model/location"
"github.com/viant/endly/service/system/storage/copy"
)
// CopyRequest represents a resources Copy request
type CopyRequest struct {
*copy.Rule `description:"if asset uses relative path it will be joined with this URL" json:",inline"`
Assets copy.Assets `description:"map entry can either represent a transfer struct or simple key is the source and the value destination relative path"` // transfers
Transfers []*copy.Rule `description:"actual transfer assets, if empty it derives from assets or source/desc "`
Udf string `description:"custom user defined function to return github.com/viant/afs/option.Modifier type to modify copied content"`
}
// CopyResponse represents a resources Copy response
type CopyResponse struct {
URLs []string //transferred URLs
}
// Copy copy source to dest
func (s *service) Copy(context *endly.Context, request *CopyRequest) (*CopyResponse, error) {
var response = &CopyResponse{
URLs: make([]string, 0),
}
return response, s.copy(context, request, response)
}
func (s *service) copy(context *endly.Context, request *CopyRequest, response *CopyResponse) error {
var udfModifier option.Modifier
if request.Udf != "" {
var ok bool
UDF, err := udf.TransformWithUDF(context, request.Udf, "", nil)
if err != nil {
return errors.Wrapf(err, "failed to get udf: %v", request.Udf)
}
udfModifier, ok = UDF.(option.Modifier)
if !ok {
return fmt.Errorf("udf %v does not implement %T", UDF, udfModifier)
}
}
for _, rule := range request.Transfers {
if err := s.transfer(context, rule, udfModifier, response); err != nil {
return err
}
}
return nil
}
func (s *service) transfer(context *endly.Context, rule *copy.Rule, udfModifier option.Modifier, response *CopyResponse) error {
source, sourceOpts, err := getSourceWithOptions(context, rule)
if err != nil {
return err
}
dest, destOpts, err := getDestWithOptions(context, rule, udfModifier)
if err != nil {
return err
}
fs, err := StorageService(context, source, dest)
if err != nil {
return err
}
useCompression := rule.Compress && IsCompressable(source.Scheme()) && IsCompressable(dest.Scheme())
object, err := fs.Object(context.Background(), source.URL)
if err != nil {
return errors.Wrapf(err, "%v: source not found", source.URL)
}
if useCompression {
err = s.compressSource(context, source, dest, object)
if err != nil {
return err
}
}
err = fs.Copy(context.Background(), source.URL, dest.URL, sourceOpts, destOpts)
if err != nil {
return err
}
if useCompression {
err = s.decompressTarget(context, source, dest, object)
if err != nil {
return err
}
}
response.URLs = append(response.URLs, object.URL())
return nil
}
// CopyRequest creates a new Copy request
func NewCopyRequest(assets copy.Assets, transfers ...*copy.Rule) *CopyRequest {
var super *copy.Rule
if len(transfers) > 0 {
super = transfers[0]
transfers = transfers[1:]
}
return &CopyRequest{
Rule: super,
Assets: assets,
Transfers: transfers,
}
}
// NewCopyRequestFromURL creates a new request from URL (JSON or YAML format are supported)
func NewCopyRequestFromURL(URL string) (*CopyRequest, error) {
var request = &CopyRequest{}
resource := location.NewResource(URL)
if err := resource.Decode(request); err != nil {
return nil, err
}
return request, nil
}
// Init initialises request
func (r *CopyRequest) Init() error {
if r.Rule == nil {
r.Rule = ©.Rule{}
} else {
if err := r.Rule.Init(); err != nil {
return err
}
}
hasAssets := len(r.Assets) > 0
hasTransfers := len(r.Transfers) > 0
if hasTransfers {
if r.Source == nil && r.Dest == nil {
return nil
}
for _, rule := range r.Transfers {
if rule.Source != nil {
if r.Source == nil {
r.Source = location.NewResource("/")
}
rule.Source = copy.JoinIfNeeded(r.Source, rule.Source.URL)
}
if rule.Dest != nil {
rule.Dest = copy.JoinIfNeeded(r.Dest, rule.Dest.URL)
}
}
return nil
}
if !hasAssets {
if r.Source != nil && r.Dest != nil {
r.Transfers = []*copy.Rule{
r.Rule,
}
}
return nil
}
r.Transfers = r.Assets.AsTransfer(r.Rule)
return nil
}
// Validate checks if request is valid
func (r *CopyRequest) Validate() error {
if len(r.Transfers) == 0 {
return errors.New("transfers were empty")
}
for _, rule := range r.Transfers {
if err := rule.Validate(); err != nil {
return err
}
}
return nil
}