Skip to content

Commit

Permalink
Add more filesystemWatcher tests
Browse files Browse the repository at this point in the history
  • Loading branch information
djjuhasz committed Mar 1, 2024
1 parent edf47a7 commit de41b2f
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 27 deletions.
10 changes: 7 additions & 3 deletions internal/watcher/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import (
"github.com/fsnotify/fsnotify"
cp "github.com/otiai10/copy"
"gocloud.dev/blob"
"gocloud.dev/blob/fileblob"
_ "gocloud.dev/blob/fileblob"

"github.com/artefactual-sdps/enduro/internal/bucket"
"github.com/artefactual-sdps/enduro/internal/filenotify"
"github.com/artefactual-sdps/enduro/internal/fsutil"
)
Expand Down Expand Up @@ -156,8 +157,10 @@ func (w *filesystemWatcher) Path() string {
return w.path
}

func (w *filesystemWatcher) OpenBucket(context.Context) (*blob.Bucket, error) {
return fileblob.OpenBucket(w.path, nil)
func (w *filesystemWatcher) OpenBucket(ctx context.Context) (*blob.Bucket, error) {
return bucket.Open(ctx, &bucket.Config{
URL: fmt.Sprintf("file://%s", w.path),
})
}

func (w *filesystemWatcher) RemoveAll(key string) error {
Expand All @@ -179,6 +182,7 @@ func (w *filesystemWatcher) Dispose(key string) error {
// of a directory or file.
func (w *filesystemWatcher) Download(ctx context.Context, dest, key string) error {
src := filepath.Clean(filepath.Join(w.path, key))
dest = filepath.Clean(filepath.Join(dest, key))
if err := cp.Copy(src, dest); err != nil {
return fmt.Errorf("filesystem watcher: download: %v", err)

Check warning on line 187 in internal/watcher/filesystem.go

View check run for this annotation

Codecov / codecov/patch

internal/watcher/filesystem.go#L187

Added line #L187 was not covered by tests
}
Expand Down
195 changes: 171 additions & 24 deletions internal/watcher/filesystem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"testing"
"time"

Expand All @@ -14,38 +15,184 @@ import (
"github.com/artefactual-sdps/enduro/internal/watcher"
)

type file struct {
name string
contents []byte
}

func TestFileSystemWatcher(t *testing.T) {
t.Run("Polling watcher returns a file event", func(t *testing.T) {
td := fs.NewDir(t, "enduro-test-fs-watcher")
ctx := context.Background()
td := fs.NewDir(t, "enduro-test-fs-watcher")
type test struct {
name string
config *watcher.FilesystemConfig
file file
want *watcher.BlobEvent
}
for _, tt := range []test{
{
name: "Polling watcher returns a blob event",
config: &watcher.FilesystemConfig{
Name: "filesystem",
Path: t.TempDir(),
PollInterval: time.Millisecond * 5,
},
file: file{name: "test.txt"},
want: &watcher.BlobEvent{Key: "test.txt"},
},
{
name: "Inotify watcher returns a blob event",
config: &watcher.FilesystemConfig{
Name: "filesystem",
Path: t.TempDir(),
Inotify: true,
},
file: file{name: "test.txt"},
want: &watcher.BlobEvent{Key: "test.txt"},
},
} {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

w, err := watcher.NewFilesystemWatcher(ctx, tt.config)
assert.NilError(t, err)

check := func(t poll.LogT) poll.Result {
got, _, err := w.Watch(ctx)
if err != nil {
return poll.Error(fmt.Errorf("watcher error: %w", err))
}
if got.Key != tt.want.Key || got.IsDir != tt.want.IsDir {
return poll.Error(fmt.Errorf(
"expected: *watcher.BlobEvent(Key: %q, IsDir: %t); got: *watcher.BlobEvent(Key: %q, IsDir: %t)",
tt.want.Key, tt.want.IsDir, got.Key, got.IsDir,
))
}

return poll.Success()
}

if err = os.WriteFile(
filepath.Join(tt.config.Path, tt.file.name),
tt.file.contents,
0o600,
); err != nil {
t.Fatalf("Couldn't create text.txt in %q", td.Path())
}

poll.WaitOn(t, check, poll.WithTimeout(time.Millisecond*15))
})
}

t.Run("Path returns the watcher path", func(t *testing.T) {
t.Parallel()

td := t.TempDir()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

w, err := watcher.NewFilesystemWatcher(ctx, &watcher.FilesystemConfig{
Name: "filesystem",
Path: td,
})
assert.NilError(t, err)
assert.Equal(t, w.Path(), td)
})

t.Run("OpenBucket returns a bucket", func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

w, err := watcher.NewFilesystemWatcher(ctx, &watcher.FilesystemConfig{
Name: "filesystem",
Path: t.TempDir(),
})
assert.NilError(t, err)

b, err := w.OpenBucket(ctx)
assert.NilError(t, err)
assert.Equal(t, fmt.Sprintf("%T", b), "*blob.Bucket")
b.Close()
})

t.Run("RemoveAll deletes a directory", func(t *testing.T) {
t.Parallel()

td := fs.NewDir(t, "enduro-test-fswatcher",
fs.WithDir("transfer", fs.WithFile("test.txt", "A test file.")),
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

w, err := watcher.NewFilesystemWatcher(ctx, &watcher.FilesystemConfig{
Name: "filesystem",
Path: td.Path(),
})
assert.NilError(t, err)

err = w.RemoveAll("transfer")
assert.NilError(t, err)
assert.Assert(t, fs.Equal(w.Path(), fs.Expected(t)))
})

t.Run("Dispose moves transfer to CompletedDir", func(t *testing.T) {
t.Parallel()

src := fs.NewDir(t, "enduro-test-fswatcher",
fs.WithDir("transfer", fs.WithFile("test.txt", "A test file.")),
)
dest := fs.NewDir(t, "enduro-test-fswatcher")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

w, err := watcher.NewFilesystemWatcher(ctx, &watcher.FilesystemConfig{
Name: "filesystem",
Path: td.Path(),
PollInterval: time.Millisecond * 5,
Path: src.Path(),
CompletedDir: dest.Path(),
})
assert.NilError(t, err)

check := func(t poll.LogT) poll.Result {
event, _, err := w.Watch(ctx)
if err != nil {
return poll.Error(fmt.Errorf("watcher error: %w", err))
}
if event.Key != "test.txt" || event.IsDir {
return poll.Error(
fmt.Errorf(
"unexpected event (Key %s, IsDir %T)",
event.Key, event.IsDir,
),
)
}
err = w.Dispose("transfer")
assert.NilError(t, err)
assert.Assert(t, fs.Equal(dest.Path(), fs.Expected(t,
fs.WithDir("transfer", fs.WithFile("test.txt", "A test file.")),
)))
})

return poll.Success()
}
t.Run("Download copies a directory", func(t *testing.T) {
t.Parallel()

if err = os.WriteFile(td.Join("test.txt"), []byte("testing"), 0o600); err != nil {
t.Fatalf("Couldn't create text.txt in %q", td.Path())
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

poll.WaitOn(t, check, poll.WithTimeout(time.Millisecond*10))
src := fs.NewDir(t, "enduro-test-fswatcher",
fs.WithDir("transfer",
fs.WithFile("test.txt", "A test file."),
fs.WithFile("test2", "Another test file."),
),
)
dest := fs.NewDir(t, "enduro-test-fswatcher")

w, err := watcher.NewFilesystemWatcher(ctx, &watcher.FilesystemConfig{
Name: "filesystem",
Path: src.Path(),
Inotify: true,
})
assert.NilError(t, err)

err = w.Download(context.Background(), dest.Path(), "transfer")
assert.NilError(t, err)
assert.Assert(t, fs.Equal(dest.Path(), fs.Expected(t, fs.WithMode(0o700),
fs.WithDir("transfer", fs.WithMode(0o755),
fs.WithFile("test.txt", "A test file.", fs.WithMode(0o644)),
fs.WithFile("test2", "Another test file.", fs.WithMode(0o644)),
),
)))
})
}

0 comments on commit de41b2f

Please sign in to comment.