Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First pass at adding support for blob time-to-live. #1

Merged
merged 7 commits into from
Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func BuildRouter(db core.MetadataDatabase, store core.BlobStore, logRequests boo
r.Post("/data", handler.CreateBlob)
r.Get("/", handler.SearchBlobs)
r.Get("/data/latest", handler.GetLatestBlobData)
r.Get("/{combined-id}", handler.MakeBlobEndpoint(handler.BlobMetadataResponse))
r.Get("/{combined-id}/data", handler.MakeBlobEndpoint(handler.BlobDataResponse))
r.Get("/{combined-id}", handler.MakeBlobEndpoint(handler.BlobMetadataResponse, 0 * time.Second))
r.Get("/{combined-id}/data", handler.MakeBlobEndpoint(handler.BlobDataResponse, 30 * time.Minute))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we add the expiration datetime to the response headers at the data endpoint

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course! I don't know why I forgot that.

})
})

Expand Down Expand Up @@ -152,6 +152,9 @@ func CreateBlobInfo(r *http.Request, blob *core.BlobInfo) map[string]interface{}

info := make(map[string]interface{})
info["lastModified"] = blob.CreatedAt.UTC().Format(time.RFC3339Nano)
if blob.ExpiresAt != nil {
info["expires"] = blob.ExpiresAt.UTC().Format(time.RFC3339Nano)
}

info["subject"] = blob.Key.Subject
if blob.Tags.ContentType != nil {
Expand Down
27 changes: 21 additions & 6 deletions api/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
log "github.com/sirupsen/logrus"
"net/http"
"regexp"
"time"
)

