Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove working dirs when worker session completes #946

Merged
merged 2 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/enduro-a3m-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"ariga.io/sqlcomment"
"entgo.io/ent/dialect/sql"
"github.com/artefactual-sdps/temporal-activities/archive"
"github.com/artefactual-sdps/temporal-activities/filesys"
"github.com/hashicorp/go-cleanhttp"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -224,8 +225,8 @@
temporalsdk_activity.RegisterOptions{Name: a3m.CreateAIPActivityName},
)
w.RegisterActivityWithOptions(
activities.NewCleanUpActivity().Execute,
temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName},
filesys.NewRemoveActivity().Execute,
temporalsdk_activity.RegisterOptions{Name: filesys.RemoveActivityName},

Check warning on line 229 in cmd/enduro-a3m-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-a3m-worker/main.go#L228-L229

Added lines #L228 - L229 were not covered by tests
)

httpClient := cleanhttp.DefaultPooledClient()
Expand Down
5 changes: 3 additions & 2 deletions cmd/enduro-am-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"ariga.io/sqlcomment"
"entgo.io/ent/dialect/sql"
"github.com/artefactual-sdps/temporal-activities/archive"
"github.com/artefactual-sdps/temporal-activities/filesys"
"github.com/hashicorp/go-cleanhttp"
"github.com/jonboulle/clockwork"
"github.com/oklog/run"
Expand Down Expand Up @@ -267,8 +268,8 @@
temporalsdk_activity.RegisterOptions{Name: activities.CreateStoragePackageActivityName},
)
w.RegisterActivityWithOptions(
activities.NewCleanUpActivity().Execute,
temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName},
filesys.NewRemoveActivity().Execute,
temporalsdk_activity.RegisterOptions{Name: filesys.RemoveActivityName},

Check warning on line 272 in cmd/enduro-am-worker/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/enduro-am-worker/main.go#L271-L272

Added lines #L271 - L272 were not covered by tests
)

