Skip to content

Commit

Permalink
Pass the upload file size to the http layer if WriteUsesLocalTemp is …
Browse files Browse the repository at this point in the history
…true
  • Loading branch information
root committed May 27, 2021
1 parent 4f62e04 commit 328a1da
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 7 deletions.
12 changes: 9 additions & 3 deletions pkg/eosclient/eosgrpc/eos_http/eoshttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"io"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -354,10 +355,10 @@ func (c *Client) GETFile(ctx context.Context, remoteuser, uid, gid, urlpath stri
}

// PUTFile does an entire PUT to upload a full file, taking the data from a stream
func (c *Client) PUTFile(ctx context.Context, remoteuser, uid, gid, urlpath string, stream io.ReadCloser) error {
func (c *Client) PUTFile(ctx context.Context, remoteuser, uid, gid, urlpath string, stream io.ReadCloser, length int64) error {

log := appctx.GetLogger(ctx)
log.Info().Str("func", "PUTFile").Str("remoteuser", remoteuser).Str("uid,gid", uid+","+gid).Str("path", urlpath).Msg("")
log.Info().Str("func", "PUTFile").Str("remoteuser", remoteuser).Str("uid,gid", uid+","+gid).Str("path", urlpath).Int64("length", length).Msg("")

// Now send the req and see what happens
finalurl, err := c.buildFullURL(urlpath, uid, gid)
Expand Down Expand Up @@ -391,7 +392,7 @@ func (c *Client) PUTFile(ctx context.Context, remoteuser, uid, gid, urlpath stri
log.Debug().Str("func", "PUTFile").Msg("sending req")
resp, err := c.cl.Do(req)

// Let's support redirections... and if we retry we have to retry at the same FST, avoid going back to the MGM
// Let's support redirections... and if we retry we retry at the same FST
if resp != nil && resp.StatusCode == 307 {

// io.Copy(ioutil.Discard, resp.Body)
Expand All @@ -407,6 +408,11 @@ func (c *Client) PUTFile(ctx context.Context, remoteuser, uid, gid, urlpath stri
Transport: httpTransport}

req, err = http.NewRequestWithContext(ctx, "PUT", loc.String(), stream)
if length >= 0 {
log.Debug().Str("func", "PUTFile").Int64("Content-Length", length).Msg("setting header")
req.Header.Set("Content-Length", strconv.FormatInt(length, 10))

}
if err != nil {
log.Error().Str("func", "PUTFile").Str("url", loc.String()).Str("err", err.Error()).Msg("can't create redirected request")
return err
Expand Down
12 changes: 8 additions & 4 deletions pkg/eosclient/eosgrpc/eosgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type Options struct {

// Set to true to use the local disk as a buffer for chunk
// writes to EOS. Default is false, i.e. pure streaming
// Beware: in pure streaming mode the FST must support
// the HTTP chunked encoding
WriteUsesLocalTemp bool

// Location of the xrdcopy binary.
Expand Down Expand Up @@ -1198,8 +1200,10 @@ func (c *Client) Read(ctx context.Context, uid, gid, path string) (io.ReadCloser
func (c *Client) Write(ctx context.Context, uid, gid, path string, stream io.ReadCloser) error {
log := appctx.GetLogger(ctx)
log.Info().Str("func", "Write").Str("uid,gid", uid+","+gid).Str("path", path).Msg("")
var length int64
length = -1

if c.opt.ReadUsesLocalTemp {
if c.opt.WriteUsesLocalTemp {
fd, err := ioutil.TempFile(c.opt.CacheDirectory, "eoswrite-")
if err != nil {
return err
Expand All @@ -1209,7 +1213,7 @@ func (c *Client) Write(ctx context.Context, uid, gid, path string, stream io.Rea

log.Info().Str("func", "Write").Str("uid,gid", uid+","+gid).Str("path", path).Str("tempfile", fd.Name()).Msg("")
// copy stream to local temp file
_, err = io.Copy(fd, stream)
length, err = io.Copy(fd, stream)
if err != nil {
return err
}
Expand All @@ -1221,10 +1225,10 @@ func (c *Client) Write(ctx context.Context, uid, gid, path string, stream io.Rea
defer wfd.Close()
defer os.RemoveAll(fd.Name())

return c.GetHTTPCl().PUTFile(ctx, "", uid, gid, path, wfd)
return c.GetHTTPCl().PUTFile(ctx, "", uid, gid, path, wfd, length)
}

return c.GetHTTPCl().PUTFile(ctx, "", uid, gid, path, stream)
return c.GetHTTPCl().PUTFile(ctx, "", uid, gid, path, stream, length)

// return c.GetHttpCl().PUTFile(ctx, remoteuser, uid, gid, urlpathng, stream)
// return c.WriteFile(ctx, uid, gid, path, fd.Name())
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/utils/eosfs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,7 @@ type Config struct {
// Normally the eosgrpc plugin streams data on the fly.
// Setting this to true will make reva use the temp cachedirectory
// as intermediate step for write operations
// Beware: in pure streaming mode the FST must support
// the HTTP chunked encoding
WriteUsesLocalTemp bool `mapstructure:"write_uses_local_temp"`
}

0 comments on commit 328a1da

Please sign in to comment.