Skip to content

Commit

Permalink
Move API upload endpoint to package service
Browse files Browse the repository at this point in the history
- Remove the upload service and package.
- Set up and close the upload bucket in main.
- Update configuration:
  - Make upload max size configurable.
  - Use `go.artefactual.dev/tools/bucket` config struct.

[skip-codecov]
  • Loading branch information
jraddaoui committed Sep 4, 2024
1 parent 0ecec2a commit a0d454d
Show file tree
Hide file tree
Showing 52 changed files with 1,620 additions and 2,627 deletions.
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ gen-mock: $(MOCKGEN)
mockgen -typed -destination=./internal/storage/fake/mock_client.go -package=fake github.com/artefactual-sdps/enduro/internal/storage Client
mockgen -typed -destination=./internal/storage/fake/mock_storage.go -package=fake github.com/artefactual-sdps/enduro/internal/storage Service
mockgen -typed -destination=./internal/storage/persistence/fake/mock_persistence.go -package=fake github.com/artefactual-sdps/enduro/internal/storage/persistence Storage
mockgen -typed -destination=./internal/upload/fake/mock_upload.go -package=fake github.com/artefactual-sdps/enduro/internal/upload Service
mockgen -typed -destination=./internal/watcher/fake/mock_service.go -package=fake github.com/artefactual-sdps/enduro/internal/watcher Service
mockgen -typed -destination=./internal/watcher/fake/mock_watcher.go -package=fake github.com/artefactual-sdps/enduro/internal/watcher Watcher

Expand Down Expand Up @@ -249,11 +248,11 @@ tparse: $(TPARSE)
go test -count=1 -json -cover $(TEST_PACKAGES) | tparse -follow -all -notests

upload-sample-transfer: # @HELP Upload sample transfer (small.zip).
upload-sample-transfer: ADDRESS ?= localhost:9000
upload-sample-transfer: ADDRESS ?= localhost:9002
upload-sample-transfer:
curl \
-F "file=@$(CURDIR)/internal/testdata/zipped_transfer/small.zip" \
http://$(ADDRESS)/upload/upload
http://$(ADDRESS)/package/upload

workflowcheck: # @HELP Detect non-determinism in workflow functions.
workflowcheck: $(WORKFLOWCHECK)
Expand Down
2 changes: 2 additions & 0 deletions cmd/enduro-a3m-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ func main() {
&auth.NoopTokenVerifier{},
nil,
cfg.Temporal.TaskQueue,
nil,
0,
)
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/enduro-am-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ func main() {
&auth.NoopTokenVerifier{},
nil,
cfg.Temporal.TaskQueue,
nil,
0,
)
}

Expand Down
41 changes: 15 additions & 26 deletions cmd/enduro/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/pflag"
"go.artefactual.dev/tools/bucket"
"go.artefactual.dev/tools/log"
temporal_tools "go.artefactual.dev/tools/temporal"
"go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace"
Expand Down Expand Up @@ -50,7 +51,6 @@ import (
storage_entdb "github.com/artefactual-sdps/enduro/internal/storage/persistence/ent/db"
storage_workflows "github.com/artefactual-sdps/enduro/internal/storage/workflows"
"github.com/artefactual-sdps/enduro/internal/telemetry"
"github.com/artefactual-sdps/enduro/internal/upload"
"github.com/artefactual-sdps/enduro/internal/version"
"github.com/artefactual-sdps/enduro/internal/watcher"
"github.com/artefactual-sdps/enduro/internal/workflow"
Expand Down Expand Up @@ -215,6 +215,14 @@ func main() {
)
}

// Set up upload bucket.
uploadBucket, err := bucket.NewWithConfig(ctx, &cfg.Upload.Bucket)
if err != nil {
logger.Error(err, "Error setting up upload bucket.")
os.Exit(1)
}
defer uploadBucket.Close()