const (
Expand All @@ -26,6 +27,7 @@ var (
tagNameRegex, _ = regexp.Compile(`(^[a-zA-Z][a-zA-Z0-9_\-]{0,63}$)|$null`)
commonTagValidator = CombineTagValidators(ValidateTagName, ValidateGenericTagValues)
systemTagValidator = CombineTagValidators(commonTagValidator, ValidateOnlyOneTag)
ttlTagValidator = CombineTagValidators(ValidateTimeToLive, systemTagValidator)
subjectTagValidator = CombineTagValidators(ValidateSubjectTagValue, systemTagValidator)
)

Expand Down Expand Up @@ -80,7 +82,7 @@ func (handler *Handler) CreateBlob(w http.ResponseWriter, r *http.Request) {
if err := handler.store.SaveBlob(r.Context(), r.Body, key); err != nil {
log.Errorf("Failed to save blob: %v", err)

err = handler.db.RevertStagedBlobMetadata(r.Context(), key)
err = handler.db.DeleteBlobMetadata(r.Context(), key)
if err != nil {
log.Errorf("Failed to revert staged blob metadata: %v", err)
}
Expand Down Expand Up @@ -138,6 +140,17 @@ func ValidateSubjectTagValue(tagName string, tagValues []string) error {
return nil
}

func ValidateTimeToLive(tagName string, tagValues []string) error {

for _, t := range tagValues {
if _, err := time.ParseDuration(t); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should validate that the the ttl is not negative

return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we add the parameter name to the error. Right now, we return this message:
"message": "time: invalid duration \"never\""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, good errors are worth their weight in gold.

}
}

return nil
}

func CombineTagValidators(validators ...TagValidator) TagValidator {
return func(tagName string, tagValues []string) error {
for _, v := range validators {
Expand All @@ -150,8 +163,8 @@ func CombineTagValidators(validators ...TagValidator) TagValidator {
}
}

func ValidateAndStoreOptionalSystemTag(tagName string, tagValues []string, field **string) error {
if err := systemTagValidator(tagName, tagValues); err != nil {
func ValidateAndStoreOptionalSystemTag(tagName string, tagValues []string, field **string, validator TagValidator) error {
if err := validator(tagName, tagValues); err != nil {
return err
}

Expand All @@ -162,11 +175,13 @@ func ValidateAndStoreOptionalSystemTag(tagName string, tagValues []string, field
func ValidateAndStoreTag(tags *core.BlobTags, tagName string, tagValues []string) error {
switch tagName {
case "device":
return ValidateAndStoreOptionalSystemTag(tagName, tagValues, &tags.Device)
return ValidateAndStoreOptionalSystemTag(tagName, tagValues, &tags.Device, systemTagValidator)
case "name":
return ValidateAndStoreOptionalSystemTag(tagName, tagValues, &tags.Name)
return ValidateAndStoreOptionalSystemTag(tagName, tagValues, &tags.Name, systemTagValidator)
case "session":
return ValidateAndStoreOptionalSystemTag(tagName, tagValues, &tags.Session)
return ValidateAndStoreOptionalSystemTag(tagName, tagValues, &tags.Session, systemTagValidator)
case "ttl":
return ValidateAndStoreOptionalSystemTag(tagName, tagValues, &tags.TimeToLive, ttlTagValidator)
default:
if err := commonTagValidator(tagName, tagValues); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion api/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestStorageWriteFailureRevertsStagedMetadata(t *testing.T) {
Return(nil, nil)

mockMetadataDatabase.EXPECT().
RevertStagedBlobMetadata(gomock.Any(), gomock.Any()).
DeleteBlobMetadata(gomock.Any(), gomock.Any()).
Return(nil)

mockBlobStore.EXPECT().
Expand Down
5 changes: 3 additions & 2 deletions api/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"errors"
"net/http"
"time"

"github.com/go-chi/chi/v5"
"github.com/ismrmrd/mrd-storage-server/core"
Expand All @@ -12,7 +13,7 @@ import (

type Responder func(http.ResponseWriter, *http.Request, *core.BlobInfo)

func (handler *Handler) MakeBlobEndpoint(responder Responder) http.HandlerFunc {
func (handler *Handler) MakeBlobEndpoint(responder Responder, grace time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {

combinedId := chi.URLParam(r, "combined-id")
Expand All @@ -22,7 +23,7 @@ func (handler *Handler) MakeBlobEndpoint(responder Responder) http.HandlerFunc {
return
}

blobInfo, err := handler.db.GetBlobMetadata(r.Context(), key)
blobInfo, err := handler.db.GetBlobMetadata(r.Context(), key, time.Now().Add(-grace))
if err != nil {
if errors.Is(err, core.ErrRecordNotFound) {
w.WriteHeader(http.StatusNotFound)
Expand Down
4 changes: 2 additions & 2 deletions api/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (handler *Handler) SearchBlobs(w http.ResponseWriter, r *http.Request) {
return
}

results, ct, err := handler.db.SearchBlobMetadata(r.Context(), query, at, ct, pageSize)
results, ct, err := handler.db.SearchBlobMetadata(r.Context(), query, at, ct, pageSize, time.Now())

if err != nil {
if errors.Is(err, core.ErrInvalidContinuationToken) {
Expand Down Expand Up @@ -60,7 +60,7 @@ func (handler *Handler) GetLatestBlobData(w http.ResponseWriter, r *http.Request
return
}

results, _, err := handler.db.SearchBlobMetadata(r.Context(), query, at, nil, 1)
results, _, err := handler.db.SearchBlobMetadata(r.Context(), query, at, nil, 1, time.Now())

if err != nil {
log.Errorf("Failed to search blobs in DB: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions core/garbage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
// records in the medatada database.
func CollectGarbage(ctx context.Context, db MetadataDatabase, store BlobStore, olderThan time.Time) error {
for {
expiredKeys, err := db.GetPageOfExpiredStagedBlobMetadata(ctx, olderThan)
expiredKeys, err := db.GetPageOfExpiredBlobMetadata(ctx, olderThan)
if err != nil {
return err
}
Expand All @@ -39,7 +39,7 @@ func processExpiredKey(ctx context.Context, db MetadataDatabase, store BlobStore
return err
}

if err := db.RevertStagedBlobMetadata(ctx, key); err != nil && !errors.Is(err, ErrStagedRecordNotFound) {
if err := db.DeleteBlobMetadata(ctx, key); err != nil && !errors.Is(err, ErrBlobNotFound) {
return err
}

Expand Down
10 changes: 5 additions & 5 deletions core/garbage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/stretchr/testify/assert"
)

// Ensure garbage collection completes even when RevertStagedBlobMetadata
// returns ErrStagedRecordNotFound, which suggests that another instance
// Ensure garbage collection completes even when DeleteBlobMetadata
// returns ErrBlobNotFound, which suggests that another instance
// is performing garbage collection at the same time.
func TestConcurrentGarbageCollection(t *testing.T) {
mockCtrl := gomock.NewController(t)
Expand All @@ -24,13 +24,13 @@ func TestConcurrentGarbageCollection(t *testing.T) {
key := core.BlobKey{Subject: "s", Id: uuid.UUID{}}

db.EXPECT().
GetPageOfExpiredStagedBlobMetadata(gomock.Any(), gomock.Any()).
GetPageOfExpiredBlobMetadata(gomock.Any(), gomock.Any()).
Return([]core.BlobKey{key}, nil)

db.EXPECT().RevertStagedBlobMetadata(gomock.Any(), key).Return(core.ErrStagedRecordNotFound)
db.EXPECT().DeleteBlobMetadata(gomock.Any(), key).Return(core.ErrBlobNotFound)

db.EXPECT().
GetPageOfExpiredStagedBlobMetadata(gomock.Any(), gomock.Any()).
GetPageOfExpiredBlobMetadata(gomock.Any(), gomock.Any()).
Return([]core.BlobKey{}, nil)

store.EXPECT().DeleteBlob(gomock.Any(), key)
Expand Down
16 changes: 9 additions & 7 deletions core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ type BlobTags struct {
Device *string
Session *string
ContentType *string
TimeToLive *string
CustomTags map[string][]string
}

type BlobInfo struct {
Key BlobKey
CreatedAt time.Time
Tags BlobTags
Key BlobKey
Tags BlobTags
CreatedAt time.Time
ExpiresAt *time.Time
}

type ContinutationToken string
Expand All @@ -47,10 +49,10 @@ func UnixTimeMsToTime(timeValueMs int64) time.Time {
type MetadataDatabase interface {
StageBlobMetadata(ctx context.Context, key BlobKey, tags *BlobTags) (*BlobInfo, error)
CompleteStagedBlobMetadata(ctx context.Context, key BlobKey) error
RevertStagedBlobMetadata(ctx context.Context, key BlobKey) error
GetPageOfExpiredStagedBlobMetadata(ctx context.Context, olderThan time.Time) ([]BlobKey, error)
GetBlobMetadata(ctx context.Context, key BlobKey) (*BlobInfo, error)
SearchBlobMetadata(ctx context.Context, tags map[string][]string, at *time.Time, ct *ContinutationToken, pageSize int) ([]BlobInfo, *ContinutationToken, error)
DeleteBlobMetadata(ctx context.Context, key BlobKey) error
GetPageOfExpiredBlobMetadata(ctx context.Context, olderThan time.Time) ([]BlobKey, error)
GetBlobMetadata(ctx context.Context, key BlobKey, expiresAfter time.Time) (*BlobInfo, error)
SearchBlobMetadata(ctx context.Context, tags map[string][]string, at *time.Time, ct *ContinutationToken, pageSize int, expiresAfter time.Time) ([]BlobInfo, *ContinutationToken, error)
}

type BlobStore interface {
Expand Down
Loading