Skip to content

Commit

Permalink
Send PIPs to Archivematica as BagIt bags
Browse files Browse the repository at this point in the history
Fixes #805

- Change the package type to "zipped bag" when starting a transfer via
  the Archivematica API
- Bag the PIP before sending it to Archivematica (if it's not already a
  bag)
- Add a "TransferSourcePath" config value to specify the API path to the
  Transfer Source directory where PIPs are uploaded
  • Loading branch information
djjuhasz committed Aug 16, 2024
1 parent 35208ef commit 0c97e80
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 37 deletions.
4 changes: 4 additions & 0 deletions cmd/enduro-am-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ func main() {
activities.NewBundleActivity(logger).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName},
)
w.RegisterActivityWithOptions(
bagit_activity.NewCreateBagActivity(cfg.BagIt).Execute,
temporalsdk_activity.RegisterOptions{Name: bagit_activity.CreateBagActivityName},
)

Check warning on line 251 in cmd/enduro-am-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-am-worker/main.go#L248-L251

Added lines #L248 - L251 were not covered by tests
w.RegisterActivityWithOptions(
activities.NewZipActivity(
logger,
Expand Down
24 changes: 20 additions & 4 deletions enduro.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ corsOrigin = "http://localhost"

[api.auth]
# Enable API authentication. OIDC is the only protocol supported at the
# moment. When enabled the API verifies the access token submitted with
# moment. When enabled the API verifies the access token submitted with
# each request. The API client is responsible for obtaining an access
# token from the provider.
enabled = true
Expand All @@ -29,7 +29,7 @@ enabled = true
# OIDC provider URL. Required when auth. is enabled.
providerURL = "http://keycloak:7470/realms/artefactual"
# OIDC client ID. The client ID must be included in the `aud` claim of
# the access token. Required when auth. is enabled.
# the access token. Required when auth. is enabled.
clientID = "enduro"

[api.auth.oidc.abac]
Expand All @@ -39,7 +39,7 @@ clientID = "enduro"
enabled = true
# Claim path of the Enduro attributes within the access token. If the claim
# path is nested then include all fields separated by `claimPathSeparator`
# (see below). E.g. "attributes.enduro" with `claimPathSeparator = "."`.
# (see below). E.g. "attributes.enduro" with `claimPathSeparator = "."`.
# Required when ABAC is enabled.
claimPath = "enduro"
# Separator used to split the claim path fields. The default value of "" will
Expand Down Expand Up @@ -152,12 +152,28 @@ pollInterval = "10s"
# no time limit.
transferDeadline = "1h"

# TransferSourcePath is the path to an Archivematica transfer source directory.
# It is used in the POST /api/v2beta/package "path" parameter to start a
# transfer via the API. TransferSourcePath must be prefixed with the UUID of an
# AMSS transfer source directory, optionally followed by a relative path from
# the source dir (e.g. "749ef452-fbed-4d50-9072-5f98bc01e52e:sftp_upload"). If
# no transferSourcPath is specified, the default transfer source path will be
# used.
transferSourcePath = ""

[am.sftp]
host = "" # The Archivematica Storage Service hostname.
port = ""
user = ""

# knownHostsFile is the absolute path to a local SSH "known_hosts" file that
# includes a public host key for the AM SFTP server.
# Default: "/home/[user]/.ssh/known_hosts" (where [user] is your local user).
knownHostsFile = ""
remoteDir = "/transfer_source"

# remoteDir is the directory path, relative to the SFTP root directory, where
# PIPs should be uploaded.
remoteDir = ""

[am.sftp.privateKey]
path = ""
Expand Down
5 changes: 5 additions & 0 deletions hack/kube/overlays/dev-am/enduro-am.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ spec:
secretKeyRef:
name: enduro-am-secret
key: api_key
- name: ENDURO_AM_TRANSFERSOURCEPATH
valueFrom:
secretKeyRef:
name: enduro-am-secret
key: transfer_source_path
- name: ENDURO_AM_SFTP_HOST
valueFrom:
secretKeyRef:
Expand Down
8 changes: 8 additions & 0 deletions internal/am/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ type Config struct {
// SFTP configuration for uploading transfers to Archivematica.
SFTP sftp.Config

// TransferSourcePath is the path to an Archivematica transfer source
// directory. It is used in the POST /api/v2beta/package "path" parameter
// to start a transfer via the API. TransferSourcePath must be prefixed with
// the UUID of an AMSS transfer source directory, optionally followed by a
// relative path from the source dir (e.g.
// "749ef452-fbed-4d50-9072-5f98bc01e52e:sftp_upload").
TransferSourcePath string

// Capacity sets the maximum number of worker sessions the worker can
// handle at one time (default: 1).
Capacity int
Expand Down
17 changes: 13 additions & 4 deletions internal/am/start_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package am

import (
context "context"
"path/filepath"

"github.com/go-logr/logr"
"go.artefactual.dev/amclient"
Expand All @@ -16,8 +17,12 @@ type StartTransferActivity struct {
}

type StartTransferActivityParams struct {
// Name of the transfer.
Name string
Path string

// RelativePath is the PIP path relative to the Archivematica transfer
// source directory.
RelativePath string
}

type StartTransferActivityResult struct {
Expand All @@ -40,7 +45,11 @@ func (a *StartTransferActivity) Execute(
ctx context.Context,
opts *StartTransferActivityParams,
) (*StartTransferActivityResult, error) {
a.logger.V(1).Info("Executing StartTransferActivity", "Name", opts.Name, "Path", opts.Path)
a.logger.V(1).Info(
"Executing StartTransferActivity",
"Name", opts.Name,
"RelativePath", opts.RelativePath,
)

processingConfig := a.cfg.ProcessingConfig
if processingConfig == "" {
Expand All @@ -49,8 +58,8 @@ func (a *StartTransferActivity) Execute(

payload, resp, err := a.amps.Create(ctx, &amclient.PackageCreateRequest{
Name: opts.Name,
Type: "zipfile",
Path: opts.Path,
Type: "zipped bag",
Path: filepath.Join(a.cfg.TransferSourcePath, opts.RelativePath),
ProcessingConfig: processingConfig,
AutoApprove: true,
})
Expand Down
13 changes: 7 additions & 6 deletions internal/am/start_transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ func TestStartTransferActivity(t *testing.T) {

transferID := uuid.New().String()
opts := am.StartTransferActivityParams{
Name: "Testing",
Path: "/tmp",
Name: "Testing",
RelativePath: "/tmp",
}

amcrDefault := func(m *amclienttest.MockPackageServiceMockRecorder, st http.Response) {
m.Create(
mockutil.Context(),
&amclient.PackageCreateRequest{
Name: opts.Name,
Type: "zipfile",
Path: opts.Path,
Type: "zipped bag",
Path: opts.RelativePath,
ProcessingConfig: "automated",
AutoApprove: true,
},
Expand All @@ -59,8 +59,8 @@ func TestStartTransferActivity(t *testing.T) {
mockutil.Context(),
&amclient.PackageCreateRequest{
Name: opts.Name,
Type: "zipfile",
Path: opts.Path,
Type: "zipped bag",
Path: opts.RelativePath,
ProcessingConfig: "automated",
AutoApprove: true,
},
Expand Down Expand Up @@ -117,6 +117,7 @@ func TestStartTransferActivity(t *testing.T) {

return
}
assert.NilError(t, err)

var r am.StartTransferActivityResult
err = future.Get(&r)
Expand Down
15 changes: 9 additions & 6 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/artefactual-sdps/temporal-activities/archive"
"github.com/artefactual-sdps/temporal-activities/bagit"
"github.com/google/uuid"
"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
Expand Down Expand Up @@ -40,6 +41,7 @@ type Configuration struct {
AM am.Config
InternalAPI api.Config
API api.Config
BagIt bagit.Config
Database db.Config
Event event.Config
ExtractActivity archive.Config
Expand All @@ -52,13 +54,14 @@ type Configuration struct {
Telemetry telemetry.Config
}

func (c Configuration) Validate() error {
func (c *Configuration) Validate() error {
// TODO: should this validate all the fields in Configuration?
apiAuthErr := c.API.Auth.Validate()
preprocessingErr := c.Preprocessing.Validate()
uploadErr := c.Upload.Validate()

return errors.Join(apiAuthErr, preprocessingErr, uploadErr)
return errors.Join(
c.API.Auth.Validate(),
c.BagIt.Validate(),
c.Preprocessing.Validate(),
c.Upload.Validate(),
)
}

func Read(config *Configuration, configFile string) (found bool, configFileUsed string, err error) {
Expand Down
1 change: 1 addition & 0 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func TestConfig(t *testing.T) {
assert.Equal(t, c.AM.Capacity, 1)
assert.Equal(t, c.AM.PollInterval, 10*time.Second)
assert.Equal(t, c.API.Listen, "127.0.0.1:9000")
assert.Equal(t, c.BagIt.ChecksumAlgorithm, "sha512")
assert.Equal(t, c.DebugListen, "127.0.0.1:9001")
assert.Equal(t, c.Preservation.TaskQueue, temporal.A3mWorkerTaskQueue)
assert.Equal(t, c.Storage.TaskQueue, temporal.GlobalTaskQueue)
Expand Down
3 changes: 2 additions & 1 deletion internal/sftp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ type Config struct {
// Private key used for authentication.
PrivateKey PrivateKey

// Default directory on SFTP server for file transfers.
// RemoteDir is the directory path, relative to the SFTP root directory,
// where PIPs should be uploaded.
RemoteDir string
}

Expand Down
50 changes: 35 additions & 15 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,11 @@ func (w *ProcessingWorkflow) waitForReview(ctx temporalsdk_workflow.Context) *pa
return &review
}

func (w *ProcessingWorkflow) transferA3m(sessCtx temporalsdk_workflow.Context, tinfo *TransferInfo, cleanup *cleanupRegistry) error {
func (w *ProcessingWorkflow) transferA3m(
sessCtx temporalsdk_workflow.Context,
tinfo *TransferInfo,
cleanup *cleanupRegistry,
) error {
// Bundle PIP as an Archivematica standard transfer.
{
activityOpts := withActivityOptsForLongLivedRequest(sessCtx)
Expand Down Expand Up @@ -814,7 +818,8 @@ func (w *ProcessingWorkflow) transferA3m(sessCtx temporalsdk_workflow.Context, t
}

result := a3m.CreateAIPActivityResult{}
err := temporalsdk_workflow.ExecuteActivity(activityOpts, a3m.CreateAIPActivityName, params).Get(sessCtx, &result)
err := temporalsdk_workflow.ExecuteActivity(activityOpts, a3m.CreateAIPActivityName, params).
Get(sessCtx, &result)
if err != nil {
return err

Check warning on line 824 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L824

Added line #L824 was not covered by tests
}
Expand All @@ -827,11 +832,26 @@ func (w *ProcessingWorkflow) transferA3m(sessCtx temporalsdk_workflow.Context, t
return nil
}

func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, tinfo *TransferInfo) error {
func (w *ProcessingWorkflow) transferAM(ctx temporalsdk_workflow.Context, tinfo *TransferInfo) error {
var err error

// Zip transfer.
activityOpts := withActivityOptsForLongLivedRequest(sessCtx)
// Bag PIP if it's not already a bag.
if tinfo.PackageType != enums.PackageTypeBagIt {
lctx := withActivityOptsForLocalAction(ctx)
var zipResult bagit_activity.CreateBagActivityResult
err = temporalsdk_workflow.ExecuteActivity(
lctx,
bagit_activity.CreateBagActivityName,
&bagit_activity.CreateBagActivityParams{SourcePath: tinfo.TempPath},
).Get(lctx, &zipResult)
if err != nil {
return err

Check warning on line 848 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L848

Added line #L848 was not covered by tests
}
tinfo.PackageType = enums.PackageTypeBagIt
}

// Zip PIP.
activityOpts := withActivityOptsForLongLivedRequest(ctx)
var zipResult activities.ZipActivityResult
err = temporalsdk_workflow.ExecuteActivity(
activityOpts,
Expand All @@ -842,8 +862,8 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti
return err
}

// Upload transfer to AMSS.
activityOpts = temporalsdk_workflow.WithActivityOptions(sessCtx,
// Upload PIP to AMSS.
activityOpts = temporalsdk_workflow.WithActivityOptions(ctx,
temporalsdk_workflow.ActivityOptions{
StartToCloseTimeout: time.Hour * 2,
HeartbeatTimeout: 2 * tinfo.req.PollInterval,
Expand All @@ -868,22 +888,22 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti
}

// Start AM transfer.
activityOpts = withActivityOptsForRequest(sessCtx)
activityOpts = withActivityOptsForRequest(ctx)
transferResult := am.StartTransferActivityResult{}
err = temporalsdk_workflow.ExecuteActivity(
activityOpts,
am.StartTransferActivityName,
&am.StartTransferActivityParams{
Name: tinfo.req.Key,
Path: uploadResult.RemoteFullPath,
Name: tinfo.req.Key,
RelativePath: uploadResult.RemoteRelativePath,
},
).Get(activityOpts, &transferResult)
if err != nil {
return err
}

pollOpts := temporalsdk_workflow.WithActivityOptions(
sessCtx,
ctx,
temporalsdk_workflow.ActivityOptions{
HeartbeatTimeout: 2 * tinfo.req.PollInterval,
StartToCloseTimeout: tinfo.req.TransferDeadline,
Expand Down Expand Up @@ -928,11 +948,11 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti
}

// Set AIP "stored at" time.
tinfo.StoredAt = temporalsdk_workflow.Now(sessCtx).UTC()
tinfo.StoredAt = temporalsdk_workflow.Now(ctx).UTC()

// Set package location
{
ctx := withLocalActivityOpts(sessCtx)
ctx := withLocalActivityOpts(ctx)
err := temporalsdk_workflow.ExecuteLocalActivity(
ctx,
setLocationIDLocalActivity,
Expand All @@ -947,7 +967,7 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti

// Create storage package record and set location to AMSS location.
{
activityOpts := withLocalActivityOpts(sessCtx)
activityOpts := withLocalActivityOpts(ctx)
err := temporalsdk_workflow.ExecuteActivity(
activityOpts,
activities.CreateStoragePackageActivityName,
Expand All @@ -965,7 +985,7 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti
}

// Delete transfer.
activityOpts = withActivityOptsForRequest(sessCtx)
activityOpts = withActivityOptsForRequest(ctx)
err = temporalsdk_workflow.ExecuteActivity(activityOpts, am.DeleteTransferActivityName, am.DeleteTransferActivityParams{
Destination: uploadResult.RemoteRelativePath,
}).
Expand Down
12 changes: 11 additions & 1 deletion internal/workflow/processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ func (s *ProcessingWorkflowTestSuite) setupAMWorkflowTest(
clock := clockwork.NewFakeClock()
sftpc := sftp_fake.NewMockClient(ctrl)

s.env.RegisterActivityWithOptions(
bagit_activity.NewCreateBagActivity(bagit_activity.Config{}).Execute,
temporalsdk_activity.RegisterOptions{Name: bagit_activity.CreateBagActivityName},
)
s.env.RegisterActivityWithOptions(
activities.NewZipActivity(logger).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.ZipActivityName},
Expand Down Expand Up @@ -602,6 +606,12 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() {
)

// Archivematica specific activities.
s.env.OnActivity(bagit_activity.CreateBagActivityName, sessionCtx,
&bagit_activity.CreateBagActivityParams{SourcePath: extractPath},
).Return(
&bagit_activity.CreateBagActivityResult{BagPath: extractPath}, nil,
)

s.env.OnActivity(activities.ZipActivityName, sessionCtx,
&activities.ZipActivityParams{SourceDir: extractPath},
).Return(
Expand All @@ -618,7 +628,7 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() {
)

s.env.OnActivity(am.StartTransferActivityName, sessionCtx,
&am.StartTransferActivityParams{Name: key, Path: "transfer.zip"},
&am.StartTransferActivityParams{Name: key, RelativePath: "transfer.zip"},
).Return(
&am.StartTransferActivityResult{TransferID: transferID.String()}, nil,
)
Expand Down

0 comments on commit 0c97e80

Please sign in to comment.