Skip to content

Commit

Permalink
S3 Gateway enhance error handling for unsupported operations
Browse files Browse the repository at this point in the history
  • Loading branch information
nopcoder committed Nov 28, 2023
1 parent ea4c66d commit 3ef34d9
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 44 deletions.
44 changes: 22 additions & 22 deletions pkg/gateway/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,29 +198,22 @@ func OperationLookupHandler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
o := ctx.Value(ContextKeyOperation).(*operations.Operation)
repoID := ctx.Value(ContextKeyRepositoryID).(string)
o.OperationID = operations.OperationIDOperationNotFound
if repoID == "" {
if req.Method == http.MethodGet {
o.OperationID = operations.OperationIDListBuckets
} else {
_ = o.EncodeError(w, req, nil, gatewayerrors.ERRLakeFSNotSupported.ToAPIErr())
return
}
} else {
ref := ctx.Value(ContextKeyRef).(string)
pth := ctx.Value(ContextKeyPath).(string)
switch {
case ref != "" && pth != "":
req = req.WithContext(ctx)
o.OperationID = pathBasedOperationID(req.Method)
case ref == "" && pth == "":
o.OperationID = repositoryBasedOperationID(req.Method)
default:
w.WriteHeader(http.StatusNotFound)
return
}

repoID := ctx.Value(ContextKeyRepositoryID).(string)
ref := ctx.Value(ContextKeyRef).(string)
pth := ctx.Value(ContextKeyPath).(string)

// based on the operation level, we can determine the operation id
switch {
case repoID == "":
o.OperationID = rootBasedOperationID(req.Method)
case ref != "" && pth != "":
o.OperationID = pathBasedOperationID(req.Method)
case ref == "" && pth == "":
o.OperationID = repositoryBasedOperationID(req.Method)
}

req = req.WithContext(logging.AddFields(ctx, logging.Fields{"operation_id": o.OperationID}))
next.ServeHTTP(w, req)
})
Expand Down Expand Up @@ -277,7 +270,7 @@ func ParseRequestParts(host string, urlPath string, bareDomains []string) Reques
}

