Skip to content

Commit

Permalink
add /existing-labels fast return of all valid bodies
Browse files Browse the repository at this point in the history
  • Loading branch information
DocSavage committed Feb 15, 2024
1 parent e450d0a commit d0b9648
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 1 deletion.
13 changes: 13 additions & 0 deletions datatype/labelmap/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,19 @@ func (d *Data) handleIndex(ctx *datastore.VersionedCtx, w http.ResponseWriter, r
timedLog.Infof("HTTP %s index for label %d (%s)", r.Method, label, r.URL)
}

func (d *Data) handleExistingLabels(ctx *datastore.VersionedCtx, w http.ResponseWriter, r *http.Request) {
// GET <api URL>/node/<UUID>/<data name>/existing-labels

if strings.ToLower(r.Method) != "get" {
server.BadRequest(w, r, "only GET action allowed for /existing-labels endpoint")
return
}

if err := d.writeExistingIndices(ctx, w, nil); err != nil {
dvid.Errorf("unable to write existing labels: %v", err)
}
}

func (d *Data) handleListLabels(ctx *datastore.VersionedCtx, w http.ResponseWriter, r *http.Request) {
// GET <api URL>/node/<UUID>/<data name>/listlabels
const MaxReturnedLabels = 10000000
Expand Down
56 changes: 56 additions & 0 deletions datatype/labelmap/labelidx.go
Original file line number Diff line number Diff line change
Expand Up @@ -1765,6 +1765,62 @@ func (d *Data) indexThread(f *os.File, mu *sync.Mutex, wg *sync.WaitGroup, chunk
}
}

// scan all label indices for the presence of its key, writing existing key's body id
// into a stream and/or setting it in an internal map.
func (d *Data) writeExistingIndices(ctx *datastore.VersionedCtx, w http.ResponseWriter, bodymap map[uint64]struct{}) error {
timedLog := dvid.NewTimeLog()

store, err := datastore.GetOrderedKeyValueDB(d)
if err != nil {
return fmt.Errorf("problem getting store for data %q: %v", d.DataName(), err)
}

minTKey := NewLabelIndexTKey(0)
maxTKey := NewLabelIndexTKey(math.MaxUint64)
keyChan := make(storage.KeyChan)
go func() {
store.SendKeysInRange(ctx, minTKey, maxTKey, keyChan)
close(keyChan)
}()

if w != nil {
w.Header().Set("Content-type", "application/json")
fmt.Fprintf(w, "[")
}

var numExist uint64
for key := range keyChan {
if key != nil {
tkey, err := storage.TKeyFromKey(key)
if err != nil {
dvid.Errorf("Unable to get TKey from key %v for data %q: %v\n", key, d.DataName(), err)
continue
}
label, err := DecodeLabelIndexTKey(tkey)
if err != nil {
dvid.Errorf("Couldn't decode label index key %v for data %q\n", key, d.DataName())
continue
}
if bodymap != nil {
bodymap[label] = struct{}{}
}
if w != nil {
if numExist > 0 {
fmt.Fprintf(w, ",")
}
fmt.Fprintf(w, "%d", label)
}
numExist++
}
}
if w != nil {
fmt.Fprintf(w, "]")
}

timedLog.Infof("Finished writing %d existing label indices.", numExist)
return nil
}

// scan all label indices in this labelmap instance, writing Blocks data into a given file
func (d *Data) writeIndices(f *os.File, outPath string, v dvid.VersionID) {
timedLog := dvid.NewTimeLog()
Expand Down
12 changes: 12 additions & 0 deletions datatype/labelmap/labelmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -1484,6 +1484,15 @@ POST <api URL>/node/<UUID>/<data name>/index/<label>
a 404 not found is returned.
GET <api URL>/node/<UUID>/<data name>/existing-labels
Streams JSON list of all existing labels in the most computationally efficient manner,
particularly when backed by a Badger key-value store or other backend that has fast
key scans.
Note that because the data is streamed over HTTP, an error code cannot be sent once data is
in flight.
GET <api URL>/node/<UUID>/<data name>/listlabels[?queryopts]
Streams labels and optionally their voxel counts in numerical order up to a maximum
Expand Down Expand Up @@ -2886,6 +2895,9 @@ func (d *Data) ServeHTTP(uuid dvid.UUID, ctx *datastore.VersionedCtx, w http.Res
case "labels":
d.handleLabels(ctx, w, r)

case "existing-labels":
d.handleExistingLabels(ctx, w, r)

case "listlabels":
d.handleListLabels(ctx, w, r)

Expand Down
9 changes: 8 additions & 1 deletion datatype/labelmap/labelmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,12 +1261,19 @@ func TestMultiscaleIngest(t *testing.T) {
server.TestBadHTTP(t, "GET", reqStr, nil)

reqStr = fmt.Sprintf("%snode/%s/labels/sizes", server.WebAPIPath, uuid)
bodystr := "[1, 2, 3, 13, 209, 311]"
bodystr := "[1,2,3,13,209,311]"
r := server.TestHTTP(t, "GET", reqStr, bytes.NewBufferString(bodystr))
if string(r) != "[64000,64000,0,64000,64000,64000]" {
t.Errorf("bad batch sizes result. got: %s\n", string(r))
}

// Verify the existing label streaming endpoint.
reqStr = fmt.Sprintf("%snode/%s/labels/existing-labels", server.WebAPIPath, uuid)
r = server.TestHTTP(t, "GET", reqStr, nil)
if string(r) != "[1,2,13,209,311]" {
t.Errorf("expected [1,2,13,209,311] from /existing-labels but got %s\n", string(r))
}

// Verify the label streaming endpoint.
reqStr = fmt.Sprintf("%snode/%s/labels/listlabels", server.WebAPIPath, uuid)
r = server.TestHTTP(t, "GET", reqStr, nil)
Expand Down
7 changes: 7 additions & 0 deletions datatype/labelmap/mutate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2248,6 +2248,13 @@ func TestMergeCleave(t *testing.T) {
t.Errorf("Expected supervoxel 4 original voxels to remain. Instead some were removed.\n")
}

// Verify using existing label streaming endpoint.
reqStr = fmt.Sprintf("%snode/%s/labels/existing-labels", server.WebAPIPath, uuid)
r = server.TestHTTP(t, "GET", reqStr, nil)
if string(r) != "[1,2,4]" {
t.Errorf("expected [1,2,4] from /existing-labels but got %s\n", string(r))
}

// Cleave supervoxel 3 out of label 4.
reqStr = fmt.Sprintf("%snode/%s/labels/cleave/4?u=mrsmith&app=myapp", server.WebAPIPath, uuid)
r = server.TestHTTP(t, "POST", reqStr, bytes.NewBufferString("[3]"))
Expand Down

0 comments on commit d0b9648

Please sign in to comment.