Skip to content

Commit

Permalink
return 409 conflict when a file was already created
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic committed Oct 1, 2024
1 parent 2213b55 commit 4a57767
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 30 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/fix-return-code-on-upload-error.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: return 409 conflict when a file was already created

We now return the correct 409 conflict status code when a file was already created by another upload.

https://github.com/cs3org/reva/pull/4872
72 changes: 42 additions & 30 deletions pkg/storage/utils/decomposedfs/upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"hash"
"io"
"io/fs"
"net/http"
"os"
"strconv"
"strings"
Expand All @@ -47,14 +48,16 @@ import (
"go.opentelemetry.io/otel/trace"
)

var tracer trace.Tracer
var (
tracer trace.Tracer
ErrAlreadyExists = tusd.NewError("ERR_ALREADY_EXISTS", "file already exists", http.StatusConflict)
defaultFilePerm = os.FileMode(0664)
)

func init() {
tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs/upload")
}

var defaultFilePerm = os.FileMode(0664)

// WriteChunk writes the stream from the reader to the given offset of the upload
func (session *OcisSession) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) {
ctx, span := tracer.Start(session.Context(ctx), "WriteChunk")
Expand Down Expand Up @@ -165,7 +168,15 @@ func (session *OcisSession) FinishUpload(ctx context.Context) error {
n, err := session.store.CreateNodeForUpload(session, attrs)
if err != nil {
session.store.Cleanup(ctx, session, true, false, false)
return err
// we need to return a tusd error here to make the tusd handler return the correct status code
switch err.(type) {
case errtypes.AlreadyExists:
return ErrAlreadyExists
case errtypes.Aborted:
return tusd.NewError("ERR_PRECONDITION_FAILED", err.Error(), http.StatusPreconditionFailed)
default:
return err
}
}

// increase the processing counter for every started processing
Expand Down Expand Up @@ -302,34 +313,35 @@ func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool
n, err := session.Node(ctx)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("sessionid", session.ID()).Msg("reading node for session failed")
}
if session.NodeExists() && session.info.MetaData["versionsPath"] != "" {
p := session.info.MetaData["versionsPath"]
if err := session.store.lu.CopyMetadata(ctx, p, n.InternalPath(), func(attributeName string, value []byte) (newValue []byte, copy bool) {
return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
attributeName == prefixes.TypeAttr ||
attributeName == prefixes.BlobIDAttr ||
attributeName == prefixes.BlobsizeAttr ||
attributeName == prefixes.MTimeAttr
}, true); err != nil {
appctx.GetLogger(ctx).Info().Str("versionpath", p).Str("nodepath", n.InternalPath()).Err(err).Msg("renaming version node failed")
}

if err := os.RemoveAll(p); err != nil {
appctx.GetLogger(ctx).Info().Str("versionpath", p).Str("nodepath", n.InternalPath()).Err(err).Msg("error removing version")
}

} else {
// if no other upload session is in progress (processing id != session id) or has finished (processing id == "")
latestSession, err := n.ProcessingID(ctx)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("uploadid", session.ID()).Msg("reading processingid for session failed")
}
if latestSession == session.ID() {
// actually delete the node
session.removeNode(ctx)
if session.NodeExists() && session.info.MetaData["versionsPath"] != "" {
p := session.info.MetaData["versionsPath"]
if err := session.store.lu.CopyMetadata(ctx, p, n.InternalPath(), func(attributeName string, value []byte) (newValue []byte, copy bool) {
return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
attributeName == prefixes.TypeAttr ||
attributeName == prefixes.BlobIDAttr ||
attributeName == prefixes.BlobsizeAttr ||
attributeName == prefixes.MTimeAttr
}, true); err != nil {
appctx.GetLogger(ctx).Info().Str("versionpath", p).Str("nodepath", n.InternalPath()).Err(err).Msg("renaming version node failed")
}

if err := os.RemoveAll(p); err != nil {
appctx.GetLogger(ctx).Info().Str("versionpath", p).Str("nodepath", n.InternalPath()).Err(err).Msg("error removing version")
}

} else {
// if no other upload session is in progress (processing id != session id) or has finished (processing id == "")
latestSession, err := n.ProcessingID(ctx)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("uploadid", session.ID()).Msg("reading processingid for session failed")
}
if latestSession == session.ID() {
// actually delete the node
session.removeNode(ctx)
}
// FIXME else if the upload has become a revision, delete the revision, or if it is the last one, delete the node
}
// FIXME else if the upload has become a revision, delete the revision, or if it is the last one, delete the node
}
}

Expand Down

0 comments on commit 4a57767

Please sign in to comment.