Skip to content

Commit

Permalink
Use tempdir setting in service mode. Clean up closing syncers to ensu…
Browse files Browse the repository at this point in the history
…re that we write partial c1zs back correctly.
  • Loading branch information
jirwin committed Oct 14, 2023
1 parent 4fe08aa commit 459aa9f
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 25 deletions.
15 changes: 8 additions & 7 deletions pkg/connectorrunner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ func (c *connectorRunner) Run(ctx context.Context) error {
return err
}

err = c.Close(ctx)
if err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -185,7 +180,13 @@ func (c *connectorRunner) run(ctx context.Context) error {
}

func (c *connectorRunner) Close(ctx context.Context) error {
return nil
var retErr error

if err := c.cw.Close(); err != nil {
retErr = errors.Join(retErr, err)
}

return retErr
}

type Option func(ctx context.Context, cfg *runnerConfig) error
Expand Down Expand Up @@ -405,7 +406,7 @@ func NewConnectorRunner(ctx context.Context, c types.ConnectorServer, opts ...Op
return runner, nil
}

tm, err := c1api.NewC1TaskManager(ctx, cfg.clientID, cfg.clientSecret)
tm, err := c1api.NewC1TaskManager(ctx, cfg.clientID, cfg.clientSecret, cfg.tempDir)
if err != nil {
return nil, err
}
Expand Down
49 changes: 34 additions & 15 deletions pkg/tasks/c1api/full_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package c1api