g.Add(
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
entgo.io/ent v0.13.1
github.com/XSAM/otelsql v0.29.0
github.com/alicebob/miniredis/v2 v2.32.1
github.com/artefactual-sdps/temporal-activities v0.0.0-20240429162703-eae3871dc67b
github.com/artefactual-sdps/temporal-activities v0.0.0-20240530153613-e610eaf238ed
github.com/coreos/go-oidc/v3 v3.9.0
github.com/cyphar/filepath-securejoin v0.2.4
github.com/dolmen-go/contextio v1.0.0
Expand All @@ -28,7 +28,7 @@ require (
github.com/jonboulle/clockwork v0.4.0
github.com/mattn/go-sqlite3 v1.14.22
github.com/mitchellh/mapstructure v1.5.0
github.com/nyudlts/go-bagit v0.2.0-alpha
github.com/nyudlts/go-bagit v0.3.0-alpha.0.20240515212815-8dab411c23af
github.com/oklog/run v1.1.0
github.com/otiai10/copy v1.14.0
github.com/pkg/sftp v1.13.6
Expand All @@ -42,7 +42,7 @@ require (
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.9.0
go.artefactual.dev/amclient v0.3.0
go.artefactual.dev/tools v0.10.0
go.artefactual.dev/tools v0.14.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.49.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,8 @@ github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuW
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apparentlymart/go-textseg/v15 v15.0.0 h1:uYvfpb3DyLSCGWnctWKGj857c6ew1u1fNQOlOtuGxQY=
github.com/apparentlymart/go-textseg/v15 v15.0.0/go.mod h1:K8XmNZdhEBkdlyDdvbmmsvpAG721bKi0joRfFdHIWJ4=
github.com/artefactual-sdps/temporal-activities v0.0.0-20240429162703-eae3871dc67b h1:B5OB22TprYqH2qHMU8ywVf2BnE9rlVZT9xuXZs2nZnI=
github.com/artefactual-sdps/temporal-activities v0.0.0-20240429162703-eae3871dc67b/go.mod h1:uf0jIGyZGHi3oTfhg+QkwCkyTaGhtAwromhwouC1FhU=
github.com/artefactual-sdps/temporal-activities v0.0.0-20240530153613-e610eaf238ed h1:VtNg8qJkRCbavlocpZweq+PwOT5UpQSeRrZmx4Y3XmM=
github.com/artefactual-sdps/temporal-activities v0.0.0-20240530153613-e610eaf238ed/go.mod h1:C6z/8k6xFm9wrF4GSMKs13v941MtdrOzH2fn8hQEHtA=
github.com/aws/aws-sdk-go v1.50.36 h1:PjWXHwZPuTLMR1NIb8nEjLucZBMzmf84TLoLbD8BZqk=
github.com/aws/aws-sdk-go v1.50.36/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA=
Expand Down Expand Up @@ -812,8 +812,8 @@ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/nwaples/rardecode/v2 v2.0.0-beta.2 h1:e3mzJFJs4k83GXBEiTaQ5HgSc/kOK8q0rDaRO0MPaOk=
github.com/nwaples/rardecode/v2 v2.0.0-beta.2/go.mod h1:yntwv/HfMc/Hbvtq9I19D1n58te3h6KsqCf3GxyfBGY=
github.com/nyudlts/go-bagit v0.2.0-alpha h1:S3BHNx3qL2AcqD3JP3NAusnmWFHMiMVu9NxXeXlHpig=
github.com/nyudlts/go-bagit v0.2.0-alpha/go.mod h1:W9m2WmCtv7IRASziKJ/8tqVCkIU8ffe7QecZCRtqGy0=
github.com/nyudlts/go-bagit v0.3.0-alpha.0.20240515212815-8dab411c23af h1:I3StjEXH279zjQyXyBFuTyf+ga1sdySf0C2xtpHU0Ag=
github.com/nyudlts/go-bagit v0.3.0-alpha.0.20240515212815-8dab411c23af/go.mod h1:ASz84B/bXWNXm84rt+eYAs4vUJqa2C7V/kzHSVVRxl8=
github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
Expand Down Expand Up @@ -929,8 +929,8 @@ github.com/zclconf/go-cty v1.14.1 h1:t9fyA35fwjjUMcmL5hLER+e/rEPqrbCK1/OSE4SI9KA
github.com/zclconf/go-cty v1.14.1/go.mod h1:VvMs5i0vgZdhYawQNq5kePSpLAoz8u1xvZgrPIxfnZE=
go.artefactual.dev/amclient v0.3.0 h1:biXZobSWJCzwpJOYtlQzOkClOv2u5mB1TKA4023B7zw=
go.artefactual.dev/amclient v0.3.0/go.mod h1:jAJ98cSJ7QDSgMq/YUkmGpAJ4ssSzbIUN7XpynCa6uA=
go.artefactual.dev/tools v0.10.0 h1:+LeZS5oHupAQBXvLQ4aGIuZyqf7zCpD7s3UpyDl9zn4=
go.artefactual.dev/tools v0.10.0/go.mod h1:PIy0RtC45gC4sASb4r26g0aCU24kSWIp+mcV1p2gtpY=
go.artefactual.dev/tools v0.14.0 h1:ESLbemsnkdIPmYXtz0uZTcPqVnTUXIEZd9DSTRyTZqY=
go.artefactual.dev/tools v0.14.0/go.mod h1:5RJ7ObocHZv/zQFYFv/zG9cW/UVRGPFywcJx/oQ+TG8=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
Expand Down
2 changes: 2 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1394,6 +1394,7 @@ github.com/neo4j/neo4j-go-driver v1.8.1-0.20200803113522-b626aa943eba/go.mod h1:
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/nyudlts/go-bagit v0.3.0-alpha.0.20240515212815-8dab411c23af h1:I3StjEXH279zjQyXyBFuTyf+ga1sdySf0C2xtpHU0Ag=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
Expand Down Expand Up @@ -1490,6 +1491,7 @@ github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
gitlab.com/nyarla/go-crypt v0.0.0-20160106005555-d9a5dc2b789b h1:7gd+rd8P3bqcn/96gOZa3F5dpJr/vEiDQYlNb/y2uNs=
gitlab.com/nyarla/go-crypt v0.0.0-20160106005555-d9a5dc2b789b/go.mod h1:T3BPAOm2cqquPa0MKWeNkmOM5RQsRhkrwMWonFMN7fE=
go.artefactual.dev/tools v0.14.0 h1:ESLbemsnkdIPmYXtz0uZTcPqVnTUXIEZd9DSTRyTZqY=
go.einride.tech/aip v0.66.0 h1:XfV+NQX6L7EOYK11yoHHFtndeaWh3KbD9/cN/6iWEt8=
go.einride.tech/aip v0.66.0/go.mod h1:qAhMsfT7plxBX+Oy7Huol6YUvZ0ZzdUz26yZsQwfl1M=
go.etcd.io/etcd/api/v3 v3.5.10 h1:szRajuUUbLyppkhs9K6BRtjY37l66XQQmw7oZRANE4k=
Expand Down
19 changes: 17 additions & 2 deletions internal/bagit/bagit.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,25 @@ import (
go_bagit "github.com/nyudlts/go-bagit"
)

// Complete tests whether the bag at path has the expected number of files and
// total size on disk indicated by the Payload-Oxum, but doesn't do checksum
// validation.
func Complete(path string) error {
return go_bagit.ValidateBag(path, false, false)
bag, err := go_bagit.GetExistingBag(path)
if err != nil {
return err
}

return bag.ValidateBag(false, true)
}

// Valid tests whether the bag at path is complete and the file checksums are
// valid.
func Valid(path string) error {
return go_bagit.ValidateBag(path, false, true)
bag, err := go_bagit.GetExistingBag(path)
if err != nil {
return err
}

return bag.ValidateBag(false, false)
}
12 changes: 11 additions & 1 deletion internal/bagit/bagit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,17 @@ func TestBagit(t *testing.T) {
)
assert.ErrorContains(
t,
bagit.Complete("./tests/test-bagged-transfer-with-invalid-checksums"),
bagit.Complete("./tests/nobag"),
"- ERROR - input ./tests/nobag directory does not exist",
)
assert.ErrorContains(
t,
bagit.Valid("./tests/test-bagged-transfer-with-invalid-checksums"),
"Bag validation failed: data/adios.txt sha256 validation failed",
)
assert.ErrorContains(
t,
bagit.Valid("./tests/nobag"),
"- ERROR - input ./tests/nobag directory does not exist",
)
}
32 changes: 0 additions & 32 deletions internal/workflow/activities/cleanup.go

This file was deleted.

68 changes: 53 additions & 15 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
"errors"
"fmt"
"path/filepath"
"slices"
"time"

"github.com/artefactual-sdps/temporal-activities/archive"
"github.com/artefactual-sdps/temporal-activities/filesys"
"github.com/go-logr/logr"
"github.com/google/uuid"
"go.artefactual.dev/tools/ref"
Expand Down Expand Up @@ -103,6 +105,44 @@
return fsutil.BaseNoExt(t.req.Key)
}

// cleanupRegistry contains items that should be cleaned up when a workflow
// session completes.
type cleanupRegistry struct {
// tempDirs are working directories registered for deletion during cleanup.
tempDirs []string
}

// registerPath registers a filepath for deletion when a workflow session
// completes.
func (c *cleanupRegistry) registerPath(path string) {
if path == "" || slices.Contains(c.tempDirs, path) {
return

Check warning on line 119 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L119

Added line #L119 was not covered by tests
}
c.tempDirs = append(c.tempDirs, path)
}

func (w *ProcessingWorkflow) sessionCleanup(ctx temporalsdk_workflow.Context, cleanup *cleanupRegistry) {
ctx = temporalsdk_workflow.WithActivityOptions(ctx, temporalsdk_workflow.ActivityOptions{
StartToCloseTimeout: time.Second,
RetryPolicy: &temporalsdk_temporal.RetryPolicy{
MaximumAttempts: 1,
},
})

err := temporalsdk_workflow.ExecuteActivity(
ctx,
filesys.RemoveActivityName,
filesys.RemoveActivityParams{Paths: cleanup.tempDirs},
).Get(ctx, nil)
if err != nil {
w.logger.V(1).Info("session cleanup: error(s) removing temporary directories",
"errors", err.Error(),
)

Check warning on line 140 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L138-L140

Added lines #L138 - L140 were not covered by tests
}

temporalsdk_workflow.CompleteSession(ctx)
}

// ProcessingWorkflow orchestrates all the activities related to the processing
// of a SIP in Archivematica, including is retrieval, creation of transfer,
// etc...
Expand Down Expand Up @@ -276,7 +316,8 @@
attempt int,
tinfo *TransferInfo,
) error {
defer temporalsdk_workflow.CompleteSession(sessCtx)
var cleanup cleanupRegistry
defer w.sessionCleanup(sessCtx, &cleanup)

packageStartedAt := temporalsdk_workflow.Now(sessCtx).UTC()

Expand Down Expand Up @@ -332,6 +373,9 @@
return err
}
tinfo.TempPath = downloadResult.Path