if !parts.MatchedHost {
// assume path based for domains we don't explicitly know
// assume path-based for domains we don't explicitly know
p = strings.SplitN(urlPath, path.Separator, 3) //nolint: gomnd
parts.Repository = p[0]
if len(p) >= 1 {
Expand All @@ -295,6 +288,13 @@ func ParseRequestParts(host string, urlPath string, bareDomains []string) Reques
return parts
}

func rootBasedOperationID(method string) operations.OperationID {
if method == http.MethodGet {
return operations.OperationIDListBuckets
}
return operations.OperationIDOperationNotFound
}

func pathBasedOperationID(method string) operations.OperationID {
switch method {
case http.MethodDelete:
Expand Down
10 changes: 10 additions & 0 deletions pkg/gateway/operations/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"slices"

"github.com/treeverse/lakefs/pkg/auth"
"github.com/treeverse/lakefs/pkg/auth/keys"
Expand Down Expand Up @@ -84,6 +85,15 @@ func (o *Operation) EncodeXMLBytes(w http.ResponseWriter, req *http.Request, t [
}
}

func (o *Operation) HandleUnsupported(w http.ResponseWriter, req *http.Request, keys ...string) bool {
query := req.URL.Query()
if slices.ContainsFunc(keys, query.Has) {
_ = o.EncodeError(w, req, nil, gwerrors.ERRLakeFSNotSupported.ToAPIErr())
return true
}
return false
}

func EncodeResponse(w http.ResponseWriter, entity interface{}, statusCode int) error {
// We don't indent the XML document because of Java.
// See: https://github.com/spulec/moto/issues/1870
Expand Down
7 changes: 4 additions & 3 deletions pkg/gateway/operations/deleteobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ func (controller *DeleteObject) HandleAbortMultipartUpload(w http.ResponseWriter
}

func (controller *DeleteObject) Handle(w http.ResponseWriter, req *http.Request, o *PathOperation) {
if o.HandleUnsupported(w, req, "tagging", "acl", "torrent") {
return
}
query := req.URL.Query()

_, hasUploadID := query[QueryParamUploadID]
if hasUploadID {
if query.Has(QueryParamUploadID) {
controller.HandleAbortMultipartUpload(w, req, o)
return
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/gateway/operations/deleteobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ func (controller *DeleteObjects) RequiredPermissions(_ *http.Request, _ string)
}

func (controller *DeleteObjects) Handle(w http.ResponseWriter, req *http.Request, o *RepoOperation) {
// verify we only handle delete request
query := req.URL.Query()
if !query.Has("delete") {
_ = o.EncodeError(w, req, nil, gerrors.ERRLakeFSNotSupported.ToAPIErr())
return
}

o.Incr("delete_objects", o.Principal, o.Repository.Name, "")
decodedXML := &serde.Delete{}
err := DecodeXMLBody(req.Body, decodedXML)
Expand Down
4 changes: 4 additions & 0 deletions pkg/gateway/operations/getobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func (controller *GetObject) RequiredPermissions(_ *http.Request, repoID, _, pat
}

func (controller *GetObject) Handle(w http.ResponseWriter, req *http.Request, o *PathOperation) {
// TODO(barak): do we like to reject tagging or return empty tagging?
if o.HandleUnsupported(w, req, "torrent", "acl", "retention", "legal-hold", "lambdaArn") {
return
}
o.Incr("get_object", o.Principal, o.Repository.Name, o.Reference)
ctx := req.Context()
query := req.URL.Query()
Expand Down
5 changes: 4 additions & 1 deletion pkg/gateway/operations/headbucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ func (controller *HeadBucket) RequiredPermissions(_ *http.Request, repoID string
}, nil
}

func (controller *HeadBucket) Handle(w http.ResponseWriter, _ *http.Request, o *RepoOperation) {
func (controller *HeadBucket) Handle(w http.ResponseWriter, req *http.Request, o *RepoOperation) {
if o.HandleUnsupported(w, req, "acl") {
return
}
o.Incr("get_repo", o.Principal, o.Repository.Name, "")
w.WriteHeader(http.StatusOK)
}
4 changes: 4 additions & 0 deletions pkg/gateway/operations/listbuckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func (controller *ListBuckets) RequiredPermissions(_ *http.Request) (permissions

// Handle - list buckets (repositories)
func (controller *ListBuckets) Handle(w http.ResponseWriter, req *http.Request, o *AuthorizedOperation) {
if o.HandleUnsupported(w, req, "events") {
return
}

o.Incr("list_repos", o.Principal, "", "")

buckets := make([]serde.Bucket, 0)
Expand Down
11 changes: 10 additions & 1 deletion pkg/gateway/operations/listobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,22 @@ func (controller *ListObjects) ListV1(w http.ResponseWriter, req *http.Request,
}

func (controller *ListObjects) Handle(w http.ResponseWriter, req *http.Request, o *RepoOperation) {
// TODO(barak): do we like to reject "versioning" or return empty response?
// TODO(barak): do we like to support "location" (getbucketlocation)?
if o.HandleUnsupported(w, req, "inventory", "metrics", "publicAccessBlock", "ownershipControls",
"intelligent-tiering", "analytics", "location", "policy", "lifecycle", "encryption", "object-lock", "replication",
"notification", "events", "acl", "cors", "website", "accelerate",
"requestPayment", "logging", "tagging", "uploads", "versions", "policyStatus") {
return
}

o.Incr("list_objects", o.Principal, o.Repository.Name, "")
// parse request parameters
// GET /example?list-type=2&prefix=main%2F&delimiter=%2F&encoding-type=url HTTP/1.1

// handle GET /?versions
query := req.URL.Query()
if _, found := query["versions"]; found {
if query.Has("versioning") {
o.EncodeXMLBytes(w, req, []byte(serde.VersioningResponse), http.StatusOK)
return
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/gateway/operations/postobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,20 +163,20 @@ func normalizeMultipartUploadCompletion(list *block.MultipartUploadCompletion) {
}

func (controller *PostObject) Handle(w http.ResponseWriter, req *http.Request, o *PathOperation) {
if o.HandleUnsupported(w, req, "select", "restore") {
return
}

// POST is only supported for CreateMultipartUpload/CompleteMultipartUpload
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
_, mpuCreateParamExist := req.URL.Query()[CreateMultipartUploadQueryParam]
if mpuCreateParamExist {
query := req.URL.Query()
switch {
case query.Has(CreateMultipartUploadQueryParam):
controller.HandleCreateMultipartUpload(w, req, o)
return
}

_, mpuCompleteParamExist := req.URL.Query()[CompleteMultipartUploadQueryParam]
if mpuCompleteParamExist {
case query.Has(CompleteMultipartUploadQueryParam):
controller.HandleCompleteMultipartUpload(w, req, o)
return
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
// otherwise
w.WriteHeader(http.StatusMethodNotAllowed)
}
11 changes: 6 additions & 5 deletions pkg/gateway/operations/putbucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ func (controller *PutBucket) RequiredPermissions(_ *http.Request, repoID string)
}

func (controller *PutBucket) Handle(w http.ResponseWriter, req *http.Request, o *RepoOperation) {
o.Incr("put_repo", o.Principal, o.Repository.Name, "")
if o.Repository == nil {
// No repo, would have to create it, but not enough
// information -- so not supported.
o.EncodeError(w, req, nil, gatewayerrors.ERRLakeFSNotSupported.ToAPIErr())
if o.HandleUnsupported(w, req, "cors", "metrics", "website", "logging", "accelerate",
"requestPayment", "acl", "publicAccessBlock", "ownershipControls", "intelligent-tiering", "analytics",
"lifecycle", "replication", "encryption", "policy", "object-lock", "tagging", "versioning") {
return
}

o.Incr("put_repo", o.Principal, o.Repository.Name, "")
o.EncodeError(w, req, nil, gatewayerrors.ErrBucketAlreadyExists.ToAPIErr())
}
7 changes: 5 additions & 2 deletions pkg/gateway/operations/putobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ func handleUploadPart(w http.ResponseWriter, req *http.Request, o *PathOperation
}

func (controller *PutObject) Handle(w http.ResponseWriter, req *http.Request, o *PathOperation) {
if o.HandleUnsupported(w, req, "torrent", "acl") {
return
}

// verify branch before we upload data - fail early
branchExists, err := o.Catalog.BranchExists(req.Context(), o.Repository.Name, o.Reference)
if err != nil {
Expand All @@ -230,8 +234,7 @@ func (controller *PutObject) Handle(w http.ResponseWriter, req *http.Request, o
query := req.URL.Query()

// check if this is a multipart upload creation call
_, hasUploadID := query[QueryParamUploadID]
if hasUploadID {
if query.Has(QueryParamUploadID) {
handleUploadPart(w, req, o)
return
}
Expand Down

0 comments on commit 3ef34d9

Please sign in to comment.