Skip to content

Commit

Permalink
DynamoDB Nested Values Bug (#4570)
Browse files Browse the repository at this point in the history
* Add tests to ExerciseBackend to expose nested-values bug

* Update DynamoDB physical backend Delete and hasChildren logic to prevent overzealous cleanup of folders and values
  • Loading branch information
MattSurabian authored and jefferai committed May 16, 2018
1 parent 305a45e commit 1886e45
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 10 deletions.
64 changes: 54 additions & 10 deletions physical/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,15 +340,33 @@ func (d *DynamoDBBackend) Delete(ctx context.Context, key string) error {
},
}}

// clean up now empty 'folders'
// Clean up empty "folders" by looping through all levels of the path to the item being deleted looking for
// children. Loop from deepest path to shallowest, and only consider items children if they are not going to be
// deleted by our batch delete request. If a path has no valid children, then it should be considered an empty
// "folder" and be deleted along with the original item in our batch job. Because we loop from deepest path to
// shallowest, once we find a path level that contains valid children we can stop the cleanup operation.
prefixes := physical.Prefixes(key)
sort.Sort(sort.Reverse(sort.StringSlice(prefixes)))
for _, prefix := range prefixes {
hasChildren, err := d.hasChildren(prefix)
for index, prefix := range prefixes {
// Because delete batches its requests, we need to pass keys we know are going to be deleted into
// hasChildren so it can exclude those when it determines if there WILL be any children left after
// the delete operations have completed.
var excluded []string
if index == 0 {
// This is the value we know for sure is being deleted
excluded = append(excluded, recordKeyForVaultKey(key))
} else {
// The previous path doesn't count as a child, since if we're still looping, we've found no children
excluded = append(excluded, recordKeyForVaultKey(prefixes[index-1]))
}

hasChildren, err := d.hasChildren(prefix, excluded)
if err != nil {
return err
}

if !hasChildren {
// If there are no children other than ones we know are being deleted then cleanup empty "folder" pointers
requests = append(requests, &dynamodb.WriteRequest{
DeleteRequest: &dynamodb.DeleteRequest{
Key: map[string]*dynamodb.AttributeValue{
Expand All @@ -357,6 +375,12 @@ func (d *DynamoDBBackend) Delete(ctx context.Context, key string) error {
},
},
})
} else {
// This loop starts at the deepest path and works backwards looking for children
// once a deeper level of the path has been found to have children there is no
// more cleanup that needs to happen, otherwise we might remove folder pointers
// to that deeper path making it "undiscoverable" with the list operation
break
}
}

Expand Down Expand Up @@ -406,9 +430,12 @@ func (d *DynamoDBBackend) List(ctx context.Context, prefix string) ([]string, er
}

// hasChildren returns true if there exist items below a certain path prefix.
// To do so, the method fetches such items from DynamoDB. If there are more
// than one item (which is the "directory" item), there are children.
func (d *DynamoDBBackend) hasChildren(prefix string) (bool, error) {
// To do so, the method fetches such items from DynamoDB. This method is primarily
// used by Delete. Because DynamoDB requests are batched this method is being called
// before any deletes take place. To account for that hasChildren accepts a slice of
// strings representing values we expect to find that should NOT be counted as children
// because they are going to be deleted.
func (d *DynamoDBBackend) hasChildren(prefix string, exclude []string) (bool, error) {
prefix = strings.TrimSuffix(prefix, "/")
prefix = escapeEmptyPath(prefix)

Expand All @@ -424,9 +451,10 @@ func (d *DynamoDBBackend) hasChildren(prefix string) (bool, error) {
},
},
// Avoid fetching too many items from DynamoDB for performance reasons.
// We need at least two because one is the directory item, all others
// are children.
Limit: aws.Int64(2),
// We want to know if there are any children we don't expect to see.
// Answering that question requires fetching a minimum of one more item
// than the number we expect. In most cases this value will be 2
Limit: aws.Int64(int64(len(exclude) + 1)),
}

d.permitPool.Acquire()
Expand All @@ -436,7 +464,23 @@ func (d *DynamoDBBackend) hasChildren(prefix string) (bool, error) {
if err != nil {
return false, err
}
return len(out.Items) > 1, nil
var childrenExist bool
for _, item := range out.Items {
for _, excluded := range exclude {
// Check if we've found an item we didn't expect to. Look for "folder" pointer keys (trailing slash)
// and regular value keys (no trailing slash)
if *item["Key"].S != excluded && *item["Key"].S != fmt.Sprintf("%s/", excluded) {
childrenExist = true
break
}
}
if childrenExist {
// We only need to find ONE child we didn't expect to.
break
}
}

return childrenExist, nil
}

// LockWith is used for mutual exclusion based on the given key.
Expand Down
48 changes: 48 additions & 0 deletions physical/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,54 @@ func ExerciseBackend(t testing.TB, b Backend) {
if len(keys) != 0 {
t.Errorf("should be empty at end: %v", keys)
}

// When the root path is empty, adding and removing deep nested values should not break listing
e = &Entry{Key: "foo/nested1/nested2/value1", Value: []byte("baz")}
err = b.Put(context.Background(), e)
if err != nil {
t.Fatalf("deep nest: %v", err)
}

e = &Entry{Key: "foo/nested1/nested2/value2", Value: []byte("baz")}
err = b.Put(context.Background(), e)
if err != nil {
t.Fatalf("deep nest: %v", err)
}

err = b.Delete(context.Background(), "foo/nested1/nested2/value2")
if err != nil {
t.Fatalf("failed to remove deep nest: %v", err)
}

keys, err = b.List(context.Background(), "")
if err != nil {
t.Fatalf("listing of root failed after deletion: %v", err)
}
if len(keys) == 0 {
t.Errorf("root is returning empty after deleting a single nested value, expected nested1/: %v", keys)
keys, err = b.List(context.Background(), "foo/nested1")
if err != nil {
t.Fatalf("listing of expected nested path 'foo/nested1' failed: %v", err)
}
// prove that the root should not be empty and that foo/nested1 exists
if len(keys) != 0 {
t.Logf(" keys can still be listed from nested1/ so it's not empty, expected nested2/: %v", keys)
}
}

// cleanup left over listing bug test value
err = b.Delete(context.Background(), "foo/nested1/nested2/value1")
if err != nil {
t.Fatalf("failed to remove deep nest: %v", err)
}

keys, err = b.List(context.Background(), "")
if err != nil {
t.Fatalf("listing of root failed after delete of deep nest: %v", err)
}
if len(keys) != 0 {
t.Errorf("should be empty at end: %v", keys)
}
}

func ExerciseBackend_ListPrefix(t testing.TB, b Backend) {
Expand Down

0 comments on commit 1886e45

Please sign in to comment.