import (
"context"
"errors"
"io"
"os"

Expand All @@ -20,13 +21,44 @@ type fullSyncHelpers interface {
Upload(ctx context.Context, r io.ReadSeeker) error
FinishTask(ctx context.Context, annos annotations.Annotations, err error) error
HeartbeatTask(ctx context.Context, annos annotations.Annotations) (context.Context, error)
TempDir() string
}

type fullSyncTaskHandler struct {
task *v1.Task
helpers fullSyncHelpers
}

func (c *fullSyncTaskHandler) sync(ctx context.Context, c1zPath string) error {
l := ctxzap.Extract(ctx).With(zap.String("task_id", c.task.GetId()), zap.Stringer("task_type", tasks.GetType(c.task)))
syncer, err := sdkSync.NewSyncer(ctx, c.helpers.ConnectorClient(), sdkSync.WithC1ZPath(c1zPath), sdkSync.WithTmpDir(c.helpers.TempDir()))
if err != nil {
l.Error("failed to create syncer", zap.Error(err))
return err
}

// TODO(jirwin): Should we attempt to retry at all before failing the task?
err = syncer.Sync(ctx)
if err != nil {
l.Error("failed to sync", zap.Error(err))

// We don't defer syncer.Close() in order to capture the error without named return values.
if closeErr := syncer.Close(ctx); closeErr != nil {
l.Error("failed to close syncer after sync error", zap.Error(err))
err = errors.Join(err, closeErr)
}

return err
}

if err := syncer.Close(ctx); err != nil {
l.Error("failed to close syncer", zap.Error(err))
return err
}

return nil
}

// TODO(morgabra) We should handle task resumption here. The task should contain at least an active sync id so we can
// resume syncing if we get restarted or fail to heartbeat temporarily.
// TODO(morgabra) Ideally we can tell the difference between a task cancellation and a task failure via the result
Expand All @@ -42,7 +74,7 @@ func (c *fullSyncTaskHandler) HandleTask(ctx context.Context) error {
l := ctxzap.Extract(ctx).With(zap.String("task_id", c.task.GetId()), zap.Stringer("task_type", tasks.GetType(c.task)))
l.Info("Handling full sync task.")

assetFile, err := os.CreateTemp("", "baton-sdk-sync-upload")
assetFile, err := os.CreateTemp(c.helpers.TempDir(), "baton-sdk-sync-upload")
if err != nil {
l.Error("failed to create temp file", zap.Error(err))
return c.helpers.FinishTask(ctx, nil, err)
Expand All @@ -53,32 +85,19 @@ func (c *fullSyncTaskHandler) HandleTask(ctx context.Context) error {
return c.helpers.FinishTask(ctx, nil, err)
}

syncer, err := sdkSync.NewSyncer(ctx, c.helpers.ConnectorClient(), sdkSync.WithC1ZPath(c1zPath))
if err != nil {
l.Error("failed to create syncer", zap.Error(err))
return c.helpers.FinishTask(ctx, nil, err)
}

// TODO(morgabra) Add annotation for for sync_id, or come up with some other way to track sync state.
ctx, err = c.helpers.HeartbeatTask(ctx, nil)
if err != nil {
l.Error("failed to heartbeat task", zap.Error(err))
return err
}

// TODO(jirwin): Should we attempt to retry at all before failing the task?
err = syncer.Sync(ctx)
err = c.sync(ctx, c1zPath)
if err != nil {
l.Error("failed to sync", zap.Error(err))
return c.helpers.FinishTask(ctx, nil, err)
}

err = syncer.Close(ctx)
if err != nil {
l.Error("failed to close syncer", zap.Error(err))
return c.helpers.FinishTask(ctx, nil, err)
}

c1zF, err := os.Open(c1zPath)
if err != nil {
l.Error("failed to open sync asset prior to upload", zap.Error(err))
Expand Down
5 changes: 4 additions & 1 deletion pkg/tasks/c1api/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type c1ApiTaskManager struct {
started bool
queue []*v1.Task
serviceClient BatonServiceClient
tempDir string
}

// getHeartbeatInterval returns an appropriate heartbeat interval. If the interval is 0, it will return the default heartbeat interval.
Expand Down Expand Up @@ -198,6 +199,7 @@ func (c *c1ApiTaskManager) Process(ctx context.Context, task *v1.Task, cc types.
cc: cc,
serviceClient: c.serviceClient,
taskFinisher: c.finishTask,
tempDir: c.tempDir,
}

// Based on the task type, call a handler to process the task.
Expand Down Expand Up @@ -230,13 +232,14 @@ func (c *c1ApiTaskManager) Process(ctx context.Context, task *v1.Task, cc types.
return nil
}

func NewC1TaskManager(ctx context.Context, clientID string, clientSecret string) (tasks.Manager, error) {
func NewC1TaskManager(ctx context.Context, clientID string, clientSecret string, tempDir string) (tasks.Manager, error) {
serviceClient, err := newServiceClient(ctx, clientID, clientSecret)
if err != nil {
return nil, err
}

return &c1ApiTaskManager{
serviceClient: serviceClient,
tempDir: tempDir,
}, nil
}
5 changes: 5 additions & 0 deletions pkg/tasks/c1api/task_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type taskHelpers struct {
task *v1.Task
serviceClient BatonServiceClient
cc types.ConnectorClient
tempDir string

taskFinisher func(ctx context.Context, task *v1.Task, annos annotations.Annotations, err error) error
}
Expand All @@ -46,6 +47,10 @@ func (t *taskHelpers) HelloClient() batonHelloClient {
return t.serviceClient
}

func (t *taskHelpers) TempDir() string {
return t.tempDir
}

// HeartbeatTask will call the heartbeat service endpoint for the task until the context is cancelled. An initial heartbeat is
// synchronously run, and if successful a goroutine is spawned that will heartbeat periodically respecting the returned heartbeat interval.
// If the given context is cancelled, the returned context will be cancelled with the same cause.
Expand Down
7 changes: 5 additions & 2 deletions pkg/tasks/local/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package local

import (
"context"
"errors"
"sync"
"time"

Expand Down Expand Up @@ -43,11 +44,13 @@ func (m *localSyncer) Process(ctx context.Context, task *v1.Task, cc types.Conne

err = syncer.Sync(ctx)
if err != nil {
if closeErr := syncer.Close(ctx); closeErr != nil {
err = errors.Join(err, closeErr)
}
return err
}

err = syncer.Close(ctx)
if err != nil {
if err := syncer.Close(ctx); err != nil {
return err
}

Expand Down

0 comments on commit 459aa9f

Please sign in to comment.