Skip to content

Commit

Permalink
fix /fields to handle field deletions
Browse files Browse the repository at this point in the history
  • Loading branch information
DocSavage committed Oct 10, 2024
1 parent b7754a3 commit 3a7eb62
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 23 deletions.
10 changes: 5 additions & 5 deletions datatype/neuronjson/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ func (d *Data) getMemDBbyVersion(v dvid.VersionID) (db *memdb, found bool) {
// in-memory neuron annotations with sorted body id list for optional sorted iteration.
type memdb struct {
data map[uint64]NeuronJSON
ids []uint64 // sorted list of body ids
fields map[string]struct{} // list of all fields among the annotations across versions
fieldTimes map[string]string // timestamp of last update for each field in HEAD
ids []uint64 // sorted list of body ids
fields map[string]int64 // list of all fields and their counts for HEAD
fieldTimes map[string]string // timestamp of last update for each field in HEAD
mu sync.RWMutex
}

Expand All @@ -69,7 +69,7 @@ func (d *Data) initMemoryDB(versions []string) error {
for _, versionSpec := range versions {
mdb := &memdb{
data: make(map[uint64]NeuronJSON),
fields: make(map[string]struct{}),
fields: make(map[string]int64),
fieldTimes: make(map[string]string),
ids: []uint64{},
}
Expand Down Expand Up @@ -195,7 +195,7 @@ func (mdb *memdb) addAnnotation(bodyid uint64, annotation NeuronJSON) {
mdb.data[bodyid] = annotation
mdb.ids = append(mdb.ids, bodyid)
for field := range annotation {
mdb.fields[field] = struct{}{}
mdb.fields[field]++
}
}

Expand Down
89 changes: 71 additions & 18 deletions datatype/neuronjson/neuronjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/gob"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math"
"net/http"
Expand Down Expand Up @@ -210,12 +211,16 @@ GET <api URL>/node/<UUID>/<data name>/keys
[key1, key2, ...]
GET <api URL>/node/<UUID>/<data name>/fields
GET <api URL>/node/<UUID>/<data name>/fields[?counts=true]
Returns all field names in annotations for the most recent version:
By default, returns all field names in annotations for the given version:
["field1", "field2", ...]
If the query string "counts=true" is set, the response will be a JSON object:
{"field1": count1, "field2": count2, ...}
GET <api URL>/node/<UUID>/<data name>/fieldtimes
Returns the RFC3339 timestamps for each field in the most recent version:
Expand Down Expand Up @@ -986,6 +991,35 @@ func (d *Data) processStoreRange(ctx storage.Context, f func(key string, value N
return err
}

// return the field counts for all annotations of a particular version
func (d *Data) getFieldCounts(ctx storage.Context) (fields map[string]int64, err error) {
var db storage.OrderedKeyValueDB
if db, err = datastore.GetOrderedKeyValueDB(d); err != nil {
return
}
first := storage.MinTKey(keyAnnotation)
last := storage.MaxTKey(keyAnnotation)
fields = make(map[string]int64)
err = db.ProcessRange(ctx, first, last, &storage.ChunkOp{}, func(c *storage.Chunk) error {
if c == nil || c.TKeyValue == nil {
return nil
}
kv := c.TKeyValue
if kv.V == nil {
return nil
}
var value NeuronJSON
if err := value.UnmarshalJSON(kv.V); err != nil {
return err
}
for field := range value {
fields[field]++
}
return nil
})
return
}

// Initialize loads mutable properties of the neuronjson data instance,
// which in this case is the in-memory neuron json map for the specified versions.
func (d *Data) Initialize() {
Expand Down Expand Up @@ -1179,20 +1213,18 @@ func (d *Data) GetKeys(ctx storage.VersionedCtx) (out []string, err error) {
return out, nil
}

func (d *Data) GetFields(ctx storage.VersionedCtx) ([]string, error) {
func (d *Data) GetFieldCounts(ctx storage.VersionedCtx) (map[string]int64, error) {
mdb, found := d.getMemDBbyVersion(ctx.VersionID())
if !found {
return nil, fmt.Errorf("unable to get fields because no in-memory db for neuronjson %q, version %d", d.DataName(), ctx.VersionID())
}
mdb.mu.RLock()
fields := make([]string, len(mdb.fields))
i := 0
for field := range mdb.fields {
fields[i] = field
i++
if found {
mdb.mu.RLock()
fields := make(map[string]int64, len(mdb.fields))
for field, count := range mdb.fields {
fields[field] = count
}
mdb.mu.RUnlock()
return fields, nil
}
mdb.mu.RUnlock()
return fields, nil
return d.getFieldCounts(ctx)
}

func (d *Data) GetFieldTimes(ctx storage.VersionedCtx) (map[string]string, error) {
Expand Down Expand Up @@ -1440,8 +1472,11 @@ func (d *Data) storeAndUpdate(ctx *datastore.VersionedCtx, keyStr string, newDat
mdb.data[bodyid] = newData

// cache updated field and field timestamps
for field := range origData {
mdb.fields[field]--
}
for field := range newData {
mdb.fields[field] = struct{}{}
mdb.fields[field]++
if strings.HasSuffix(field, "_time") {
rootField := field[:len(field)-5]
mdb.fieldTimes[rootField] = newData[field].(string)
Expand Down Expand Up @@ -1568,6 +1603,9 @@ func (d *Data) DeleteData(ctx storage.VersionedCtx, keyStr string) error {
mdb.mu.Lock()
_, found := mdb.data[bodyid]
if found {
for field := range mdb.data[bodyid] {
mdb.fields[field]--
}
delete(mdb.data, bodyid)
mdb.deleteBodyID(bodyid)
}
Expand Down Expand Up @@ -2202,12 +2240,27 @@ func (d *Data) ServeHTTP(uuid dvid.UUID, ctx *datastore.VersionedCtx, w http.Res
comment = "HTTP GET keys"

case "fields":
fieldList, err := d.GetFields(ctx)
returnCounts := r.URL.Query().Get("counts") == "true"
fieldCount, err := d.GetFieldCounts(ctx)
if err != nil {
server.BadRequest(w, r, err)
return
}
jsonBytes, err := json.Marshal(fieldList)
var result interface{}
if returnCounts {
result = fieldCount
} else {
fields := make([]string, len(fieldCount))
i := 0
for field, count := range fieldCount {
if count > 0 {
fields[i] = field
}
i++
}
result = fields
}
jsonBytes, err := json.Marshal(result)
if err != nil {
server.BadRequest(w, r, err)
return
Expand Down Expand Up @@ -2354,7 +2407,7 @@ func (d *Data) ServeHTTP(uuid dvid.UUID, ctx *datastore.VersionedCtx, w http.Res
comment = fmt.Sprintf("HTTP DELETE data with key %q of neuronjson %q (%s)", keyStr, d.DataName(), url)

case "post":
data, err := ioutil.ReadAll(r.Body)
data, err := io.ReadAll(r.Body)
if err != nil {
server.BadRequest(w, r, err)
return
Expand Down
74 changes: 74 additions & 0 deletions datatype/neuronjson/neuronjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,80 @@ var testData2 = []struct {
{"40000", `{"position": [151, 251, 301], "bodyid": 40000, "soma_side": "LHS", "baz": "some string"}`},
}

func TestFieldNames(t *testing.T) {
if err := server.OpenTest(); err != nil {
t.Fatalf("can't open test server: %v\n", err)
}
defer server.CloseTest()

uuid, _ := initTestRepo()
payload := bytes.NewBufferString(`{"typename": "neuronjson", "dataname": "neurons"}`)
apiStr := fmt.Sprintf("%srepo/%s/instance", server.WebAPIPath, uuid)
server.TestHTTP(t, "POST", apiStr, payload)

for _, td := range testData {
keyreq := fmt.Sprintf("%snode/%s/neurons/key/%s?u=tester", server.WebAPIPath, uuid, td.key)
server.TestHTTP(t, "POST", keyreq, strings.NewReader(td.val))
}

// Test fields request
fieldsReq := fmt.Sprintf("%snode/%s/neurons/fields", server.WebAPIPath, uuid)
returnValue := server.TestHTTP(t, "GET", fieldsReq, nil)

expectedFields := []string{"bodyid", "a number", "position", "baz", "bar", "somefield", "a list", "soma_side"}
var fields []string
if err := json.Unmarshal(returnValue, &fields); err != nil {
t.Fatalf("Unable to parse return from /fields request: %s\nError: %v\n", string(returnValue), err)
}
expectedNumFields := (len(expectedFields)-1)*3 + 1 // fields have _user and _time except bodyid
if len(fields) != expectedNumFields {
t.Fatalf("Expected %d fields, got %d\n", len(expectedFields), len(fields))
}
for _, field := range expectedFields {
found := false
for _, f := range fields {
if f == field {
found = true
break
}
}
if !found {
t.Errorf("Expected field %q not found in fields request\n", field)
}
}

// If we repost without the only instance of a field, it should no longer be in the fields request
postReq := fmt.Sprintf("%snode/%s/neurons/key/4000?u=changer&replace=true", server.WebAPIPath, uuid)
server.TestHTTP(t, "POST", postReq,
strings.NewReader(`{"bodyid": 4000, "position": [151, 251, 301], "baz": "some string"}`),
)

returnValue = server.TestHTTP(t, "GET", fieldsReq, nil)
if err := json.Unmarshal(returnValue, &fields); err != nil {
t.Fatalf("Unable to parse return from /fields request: %s\nError: %v\n", string(returnValue), err)
}
for _, field := range fields {
if field == "soma_side" {
t.Errorf("Found deleted field soma_side in fields request\n")
}
}

// If we delete an annotation, it should no longer contribute to the fields request.
// In this case, we delete the only "bar" field.
delReq := fmt.Sprintf("%snode/%s/neurons/key/2000?u=changer", server.WebAPIPath, uuid)
server.TestHTTP(t, "DELETE", delReq, nil)

returnValue = server.TestHTTP(t, "GET", fieldsReq, nil)
if err := json.Unmarshal(returnValue, &fields); err != nil {
t.Fatalf("Unable to parse return from /fields request: %s\nError: %v\n", string(returnValue), err)
}
for _, field := range fields {
if field == "bar" {
t.Errorf("Found deleted field bar in fields request: %v\n", fields)
}
}
}

func TestStressConcurrentRW(t *testing.T) {
if err := server.OpenTest(); err != nil {
t.Fatalf("can't open test server: %v\n", err)
Expand Down

0 comments on commit 3a7eb62

Please sign in to comment.