From 0ecec2a2b746a5ecd23c5e38468337ad88680727 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Raddaoui=20Mar=C3=ADn?= Date: Wed, 4 Sep 2024 02:54:57 +0200 Subject: [PATCH] Use logger interceptor in Temporal workers Follow https://enduro.readthedocs.io/dev-manual/logging recommendations. - Use `go.artefactual.dev/tools/temporal` to set the clients logger. - Use replay-aware Temporal SDK logger from processing workflow. - Add logger interceptor to all workers. - Use `go.artefactual.dev/tools/temporal` `GetLogger` in activities. - Remove logger usage from local activities. --- cmd/enduro-a3m-worker/main.go | 13 ++++--- cmd/enduro-am-worker/main.go | 20 +++++----- cmd/enduro/main.go | 14 ++++--- internal/a3m/a3m.go | 8 ++-- internal/a3m/a3m_test.go | 2 - internal/am/delete_transfer.go | 10 ++--- internal/am/delete_transfer_test.go | 3 +- internal/am/poll_ingest.go | 8 ++-- internal/am/poll_ingest_test.go | 2 - internal/am/poll_transfer.go | 7 +--- internal/am/poll_transfer_test.go | 2 - internal/am/start_transfer.go | 17 ++++----- internal/am/start_transfer_test.go | 3 +- internal/am/upload_transfer.go | 13 ++----- internal/am/upload_transfer_test.go | 3 +- internal/temporal/log.go | 35 ------------------ internal/workflow/activities/bundle.go | 12 +++--- internal/workflow/activities/bundle_test.go | 3 +- .../workflow/activities/classify_package.go | 13 +++---- .../activities/classify_package_test.go | 3 +- internal/workflow/activities/download.go | 8 ++-- internal/workflow/activities/download_test.go | 3 +- internal/workflow/local_activities.go | 5 --- internal/workflow/move.go | 5 +-- internal/workflow/move_test.go | 4 +- internal/workflow/processing.go | 27 +++++++------- internal/workflow/processing_test.go | 37 ++++++------------- 27 files changed, 100 insertions(+), 180 deletions(-) delete mode 100644 internal/temporal/log.go diff --git a/cmd/enduro-a3m-worker/main.go b/cmd/enduro-a3m-worker/main.go index f04030847..a25f54e67 100644 --- a/cmd/enduro-a3m-worker/main.go +++ b/cmd/enduro-a3m-worker/main.go @@ -22,6 +22,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/pflag" "go.artefactual.dev/tools/log" + temporal_tools "go.artefactual.dev/tools/temporal" "go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" temporalsdk_activity "go.temporal.io/sdk/activity" @@ -118,7 +119,7 @@ func main() { temporalClient, err := temporalsdk_client.Dial(temporalsdk_client.Options{ Namespace: cfg.Temporal.Namespace, HostPort: cfg.Temporal.Address, - Logger: temporal.Logger(logger.WithName("temporal-client")), + Logger: temporal_tools.Logger(logger.WithName("temporal-client")), Interceptors: []temporalsdk_interceptor.ClientInterceptor{tracingInterceptor}, }) if err != nil { @@ -212,6 +213,9 @@ func main() { EnableSessionWorker: true, MaxConcurrentSessionExecutionSize: cfg.A3m.Capacity, MaxConcurrentActivityExecutionSize: 1, + Interceptors: []temporalsdk_interceptor.WorkerInterceptor{ + temporal_tools.NewLoggerInterceptor(logger), + }, } w := temporalsdk_worker.New(temporalClient, temporal.A3mWorkerTaskQueue, workerOpts) if err != nil { @@ -220,7 +224,7 @@ func main() { } w.RegisterActivityWithOptions( - activities.NewDownloadActivity(logger, tp.Tracer(activities.DownloadActivityName), wsvc).Execute, + activities.NewDownloadActivity(tp.Tracer(activities.DownloadActivityName), wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName}, ) w.RegisterActivityWithOptions( @@ -228,7 +232,7 @@ func main() { temporalsdk_activity.RegisterOptions{Name: archiveextract.Name}, ) w.RegisterActivityWithOptions( - activities.NewClassifyPackageActivity(logger).Execute, + activities.NewClassifyPackageActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.ClassifyPackageActivityName}, ) w.RegisterActivityWithOptions( @@ -236,12 +240,11 @@ func main() { temporalsdk_activity.RegisterOptions{Name: bagvalidate.Name}, ) w.RegisterActivityWithOptions( - activities.NewBundleActivity(logger).Execute, + activities.NewBundleActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName}, ) w.RegisterActivityWithOptions( a3m.NewCreateAIPActivity( - logger, tp.Tracer(a3m.CreateAIPActivityName), a3mClient.TransferClient, &cfg.A3m, diff --git a/cmd/enduro-am-worker/main.go b/cmd/enduro-am-worker/main.go index 6524d0cd5..43e6f9856 100644 --- a/cmd/enduro-am-worker/main.go +++ b/cmd/enduro-am-worker/main.go @@ -26,6 +26,7 @@ import ( "github.com/spf13/pflag" "go.artefactual.dev/amclient" "go.artefactual.dev/tools/log" + temporal_tools "go.artefactual.dev/tools/temporal" "go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel/trace" @@ -125,7 +126,7 @@ func main() { temporalClient, err := temporalsdk_client.Dial(temporalsdk_client.Options{ Namespace: cfg.Temporal.Namespace, HostPort: cfg.Temporal.Address, - Logger: temporal.Logger(logger.WithName("temporal-client")), + Logger: temporal_tools.Logger(logger.WithName("temporal-client")), Interceptors: []temporalsdk_interceptor.ClientInterceptor{tracingInterceptor}, }) if err != nil { @@ -209,6 +210,9 @@ func main() { EnableSessionWorker: true, MaxConcurrentSessionExecutionSize: cfg.AM.Capacity, MaxConcurrentActivityExecutionSize: 1, + Interceptors: []temporalsdk_interceptor.WorkerInterceptor{ + temporal_tools.NewLoggerInterceptor(logger), + }, } w := temporalsdk_worker.New(temporalClient, temporal.AmWorkerTaskQueue, workerOpts) if err != nil { @@ -228,7 +232,7 @@ func main() { sftpClient := sftp.NewGoClient(logger, cfg.AM.SFTP) w.RegisterActivityWithOptions( - activities.NewDownloadActivity(logger, tp.Tracer(activities.DownloadActivityName), wsvc).Execute, + activities.NewDownloadActivity(tp.Tracer(activities.DownloadActivityName), wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName}, ) w.RegisterActivityWithOptions( @@ -236,7 +240,7 @@ func main() { temporalsdk_activity.RegisterOptions{Name: archiveextract.Name}, ) w.RegisterActivityWithOptions( - activities.NewClassifyPackageActivity(logger).Execute, + activities.NewClassifyPackageActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.ClassifyPackageActivityName}, ) w.RegisterActivityWithOptions( @@ -244,7 +248,7 @@ func main() { temporalsdk_activity.RegisterOptions{Name: bagvalidate.Name}, ) w.RegisterActivityWithOptions( - activities.NewBundleActivity(logger).Execute, + activities.NewBundleActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName}, ) w.RegisterActivityWithOptions( @@ -256,20 +260,19 @@ func main() { temporalsdk_activity.RegisterOptions{Name: archivezip.Name}, ) w.RegisterActivityWithOptions( - am.NewUploadTransferActivity(logger, sftpClient, cfg.AM.PollInterval).Execute, + am.NewUploadTransferActivity(sftpClient, cfg.AM.PollInterval).Execute, temporalsdk_activity.RegisterOptions{Name: am.UploadTransferActivityName}, ) w.RegisterActivityWithOptions( - am.NewDeleteTransferActivity(logger, sftpClient).Execute, + am.NewDeleteTransferActivity(sftpClient).Execute, temporalsdk_activity.RegisterOptions{Name: am.DeleteTransferActivityName}, ) w.RegisterActivityWithOptions( - am.NewStartTransferActivity(logger, &cfg.AM, amc.Package).Execute, + am.NewStartTransferActivity(&cfg.AM, amc.Package).Execute, temporalsdk_activity.RegisterOptions{Name: am.StartTransferActivityName}, ) w.RegisterActivityWithOptions( am.NewPollTransferActivity( - logger, &cfg.AM, clockwork.NewRealClock(), amc.Transfer, @@ -280,7 +283,6 @@ func main() { ) w.RegisterActivityWithOptions( am.NewPollIngestActivity( - logger, &cfg.AM, clockwork.NewRealClock(), amc.Ingest, diff --git a/cmd/enduro/main.go b/cmd/enduro/main.go index 27d20fa70..a99d7d6e4 100644 --- a/cmd/enduro/main.go +++ b/cmd/enduro/main.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/pflag" "go.artefactual.dev/tools/log" + temporal_tools "go.artefactual.dev/tools/temporal" "go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel/codes" @@ -49,7 +50,6 @@ import ( storage_entdb "github.com/artefactual-sdps/enduro/internal/storage/persistence/ent/db" storage_workflows "github.com/artefactual-sdps/enduro/internal/storage/workflows" "github.com/artefactual-sdps/enduro/internal/telemetry" - "github.com/artefactual-sdps/enduro/internal/temporal" "github.com/artefactual-sdps/enduro/internal/upload" "github.com/artefactual-sdps/enduro/internal/version" "github.com/artefactual-sdps/enduro/internal/watcher" @@ -149,7 +149,7 @@ func main() { temporalClient, err := temporalsdk_client.Dial(temporalsdk_client.Options{ Namespace: cfg.Temporal.Namespace, HostPort: cfg.Temporal.Address, - Logger: temporal.Logger(logger.WithName("temporal-client")), + Logger: temporal_tools.Logger(logger.WithName("temporal-client")), Interceptors: []temporalsdk_interceptor.ClientInterceptor{tracingInterceptor}, }) if err != nil { @@ -423,7 +423,11 @@ func main() { // Workflow and activity worker. { done := make(chan struct{}) - workerOpts := temporalsdk_worker.Options{} + workerOpts := temporalsdk_worker.Options{ + Interceptors: []temporalsdk_interceptor.WorkerInterceptor{ + temporal_tools.NewLoggerInterceptor(logger.WithName("worker")), + }, + } w := temporalsdk_worker.New(temporalClient, cfg.Temporal.TaskQueue, workerOpts) if err != nil { logger.Error(err, "Error creating Temporal worker.") @@ -431,7 +435,7 @@ func main() { } w.RegisterWorkflowWithOptions( - workflow.NewProcessingWorkflow(logger, cfg, rand.Reader, pkgsvc, wsvc).Execute, + workflow.NewProcessingWorkflow(cfg, rand.Reader, pkgsvc, wsvc).Execute, temporalsdk_workflow.RegisterOptions{Name: package_.ProcessingWorkflowName}, ) w.RegisterActivityWithOptions( @@ -458,7 +462,7 @@ func main() { ) w.RegisterWorkflowWithOptions( - workflow.NewMoveWorkflow(logger, pkgsvc).Execute, + workflow.NewMoveWorkflow(pkgsvc).Execute, temporalsdk_workflow.RegisterOptions{Name: package_.MoveWorkflowName}, ) diff --git a/internal/a3m/a3m.go b/internal/a3m/a3m.go index ef9685f8a..8aa96318d 100644 --- a/internal/a3m/a3m.go +++ b/internal/a3m/a3m.go @@ -9,8 +9,8 @@ import ( "buf.build/gen/go/artefactual/a3m/grpc/go/a3m/api/transferservice/v1beta1/transferservicev1beta1grpc" transferservice "buf.build/gen/go/artefactual/a3m/protocolbuffers/go/a3m/api/transferservice/v1beta1" - "github.com/go-logr/logr" "github.com/oklog/run" + temporal_tools "go.artefactual.dev/tools/temporal" "go.opentelemetry.io/otel/trace" temporalsdk_activity "go.temporal.io/sdk/activity" "google.golang.org/grpc" @@ -24,7 +24,6 @@ import ( const CreateAIPActivityName = "create-aip-activity" type CreateAIPActivity struct { - logger logr.Logger tracer trace.Tracer client transferservicev1beta1grpc.TransferServiceClient cfg *Config @@ -43,14 +42,12 @@ type CreateAIPActivityResult struct { } func NewCreateAIPActivity( - logger logr.Logger, tracer trace.Tracer, client transferservicev1beta1grpc.TransferServiceClient, cfg *Config, pkgsvc package_.Service, ) *CreateAIPActivity { return &CreateAIPActivity{ - logger: logger, tracer: tracer, client: client, cfg: cfg, @@ -62,6 +59,7 @@ func (a *CreateAIPActivity) Execute( ctx context.Context, opts *CreateAIPActivityParams, ) (*CreateAIPActivityResult, error) { + logger := temporal_tools.GetLogger(ctx) result := &CreateAIPActivityResult{} var g run.Group @@ -147,7 +145,7 @@ func (a *CreateAIPActivity) Execute( } result.Path = fmt.Sprintf("%s/completed/%s-%s.7z", a.cfg.ShareDir, opts.Name, result.UUID) - a.logger.Info("We have run a3m successfully", "path", result.Path) + logger.Info("We have run a3m successfully", "path", result.Path) break } diff --git a/internal/a3m/a3m_test.go b/internal/a3m/a3m_test.go index c39147e2d..c0d53a7e6 100644 --- a/internal/a3m/a3m_test.go +++ b/internal/a3m/a3m_test.go @@ -6,7 +6,6 @@ import ( "time" transferservice "buf.build/gen/go/artefactual/a3m/protocolbuffers/go/a3m/api/transferservice/v1beta1" - "github.com/go-logr/logr" "go.artefactual.dev/tools/mockutil" "go.opentelemetry.io/otel/trace/noop" temporalsdk_activity "go.temporal.io/sdk/activity" @@ -75,7 +74,6 @@ func TestCreateAIPActivity(t *testing.T) { env.RegisterActivityWithOptions( a3m.NewCreateAIPActivity( - logr.Discard(), noop.Tracer{}, a3mTransferServiceClient, &a3m.Config{}, diff --git a/internal/am/delete_transfer.go b/internal/am/delete_transfer.go index 7707a0eae..ccd5cd2ce 100644 --- a/internal/am/delete_transfer.go +++ b/internal/am/delete_transfer.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - "github.com/go-logr/logr" + temporal_tools "go.artefactual.dev/tools/temporal" "github.com/artefactual-sdps/enduro/internal/sftp" ) @@ -17,20 +17,20 @@ type DeleteTransferActivityParams struct { type DeleteTransferActivity struct { client sftp.Client - logger logr.Logger } type DeleteTransferActivityResult struct{} -func NewDeleteTransferActivity(logger logr.Logger, client sftp.Client) *DeleteTransferActivity { - return &DeleteTransferActivity{client: client, logger: logger} +func NewDeleteTransferActivity(client sftp.Client) *DeleteTransferActivity { + return &DeleteTransferActivity{client: client} } func (a *DeleteTransferActivity) Execute( ctx context.Context, params *DeleteTransferActivityParams, ) (*DeleteTransferActivityResult, error) { - a.logger.V(1).Info("Execute DeleteTransferActivity", + logger := temporal_tools.GetLogger(ctx) + logger.V(1).Info("Execute DeleteTransferActivity", "destination", params.Destination, ) diff --git a/internal/am/delete_transfer_test.go b/internal/am/delete_transfer_test.go index 4c4d56875..635d67d94 100644 --- a/internal/am/delete_transfer_test.go +++ b/internal/am/delete_transfer_test.go @@ -5,7 +5,6 @@ import ( "fmt" "testing" - "github.com/go-logr/logr" "go.artefactual.dev/tools/mockutil" temporalsdk_activity "go.temporal.io/sdk/activity" temporalsdk_testsuite "go.temporal.io/sdk/testsuite" @@ -90,7 +89,7 @@ func TestDeleteTransferActivity(t *testing.T) { ctrl := gomock.NewController(t) env.RegisterActivityWithOptions( - am.NewDeleteTransferActivity(logr.Discard(), tt.mock(ctrl)).Execute, + am.NewDeleteTransferActivity(tt.mock(ctrl)).Execute, temporalsdk_activity.RegisterOptions{ Name: am.DeleteTransferActivityName, }, diff --git a/internal/am/poll_ingest.go b/internal/am/poll_ingest.go index 9b303acb0..1d3bac932 100644 --- a/internal/am/poll_ingest.go +++ b/internal/am/poll_ingest.go @@ -5,9 +5,9 @@ import ( "fmt" "time" - "github.com/go-logr/logr" "github.com/jonboulle/clockwork" "go.artefactual.dev/amclient" + temporal_tools "go.artefactual.dev/tools/temporal" temporalsdk_activity "go.temporal.io/sdk/activity" "github.com/artefactual-sdps/enduro/internal/package_" @@ -21,7 +21,6 @@ type PollIngestActivityParams struct { } type PollIngestActivity struct { - logger logr.Logger cfg *Config clock clockwork.Clock ingSvc amclient.IngestService @@ -35,7 +34,6 @@ type PollIngestActivityResult struct { } func NewPollIngestActivity( - logger logr.Logger, cfg *Config, clock clockwork.Clock, ingSvc amclient.IngestService, @@ -43,7 +41,6 @@ func NewPollIngestActivity( pkgSvc package_.Service, ) *PollIngestActivity { return &PollIngestActivity{ - logger: logger, cfg: cfg, clock: clock, ingSvc: ingSvc, @@ -63,7 +60,8 @@ func (a *PollIngestActivity) Execute( ctx context.Context, params *PollIngestActivityParams, ) (*PollIngestActivityResult, error) { - a.logger.V(1).Info("Executing PollIngestActivity", + logger := temporal_tools.GetLogger(ctx) + logger.V(1).Info("Executing PollIngestActivity", "PresActionID", params.PresActionID, "SIPID", params.SIPID, ) diff --git a/internal/am/poll_ingest_test.go b/internal/am/poll_ingest_test.go index f269c22b2..8956bbae0 100644 --- a/internal/am/poll_ingest_test.go +++ b/internal/am/poll_ingest_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/go-logr/logr" "github.com/google/uuid" "github.com/jonboulle/clockwork" "go.artefactual.dev/amclient" @@ -265,7 +264,6 @@ func TestPollIngestActivity(t *testing.T) { env.RegisterActivityWithOptions( am.NewPollIngestActivity( - logr.Discard(), &am.Config{PollInterval: time.Millisecond * 10}, clock, ingSvc, diff --git a/internal/am/poll_transfer.go b/internal/am/poll_transfer.go index 2f7da52be..dc19fa027 100644 --- a/internal/am/poll_transfer.go +++ b/internal/am/poll_transfer.go @@ -6,7 +6,6 @@ import ( "fmt" "time" - "github.com/go-logr/logr" "github.com/jonboulle/clockwork" "go.artefactual.dev/amclient" temporal_tools "go.artefactual.dev/tools/temporal" @@ -23,7 +22,6 @@ type PollTransferActivityParams struct { } type PollTransferActivity struct { - logger logr.Logger cfg *Config clock clockwork.Clock tfrSvc amclient.TransferService @@ -38,7 +36,6 @@ type PollTransferActivityResult struct { } func NewPollTransferActivity( - logger logr.Logger, cfg *Config, clock clockwork.Clock, tfrSvc amclient.TransferService, @@ -46,7 +43,6 @@ func NewPollTransferActivity( pkgSvc package_.Service, ) *PollTransferActivity { return &PollTransferActivity{ - logger: logger, cfg: cfg, clock: clock, jobSvc: jobSvc, @@ -71,7 +67,8 @@ func (a *PollTransferActivity) Execute( ) (*PollTransferActivityResult, error) { var taskCount int - a.logger.V(1).Info("Executing PollTransferActivity", + logger := temporal_tools.GetLogger(ctx) + logger.V(1).Info("Executing PollTransferActivity", "PresActionID", params.PresActionID, "TransferID", params.TransferID, ) diff --git a/internal/am/poll_transfer_test.go b/internal/am/poll_transfer_test.go index 7175c8138..5a34fe7c0 100644 --- a/internal/am/poll_transfer_test.go +++ b/internal/am/poll_transfer_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/go-logr/logr" "github.com/google/uuid" "github.com/jonboulle/clockwork" "go.artefactual.dev/amclient" @@ -295,7 +294,6 @@ func TestPollTransferActivity(t *testing.T) { env.RegisterActivityWithOptions( am.NewPollTransferActivity( - logr.Discard(), &am.Config{PollInterval: time.Millisecond * 10}, clockwork.NewFakeClock(), trfSvc, diff --git a/internal/am/start_transfer.go b/internal/am/start_transfer.go index e7043d859..14287149e 100644 --- a/internal/am/start_transfer.go +++ b/internal/am/start_transfer.go @@ -4,16 +4,15 @@ import ( context "context" "path/filepath" - "github.com/go-logr/logr" "go.artefactual.dev/amclient" + temporal_tools "go.artefactual.dev/tools/temporal" ) const StartTransferActivityName = "start-transfer-activity" type StartTransferActivity struct { - logger logr.Logger - cfg *Config - amps amclient.PackageService + cfg *Config + amps amclient.PackageService } type StartTransferActivityParams struct { @@ -29,11 +28,10 @@ type StartTransferActivityResult struct { TransferID string } -func NewStartTransferActivity(logger logr.Logger, cfg *Config, amps amclient.PackageService) *StartTransferActivity { +func NewStartTransferActivity(cfg *Config, amps amclient.PackageService) *StartTransferActivity { return &StartTransferActivity{ - logger: logger, - cfg: cfg, - amps: amps, + cfg: cfg, + amps: amps, } } @@ -45,7 +43,8 @@ func (a *StartTransferActivity) Execute( ctx context.Context, opts *StartTransferActivityParams, ) (*StartTransferActivityResult, error) { - a.logger.V(1).Info( + logger := temporal_tools.GetLogger(ctx) + logger.V(1).Info( "Executing StartTransferActivity", "Name", opts.Name, "RelativePath", opts.RelativePath, diff --git a/internal/am/start_transfer_test.go b/internal/am/start_transfer_test.go index ed4e03567..74b05748a 100644 --- a/internal/am/start_transfer_test.go +++ b/internal/am/start_transfer_test.go @@ -4,7 +4,6 @@ import ( "net/http" "testing" - "github.com/go-logr/logr" "github.com/google/uuid" "go.artefactual.dev/amclient" "go.artefactual.dev/amclient/amclienttest" @@ -104,7 +103,7 @@ func TestStartTransferActivity(t *testing.T) { } env.RegisterActivityWithOptions( - am.NewStartTransferActivity(logr.Discard(), &am.Config{}, amps).Execute, + am.NewStartTransferActivity(&am.Config{}, amps).Execute, temporalsdk_activity.RegisterOptions{ Name: am.StartTransferActivityName, }, diff --git a/internal/am/upload_transfer.go b/internal/am/upload_transfer.go index 252024031..3bc847c78 100644 --- a/internal/am/upload_transfer.go +++ b/internal/am/upload_transfer.go @@ -7,8 +7,8 @@ import ( "path/filepath" "time" - "github.com/go-logr/logr" "go.artefactual.dev/tools/temporal" + temporal_tools "go.artefactual.dev/tools/temporal" temporalsdk_activity "go.temporal.io/sdk/activity" "github.com/artefactual-sdps/enduro/internal/sftp" @@ -34,20 +34,14 @@ type UploadTransferActivityResult struct { // a periodic Temporal Heartbeat at the given heartRate. type UploadTransferActivity struct { client sftp.Client - logger logr.Logger heartRate time.Duration } // NewUploadTransferActivity initializes and returns a new // UploadTransferActivity. -func NewUploadTransferActivity( - logger logr.Logger, - client sftp.Client, - heartRate time.Duration, -) *UploadTransferActivity { +func NewUploadTransferActivity(client sftp.Client, heartRate time.Duration) *UploadTransferActivity { return &UploadTransferActivity{ client: client, - logger: logger, heartRate: heartRate, } } @@ -57,7 +51,8 @@ func (a *UploadTransferActivity) Execute( ctx context.Context, params *UploadTransferActivityParams, ) (*UploadTransferActivityResult, error) { - a.logger.V(1).Info("Execute UploadTransferActivity", "SourcePath", params.SourcePath) + logger := temporal_tools.GetLogger(ctx) + logger.V(1).Info("Execute UploadTransferActivity", "SourcePath", params.SourcePath) src, err := os.Open(params.SourcePath) if err != nil { diff --git a/internal/am/upload_transfer_test.go b/internal/am/upload_transfer_test.go index 488151354..220047771 100644 --- a/internal/am/upload_transfer_test.go +++ b/internal/am/upload_transfer_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/go-logr/logr" "go.artefactual.dev/tools/mockutil" "go.artefactual.dev/tools/temporal" temporalsdk_activity "go.temporal.io/sdk/activity" @@ -151,7 +150,7 @@ func TestUploadTransferActivity(t *testing.T) { } env.RegisterActivityWithOptions( - am.NewUploadTransferActivity(logr.Discard(), client, 2*time.Millisecond).Execute, + am.NewUploadTransferActivity(client, 2*time.Millisecond).Execute, temporalsdk_activity.RegisterOptions{ Name: am.UploadTransferActivityName, }, diff --git a/internal/temporal/log.go b/internal/temporal/log.go deleted file mode 100644 index 14dd2394a..000000000 --- a/internal/temporal/log.go +++ /dev/null @@ -1,35 +0,0 @@ -package temporal - -import ( - "errors" - - "github.com/go-logr/logr" - temporalsdk_log "go.temporal.io/sdk/log" -) - -// logrWrapper implements temporalsdk_log.Logger. -type logrWrapper struct { - logger logr.Logger -} - -var _ temporalsdk_log.Logger = (*logrWrapper)(nil) - -func (l logrWrapper) Debug(msg string, keyvals ...interface{}) { - l.logger.WithValues("level", "debug").Info(msg, keyvals...) -} - -func (l logrWrapper) Info(msg string, keyvals ...interface{}) { - l.logger.WithValues("level", "info").Info(msg, keyvals...) -} - -func (l logrWrapper) Warn(msg string, keyvals ...interface{}) { - l.logger.WithValues("level", "warn").Info(msg, keyvals...) -} - -func (l logrWrapper) Error(msg string, keyvals ...interface{}) { - l.logger.Error(errors.New(msg), "error", keyvals...) -} - -func Logger(logger logr.Logger) temporalsdk_log.Logger { - return logrWrapper{logger.WithCallDepth(1)} -} diff --git a/internal/workflow/activities/bundle.go b/internal/workflow/activities/bundle.go index d8ab3fcc3..1336a41ca 100644 --- a/internal/workflow/activities/bundle.go +++ b/internal/workflow/activities/bundle.go @@ -10,7 +10,6 @@ import ( "strings" securejoin "github.com/cyphar/filepath-securejoin" - "github.com/go-logr/logr" "github.com/otiai10/copy" temporal_tools "go.artefactual.dev/tools/temporal" @@ -24,12 +23,10 @@ const ( ModeFile = 0o640 ) -type BundleActivity struct { - logger logr.Logger -} +type BundleActivity struct{} -func NewBundleActivity(logger logr.Logger) *BundleActivity { - return &BundleActivity{logger: logger} +func NewBundleActivity() *BundleActivity { + return &BundleActivity{} } type BundleActivityParams struct { @@ -54,7 +51,8 @@ func (a *BundleActivity) Execute(ctx context.Context, params *BundleActivityPara err error ) - a.logger.V(1).Info("Executing BundleActivity", + logger := temporal_tools.GetLogger(ctx) + logger.V(1).Info("Executing BundleActivity", "SourcePath", params.SourcePath, "TransferDir", params.TransferDir, "IsDir", params.IsDir, diff --git a/internal/workflow/activities/bundle_test.go b/internal/workflow/activities/bundle_test.go index 79b8c4720..04023de3f 100644 --- a/internal/workflow/activities/bundle_test.go +++ b/internal/workflow/activities/bundle_test.go @@ -4,7 +4,6 @@ import ( "path/filepath" "testing" - "github.com/go-logr/logr" "go.artefactual.dev/tools/temporal" temporalsdk_activity "go.temporal.io/sdk/activity" temporalsdk_testsuite "go.temporal.io/sdk/testsuite" @@ -118,7 +117,7 @@ e91f941be5973ff71f1dccbdd1a32d598881893a7f21be516aca743da38b1689 bagit.txt ts := &temporalsdk_testsuite.WorkflowTestSuite{} env := ts.NewTestActivityEnvironment() env.RegisterActivityWithOptions( - activities.NewBundleActivity(logr.Discard()).Execute, + activities.NewBundleActivity().Execute, temporalsdk_activity.RegisterOptions{ Name: activities.BundleActivityName, }, diff --git a/internal/workflow/activities/classify_package.go b/internal/workflow/activities/classify_package.go index f055e71cd..4c5635fa9 100644 --- a/internal/workflow/activities/classify_package.go +++ b/internal/workflow/activities/classify_package.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - "github.com/go-logr/logr" + "go.artefactual.dev/tools/temporal" "github.com/artefactual-sdps/enduro/internal/bagit" "github.com/artefactual-sdps/enduro/internal/enums" @@ -13,9 +13,7 @@ import ( const ClassifyPackageActivityName = "classify-package-activity" type ( - ClassifyPackageActivity struct { - logger logr.Logger - } + ClassifyPackageActivity struct{} ClassifyPackageActivityParams struct { // Path is the full path of the package. Path string @@ -26,15 +24,16 @@ type ( } ) -func NewClassifyPackageActivity(logger logr.Logger) *ClassifyPackageActivity { - return &ClassifyPackageActivity{logger: logger} +func NewClassifyPackageActivity() *ClassifyPackageActivity { + return &ClassifyPackageActivity{} } func (a *ClassifyPackageActivity) Execute( ctx context.Context, params ClassifyPackageActivityParams, ) (*ClassifyPackageActivityResult, error) { - a.logger.V(1).Info( + logger := temporal.GetLogger(ctx) + logger.V(1).Info( fmt.Sprintf("Executing %s", ClassifyPackageActivityName), "Path", params.Path, ) diff --git a/internal/workflow/activities/classify_package_test.go b/internal/workflow/activities/classify_package_test.go index 30e294df0..44d1afff7 100644 --- a/internal/workflow/activities/classify_package_test.go +++ b/internal/workflow/activities/classify_package_test.go @@ -3,7 +3,6 @@ package activities_test import ( "testing" - "github.com/go-logr/logr" temporalsdk_activity "go.temporal.io/sdk/activity" temporalsdk_testsuite "go.temporal.io/sdk/testsuite" "gotest.tools/v3/assert" @@ -50,7 +49,7 @@ func TestClassifyPackageActivity(t *testing.T) { ts := &temporalsdk_testsuite.WorkflowTestSuite{} env := ts.NewTestActivityEnvironment() env.RegisterActivityWithOptions( - activities.NewClassifyPackageActivity(logr.Discard()).Execute, + activities.NewClassifyPackageActivity().Execute, temporalsdk_activity.RegisterOptions{ Name: activities.ClassifyPackageActivityName, }, diff --git a/internal/workflow/activities/download.go b/internal/workflow/activities/download.go index 58ff91706..bfa76365b 100644 --- a/internal/workflow/activities/download.go +++ b/internal/workflow/activities/download.go @@ -6,7 +6,6 @@ import ( "os" "path/filepath" - "github.com/go-logr/logr" temporal_tools "go.artefactual.dev/tools/temporal" "go.opentelemetry.io/otel/trace" @@ -16,7 +15,6 @@ import ( // DownloadActivity downloads the blob into the processing directory. type DownloadActivity struct { - logger logr.Logger tracer trace.Tracer wsvc watcher.Service } @@ -31,9 +29,8 @@ type DownloadActivityResult struct { Path string } -func NewDownloadActivity(logger logr.Logger, tracer trace.Tracer, wsvc watcher.Service) *DownloadActivity { +func NewDownloadActivity(tracer trace.Tracer, wsvc watcher.Service) *DownloadActivity { return &DownloadActivity{ - logger: logger, tracer: tracer, wsvc: wsvc, } @@ -43,7 +40,8 @@ func (a *DownloadActivity) Execute( ctx context.Context, params *DownloadActivityParams, ) (*DownloadActivityResult, error) { - a.logger.V(1).Info("Executing DownloadActivity", + logger := temporal_tools.GetLogger(ctx) + logger.V(1).Info("Executing DownloadActivity", "Key", params.Key, "WatcherName", params.WatcherName, ) diff --git a/internal/workflow/activities/download_test.go b/internal/workflow/activities/download_test.go index ac40f8ee4..5fd318866 100644 --- a/internal/workflow/activities/download_test.go +++ b/internal/workflow/activities/download_test.go @@ -6,7 +6,6 @@ import ( "os" "testing" - "github.com/go-logr/logr" "go.artefactual.dev/tools/mockutil" "go.artefactual.dev/tools/temporal" "go.opentelemetry.io/otel/trace/noop" @@ -80,7 +79,7 @@ func TestDownloadActivity(t *testing.T) { } env.RegisterActivityWithOptions( - activities.NewDownloadActivity(logr.Discard(), noop.Tracer{}, wsvc).Execute, + activities.NewDownloadActivity(noop.Tracer{}, wsvc).Execute, temporalsdk_activity.RegisterOptions{ Name: activities.DownloadActivityName, }, diff --git a/internal/workflow/local_activities.go b/internal/workflow/local_activities.go index 90150fb94..d051fe224 100644 --- a/internal/workflow/local_activities.go +++ b/internal/workflow/local_activities.go @@ -7,7 +7,6 @@ import ( "io" "time" - "github.com/go-logr/logr" "github.com/google/uuid" temporalsdk_activity "go.temporal.io/sdk/activity" @@ -23,7 +22,6 @@ type createPackageLocalActivityParams struct { func createPackageLocalActivity( ctx context.Context, - logger logr.Logger, pkgsvc package_.Service, params *createPackageLocalActivityParams, ) (uint, error) { @@ -37,7 +35,6 @@ func createPackageLocalActivity( } if err := pkgsvc.Create(ctx, col); err != nil { - logger.Error(err, "Error creating package") return 0, err } @@ -56,7 +53,6 @@ type updatePackageLocalActivityResult struct{} func updatePackageLocalActivity( ctx context.Context, - logger logr.Logger, pkgsvc package_.Service, params *updatePackageLocalActivityParams, ) (*updatePackageLocalActivityResult, error) { @@ -73,7 +69,6 @@ func updatePackageLocalActivity( params.StoredAt, ) if err != nil { - logger.Error(err, "Error updating package") return &updatePackageLocalActivityResult{}, err } diff --git a/internal/workflow/move.go b/internal/workflow/move.go index bc2009048..2dddb9c6f 100644 --- a/internal/workflow/move.go +++ b/internal/workflow/move.go @@ -1,7 +1,6 @@ package workflow import ( - "github.com/go-logr/logr" temporalsdk_workflow "go.temporal.io/sdk/workflow" "github.com/artefactual-sdps/enduro/internal/enums" @@ -10,13 +9,11 @@ import ( ) type MoveWorkflow struct { - logger logr.Logger pkgsvc package_.Service } -func NewMoveWorkflow(logger logr.Logger, pkgsvc package_.Service) *MoveWorkflow { +func NewMoveWorkflow(pkgsvc package_.Service) *MoveWorkflow { return &MoveWorkflow{ - logger: logger, pkgsvc: pkgsvc, } } diff --git a/internal/workflow/move_test.go b/internal/workflow/move_test.go index 21cea4ea8..70fd166b2 100644 --- a/internal/workflow/move_test.go +++ b/internal/workflow/move_test.go @@ -4,7 +4,6 @@ import ( "errors" "testing" - "github.com/go-logr/logr" "github.com/google/uuid" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -35,7 +34,6 @@ func (s *MoveWorkflowTestSuite) SetupTest() { s.env.SetWorkerOptions(temporalsdk_worker.Options{EnableSessionWorker: true}) ctrl := gomock.NewController(s.T()) - logger := logr.Discard() pkgsvc := packagefake.NewMockService(ctrl) s.env.RegisterActivityWithOptions( @@ -47,7 +45,7 @@ func (s *MoveWorkflowTestSuite) SetupTest() { temporalsdk_activity.RegisterOptions{Name: activities.PollMoveToPermanentStorageActivityName}, ) - s.workflow = NewMoveWorkflow(logger, pkgsvc) + s.workflow = NewMoveWorkflow(pkgsvc) } func (s *MoveWorkflowTestSuite) AfterTest(suiteName, testName string) { diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index 4bc86a4f2..4132b738c 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -19,11 +19,11 @@ import ( "github.com/artefactual-sdps/temporal-activities/bagcreate" "github.com/artefactual-sdps/temporal-activities/bagvalidate" "github.com/artefactual-sdps/temporal-activities/removepaths" - "github.com/go-logr/logr" "github.com/google/uuid" "go.artefactual.dev/tools/ref" temporal_tools "go.artefactual.dev/tools/temporal" temporalapi_enums "go.temporal.io/api/enums/v1" + temporalsdk_log "go.temporal.io/sdk/log" temporalsdk_temporal "go.temporal.io/sdk/temporal" temporalsdk_workflow "go.temporal.io/sdk/workflow" @@ -42,7 +42,7 @@ import ( ) type ProcessingWorkflow struct { - logger logr.Logger + logger temporalsdk_log.Logger cfg config.Configuration rng io.Reader pkgsvc package_.Service @@ -50,14 +50,12 @@ type ProcessingWorkflow struct { } func NewProcessingWorkflow( - logger logr.Logger, cfg config.Configuration, rng io.Reader, pkgsvc package_.Service, wsvc watcher.Service, ) *ProcessingWorkflow { return &ProcessingWorkflow{ - logger: logger, cfg: cfg, rng: rng, pkgsvc: pkgsvc, @@ -148,7 +146,8 @@ func (w *ProcessingWorkflow) sessionCleanup(ctx temporalsdk_workflow.Context, cl removepaths.Params{Paths: cleanup.tempDirs}, ).Get(ctx, nil) if err != nil { - w.logger.V(1).Info("session cleanup: error(s) removing temporary directories", + w.logger.Error( + "session cleanup: error(s) removing temporary directories", "errors", err.Error(), ) } @@ -165,8 +164,6 @@ func (w *ProcessingWorkflow) sessionCleanup(ctx temporalsdk_workflow.Context, cl // the API. func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *package_.ProcessingWorkflowRequest) error { var ( - logger = temporalsdk_workflow.GetLogger(ctx) - tinfo = &TransferInfo{ req: *req, IsDir: req.IsDir, @@ -181,20 +178,22 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack paStatus = enums.PreservationActionStatusUnspecified ) + w.logger = temporalsdk_workflow.GetLogger(ctx) + // Persist package as early as possible. { activityOpts := withLocalActivityOpts(ctx) var err error if req.PackageID == 0 { - err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, createPackageLocalActivity, w.logger, w.pkgsvc, &createPackageLocalActivityParams{ + err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, createPackageLocalActivity, w.pkgsvc, &createPackageLocalActivityParams{ Key: req.Key, Status: status, }). Get(activityOpts, &tinfo.req.PackageID) } else { // TODO: investigate better way to reset the package_. - err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.pkgsvc, &updatePackageLocalActivityParams{ + err = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.pkgsvc, &updatePackageLocalActivityParams{ PackageID: req.PackageID, Key: req.Key, SIPID: "", @@ -219,7 +218,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack // Use disconnected context so it also runs after cancellation. dctx, _ := temporalsdk_workflow.NewDisconnectedContext(ctx) activityOpts := withLocalActivityOpts(dctx) - _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.pkgsvc, &updatePackageLocalActivityParams{ + _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.pkgsvc, &updatePackageLocalActivityParams{ PackageID: tinfo.req.PackageID, Key: tinfo.req.Key, SIPID: tinfo.SIPID, @@ -271,7 +270,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack return nil } - logger.Error( + w.logger.Error( "Session failed, will retry shortly (10s)...", "err", ctx.Err(), "attemptFailed", attempt, @@ -300,7 +299,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack if tinfo.req.RetentionPeriod != nil { err := temporalsdk_workflow.NewTimer(ctx, *tinfo.req.RetentionPeriod).Get(ctx, nil) if err != nil { - logger.Warn("Retention policy timer failed", "err", err.Error()) + w.logger.Warn("Retention policy timer failed", "err", err.Error()) } else { activityOpts := withActivityOptsForRequest(ctx) _ = temporalsdk_workflow.ExecuteActivity(activityOpts, activities.DeleteOriginalActivityName, tinfo.req.WatcherName, tinfo.req.Key).Get(activityOpts, nil) @@ -312,7 +311,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack } } - logger.Info( + w.logger.Info( "Workflow completed successfully!", "packageID", tinfo.req.PackageID, "watcher", tinfo.req.WatcherName, @@ -513,7 +512,7 @@ func (w *ProcessingWorkflow) SessionHandler( // Persist SIPID. { activityOpts := withLocalActivityOpts(sessCtx) - _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.logger, w.pkgsvc, &updatePackageLocalActivityParams{ + _ = temporalsdk_workflow.ExecuteLocalActivity(activityOpts, updatePackageLocalActivity, w.pkgsvc, &updatePackageLocalActivityParams{ PackageID: tinfo.req.PackageID, Key: tinfo.req.Key, SIPID: tinfo.SIPID, diff --git a/internal/workflow/processing_test.go b/internal/workflow/processing_test.go index 53753ec88..f3a0f52fc 100644 --- a/internal/workflow/processing_test.go +++ b/internal/workflow/processing_test.go @@ -12,7 +12,6 @@ import ( "github.com/artefactual-sdps/temporal-activities/bagcreate" "github.com/artefactual-sdps/temporal-activities/bagvalidate" "github.com/artefactual-sdps/temporal-activities/removepaths" - "github.com/go-logr/logr" "github.com/google/uuid" "github.com/jonboulle/clockwork" "github.com/stretchr/testify/mock" @@ -101,13 +100,12 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(cfg config.Configuration s.env.SetStartTime(startTime) ctrl := gomock.NewController(s.T()) - logger := logr.Discard() pkgsvc := packagefake.NewMockService(ctrl) wsvc := watcherfake.NewMockService(ctrl) rng := rand.New(rand.NewSource(1)) // #nosec: G404 s.env.RegisterActivityWithOptions( - activities.NewDownloadActivity(logger, noop.Tracer{}, wsvc).Execute, + activities.NewDownloadActivity(noop.Tracer{}, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName}, ) s.env.RegisterActivityWithOptions( @@ -119,15 +117,15 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(cfg config.Configuration temporalsdk_activity.RegisterOptions{Name: bagvalidate.Name}, ) s.env.RegisterActivityWithOptions( - activities.NewClassifyPackageActivity(logger).Execute, + activities.NewClassifyPackageActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.ClassifyPackageActivityName}, ) // Set up AM taskqueue. if cfg.Preservation.TaskQueue == temporal.AmWorkerTaskQueue { - s.setupAMWorkflowTest(logger, &cfg.AM, ctrl, pkgsvc) + s.setupAMWorkflowTest(&cfg.AM, ctrl, pkgsvc) } else { - s.setupA3mWorkflowTest(logger, ctrl, pkgsvc) + s.setupA3mWorkflowTest(ctrl, pkgsvc) } s.env.RegisterWorkflowWithOptions( @@ -143,11 +141,10 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(cfg config.Configuration temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName}, ) - s.workflow = NewProcessingWorkflow(logger, cfg, rng, pkgsvc, wsvc) + s.workflow = NewProcessingWorkflow(cfg, rng, pkgsvc, wsvc) } func (s *ProcessingWorkflowTestSuite) setupAMWorkflowTest( - logger logr.Logger, cfg *am.Config, ctrl *gomock.Controller, pkgsvc package_.Service, @@ -164,16 +161,15 @@ func (s *ProcessingWorkflowTestSuite) setupAMWorkflowTest( temporalsdk_activity.RegisterOptions{Name: archivezip.Name}, ) s.env.RegisterActivityWithOptions( - am.NewUploadTransferActivity(logger, sftpc, 10*time.Second).Execute, + am.NewUploadTransferActivity(sftpc, 10*time.Second).Execute, temporalsdk_activity.RegisterOptions{Name: am.UploadTransferActivityName}, ) s.env.RegisterActivityWithOptions( - am.NewStartTransferActivity(logger, &am.Config{}, amclienttest.NewMockPackageService(ctrl)).Execute, + am.NewStartTransferActivity(&am.Config{}, amclienttest.NewMockPackageService(ctrl)).Execute, temporalsdk_activity.RegisterOptions{Name: am.StartTransferActivityName}, ) s.env.RegisterActivityWithOptions( am.NewPollTransferActivity( - logger, cfg, clock, amclienttest.NewMockTransferService(ctrl), @@ -184,7 +180,6 @@ func (s *ProcessingWorkflowTestSuite) setupAMWorkflowTest( ) s.env.RegisterActivityWithOptions( am.NewPollIngestActivity( - logger, cfg, clock, amclienttest.NewMockIngestService(ctrl), @@ -198,7 +193,7 @@ func (s *ProcessingWorkflowTestSuite) setupAMWorkflowTest( temporalsdk_activity.RegisterOptions{Name: activities.CreateStoragePackageActivityName}, ) s.env.RegisterActivityWithOptions( - am.NewDeleteTransferActivity(logger, sftpc).Execute, + am.NewDeleteTransferActivity(sftpc).Execute, temporalsdk_activity.RegisterOptions{ Name: am.DeleteTransferActivityName, }, @@ -206,18 +201,17 @@ func (s *ProcessingWorkflowTestSuite) setupAMWorkflowTest( } func (s *ProcessingWorkflowTestSuite) setupA3mWorkflowTest( - logger logr.Logger, ctrl *gomock.Controller, pkgsvc package_.Service, ) { tsvc := a3mfake.NewMockTransferServiceClient(ctrl) s.env.RegisterActivityWithOptions( - activities.NewBundleActivity(logger).Execute, + activities.NewBundleActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName}, ) s.env.RegisterActivityWithOptions( - a3m.NewCreateAIPActivity(logger, noop.Tracer{}, tsvc, &a3m.Config{}, pkgsvc).Execute, + a3m.NewCreateAIPActivity(noop.Tracer{}, tsvc, &a3m.Config{}, pkgsvc).Execute, temporalsdk_activity.RegisterOptions{Name: a3m.CreateAIPActivityName}, ) s.env.RegisterActivityWithOptions( @@ -376,7 +370,6 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { retentionPeriod := 1 * time.Second ctx := mock.AnythingOfType("*context.valueCtx") sessionCtx := mock.AnythingOfType("*context.timerCtx") - logger := s.workflow.logger pkgsvc := s.workflow.pkgsvc rng := rand.New(rand.NewSource(1)) // #nosec: G404 @@ -384,7 +377,6 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { s.env.OnActivity( createPackageLocalActivity, ctx, - logger, pkgsvc, &createPackageLocalActivityParams{Key: key, Status: enums.PackageStatusQueued}, ).Return(pkgID, nil).Once() @@ -482,7 +474,7 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { s.env.OnActivity(a3m.CreateAIPActivityName, sessionCtx, mock.AnythingOfType("*a3m.CreateAIPActivityParams")). Return(nil, nil). Once() - s.env.OnActivity(updatePackageLocalActivity, ctx, logger, pkgsvc, mock.AnythingOfType("*workflow.updatePackageLocalActivityParams")). + s.env.OnActivity(updatePackageLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.updatePackageLocalActivityParams")). Return(nil, nil). Times(2) @@ -571,12 +563,10 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() { } s.SetupWorkflowTest(cfg) - logger := s.workflow.logger pkgsvc := s.workflow.pkgsvc // Activity mocks/assertions sequence s.env.OnActivity(createPackageLocalActivity, ctx, - logger, pkgsvc, &createPackageLocalActivityParams{Key: key, Status: enums.PackageStatusQueued}, ).Return(pkgID, nil) @@ -672,7 +662,6 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() { s.env.OnActivity( updatePackageLocalActivity, ctx, - logger, pkgsvc, mock.AnythingOfType("*workflow.updatePackageLocalActivityParams"), ).Return(nil, nil) @@ -841,7 +830,6 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() { retentionPeriod := 1 * time.Second ctx := mock.AnythingOfType("*context.valueCtx") sessionCtx := mock.AnythingOfType("*context.timerCtx") - logger := s.workflow.logger pkgsvc := s.workflow.pkgsvc downloadDir := strings.Replace(tempPath, "/tmp/", cfg.Preprocessing.SharedPath, 1) @@ -851,7 +839,6 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() { s.env.OnActivity( createPackageLocalActivity, ctx, - logger, pkgsvc, &createPackageLocalActivityParams{Key: key, Status: enums.PackageStatusQueued}, ).Return(pkgID, nil) @@ -993,7 +980,7 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() { s.env.OnActivity(a3m.CreateAIPActivityName, sessionCtx, mock.AnythingOfType("*a3m.CreateAIPActivityParams")). Return(nil, nil). Once() - s.env.OnActivity(updatePackageLocalActivity, ctx, logger, pkgsvc, mock.AnythingOfType("*workflow.updatePackageLocalActivityParams")). + s.env.OnActivity(updatePackageLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.updatePackageLocalActivityParams")). Return(nil, nil). Times(2)