diff --git a/pkg/connectorrunner/runner.go b/pkg/connectorrunner/runner.go index fb5250e7..2bdae035 100644 --- a/pkg/connectorrunner/runner.go +++ b/pkg/connectorrunner/runner.go @@ -55,11 +55,6 @@ func (c *connectorRunner) Run(ctx context.Context) error { return err } - err = c.Close(ctx) - if err != nil { - return err - } - return nil } @@ -185,7 +180,13 @@ func (c *connectorRunner) run(ctx context.Context) error { } func (c *connectorRunner) Close(ctx context.Context) error { - return nil + var retErr error + + if err := c.cw.Close(); err != nil { + retErr = errors.Join(retErr, err) + } + + return retErr } type Option func(ctx context.Context, cfg *runnerConfig) error @@ -405,7 +406,7 @@ func NewConnectorRunner(ctx context.Context, c types.ConnectorServer, opts ...Op return runner, nil } - tm, err := c1api.NewC1TaskManager(ctx, cfg.clientID, cfg.clientSecret) + tm, err := c1api.NewC1TaskManager(ctx, cfg.clientID, cfg.clientSecret, cfg.tempDir) if err != nil { return nil, err } diff --git a/pkg/tasks/c1api/full_sync.go b/pkg/tasks/c1api/full_sync.go index 6f5d5828..0ac09a25 100644 --- a/pkg/tasks/c1api/full_sync.go +++ b/pkg/tasks/c1api/full_sync.go @@ -2,6 +2,7 @@ package c1api import ( "context" + "errors" "io" "os" @@ -20,6 +21,7 @@ type fullSyncHelpers interface { Upload(ctx context.Context, r io.ReadSeeker) error FinishTask(ctx context.Context, annos annotations.Annotations, err error) error HeartbeatTask(ctx context.Context, annos annotations.Annotations) (context.Context, error) + TempDir() string } type fullSyncTaskHandler struct { @@ -27,6 +29,36 @@ type fullSyncTaskHandler struct { helpers fullSyncHelpers } +func (c *fullSyncTaskHandler) sync(ctx context.Context, c1zPath string) error { + l := ctxzap.Extract(ctx).With(zap.String("task_id", c.task.GetId()), zap.Stringer("task_type", tasks.GetType(c.task))) + syncer, err := sdkSync.NewSyncer(ctx, c.helpers.ConnectorClient(), sdkSync.WithC1ZPath(c1zPath), sdkSync.WithTmpDir(c.helpers.TempDir())) + if err != nil { + l.Error("failed to create syncer", zap.Error(err)) + return err + } + + // TODO(jirwin): Should we attempt to retry at all before failing the task? + err = syncer.Sync(ctx) + if err != nil { + l.Error("failed to sync", zap.Error(err)) + + // We don't defer syncer.Close() in order to capture the error without named return values. + if closeErr := syncer.Close(ctx); closeErr != nil { + l.Error("failed to close syncer after sync error", zap.Error(err)) + err = errors.Join(err, closeErr) + } + + return err + } + + if err := syncer.Close(ctx); err != nil { + l.Error("failed to close syncer", zap.Error(err)) + return err + } + + return nil +} + // TODO(morgabra) We should handle task resumption here. The task should contain at least an active sync id so we can // resume syncing if we get restarted or fail to heartbeat temporarily. // TODO(morgabra) Ideally we can tell the difference between a task cancellation and a task failure via the result @@ -42,7 +74,7 @@ func (c *fullSyncTaskHandler) HandleTask(ctx context.Context) error { l := ctxzap.Extract(ctx).With(zap.String("task_id", c.task.GetId()), zap.Stringer("task_type", tasks.GetType(c.task))) l.Info("Handling full sync task.") - assetFile, err := os.CreateTemp("", "baton-sdk-sync-upload") + assetFile, err := os.CreateTemp(c.helpers.TempDir(), "baton-sdk-sync-upload") if err != nil { l.Error("failed to create temp file", zap.Error(err)) return c.helpers.FinishTask(ctx, nil, err) @@ -53,12 +85,6 @@ func (c *fullSyncTaskHandler) HandleTask(ctx context.Context) error { return c.helpers.FinishTask(ctx, nil, err) } - syncer, err := sdkSync.NewSyncer(ctx, c.helpers.ConnectorClient(), sdkSync.WithC1ZPath(c1zPath)) - if err != nil { - l.Error("failed to create syncer", zap.Error(err)) - return c.helpers.FinishTask(ctx, nil, err) - } - // TODO(morgabra) Add annotation for for sync_id, or come up with some other way to track sync state. ctx, err = c.helpers.HeartbeatTask(ctx, nil) if err != nil { @@ -66,19 +92,12 @@ func (c *fullSyncTaskHandler) HandleTask(ctx context.Context) error { return err } - // TODO(jirwin): Should we attempt to retry at all before failing the task? - err = syncer.Sync(ctx) + err = c.sync(ctx, c1zPath) if err != nil { l.Error("failed to sync", zap.Error(err)) return c.helpers.FinishTask(ctx, nil, err) } - err = syncer.Close(ctx) - if err != nil { - l.Error("failed to close syncer", zap.Error(err)) - return c.helpers.FinishTask(ctx, nil, err) - } - c1zF, err := os.Open(c1zPath) if err != nil { l.Error("failed to open sync asset prior to upload", zap.Error(err)) diff --git a/pkg/tasks/c1api/manager.go b/pkg/tasks/c1api/manager.go index 4fbbd3bf..e23529a8 100644 --- a/pkg/tasks/c1api/manager.go +++ b/pkg/tasks/c1api/manager.go @@ -42,6 +42,7 @@ type c1ApiTaskManager struct { started bool queue []*v1.Task serviceClient BatonServiceClient + tempDir string } // getHeartbeatInterval returns an appropriate heartbeat interval. If the interval is 0, it will return the default heartbeat interval. @@ -198,6 +199,7 @@ func (c *c1ApiTaskManager) Process(ctx context.Context, task *v1.Task, cc types. cc: cc, serviceClient: c.serviceClient, taskFinisher: c.finishTask, + tempDir: c.tempDir, } // Based on the task type, call a handler to process the task. @@ -230,7 +232,7 @@ func (c *c1ApiTaskManager) Process(ctx context.Context, task *v1.Task, cc types. return nil } -func NewC1TaskManager(ctx context.Context, clientID string, clientSecret string) (tasks.Manager, error) { +func NewC1TaskManager(ctx context.Context, clientID string, clientSecret string, tempDir string) (tasks.Manager, error) { serviceClient, err := newServiceClient(ctx, clientID, clientSecret) if err != nil { return nil, err @@ -238,5 +240,6 @@ func NewC1TaskManager(ctx context.Context, clientID string, clientSecret string) return &c1ApiTaskManager{ serviceClient: serviceClient, + tempDir: tempDir, }, nil } diff --git a/pkg/tasks/c1api/task_helpers.go b/pkg/tasks/c1api/task_helpers.go index 5f02e210..5f8af76c 100644 --- a/pkg/tasks/c1api/task_helpers.go +++ b/pkg/tasks/c1api/task_helpers.go @@ -20,6 +20,7 @@ type taskHelpers struct { task *v1.Task serviceClient BatonServiceClient cc types.ConnectorClient + tempDir string taskFinisher func(ctx context.Context, task *v1.Task, annos annotations.Annotations, err error) error } @@ -46,6 +47,10 @@ func (t *taskHelpers) HelloClient() batonHelloClient { return t.serviceClient } +func (t *taskHelpers) TempDir() string { + return t.tempDir +} + // HeartbeatTask will call the heartbeat service endpoint for the task until the context is cancelled. An initial heartbeat is // synchronously run, and if successful a goroutine is spawned that will heartbeat periodically respecting the returned heartbeat interval. // If the given context is cancelled, the returned context will be cancelled with the same cause. diff --git a/pkg/tasks/local/syncer.go b/pkg/tasks/local/syncer.go index f2c0f22c..b879b83d 100644 --- a/pkg/tasks/local/syncer.go +++ b/pkg/tasks/local/syncer.go @@ -2,6 +2,7 @@ package local import ( "context" + "errors" "sync" "time" @@ -43,11 +44,13 @@ func (m *localSyncer) Process(ctx context.Context, task *v1.Task, cc types.Conne err = syncer.Sync(ctx) if err != nil { + if closeErr := syncer.Close(ctx); closeErr != nil { + err = errors.Join(err, closeErr) + } return err } - err = syncer.Close(ctx) - if err != nil { + if err := syncer.Close(ctx); err != nil { return err }