Skip to content

Commit

Permalink
consolidate metadata writing
Browse files Browse the repository at this point in the history
  • Loading branch information
discentem committed May 27, 2023
1 parent d697a81 commit e493648
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 44 deletions.
5 changes: 4 additions & 1 deletion internal/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "config",
srcs = ["config.go"],
srcs = [
"config.go",
"options.go",
],
importpath = "github.com/discentem/cavorite/internal/config",
visibility = ["//:__subpackages__"],
deps = [
Expand Down
2 changes: 2 additions & 0 deletions internal/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

const MetadataFileExtension string = "cfile"

var ErrMetadataFileExtensionEmpty = fmt.Errorf("options.MetadatafileExtension cannot be %q", "")

type ObjectMetaData struct {
Name string `json:"name"`
Checksum string `json:"checksum"`
Expand Down
29 changes: 14 additions & 15 deletions internal/stores/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package stores

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"strings"
Expand Down Expand Up @@ -63,6 +61,10 @@ func (s *GCSStore) GetOptions() Options {
return s.Options
}

func (s *GCSStore) GetFsys() afero.Fs {
return s.fsys
}

// Upload generates the metadata, writes it s.fsys and uploads the file to the GCS bucket
func (s *GCSStore) Upload(ctx context.Context, objects ...string) error {
if s.Options.MetadataFileExtension == "" {
Expand All @@ -77,22 +79,10 @@ func (s *GCSStore) Upload(ctx context.Context, objects ...string) error {
}
defer f.Close()

// Generate cavorite metadata
m, err := metadata.GenerateFromFile(f)
if err != nil {
return err
}
logger.V(2).Infof("%s has a checksum of %q", o, m.Checksum)
// convert metadata to json
blob, err := json.MarshalIndent(m, "", " ")
cleanup, err := WriteMetadataToFsys(s, o, f)
if err != nil {
return err
}
// Write metadata to fsys
metadataPath := fmt.Sprintf("%s.%s", o, s.Options.MetadataFileExtension)
if err := afero.WriteFile(s.fsys, metadataPath, blob, 0644); err != nil {
return err
}

// ToDo(natewalck) Expose this timeout as a setting
ctx, cancel := context.WithTimeout(ctx, time.Second*1800)
Expand All @@ -106,10 +96,16 @@ func (s *GCSStore) Upload(ctx context.Context, objects ...string) error {
// Reset to the start of the file because metadata generation has already read it once
_, err = f.Seek(0, io.SeekStart)
if err != nil {
if err := cleanup(); err != nil {
return err
}
return err
}
_, err = io.Copy(wc, f)
if err != nil {
if err := cleanup(); err != nil {
return err
}
logger.V(2).Infof("Failed to upload %s", o)
return err
}
Expand All @@ -119,6 +115,9 @@ func (s *GCSStore) Upload(ctx context.Context, objects ...string) error {
if strings.Contains(err.Error(), "conditionNotMet") {
logger.Infof("%s already exists, skipping...", o)
} else {
if err := cleanup(); err != nil {
return err
}
return err
}
}
Expand Down
45 changes: 17 additions & 28 deletions internal/stores/s3.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package stores

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"strings"
Expand Down Expand Up @@ -130,6 +127,10 @@ func (s *S3Store) GetOptions() Options {
return s.Options
}

func (s *S3Store) GetFsys() afero.Fs {
return s.fsys
}

// TODO(discentem): #34 largely copy-pasted from stores/local/local.go. Can be consolidated
// Upload generates the metadata, writes it to disk and uploads the file to the S3 bucket
func (s *S3Store) Upload(ctx context.Context, objects ...string) error {
Expand All @@ -138,37 +139,19 @@ func (s *S3Store) Upload(ctx context.Context, objects ...string) error {
}

for _, o := range objects {
logger.V(2).Infof("object: %s", o)
f, err := s.fsys.Open(o)
if err != nil {
return err
}
defer f.Close()
// TODO(discentem): probably inefficient, reading entire file into memory
b, err := afero.ReadFile(s.fsys, o)
fi, err := s.fsys.Open(o)
if err != nil {
return err
}

// generate metadata
m, err := metadata.GenerateFromFile(f)
cleanup, err := WriteMetadataToFsys(s, o, fi)
if err != nil {
return err
}
logger.V(2).Infof("%s has a checksum of %q", o, m.Checksum)
// convert metadata to json
blob, err := json.MarshalIndent(m, "", " ")
_, err = fi.Seek(0, io.SeekStart)
if err != nil {
return err
}
// Create path for metadata if it doesn't already exist
if err := s.fsys.MkdirAll(filepath.Dir(filepath.Dir(o)), os.ModePerm); err != nil {
return err
}
// Write metadata to disk
metadataPath := fmt.Sprintf("%s.%s", o, s.Options.MetadataFileExtension)
logger.V(2).Infof("writing metadata to %s", metadataPath)
if err := afero.WriteFile(s.fsys, metadataPath, blob, 0644); err != nil {
if err := cleanup(); err != nil {
return err
}
return err
}

Expand All @@ -177,13 +160,19 @@ func (s *S3Store) Upload(ctx context.Context, objects ...string) error {
obj := s3.PutObjectInput{
Bucket: aws.String(buck),
Key: &o,
Body: bytes.NewReader(b),
Body: fi,
}
out, err := s.s3Uploader.Upload(ctx, &obj)
if err != nil {
if err := cleanup(); err != nil {
return fmt.Errorf("cleanup() failed after Upload failure: %w", err)
}
logger.Error(out)
return err
}
if err := fi.Close(); err != nil {
return err
}
}
return nil
}
Expand Down
43 changes: 43 additions & 0 deletions internal/stores/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package stores

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/discentem/cavorite/internal/metadata"
"github.com/google/logger"
"github.com/spf13/afero"
)

Expand All @@ -26,6 +30,7 @@ type Store interface {
Upload(ctx context.Context, objects ...string) error
Retrieve(ctx context.Context, objects ...string) error
GetOptions() Options
GetFsys() afero.Fs
}

func openOrCreateFile(fsys afero.Fs, filename string) (afero.File, error) {
Expand All @@ -39,3 +44,41 @@ func openOrCreateFile(fsys afero.Fs, filename string) (afero.File, error) {
func inferObjPath(cfilePath string) string {
return strings.TrimSuffix(cfilePath, filepath.Ext(cfilePath))
}

// WriteMetadata generates metadata for
func WriteMetadataToFsys(s Store, fullObjectPath string, f afero.File) (cleanup func() error, err error) {
opts := s.GetOptions()
if opts.MetadataFileExtension == "" {
return nil, metadata.ErrMetadataFileExtensionEmpty
}
fsys := s.GetFsys()
logger.V(2).Infof("object: %s", fullObjectPath)

// generate metadata
m, err := metadata.GenerateFromFile(f)
if err != nil {
return nil, err
}
logger.V(2).Infof("%s has a checksum of %q", fullObjectPath, m.Checksum)
// convert metadata to json
blob, err := json.MarshalIndent(m, "", " ")
if err != nil {
return nil, err
}
// Create path for metadata if it doesn't already exist
if err := fsys.MkdirAll(filepath.Dir(filepath.Dir(fullObjectPath)), os.ModePerm); err != nil {
return nil, err
}
// Write metadata to disk
metadataPath := fmt.Sprintf("%s.%s", fullObjectPath, opts.MetadataFileExtension)
logger.V(2).Infof("writing metadata to %s", metadataPath)
if err := afero.WriteFile(fsys, metadataPath, blob, 0644); err != nil {
return nil, err
}

cleanup = func() error {
return fsys.Remove(metadataPath)
}

return cleanup, nil
}

0 comments on commit e493648

Please sign in to comment.