// Set up the package service.
var pkgsvc package_.Service
{
Expand All @@ -227,6 +235,8 @@ func main() {
tokenVerifier,
ticketProvider,
cfg.Temporal.TaskQueue,
uploadBucket,
cfg.Upload.MaxSize,
)
}

Expand Down Expand Up @@ -264,17 +274,6 @@ func main() {
}
}

// Set up the upload service.
var uploadsvc upload.Service
{
uploadsvc, err = upload.NewService(logger.WithName("upload"), cfg.Upload, upload.UPLOAD_MAX_SIZE, tokenVerifier)
if err != nil {
logger.Error(err, "Error setting up upload service.")
os.Exit(1)
}
defer uploadsvc.Close()
}

// Set up the watcher service.
var wsvc watcher.Service
{
Expand All @@ -293,7 +292,7 @@ func main() {

g.Add(
func() error {
srv = api.HTTPServer(logger, tp, &cfg.API, pkgsvc, storagesvc, uploadsvc)
srv = api.HTTPServer(logger, tp, &cfg.API, pkgsvc, storagesvc)
return srv.ListenAndServe()
},
func(err error) {
Expand All @@ -317,6 +316,8 @@ func main() {
&auth.NoopTokenVerifier{},
ticketProvider,
cfg.Temporal.TaskQueue,
uploadBucket,
cfg.Upload.MaxSize,
)

storagesvc, err = storage.NewService(
Expand All @@ -332,23 +333,11 @@ func main() {
os.Exit(1)
}

uploadsvc, err = upload.NewService(
logger.WithName("internal-upload"),
cfg.Upload,
upload.UPLOAD_MAX_SIZE,
&auth.NoopTokenVerifier{},
)
if err != nil {
logger.Error(err, "Error setting up internal upload service.")
os.Exit(1)
}
defer uploadsvc.Close()

var srv *http.Server

g.Add(
func() error {
srv = api.HTTPServer(logger, tp, &cfg.InternalAPI, pkgsvc, storagesvc, uploadsvc)
srv = api.HTTPServer(logger, tp, &cfg.InternalAPI, pkgsvc, storagesvc)
return srv.ListenAndServe()
},
func(err error) {
Expand Down
2 changes: 1 addition & 1 deletion docs/src/admin-manual/iac.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ The `*` attribute will provide full access to the API.
| POST | /package/{id}/move | `package:move` |
| GET | /package/{id}/preservation-actions | `package:listActions` |
| POST | /package/{id}/reject | `package:review` |
| POST | /upload/upload | `package:upload` |
| POST | /package/upload | `package:upload` |
| GET | /storage/location | `storage:location:list` |
| POST | /storage/location | `storage:location:create` |
| GET | /storage/location/{uuid} | `storage:location:read` |
Expand Down
14 changes: 11 additions & 3 deletions enduro.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ pollInterval = "10s"
# no time limit.
transferDeadline = "1h"

# TransferSourcePath is the path to an Archivematica transfer source directory.
# transferSourcePath is the path to an Archivematica transfer source directory.
# It is used in the POST /api/v2beta/package "path" parameter to start a
# transfer via the API. TransferSourcePath must be prefixed with the UUID of an
# AMSS transfer source directory, optionally followed by a relative path from
Expand Down Expand Up @@ -180,10 +180,18 @@ path = ""
passphrase = "" # Secret: set (if required) with env var ENDURO_AM_SFTP_PRIVATEKEY_PASSPHRASE.

[upload]
# maxSize is the maximum upload size allowed by the server in bytes.
# Default: 102400000.
maxSize = 102400000

# upload.bucket section configures a bucket where the files will be placed.
# Make sure it matches the configuration from one of the watchers to trigger
# the processing workflow after upload.
[upload.bucket]
endpoint = "http://minio.enduro-sdps:9000"
pathStyle = true
key = "minio"
secret = "minio123"
accessKey = "minio"
secretKey = "minio123"
region = "us-west-1"
bucket = "sips"

Expand Down
10 changes: 0 additions & 10 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@ import (
packagesvr "github.com/artefactual-sdps/enduro/internal/api/gen/http/package_/server"
storagesvr "github.com/artefactual-sdps/enduro/internal/api/gen/http/storage/server"
swaggersvr "github.com/artefactual-sdps/enduro/internal/api/gen/http/swagger/server"
uploadsvr "github.com/artefactual-sdps/enduro/internal/api/gen/http/upload/server"
"github.com/artefactual-sdps/enduro/internal/api/gen/package_"
"github.com/artefactual-sdps/enduro/internal/api/gen/storage"
"github.com/artefactual-sdps/enduro/internal/api/gen/upload"
intpkg "github.com/artefactual-sdps/enduro/internal/package_"
intstorage "github.com/artefactual-sdps/enduro/internal/storage"
intupload "github.com/artefactual-sdps/enduro/internal/upload"
"github.com/artefactual-sdps/enduro/internal/version"
)

Expand All @@ -46,7 +43,6 @@ func HTTPServer(
config *Config,
pkgsvc intpkg.Service,
storagesvc intstorage.Service,
uploadsvc intupload.Service,
) *http.Server {
dec := goahttp.RequestDecoder
enc := goahttp.ResponseEncoder
Expand All @@ -70,12 +66,6 @@ func HTTPServer(
storageServer.Download = writeTimeout(intstorage.Download(storagesvc, mux, dec), 0)
storagesvr.Mount(mux, storageServer)

// Upload service.
uploadEndpoints := upload.NewEndpoints(uploadsvc)
uploadErrorHandler := errorHandler(logger, "Upload error.")
uploadServer := uploadsvr.New(uploadEndpoints, mux, dec, enc, uploadErrorHandler, nil)
uploadsvr.Mount(mux, uploadServer)

// Swagger service.
swaggerService := swaggersvr.New(nil, nil, nil, nil, nil, nil, http.FS(openAPIJSON))
swaggersvr.Mount(mux, swaggerService)
Expand Down
2 changes: 1 addition & 1 deletion internal/api/design/design.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var _ = API("enduro", func() {
Title("Enduro API")
Randomizer(expr.NewDeterministicRandomizer())
Server("enduro", func() {
Services("package", "storage", "swagger", "upload")
Services("package", "storage", "swagger")
Host("localhost", func() {
URI("http://localhost:9000")
})
Expand Down
41 changes: 41 additions & 0 deletions internal/api/design/package_.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,47 @@ var _ = Service("package", func() {
Response("failed_dependency", StatusFailedDependency)
})
})
Method("upload", func() {
Description("Upload a package to trigger an ingest workflow")
Security(JWTAuth, func() {
Scope("package:upload")
})
Payload(func() {
Attribute("content_type", String, "Content-Type header, must define value for multipart boundary.", func() {
Default("multipart/form-data; boundary=goa")
Pattern("multipart/[^;]+; boundary=.+")
Example("multipart/form-data; boundary=goa")
})
Token("token", String)
})

Error(
"invalid_media_type",
ErrorResult,
"Error returned when the Content-Type header does not define a multipart request.",
)
Error(
"invalid_multipart_request",
ErrorResult,
"Error returned when the request body is not a valid multipart content.",
)
Error("internal_error", ErrorResult, "Fault while processing upload.")

HTTP(func() {
POST("/upload")
Header("content_type:Content-Type")

// Bypass request body decoder code generation to alleviate need for
// loading the entire request body in memory. The service gets
// direct access to the HTTP request body reader.
SkipRequestBodyEncodeDecode()

// Define error HTTP statuses.
Response("invalid_media_type", StatusBadRequest)
Response("invalid_multipart_request", StatusBadRequest)
Response("internal_error", StatusInternalServerError)
})
})
})

var EnumPackageStatus = func() {
Expand Down
57 changes: 0 additions & 57 deletions internal/api/design/upload.go

This file was deleted.

Loading

0 comments on commit a0d454d

Please sign in to comment.