-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paths3Compat.go
430 lines (355 loc) · 10.7 KB
/
s3Compat.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
package s3Compat
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"sync"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
)
type Params struct {
REGION string
BUCKET string
ACCESS_KEY string
SECRET_KEY string
ENDPOINT string
Service *s3.Client
}
// Take stream data non-seekable stream like StdIn and upload it to S3
func (p *Params) PutObjectStream(key string, data io.Reader) error {
ctx := context.TODO() // Replace with your context if necessary
resp, err := p.Service.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
Bucket: &p.BUCKET,
Key: &key,
})
if err != nil {
return fmt.Errorf("unable to initiate multipart upload: %v", err)
}
var wg sync.WaitGroup
sem := make(chan bool, 10) // Number of concurrent uploads
partSize := int64(1024 * 1024 * 200) // 200MB part size
completedParts := make([]*types.CompletedPart, 0, 2048) // Max number of total parts
for partNum := 0; ; partNum++ {
partBuffer := make([]byte, partSize)
bytesRead, err := io.ReadFull(data, partBuffer)
if err == io.EOF {
break
} else if err != nil && err != io.ErrUnexpectedEOF {
return fmt.Errorf("unable to read from input data: %v", err)
}
partBuffer = partBuffer[:bytesRead] // Adjust buffer size to actual bytes read
wg.Add(1)
sem <- true // block if there are already 10 active uploads
go func(pn int, pb []byte) {
defer wg.Done()
defer func() { <-sem }() // release the semaphore when the upload finishes
uploadResult, err := p.Service.UploadPart(ctx, &s3.UploadPartInput{
Body: bytes.NewReader(pb),
Bucket: &p.BUCKET,
Key: &key,
PartNumber: aws.Int32(int32(pn + 1)), // part numbers start from 1
UploadId: resp.UploadId,
})
if err != nil {
fmt.Printf("unable to upload part: %v\n", err)
return
}
completedParts[pn] = &types.CompletedPart{
ETag: uploadResult.ETag,
PartNumber: aws.Int32(int32(pn + 1)),
}
}(partNum, partBuffer)
completedParts = append(completedParts, nil) // placeholder for the completed part
}
wg.Wait()
// Remove any nil elements from completedParts. This should not be necessary if all parts were uploaded successfully.
finalParts := make([]types.CompletedPart, 0, len(completedParts))
for _, part := range completedParts {
if part != nil {
finalParts = append(finalParts, *part)
}
}
_, err = p.Service.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: &p.BUCKET,
Key: &key,
UploadId: resp.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: finalParts,
},
})
if err != nil {
return fmt.Errorf("unable to complete multipart upload: %v", err)
}
return nil
}
// Function to put object with a seekable stream
func (p *Params) PutObjectSeekableStream(key string, data io.Reader) error {
// Create a context
ctx := context.TODO() // Replace with your context if necessary
// Create an uploader with the client and default options
uploader := manager.NewUploader(p.Service)
// Perform an upload.
_, err := uploader.Upload(ctx, &s3.PutObjectInput{
Bucket: &p.BUCKET,
Key: &key,
Body: data,
})
if err != nil {
// Print the error and exit.
return fmt.Errorf("unable to upload %q to %q, %v", key, p.BUCKET, err)
}
return nil
}
// Function to get object stream in a simple way
func (p *Params) GetObjectStreamSimple(key string) (io.ReadCloser, error) {
// Set up the input for the GetObject method.
input := &s3.GetObjectInput{
Bucket: &p.BUCKET,
Key: &key,
}
// Create a context
ctx := context.TODO() // Replace with your context if necessary
// Call S3's GetObject method and handle the error (if any).
result, err := p.Service.GetObject(ctx, input)
if err != nil {
return nil, err
}
return result.Body, nil
}
// Parallel
func (p *Params) GetObjectStream(key string) (io.ReadCloser, error) {
const chunkSize int64 = 500 * 1024 * 1024 // 500MB
// Create a context
ctx := context.TODO() // Replace with your context if necessary
// Get the object size
headInput := &s3.HeadObjectInput{
Bucket: &p.BUCKET,
Key: &key,
}
headResult, err := p.Service.HeadObject(ctx, headInput)
if err != nil {
return nil, err
}
objectSize := headResult.ContentLength
// Calculate the number of chunks
numChunks := int(*objectSize / chunkSize) // Dereference objectSize before division
if *objectSize%chunkSize != 0 { // Dereference objectSize before modulo
numChunks++
}
// Buffer to hold chunks with range metadata
type Chunk struct {
RangeStart int64
Data *bytes.Buffer
}
buffer := make([]*Chunk, numChunks)
var bufferMutex sync.Mutex
// Wait group to wait for all download workers
var wg sync.WaitGroup
// Launch 3 download workers
for worker := 0; worker < 3; worker++ {
wg.Add(1)
go func(worker int) {
defer wg.Done()
for chunkIndex := worker; chunkIndex < numChunks; chunkIndex += 3 {
start := int64(chunkIndex) * chunkSize
end := start + chunkSize - 1
if chunkIndex == numChunks-1 {
end = *objectSize - 1 // Dereference objectSize before subtraction
}
// Create the GetObjectInput with the Range parameter
rangeStr := fmt.Sprintf("bytes=%d-%d", start, end)
input := &s3.GetObjectInput{
Bucket: &p.BUCKET,
Key: &key,
Range: &rangeStr, // Take the address of rangeStr
}
// Call S3's GetObject method for this chunk
result, err := p.Service.GetObject(ctx, input)
if err != nil {
// Handle error
return
}
defer result.Body.Close()
// Read the chunk into a buffer
buf := new(bytes.Buffer)
_, err = io.Copy(buf, result.Body)
if err != nil {
// Handle error
return
}
chunk := &Chunk{
RangeStart: start,
Data: buf,
}
// Store the chunk in the buffer with proper synchronization
bufferMutex.Lock()
buffer[chunkIndex] = chunk
bufferMutex.Unlock()
}
}(worker)
}
// Custom reader to read from buffer in the correct order
reader, writer := io.Pipe()
go func() {
wg.Wait() // Wait for all download workers to finish
// Write chunks to writer in the correct order
for _, chunk := range buffer {
_, err := writer.Write(chunk.Data.Bytes())
if err != nil {
// Handle error
return
}
}
writer.Close()
}()
return reader, nil
}
// Function that takes []byte and uploads it to S3
func (p *Params) PutObject(key string, data []byte) error {
// Set up the input for the PutObject method.
input := &s3.PutObjectInput{
Body: bytes.NewReader(data),
Bucket: &p.BUCKET,
Key: &key,
}
// Create a context
ctx := context.TODO() // Replace with your context if necessary
// Call S3's PutObject method and handle the error (if any).
_, err := p.Service.PutObject(ctx, input)
if err != nil {
return err
}
return nil
}
// Function that takes key and metadata and updates metadata for that key
func (p *Params) SetMetadata(key string, metadata map[string]string) error {
// Create a context
ctx := context.TODO() // Replace with your context if necessary
// Prepare the CopyObjectInput
input := &s3.CopyObjectInput{
Bucket: &p.BUCKET,
CopySource: aws.String(fmt.Sprintf("%s/%s", p.BUCKET, key)),
Key: &key,
Metadata: metadata,
MetadataDirective: types.MetadataDirectiveReplace,
}
// Perform the CopyObject operation to update metadata
_, err := p.Service.CopyObject(ctx, input)
if err != nil {
return err
}
return nil
}
// Function that takes key and returns metadata for that key
func (p *Params) GetMetadata(key string) (map[string]string, error) {
// Create a context
ctx := context.TODO() // Replace with your context if necessary
// Prepare the HeadObjectInput
input := &s3.HeadObjectInput{
Bucket: &p.BUCKET,
Key: &key,
}
// Call S3's HeadObject method
output, err := p.Service.HeadObject(ctx, input)
if err != nil {
return nil, err
}
// Extract metadata from the output
metadata := output.Metadata
return metadata, nil
}
// Function that takes key and downloads it from S3
func (p *Params) GetObject(key string) ([]byte, error) {
// Set up the input for the GetObject method.
input := &s3.GetObjectInput{
Bucket: &p.BUCKET,
Key: &key,
}
// Create a context
ctx := context.TODO() // Replace with your context if necessary
// Call S3's GetObject method and handle the error (if any).
result, err := p.Service.GetObject(ctx, input)
if err != nil {
return nil, err
}
defer result.Body.Close()
// Read the bytes from S3
buf := new(bytes.Buffer)
_, err = io.Copy(buf, result.Body)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// Function that deletes object from S3
func (p *Params) DeleteObject(key string) error {
// Set up the input for the DeleteObject method.
input := &s3.DeleteObjectInput{
Bucket: &p.BUCKET,
Key: &key,
}
// Create a context
ctx := context.TODO() // Replace with your context if necessary
// Call S3's DeleteObject method and handle the error (if any).
_, err := p.Service.DeleteObject(ctx, input)
if err != nil {
return err
}
return nil
}
// Function that lists all objects in a bucket
func (p *Params) ListObjects() ([]string, error) {
listObjectsOutput, err := p.Service.ListObjectsV2(context.TODO(), &s3.ListObjectsV2Input{
Bucket: &p.BUCKET,
})
if err != nil {
return nil, err
}
// Get names of all objects
var names []string
for _, object := range listObjectsOutput.Contents {
names = append(names, *object.Key)
}
return names, nil
}
func New(params Params) (*Params, error) {
// Go over all params and if unset set them to defaults
if params.REGION == "" {
params.REGION = "auto"
}
if params.BUCKET == "" {
return nil, errors.New("bucket name is required")
}
if params.ACCESS_KEY == "" {
return nil, errors.New("access key is required")
}
if params.SECRET_KEY == "" {
return nil, errors.New("secret key is required")
}
if params.ENDPOINT == "" {
return nil, errors.New("endpoint is required")
}
customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{
URL: params.ENDPOINT,
SigningRegion: params.REGION,
}, nil
})
cfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithEndpointResolverWithOptions(customResolver),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(params.ACCESS_KEY, params.SECRET_KEY, "")),
)
cfg.Region = params.REGION
if err != nil {
return nil, err
}
// Create a new S3 service.
params.Service = s3.NewFromConfig(cfg)
return ¶ms, nil
}