Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve cmd of downloading and uploading #90

Merged
merged 13 commits into from
Sep 21, 2023
103 changes: 85 additions & 18 deletions cmd/cmd_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"os"
"path"
"path/filepath"
"strings"
"time"
Expand Down Expand Up @@ -74,6 +75,11 @@ $ gnfd-cmd object put --recursive folderName gnfd://bucket-name`,
Value: false,
Usage: "performed on all files or objects under the specified directory or prefix in a recursive way",
},
&cli.BoolFlag{
Name: bypassSealFlag,
Value: false,
Usage: "if set this flag as true, it will not wait for the file to be sealed after the uploading is completed.",
},
},
}
}
Expand Down Expand Up @@ -382,8 +388,7 @@ func uploadFolder(urlInfo string, ctx *cli.Context,
}
// upload folder
for id, info := range fileInfos {
// pathList := strings.Split(info.Name(), "/")
objectName := filePaths[id]
objectName := path.Base(filePaths[id])
if uploadErr := uploadFile(bucketName, objectName, filePaths[id], urlInfo, ctx, gnfdClient, false, false, info.Size()); uploadErr != nil {
fmt.Printf("failed to upload object: %s, error:%v \n", objectName, uploadErr)
}
Expand All @@ -394,11 +399,12 @@ func uploadFolder(urlInfo string, ctx *cli.Context,

func uploadFile(bucketName, objectName, filePath, urlInfo string, ctx *cli.Context,
gnfdClient client.Client, uploadSigleFolder, printTxnHash bool, objectSize int64) error {

var file *os.File
contentType := ctx.String(contentTypeFlag)
secondarySPAccs := ctx.String(secondarySPFlag)
partSize := ctx.Uint64(partSizeFlag)
resumableUpload := ctx.Bool(resumableFlag)
bypassSeal := ctx.Bool(bypassSealFlag)

opts := sdktypes.CreateObjectOptions{}
if contentType != "" {
Expand Down Expand Up @@ -438,7 +444,7 @@ func uploadFile(bucketName, objectName, filePath, urlInfo string, ctx *cli.Conte
}
} else {
// Open the referenced file.
file, err := os.Open(filePath)
file, err = os.Open(filePath)
if err != nil {
return err
}
Expand Down Expand Up @@ -475,25 +481,58 @@ func uploadFile(bucketName, objectName, filePath, urlInfo string, ctx *cli.Conte
}
defer reader.Close()

if err = gnfdClient.PutObject(c, bucketName, objectName,
objectSize, reader, opt); err != nil {
return toCmdErr(err)
// if the file is more than 2G , it needs to force use resume uploading
if objectSize > maxPutWithoutResumeSize {
opt.DisableResumable = false
}

// Check if object is sealed
timeout := time.After(15 * time.Second)
ticker := time.NewTicker(2 * time.Second)
if opt.DisableResumable {
progressReader := &ProgressReader{
Reader: reader,
Total: objectSize,
StartTime: time.Now(),
LastPrinted: time.Now(),
}

if objectSize > objectLargerSize {
progressReader.LastPrinted = time.Now().Add(2 * time.Second)
}

if err = gnfdClient.PutObject(c, bucketName, objectName,
objectSize, progressReader, opt); err != nil {
return toCmdErr(err)
}
} else {
if err = gnfdClient.PutObject(c, bucketName, objectName,
objectSize, reader, opt); err != nil {
return toCmdErr(err)
}
}

if bypassSeal {
fmt.Printf("\nupload %s to %s \n", objectName, urlInfo)
return nil
}

// Check if object is sealed
timeout := time.After(1 * time.Hour)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it need to be that long?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After our discussion, we changed the seal printing strategy here. Users can manually cancel the wait for seal or set --bypassSeal now. If they do not cancel, they will wait until seal by default, so this time can actually be infinite.

ticker := time.NewTicker(3 * time.Second)
count := 0
fmt.Println()
fmt.Println("sealing...")
for {
select {
case <-timeout:
return toCmdErr(errors.New("object not sealed after 15 seconds"))
flywukong marked this conversation as resolved.
Show resolved Hide resolved
case <-ticker.C:
count++
headObjOutput, queryErr := gnfdClient.HeadObject(c, bucketName, objectName)
if queryErr != nil {
return queryErr
}

if count%10 == 0 {
fmt.Println("sealing...")
}
if headObjOutput.ObjectInfo.GetObjectStatus().String() == "OBJECT_STATUS_SEALED" {
ticker.Stop()
fmt.Printf("upload %s to %s \n", objectName, urlInfo)
Expand All @@ -505,6 +544,7 @@ func uploadFile(bucketName, objectName, filePath, urlInfo string, ctx *cli.Conte

// getObject download the object payload from sp
func getObject(ctx *cli.Context) error {
var err error
if ctx.NArg() < 1 {
return toCmdErr(fmt.Errorf("args number less than one"))
}
Expand All @@ -523,7 +563,7 @@ func getObject(ctx *cli.Context) error {
c, cancelGetObject := context.WithCancel(globalContext)
defer cancelGetObject()

_, err = gnfdClient.HeadObject(c, bucketName, objectName)
chainInfo, err := gnfdClient.HeadObject(c, bucketName, objectName)
if err != nil {
return toCmdErr(ErrObjectNotExist)
}
Expand All @@ -545,6 +585,11 @@ func getObject(ctx *cli.Context) error {
}
}

filePath, err = checkIfDownloadFileExist(filePath, objectName)
if err != nil {
return toCmdErr(err)
}

opt := sdktypes.GetObjectOptions{}
startOffset := ctx.Int64(startOffsetFlag)
endOffset := ctx.Int64(endOffsetFlag)
Expand All @@ -566,24 +611,46 @@ func getObject(ctx *cli.Context) error {
}
fmt.Printf("resumable download object %s, the file path is %s \n", objectName, filePath)
} else {
// If file exist, open it in append mode
fd, err := os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0660)
var fd *os.File
dir := filepath.Dir(filePath)
fileName := "." + filepath.Base(filePath) + ".tmp"
tempFilePath := filepath.Join(dir, fileName)

tempFilePath, err = checkIfDownloadFileExist(tempFilePath, objectName)
if err != nil {
return toCmdErr(err)
}
// download to the temp file firstly
fd, err = os.OpenFile(tempFilePath, os.O_CREATE|os.O_WRONLY, 0660)
if err != nil {
return err
}

defer fd.Close()

body, info, err := gnfdClient.GetObject(c, bucketName, objectName, opt)
if err != nil {
pw := &ProgressWriter{
Writer: fd,
Total: int64(chainInfo.ObjectInfo.PayloadSize),
StartTime: time.Now(),
LastPrinted: time.Now(),
}

body, info, getErr := gnfdClient.GetObject(c, bucketName, objectName, opt)
if getErr != nil {
return toCmdErr(err)
}

_, err = io.Copy(fd, body)
_, err = io.Copy(pw, body)
if err != nil {
return toCmdErr(err)
}
fmt.Printf("download object %s, the file path is %s, content length:%d \n", objectName, filePath, uint64(info.Size))

err = os.Rename(tempFilePath, filePath)
if err != nil {
fmt.Printf("failed to rename %s to %s \n", tempFilePath, filePath)
return nil
}
fmt.Printf("\ndownload object %s, the file path is %s, content length:%d \n", objectName, filePath, uint64(info.Size))
}

return nil
Expand Down
166 changes: 138 additions & 28 deletions cmd/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,36 @@ import (
)

const (
Version = "v0.1.0"
maxFileSize = 10 * 1024 * 1024 * 1024
publicReadType = "public-read"
privateType = "private"
inheritType = "inherit"
effectAllow = "allow"
effectDeny = "deny"
primarySPFlag = "primarySP"
chargeQuotaFlag = "chargedQuota"
visibilityFlag = "visibility"
paymentFlag = "paymentAddress"
secondarySPFlag = "secondarySPs"
contentTypeFlag = "contentType"
startOffsetFlag = "start"
endOffsetFlag = "end"
recursiveFlag = "recursive"
addMemberFlag = "addMembers"
removeMemberFlag = "removeMembers"
renewMemberFlag = "renewMembers"
groupOwnerFlag = "groupOwner"
groupMemberExpireFlag = "expireTime"
groupIDFlag = "groupId"
granteeFlag = "grantee"
actionsFlag = "actions"
effectFlag = "effect"
expireTimeFlag = "expire"
IdFlag = "id"
DestChainIdFlag = "destChainId"
Version = "v0.1.0"
maxFileSize = 64 * 1024 * 1024 * 1024
maxPutWithoutResumeSize = 2 * 1024 * 1024 * 1024
publicReadType = "public-read"
privateType = "private"
inheritType = "inherit"
effectAllow = "allow"
effectDeny = "deny"
primarySPFlag = "primarySP"
chargeQuotaFlag = "chargedQuota"
visibilityFlag = "visibility"
paymentFlag = "paymentAddress"
secondarySPFlag = "secondarySPs"
contentTypeFlag = "contentType"
startOffsetFlag = "start"
endOffsetFlag = "end"
recursiveFlag = "recursive"
bypassSealFlag = "bypassSeal"
addMemberFlag = "addMembers"
removeMemberFlag = "removeMembers"
renewMemberFlag = "renewMembers"
groupOwnerFlag = "groupOwner"
groupMemberExpireFlag = "expireTime"
groupIDFlag = "groupId"
granteeFlag = "grantee"
actionsFlag = "actions"
effectFlag = "effect"
expireTimeFlag = "expire"
IdFlag = "id"
DestChainIdFlag = "destChainId"

ownerAddressFlag = "owner"
addressFlag = "address"
Expand Down Expand Up @@ -103,6 +105,7 @@ const (

noBalanceErr = "key not found"
maxListMemberNum = 1000
objectLargerSize = 10 * 1024 * 1024
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a little bit confused the naming

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

)

var (
Expand Down Expand Up @@ -536,3 +539,110 @@ func parseFileByArg(ctx *cli.Context, argIndex int) (int64, error) {
}
return objectSize, nil
}

type ProgressReader struct {
io.Reader
Total int64
Current int64
StartTime time.Time
LastPrinted time.Time
LastPrintedStr string
}

func (pr *ProgressReader) Read(p []byte) (int, error) {
n, err := pr.Reader.Read(p)
pr.Current += int64(n)
pr.printProgress()
return n, err
}

func (pr *ProgressReader) printProgress() {
progress := float64(pr.Current) / float64(pr.Total) * 100
now := time.Now()
elapsed := now.Sub(pr.StartTime)
uploadSpeed := float64(pr.Current) / elapsed.Seconds()

if now.Sub(pr.LastPrinted) >= time.Second { // print rate every second
progressStr := fmt.Sprintf("uploading progress: %.2f%% [ %s / %s ], rate: %s",
progress, getConvertSize(pr.Current), getConvertSize(pr.Total), getConvertRate(uploadSpeed))
// Clear current line
fmt.Print("\r", strings.Repeat(" ", len(pr.LastPrintedStr)), "\r")
// Print new progress
fmt.Print(progressStr)

pr.LastPrinted = now
}
}

type ProgressWriter struct {
io.Writer
Total int64
Current int64
StartTime time.Time
LastPrinted time.Time
}

func (pw *ProgressWriter) Write(p []byte) (int, error) {
n, err := pw.Writer.Write(p)
pw.Current += int64(n)
pw.printProgress()
return n, err
}

func (pw *ProgressWriter) printProgress() {
progress := float64(pw.Current) / float64(pw.Total) * 100
now := time.Now()

elapsed := now.Sub(pw.StartTime)
downloadedBytes := pw.Current
downloadSpeed := float64(downloadedBytes) / elapsed.Seconds()

if now.Sub(pw.LastPrinted) >= time.Second { // print rate every second
fmt.Printf("\rdownloding progress: %.2f%% [ %s / %s ], rate: %s ",
progress, getConvertSize(pw.Current), getConvertSize(pw.Total), getConvertRate(downloadSpeed))
pw.LastPrinted = now
}
}

func getConvertSize(fileSize int64) string {
var convertedSize string
if fileSize > 1<<30 {
convertedSize = fmt.Sprintf("%.2fG", float64(fileSize)/(1<<30))
} else if fileSize > 1<<20 {
convertedSize = fmt.Sprintf("%.2fM", float64(fileSize)/(1<<20))
} else if fileSize > 1<<10 {
convertedSize = fmt.Sprintf("%.2fK", float64(fileSize)/(1<<10))
} else {
convertedSize = fmt.Sprintf("%dB", fileSize)
}
return convertedSize
}

func getConvertRate(rate float64) string {
const (
KB = 1024
MB = 1024 * KB
)

switch {
case rate >= MB:
return fmt.Sprintf("%.2f MB/s", rate/MB)
case rate >= KB:
return fmt.Sprintf("%.2f KB/s", rate/KB)
default:
return fmt.Sprintf("%.2f Byte/s", rate)
}
}

func checkIfDownloadFileExist(filePath, objectName string) (string, error) {
st, err := os.Stat(filePath)
if err == nil {
// If the destination exists and is a directory.
if st.IsDir() {
filePath = filePath + "/" + objectName
return filePath, nil
}
return filePath, fmt.Errorf("download file:%s already exist\n", filePath)
}
return filePath, nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return filePath, nil -> return filePath, err

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this file does not exist, it will be created for the user using CREAT mode open, so there is no need to return err.

}
Loading