Skip to content

Commit

Permalink
Use logger interceptor in Temporal workers
Browse files Browse the repository at this point in the history
Follow https://enduro.readthedocs.io/dev-manual/logging recommendations.

- Use `go.artefactual.dev/tools/temporal` to set the clients logger.
- Use replay-aware Temporal SDK logger from processing workflow.
- Add logger interceptor to all workers.
- Use `go.artefactual.dev/tools/temporal` `GetLogger` in activities.
- Remove logger usage from local activities.
  • Loading branch information
jraddaoui committed Sep 4, 2024
1 parent 8aa0da8 commit 0ecec2a
Show file tree
Hide file tree
Showing 27 changed files with 100 additions and 180 deletions.
13 changes: 8 additions & 5 deletions cmd/enduro-a3m-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/pflag"
"go.artefactual.dev/tools/log"
temporal_tools "go.artefactual.dev/tools/temporal"
"go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
temporalsdk_activity "go.temporal.io/sdk/activity"
Expand Down Expand Up @@ -118,7 +119,7 @@ func main() {
temporalClient, err := temporalsdk_client.Dial(temporalsdk_client.Options{
Namespace: cfg.Temporal.Namespace,
HostPort: cfg.Temporal.Address,
Logger: temporal.Logger(logger.WithName("temporal-client")),
Logger: temporal_tools.Logger(logger.WithName("temporal-client")),
Interceptors: []temporalsdk_interceptor.ClientInterceptor{tracingInterceptor},
})
if err != nil {
Expand Down Expand Up @@ -212,6 +213,9 @@ func main() {
EnableSessionWorker: true,
MaxConcurrentSessionExecutionSize: cfg.A3m.Capacity,
MaxConcurrentActivityExecutionSize: 1,
Interceptors: []temporalsdk_interceptor.WorkerInterceptor{
temporal_tools.NewLoggerInterceptor(logger),
},
}
w := temporalsdk_worker.New(temporalClient, temporal.A3mWorkerTaskQueue, workerOpts)
if err != nil {
Expand All @@ -220,28 +224,27 @@ func main() {
}

w.RegisterActivityWithOptions(
activities.NewDownloadActivity(logger, tp.Tracer(activities.DownloadActivityName), wsvc).Execute,
activities.NewDownloadActivity(tp.Tracer(activities.DownloadActivityName), wsvc).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName},
)
w.RegisterActivityWithOptions(
archiveextract.New(cfg.ExtractActivity).Execute,
temporalsdk_activity.RegisterOptions{Name: archiveextract.Name},
)
w.RegisterActivityWithOptions(
activities.NewClassifyPackageActivity(logger).Execute,
activities.NewClassifyPackageActivity().Execute,
temporalsdk_activity.RegisterOptions{Name: activities.ClassifyPackageActivityName},
)
w.RegisterActivityWithOptions(
bagvalidate.New(validator).Execute,
temporalsdk_activity.RegisterOptions{Name: bagvalidate.Name},
)
w.RegisterActivityWithOptions(
activities.NewBundleActivity(logger).Execute,
activities.NewBundleActivity().Execute,
temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName},
)
w.RegisterActivityWithOptions(
a3m.NewCreateAIPActivity(
logger,
tp.Tracer(a3m.CreateAIPActivityName),
a3mClient.TransferClient,
&cfg.A3m,
Expand Down
20 changes: 11 additions & 9 deletions cmd/enduro-am-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/spf13/pflag"
"go.artefactual.dev/amclient"
"go.artefactual.dev/tools/log"
temporal_tools "go.artefactual.dev/tools/temporal"
"go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -125,7 +126,7 @@ func main() {
temporalClient, err := temporalsdk_client.Dial(temporalsdk_client.Options{
Namespace: cfg.Temporal.Namespace,
HostPort: cfg.Temporal.Address,
Logger: temporal.Logger(logger.WithName("temporal-client")),
Logger: temporal_tools.Logger(logger.WithName("temporal-client")),
Interceptors: []temporalsdk_interceptor.ClientInterceptor{tracingInterceptor},
})
if err != nil {
Expand Down Expand Up @@ -209,6 +210,9 @@ func main() {
EnableSessionWorker: true,
MaxConcurrentSessionExecutionSize: cfg.AM.Capacity,
MaxConcurrentActivityExecutionSize: 1,
Interceptors: []temporalsdk_interceptor.WorkerInterceptor{
temporal_tools.NewLoggerInterceptor(logger),
},
}
w := temporalsdk_worker.New(temporalClient, temporal.AmWorkerTaskQueue, workerOpts)
if err != nil {
Expand All @@ -228,23 +232,23 @@ func main() {
sftpClient := sftp.NewGoClient(logger, cfg.AM.SFTP)

w.RegisterActivityWithOptions(
activities.NewDownloadActivity(logger, tp.Tracer(activities.DownloadActivityName), wsvc).Execute,
activities.NewDownloadActivity(tp.Tracer(activities.DownloadActivityName), wsvc).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName},
)
w.RegisterActivityWithOptions(
archiveextract.New(cfg.ExtractActivity).Execute,
temporalsdk_activity.RegisterOptions{Name: archiveextract.Name},
)
w.RegisterActivityWithOptions(
activities.NewClassifyPackageActivity(logger).Execute,
activities.NewClassifyPackageActivity().Execute,
temporalsdk_activity.RegisterOptions{Name: activities.ClassifyPackageActivityName},
)
w.RegisterActivityWithOptions(
bagvalidate.New(validator).Execute,
temporalsdk_activity.RegisterOptions{Name: bagvalidate.Name},
)
w.RegisterActivityWithOptions(
activities.NewBundleActivity(logger).Execute,
activities.NewBundleActivity().Execute,
temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName},
)
w.RegisterActivityWithOptions(
Expand All @@ -256,20 +260,19 @@ func main() {
temporalsdk_activity.RegisterOptions{Name: archivezip.Name},
)
w.RegisterActivityWithOptions(
am.NewUploadTransferActivity(logger, sftpClient, cfg.AM.PollInterval).Execute,
am.NewUploadTransferActivity(sftpClient, cfg.AM.PollInterval).Execute,
temporalsdk_activity.RegisterOptions{Name: am.UploadTransferActivityName},
)
w.RegisterActivityWithOptions(
am.NewDeleteTransferActivity(logger, sftpClient).Execute,
am.NewDeleteTransferActivity(sftpClient).Execute,
temporalsdk_activity.RegisterOptions{Name: am.DeleteTransferActivityName},
)
w.RegisterActivityWithOptions(
am.NewStartTransferActivity(logger, &cfg.AM, amc.Package).Execute,
am.NewStartTransferActivity(&cfg.AM, amc.Package).Execute,
temporalsdk_activity.RegisterOptions{Name: am.StartTransferActivityName},
)
w.RegisterActivityWithOptions(
am.NewPollTransferActivity(
logger,
&cfg.AM,
clockwork.NewRealClock(),
amc.Transfer,
Expand All @@ -280,7 +283,6 @@ func main() {
)
w.RegisterActivityWithOptions(
am.NewPollIngestActivity(
logger,
&cfg.AM,
clockwork.NewRealClock(),
amc.Ingest,
Expand Down
14 changes: 9 additions & 5 deletions cmd/enduro/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/pflag"
"go.artefactual.dev/tools/log"
temporal_tools "go.artefactual.dev/tools/temporal"
"go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/codes"
Expand Down Expand Up @@ -49,7 +50,6 @@ import (
storage_entdb "github.com/artefactual-sdps/enduro/internal/storage/persistence/ent/db"
storage_workflows "github.com/artefactual-sdps/enduro/internal/storage/workflows"
"github.com/artefactual-sdps/enduro/internal/telemetry"
"github.com/artefactual-sdps/enduro/internal/temporal"
"github.com/artefactual-sdps/enduro/internal/upload"
"github.com/artefactual-sdps/enduro/internal/version"
"github.com/artefactual-sdps/enduro/internal/watcher"
Expand Down Expand Up @@ -149,7 +149,7 @@ func main() {
temporalClient, err := temporalsdk_client.Dial(temporalsdk_client.Options{
Namespace: cfg.Temporal.Namespace,
HostPort: cfg.Temporal.Address,
Logger: temporal.Logger(logger.WithName("temporal-client")),
Logger: temporal_tools.Logger(logger.WithName("temporal-client")),
Interceptors: []temporalsdk_interceptor.ClientInterceptor{tracingInterceptor},
})
if err != nil {
Expand Down Expand Up @@ -423,15 +423,19 @@ func main() {
// Workflow and activity worker.
{
done := make(chan struct{})
workerOpts := temporalsdk_worker.Options{}
workerOpts := temporalsdk_worker.Options{
Interceptors: []temporalsdk_interceptor.WorkerInterceptor{
temporal_tools.NewLoggerInterceptor(logger.WithName("worker")),
},
}
w := temporalsdk_worker.New(temporalClient, cfg.Temporal.TaskQueue, workerOpts)
if err != nil {
logger.Error(err, "Error creating Temporal worker.")
os.Exit(1)
}

w.RegisterWorkflowWithOptions(
workflow.NewProcessingWorkflow(logger, cfg, rand.Reader, pkgsvc, wsvc).Execute,
workflow.NewProcessingWorkflow(cfg, rand.Reader, pkgsvc, wsvc).Execute,
temporalsdk_workflow.RegisterOptions{Name: package_.ProcessingWorkflowName},
)
w.RegisterActivityWithOptions(
Expand All @@ -458,7 +462,7 @@ func main() {
)

w.RegisterWorkflowWithOptions(
workflow.NewMoveWorkflow(logger, pkgsvc).Execute,
workflow.NewMoveWorkflow(pkgsvc).Execute,
temporalsdk_workflow.RegisterOptions{Name: package_.MoveWorkflowName},
)

Expand Down
8 changes: 3 additions & 5 deletions internal/a3m/a3m.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

"buf.build/gen/go/artefactual/a3m/grpc/go/a3m/api/transferservice/v1beta1/transferservicev1beta1grpc"
transferservice "buf.build/gen/go/artefactual/a3m/protocolbuffers/go/a3m/api/transferservice/v1beta1"
"github.com/go-logr/logr"
"github.com/oklog/run"
temporal_tools "go.artefactual.dev/tools/temporal"
"go.opentelemetry.io/otel/trace"
temporalsdk_activity "go.temporal.io/sdk/activity"
"google.golang.org/grpc"
Expand All @@ -24,7 +24,6 @@ import (
const CreateAIPActivityName = "create-aip-activity"

type CreateAIPActivity struct {
logger logr.Logger
tracer trace.Tracer
client transferservicev1beta1grpc.TransferServiceClient
cfg *Config
Expand All @@ -43,14 +42,12 @@ type CreateAIPActivityResult struct {
}

func NewCreateAIPActivity(
logger logr.Logger,
tracer trace.Tracer,
client transferservicev1beta1grpc.TransferServiceClient,
cfg *Config,
pkgsvc package_.Service,
) *CreateAIPActivity {
return &CreateAIPActivity{
logger: logger,
tracer: tracer,
client: client,
cfg: cfg,
Expand All @@ -62,6 +59,7 @@ func (a *CreateAIPActivity) Execute(
ctx context.Context,
opts *CreateAIPActivityParams,
) (*CreateAIPActivityResult, error) {
logger := temporal_tools.GetLogger(ctx)
result := &CreateAIPActivityResult{}

var g run.Group
Expand Down Expand Up @@ -147,7 +145,7 @@ func (a *CreateAIPActivity) Execute(
}

result.Path = fmt.Sprintf("%s/completed/%s-%s.7z", a.cfg.ShareDir, opts.Name, result.UUID)
a.logger.Info("We have run a3m successfully", "path", result.Path)
logger.Info("We have run a3m successfully", "path", result.Path)

break
}
Expand Down
2 changes: 0 additions & 2 deletions internal/a3m/a3m_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

transferservice "buf.build/gen/go/artefactual/a3m/protocolbuffers/go/a3m/api/transferservice/v1beta1"
"github.com/go-logr/logr"
"go.artefactual.dev/tools/mockutil"
"go.opentelemetry.io/otel/trace/noop"
temporalsdk_activity "go.temporal.io/sdk/activity"
Expand Down Expand Up @@ -75,7 +74,6 @@ func TestCreateAIPActivity(t *testing.T) {

env.RegisterActivityWithOptions(
a3m.NewCreateAIPActivity(
logr.Discard(),
noop.Tracer{},
a3mTransferServiceClient,
&a3m.Config{},
Expand Down
10 changes: 5 additions & 5 deletions internal/am/delete_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"

"github.com/go-logr/logr"
temporal_tools "go.artefactual.dev/tools/temporal"

"github.com/artefactual-sdps/enduro/internal/sftp"
)
Expand All @@ -17,20 +17,20 @@ type DeleteTransferActivityParams struct {

type DeleteTransferActivity struct {
client sftp.Client
logger logr.Logger
}

type DeleteTransferActivityResult struct{}

func NewDeleteTransferActivity(logger logr.Logger, client sftp.Client) *DeleteTransferActivity {
return &DeleteTransferActivity{client: client, logger: logger}
func NewDeleteTransferActivity(client sftp.Client) *DeleteTransferActivity {
return &DeleteTransferActivity{client: client}
}

func (a *DeleteTransferActivity) Execute(
ctx context.Context,
params *DeleteTransferActivityParams,
) (*DeleteTransferActivityResult, error) {
a.logger.V(1).Info("Execute DeleteTransferActivity",
logger := temporal_tools.GetLogger(ctx)
logger.V(1).Info("Execute DeleteTransferActivity",
"destination", params.Destination,
)

Expand Down
3 changes: 1 addition & 2 deletions internal/am/delete_transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"testing"

"github.com/go-logr/logr"
"go.artefactual.dev/tools/mockutil"
temporalsdk_activity "go.temporal.io/sdk/activity"
temporalsdk_testsuite "go.temporal.io/sdk/testsuite"
Expand Down Expand Up @@ -90,7 +89,7 @@ func TestDeleteTransferActivity(t *testing.T) {
ctrl := gomock.NewController(t)

env.RegisterActivityWithOptions(
am.NewDeleteTransferActivity(logr.Discard(), tt.mock(ctrl)).Execute,
am.NewDeleteTransferActivity(tt.mock(ctrl)).Execute,
temporalsdk_activity.RegisterOptions{
Name: am.DeleteTransferActivityName,
},
Expand Down
8 changes: 3 additions & 5 deletions internal/am/poll_ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"fmt"
"time"

"github.com/go-logr/logr"
"github.com/jonboulle/clockwork"
"go.artefactual.dev/amclient"
temporal_tools "go.artefactual.dev/tools/temporal"
temporalsdk_activity "go.temporal.io/sdk/activity"

"github.com/artefactual-sdps/enduro/internal/package_"
Expand All @@ -21,7 +21,6 @@ type PollIngestActivityParams struct {
}

type PollIngestActivity struct {
logger logr.Logger
cfg *Config
clock clockwork.Clock
ingSvc amclient.IngestService
Expand All @@ -35,15 +34,13 @@ type PollIngestActivityResult struct {
}

func NewPollIngestActivity(
logger logr.Logger,
cfg *Config,
clock clockwork.Clock,
ingSvc amclient.IngestService,
jobSvc amclient.JobsService,
pkgSvc package_.Service,
) *PollIngestActivity {
return &PollIngestActivity{
logger: logger,
cfg: cfg,
clock: clock,
ingSvc: ingSvc,
Expand All @@ -63,7 +60,8 @@ func (a *PollIngestActivity) Execute(
ctx context.Context,
params *PollIngestActivityParams,
) (*PollIngestActivityResult, error) {
a.logger.V(1).Info("Executing PollIngestActivity",
logger := temporal_tools.GetLogger(ctx)
logger.V(1).Info("Executing PollIngestActivity",
"PresActionID", params.PresActionID,
"SIPID", params.SIPID,
)
Expand Down
2 changes: 0 additions & 2 deletions internal/am/poll_ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"
"time"

"github.com/go-logr/logr"
"github.com/google/uuid"
"github.com/jonboulle/clockwork"
"go.artefactual.dev/amclient"
Expand Down Expand Up @@ -265,7 +264,6 @@ func TestPollIngestActivity(t *testing.T) {

env.RegisterActivityWithOptions(
am.NewPollIngestActivity(
logr.Discard(),
&am.Config{PollInterval: time.Millisecond * 10},
clock,
ingSvc,
Expand Down
Loading

0 comments on commit 0ecec2a

Please sign in to comment.