// Delete download tmp dir when session ends.
cleanup.registerPath(filepath.Dir(downloadResult.Path))
}

// Unarchive the transfer if it's not a directory and it's not part of the preprocessing child workflow.
Expand Down Expand Up @@ -361,7 +405,7 @@
return err
}

// Bundle.
// Bundle transfer as an Archivematica standard transfer.
{
// For the a3m workflow bundle the transfer to a directory shared with
// the a3m container.
Expand All @@ -386,19 +430,10 @@
}

tinfo.Bundle = bundleResult
}

// Delete local temporary files.
defer func() {
// TODO: call clean up here to enforce that we always destroy TempDir.
if tinfo.Bundle.FullPath != "" {
activityOpts := withActivityOptsForRequest(sessCtx)
_ = temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{
FullPath: tinfo.Bundle.FullPath,
}).
Get(activityOpts, nil)
}
}()
// Delete bundled transfer when session ends.
cleanup.registerPath(bundleResult.FullPath)
}

// Do preservation activities.
{
Expand Down Expand Up @@ -698,12 +733,15 @@

result := a3m.CreateAIPActivityResult{}
err := temporalsdk_workflow.ExecuteActivity(activityOpts, a3m.CreateAIPActivityName, params).Get(sessCtx, &result)
if err != nil {
return err

Check warning on line 737 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L737

Added line #L737 was not covered by tests
}

tinfo.SIPID = result.UUID
tinfo.AIPPath = result.Path
tinfo.StoredAt = temporalsdk_workflow.Now(sessCtx).UTC()

return err
return nil
}

func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, tinfo *TransferInfo) error {
Expand Down
Loading
Loading