Skip to content

Commit

Permalink
fmt.Println debugging
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic committed Nov 24, 2023
1 parent b9e0f3e commit f3b262b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
30 changes: 27 additions & 3 deletions pkg/storage/utils/decomposedfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error {

// restore logger from file info
log, err := logger.FromConfig(&logger.LogConf{
Output: "stdout",
Output: "stderr",
Mode: "json",
Level: uploadMetadata.LogLevel,
})
Expand All @@ -295,6 +295,7 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error {
// TODO only calculate the checksum in sync that was requested to match, the rest could be async ... but the tests currently expect all to be present
// TODO the hashes all implement BinaryMarshaler so we could try to persist the state for resumable upload. we would neet do keep track of the copied bytes ...

log.Debug().Msg("calculating checksums")
sha1h := sha1.New()
md5h := md5.New()
adler32h := adler32.New()
Expand Down Expand Up @@ -362,6 +363,7 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error {
uploadMetadata.ChecksumMD5 = md5h.Sum(nil)
uploadMetadata.ChecksumADLER32 = adler32h.Sum(nil)

log.Debug().Str("id", info.ID).Msg("upload.UpdateMetadata")
uploadMetadata, n, err := upload.UpdateMetadata(ctx, fs.lu, info.ID, info.Size, uploadMetadata)
if err != nil {
upload.Cleanup(ctx, fs.lu, n, info.ID, uploadMetadata.MTime, true)
Expand All @@ -388,6 +390,7 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error {
return err
}

log.Debug().Str("id", info.ID).Msg("events.Publish BytesReceived")
if err := events.Publish(ctx, fs.stream, events.BytesReceived{
UploadID: info.ID,
URL: s,
Expand All @@ -402,7 +405,8 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error {
}

if n.Exists {
// copy metadata to a revision node
// // copy metadata to a revision node
log.Debug().Str("id", info.ID).Msg("copy metadata to a revision node")
currentAttrs, err := n.Xattrs(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -435,9 +439,13 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error {
sizeDiff := info.Size - n.Blobsize
if !fs.o.AsyncFileUploads {
// handle postprocessing synchronously
log.Debug().Str("id", info.ID).Msg("upload.Finalize")
err = upload.Finalize(ctx, fs.blobstore, uploadMetadata.MTime, info, n, uploadMetadata.BlobID) // moving or copying the blob only reads the blobid, no need to change the revision nodes nodeid

log.Debug().Str("id", info.ID).Msg("upload.Cleanup")
upload.Cleanup(ctx, fs.lu, n, info.ID, uploadMetadata.MTime, err != nil)
if tup, ok := up.(tusd.TerminatableUpload); ok {
log.Debug().Str("id", info.ID).Msg("tup.Terminate")
terr := tup.Terminate(ctx)
if terr != nil {
log.Error().Err(terr).Interface("info", info).Msg("failed to terminate upload")
Expand All @@ -447,13 +455,14 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error {
log.Error().Err(err).Msg("failed to upload")
return err
}
log.Debug().Str("id", info.ID).Msg("upload.SetNodeToUpload")
sizeDiff, err = upload.SetNodeToUpload(ctx, fs.lu, n, uploadMetadata)
if err != nil {
log.Error().Err(err).Msg("failed update Node to revision")
return err
}
}

log.Debug().Str("id", info.ID).Msg("fs.tp.Propagate")
return fs.tp.Propagate(ctx, n, sizeDiff)
}

Expand Down Expand Up @@ -500,28 +509,34 @@ func checkHash(expected string, h hash.Hash) error {
// TODO Upload (and InitiateUpload) needs a way to receive the expected checksum.
// Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated?
func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, uff storage.UploadFinishedFunc) (provider.ResourceInfo, error) {
sublog := appctx.GetLogger(ctx).With().Str("path", req.Ref.Path).Int64("uploadLength", req.Length).Logger()
up, err := fs.tusDataStore.GetUpload(ctx, req.Ref.GetPath())
if err != nil {
sublog.Debug().Err(err).Msg("Decomposedfs: error retrieving upload")
return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error retrieving upload")
}

uploadInfo, err := up.GetInfo(ctx)
if err != nil {
sublog.Debug().Err(err).Msg("Decomposedfs: error retrieving upload info")
return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error retrieving upload info")
}

uploadMetadata, err := upload.ReadMetadata(ctx, fs.lu, uploadInfo.ID)
if err != nil {
sublog.Debug().Err(err).Msg("Decomposedfs: error retrieving upload metadata")
return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error retrieving upload metadata")
}

if chunking.IsChunked(uploadMetadata.Chunk) { // check chunking v1, TODO, actually there is a 'OC-Chunked: 1' header, at least when the testsuite uses chunking v1
var assembledFile, p string
p, assembledFile, err = fs.chunkHandler.WriteChunk(uploadMetadata.Chunk, req.Body)
if err != nil {
sublog.Debug().Err(err).Msg("Decomposedfs: could not write chunk")
return provider.ResourceInfo{}, err
}
if p == "" {
sublog.Debug().Err(err).Str("chunk", uploadMetadata.Chunk).Msg("Decomposedfs: wrote chunk")
return provider.ResourceInfo{}, errtypes.PartialContent(req.Ref.String())
}
uploadMetadata.Filename = p
Expand Down Expand Up @@ -572,12 +587,14 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u
if req.Length > 0 {
if ldx, ok := up.(tusd.LengthDeclarableUpload); ok {
if err := ldx.DeclareLength(ctx, req.Length); err != nil {
sublog.Debug().Err(err).Msg("Decomposedfs: error declaring length")
return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error declaring length")
}
}
}
bytesWritten, err := up.WriteChunk(ctx, 0, req.Body)
if err != nil {
sublog.Debug().Err(err).Msg("Decomposedfs: error writing to binary file")
return provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error writing to binary file")
}
uploadInfo.Offset += bytesWritten
Expand All @@ -589,18 +606,23 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u
}

// This finishes the tus upload
sublog.Debug().Msg("finishing upload")
if err := up.FinishUpload(ctx); err != nil {
sublog.Debug().Err(err).Msg("Decomposedfs: error finishing upload")
return provider.ResourceInfo{}, err
}

// we now need to handle to move/copy&delete to the target blobstore
sublog.Debug().Msg("executing tusd prefinish callback")
err = fs.PreFinishResponseCallback(tusd.HookEvent{Upload: uploadInfo})
if err != nil {
sublog.Debug().Err(err).Msg("Decomposedfs: tusd callback failed")
return provider.ResourceInfo{}, err
}

n, err := upload.ReadNode(ctx, fs.lu, uploadMetadata)
if err != nil {
sublog.Debug().Err(err).Msg("Decomposedfs: error reading node")
return provider.ResourceInfo{}, err
}

Expand All @@ -620,6 +642,7 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u
return provider.ResourceInfo{}, errtypes.PreconditionFailed("error getting user from context")
}

sublog.Debug().Msg("calling upload finished func")
uff(n.SpaceOwnerOrManager(ctx), excutant.Id, uploadRef)
}

Expand All @@ -644,6 +667,7 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u
if mtime, err := utils.MTimeToTS(uploadInfo.MetaData["mtime"]); err == nil {
ri.Mtime = &mtime
}
sublog.Debug().Msg("Decomposedfs: finished upload")

return ri, nil
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/utils/metadata/cs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
Expand Down Expand Up @@ -184,7 +185,9 @@ func (cs3 *CS3) Upload(ctx context.Context, req UploadRequest) (*UploadResponse,
// FIXME ... we need a better way to transport filesize
ifuReq.Opaque = utils.AppendPlainToOpaque(ifuReq.Opaque, net.HeaderUploadLength, strconv.FormatInt(int64(len(req.Content)), 10))

fmt.Println("client.InitiateFileUpload")
res, err := client.InitiateFileUpload(ctx, ifuReq)
fmt.Printf("client.InitiateFileUpload err=%v\n", err)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -214,7 +217,9 @@ func (cs3 *CS3) Upload(ctx context.Context, req UploadRequest) (*UploadResponse,

md, _ := metadata.FromOutgoingContext(ctx)
httpReq.Header.Add(ctxpkg.TokenHeader, md.Get(ctxpkg.TokenHeader)[0])
fmt.Println("cs3.dataGatewayClient.Do")
resp, err := cs3.dataGatewayClient.Do(httpReq)
fmt.Printf("cs3.dataGatewayClient.Do err=%v\n", err)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit f3b262b

Please sign in to comment.