Skip to content

Commit

Permalink
reflow: lazy object fetching and bottom-up evaluation
Browse files Browse the repository at this point in the history
Summary:
This change introduces two changes to Reflow's evaluator:

(1) Lazy object fetching. Objects are now fetched from cache by
need, instead of at lookup time. This means that fetches are also
precise: we never retrieve from cache more than is required for the
job at hand. It is required to do bottom-up evaluation (efficiently),
but introduces some additional complexities, namely that there is
a race between cache lookup and object retrieval.  If an object is
removed after lookup but before retrieval, evaluation is stuck. In
this case, we opt to terminate evaluation with a restartable error.
This should be very rare in practice, and we can recover from it
safely in this manner [1].

(2) Bottom-up evaluation. We add a mode to the evaluator where it
performs bottom-up evaluation only. This effectively replays the
entire computation (but cached steps are skipped). It simplifies
reasoning about execution, and makes each execution transcript
self-contained.  Bottom-up evaluation also admits different digesting
schemes. For example, we could store execs also by their physical
keys (since these are always known in bottom-up mode).
The main hazards of bottom-up mode are: (1) it reduces our ability
to handle huge, incremental compute graphs since we have to explore
the whole graph for every evaluation; (2) it has to recompute results
on partial cache misses.

[1] Ideally we'd be able to recompute the missing object, however,
the exec that computed it may not actually be deterministic, at
which point we'd be stuck, and would instead have to invalidate the
whole path from the dependee to the dependent flows. The simplest
way of doing this is anyway a full restart. Future evaluation changes
may allow for these dependencies to be expressed through "symbolic"
values instead of concrete ones, in which case we'd be able to
recompute just the required object. However this would entail a great
deal of complexity that may not be worthwhile.

[2] Here's an example transcript from a bottom-up evaluation using
a real pipeline:

```
% reflow0.5bu1 run -eval=bottomup main.rf [redacted]
2017/12/12 21:42:49 run name: [email protected]/8c3cc468
2017/12/12 21:43:03 (<-) wgs.Score.model dca28714 ok   intern 0s 21.6MiB
2017/12/12 21:43:03 (<-) main.splitFlowcell 9909853c ok   intern 0s 23.1GiB
2017/12/12 21:43:03 (<-) wgs.alignmentGenome 8e635b5b ok   intern 0s 11.5GiB
2017/12/12 21:43:03 (<-) align.align.aligned fa304c07 ok     exec 0s 3.3GiB
2017/12/12 21:43:03 (<-) align.align.aligned 211cc684 ok     exec 0s 3.3GiB
2017/12/12 21:43:03 (<-) cnv.Bins     1dd306c7 ok     exec 0s 3.6MiB
2017/12/12 21:43:03 (<-) align.align.aligned 1e5acadc ok     exec 0s 3.3GiB
2017/12/12 21:43:03 (<-) align.align.aligned c7099eb8 ok     exec 0s 3.3GiB
2017/12/12 21:43:03 (<-) align.align.aligned 34ab0835 ok     exec 0s 3.2GiB
2017/12/12 21:43:03 (<-) align.align.aligned 24dec3f4 ok     exec 0s 3.3GiB
2017/12/12 21:43:03 (<-) bam.Sort     db8553be ok     exec 0s 2.8GiB
2017/12/12 21:43:03 (<-) align.align.aligned e16e2821 ok     exec 0s 3.3GiB
2017/12/12 21:43:03 (<-) bam.Sort     bf1847ec ok     exec 0s 2.8GiB
2017/12/12 21:43:03 (<-) bam.Sort     1091b5a0 ok     exec 0s 2.9GiB
2017/12/12 21:43:03 (<-) bam.Sort     4a2575cf ok     exec 0s 2.8GiB
2017/12/12 21:43:03 (<-) bam.Sort     81ba0741 ok     exec 0s 2.8GiB
2017/12/12 21:43:03 (<-) bam.Sort     02d4245b ok     exec 0s 2.8GiB
2017/12/12 21:43:03 (<-) align.align.aligned bf825cfc ok     exec 0s 3.3GiB
2017/12/12 21:43:03 (<-) bam.Sort     f7ddbf71 ok     exec 0s 2.8GiB
2017/12/12 21:43:03 (<-) bam.Sort     32360181 ok     exec 0s 2.9GiB
2017/12/12 21:43:03 (<-) bam.Merge    635b21cb ok     exec 0s 19.4GiB
2017/12/12 21:43:03 (<-) bam.MarkDuplicates.out d9882213 ok     exec 0s 19.5GiB
2017/12/12 21:43:03 (<-) bam.FilterDedupAndMapQ 53687918 ok     exec 0s 14.8GiB
2017/12/12 21:43:03 (<-) cnv.BinCounts f38c1773 ok     exec 0s 1.8MiB
2017/12/12 21:43:04 (<-) cnv.ScoreSample cf271177 ok     exec 0s 481.8KiB
2017/12/12 21:43:04 (<-) qc.samtoolsStats fefbcb86 ok     exec 0s 90.7KiB
2017/12/12 21:43:04 (<-) qc.samtoolsStats e5904373 ok     exec 0s 93.9KiB
2017/12/12 21:43:04 (<-) cnv.BinCounts 49519f66 ok     exec 0s 1.8MiB
2017/12/12 21:43:04 (<-) qc.samtoolsStats bce7027d ok     exec 0s 94.0KiB
2017/12/12 21:43:04 (<-) bam.Index    87f20eef ok     exec 0s 8.8MiB
2017/12/12 21:43:04 (<-) bam.Index    7ea10e99 ok     exec 0s 8.3MiB
2017/12/12 21:43:04 (<-) wgs.dbSNP    4e8599ae ok   intern 0s 17.4GiB
2017/12/12 21:43:04 (<-) qc.parseSamtoolsStats 1b01597e ok     exec 0s 1.8MiB
2017/12/12 21:43:04 (<-) qc.parseSamtoolsStats d99419bf ok     exec 0s 1.9MiB
2017/12/12 21:43:04 (<-) qc.parseSamtoolsStats 5b611fea ok     exec 0s 1.9MiB
2017/12/12 21:43:04 (<-) snp.tsvs     88bae92e ok     exec 0s 645.9MiB
2017/12/12 21:43:04 (<-) snp.tsvs     fb99b95c ok     exec 0s 647.1MiB
2017/12/12 21:43:04 (<-) snp.cat      de85e290 ok     exec 0s 645.9MiB
2017/12/12 21:43:04 (<-) snp.cat      b0b20e68 ok     exec 0s 647.1MiB
2017/12/12 21:43:04 (<-) main.Main    40d7f8af ok   extern 0s 0B
2017/12/12 21:43:04 total n=40 time=1s
        ident                  n   ncache transfer runtime(m) cpu mem(GiB) disk(GiB) tmp(GiB)
        bam.FilterDedupAndMapQ 1   1      0B
        bam.Index              2   2      0B
        qc.parseSamtoolsStats  3   3      0B
        cnv.ScoreSample        1   1      0B
        cnv.Bins               1   1      0B
        wgs.dbSNP              1   1      0B
        align.align.aligned    8   8      0B
        cnv.BinCounts          2   2      0B
        qc.samtoolsStats       3   3      0B
        wgs.alignmentGenome    1   1      0B
        main.splitFlowcell     1   1      0B
        main.Main              1   1      0B
        snp.cat                2   2      0B
        bam.Merge              1   1      0B
        snp.tsvs               2   2      0B
        bam.MarkDuplicates.out 1   1      0B
        bam.Sort               8   8      0B
        wgs.Score.model        1   1      0B

val<>
```

and after changing snp.cat:

```
% reflow0.5bu1 run -eval=bottomup main.rf [redacted]
2017/12/12 21:45:16 run name: [email protected]/03498998
2017/12/12 21:45:18 (<-) wgs.Score.model dca28714 ok   intern 0s 21.6MiB
2017/12/12 21:45:18 (<-) wgs.alignmentGenome 8e635b5b ok   intern 0s 11.5GiB
2017/12/12 21:45:19 (<-) cnv.Bins     1dd306c7 ok     exec 0s 3.6MiB
2017/12/12 21:45:19 (<-) main.splitFlowcell 9909853c ok   intern 0s 23.1GiB
2017/12/12 21:45:19 (<-) align.align.aligned 1e5acadc ok     exec 0s 3.3GiB
2017/12/12 21:45:19 (<-) align.align.aligned 211cc684 ok     exec 0s 3.3GiB
2017/12/12 21:45:19 (<-) align.align.aligned c7099eb8 ok     exec 0s 3.3GiB
2017/12/12 21:45:19 (<-) align.align.aligned fa304c07 ok     exec 0s 3.3GiB
2017/12/12 21:45:19 (<-) align.align.aligned 34ab0835 ok     exec 0s 3.2GiB
2017/12/12 21:45:19 (<-) align.align.aligned 24dec3f4 ok     exec 0s 3.3GiB
2017/12/12 21:45:19 (<-) align.align.aligned e16e2821 ok     exec 0s 3.3GiB
2017/12/12 21:45:19 (<-) align.align.aligned bf825cfc ok     exec 0s 3.3GiB
2017/12/12 21:45:19 (<-) bam.Sort     1091b5a0 ok     exec 0s 2.9GiB
2017/12/12 21:45:19 (<-) bam.Sort     81ba0741 ok     exec 0s 2.8GiB
2017/12/12 21:45:19 (<-) bam.Sort     02d4245b ok     exec 0s 2.8GiB
2017/12/12 21:45:19 (<-) bam.Sort     bf1847ec ok     exec 0s 2.8GiB
2017/12/12 21:45:19 (<-) bam.Sort     4a2575cf ok     exec 0s 2.8GiB
2017/12/12 21:45:19 (<-) bam.Sort     f7ddbf71 ok     exec 0s 2.8GiB
2017/12/12 21:45:19 (<-) bam.Sort     32360181 ok     exec 0s 2.9GiB
2017/12/12 21:45:19 (<-) bam.Sort     db8553be ok     exec 0s 2.8GiB
2017/12/12 21:45:19 (<-) bam.Merge    635b21cb ok     exec 0s 19.4GiB
2017/12/12 21:45:19 (<-) bam.MarkDuplicates.out d9882213 ok     exec 0s 19.5GiB
2017/12/12 21:45:19 (<-) bam.FilterDedupAndMapQ 53687918 ok     exec 0s 14.8GiB
2017/12/12 21:45:19 (<-) cnv.BinCounts f38c1773 ok     exec 0s 1.8MiB
2017/12/12 21:45:20 (<-) cnv.ScoreSample cf271177 ok     exec 0s 481.8KiB
2017/12/12 21:45:20 (<-) cnv.BinCounts 49519f66 ok     exec 0s 1.8MiB
2017/12/12 21:45:20 (<-) bam.Index    7ea10e99 ok     exec 0s 8.3MiB
2017/12/12 21:45:20 (<-) wgs.dbSNP    4e8599ae ok   intern 0s 17.4GiB
2017/12/12 21:45:20 (<-) qc.samtoolsStats bce7027d ok     exec 0s 94.0KiB
2017/12/12 21:45:20 (<-) bam.Index    87f20eef ok     exec 0s 8.8MiB
2017/12/12 21:45:20 (<-) qc.samtoolsStats e5904373 ok     exec 0s 93.9KiB
2017/12/12 21:45:20 (<-) qc.parseSamtoolsStats 5b611fea ok     exec 0s 1.9MiB
2017/12/12 21:45:20 (<-) snp.tsvs     88bae92e ok     exec 0s 645.9MiB
2017/12/12 21:45:20 (<-) snp.tsvs     fb99b95c ok     exec 0s 647.1MiB
2017/12/12 21:45:20 (<-) qc.parseSamtoolsStats d99419bf ok     exec 0s 1.9MiB
2017/12/12 21:45:21  ->  snp.cat      72c26af1 xfer   exec 645.9MiB
2017/12/12 21:45:21  ->  snp.cat      6a32f7ba xfer   exec 647.1MiB
2017/12/12 21:45:21 (<-) qc.samtoolsStats fefbcb86 ok     exec 0s 90.7KiB
2017/12/12 21:45:21 (<-) qc.parseSamtoolsStats 1b01597e ok     exec 0s 1.8MiB
2017/12/12 21:45:22  ->  snp.cat      72c26af1 run    exec ..cbd59de34d17b1bc6160b5679ebc5ef9b17abc snps={{snps}}.cat ${snps}/_ch..000001_None.bed >> {{merged}}
2017/12/12 21:45:22  ->  snp.cat      6a32f7ba run    exec ..cbd59de34d17b1bc6160b5679ebc5ef9b17abc snps={{snps}}.cat ${snps}/_ch..000001_None.bed >> {{merged}}
2017/12/12 21:45:25  <-  snp.cat      72c26af1 ok     exec 0s 645.9MiB
2017/12/12 21:45:26  <-  snp.cat      6a32f7ba ok     exec 0s 647.1MiB
2017/12/12 21:45:26  ->  main.Main    432e2d40 xfer extern 55.0GiB
2017/12/12 21:47:20  ->  main.Main    432e2d40 run  extern s3://grail-marius/WGS/test-2017-06-27/ 74.4GiB
2017/12/12 21:59:01  <-  main.Main    432e2d40 ok   extern 11m40s 0B
2017/12/12 21:59:01 total n=40 time=13m43s
        ident                  n   ncache transfer runtime(m) cpu         mem(GiB)    disk(GiB)   tmp(GiB)
        snp.tsvs               2   2      0B
        bam.Sort               8   8      0B
        wgs.dbSNP              1   1      0B
        bam.Index              2   2      0B
        main.Main              1   0      55.0GiB
        qc.samtoolsStats       3   3      0B
        qc.parseSamtoolsStats  3   3      0B
        cnv.BinCounts          2   2      0B
        main.splitFlowcell     1   1      0B
        wgs.alignmentGenome    1   1      0B
        align.align.aligned    8   8      0B
        bam.Merge              1   1      0B
        cnv.ScoreSample        1   1      0B
        cnv.Bins               1   1      0B
        wgs.Score.model        1   1      0B
        snp.cat                2   0      1.3GiB   0/0/0      0.0/0.0/0.0 0.0/0.0/0.0 0.3/0.4/0.6 0.0/0.0/0.0
        bam.FilterDedupAndMapQ 1   1      0B
        bam.MarkDuplicates.out 1   1      0B

val<>
```

Notice here in particular that reflow needs only transfer the precise
objects required to recompute snp.cat; and that even after this,
55GiB must be transfered in order to perform the extern. This hints
at a future optimization: we can direct transfer externs, or skip
objects that match checksums altogether.

Reviewers: pgopal, sbagaria

Reviewed By: pgopal

Differential Revision: https://phabricator.grailbio.com/D8713

fbshipit-source-id: 7f51bc2
  • Loading branch information
mariusae authored and grailbot committed Dec 18, 2017
1 parent e9e6d90 commit 2093b25
Show file tree
Hide file tree
Showing 16 changed files with 445 additions and 136 deletions.
17 changes: 16 additions & 1 deletion cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,27 @@ type Cache interface {
// Lookup returns the value associated with a (digest) key.
// Lookup returns an error flagged errors.NotExist when there
// is no such value.
//
// Lookup should also check to make sure that the objects
// actually exist, and provide a reasonable guarantee that they'll
// be available for transfer.
//
// TODO(marius): allow the caller to maintain a lease on the desired
// objects so that garbage collection can (safely) be run
// concurrently with flows. This isn't a correctness concern (the
// flows may be restarted), but rather one of efficiency.
Lookup(context.Context, digest.Digest) (Fileset, error)

// Transfer transmits the file objects associated with value v
// (usually retrieved by Lookup) to the repository dst.
// (usually retrieved by Lookup) to the repository dst. Transfer
// should be used in place of direct (cache) repository access since
// it may apply additional policies (e.g., rate limiting, etc.)
Transfer(ctx context.Context, dst Repository, v Fileset) error

// NeedTransfer returns the set of files in the Fileset v that are absent
// in the provided repository.
NeedTransfer(ctx context.Context, dst Repository, v Fileset) ([]File, error)

// Write stores the Value v, whose file objects exist in Repository repo,
// under the key id.
Write(ctx context.Context, id digest.Digest, v Fileset, repo Repository) error
Expand Down
29 changes: 28 additions & 1 deletion cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (
"context"
"encoding/json"

"github.com/grailbio/base/data"
"github.com/grailbio/base/digest"
"github.com/grailbio/base/limiter"
"github.com/grailbio/reflow"
"github.com/grailbio/reflow/errors"
"github.com/grailbio/reflow/repository"
)

// An Assoc is an associative array mapping digests to other digests.
Expand Down Expand Up @@ -79,7 +82,9 @@ func (c *Cache) Write(ctx context.Context, id digest.Digest, v reflow.Fileset, r

// Lookup returns the value associated with a (digest) key. Lookup
// returns an error flagged errors.NotExist when there is no such
// value.
// value. Lookup also checks that the objects are available in the
// cache's repository; an errors.NotExist error is returned if any
// object is missing.
func (c *Cache) Lookup(ctx context.Context, id digest.Digest) (reflow.Fileset, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -100,6 +105,23 @@ func (c *Cache) Lookup(ctx context.Context, id digest.Digest) (reflow.Fileset, e
var v reflow.Fileset
err = json.NewDecoder(rc).Decode(&v)
rc.Close()
if err != nil {
return reflow.Fileset{}, err
}
// Also check that the objects exist.
missing, err := repository.Missing(ctx, c.Repository, v.Files()...)
if err != nil {
return reflow.Fileset{}, err
}
if len(missing) > 0 {
var total int64
for _, file := range missing {
total += file.Size
}
return reflow.Fileset{}, errors.E(
errors.NotExist, "cache.Lookup",
errors.Errorf("missing %d files (%s)", len(missing), data.Size(total)))
}
return v, err
}

Expand All @@ -113,3 +135,8 @@ func (c *Cache) Delete(ctx context.Context, id digest.Digest) error {
func (c *Cache) Transfer(ctx context.Context, dst reflow.Repository, v reflow.Fileset) error {
return c.Transferer.Transfer(ctx, dst, c.Repository, v.Files()...)
}

// NeedTransfer returns the file objects in v that are missing from repository dst.
func (c *Cache) NeedTransfer(ctx context.Context, dst reflow.Repository, v reflow.Fileset) ([]reflow.File, error) {
return c.Transferer.NeedTransfer(ctx, dst, v.Files()...)
}
4 changes: 4 additions & 0 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (transferer) Transfer(ctx context.Context, dst, src reflow.Repository, file
return nil
}

func (transferer) NeedTransfer(ctx context.Context, dst reflow.Repository, files ...reflow.File) ([]reflow.File, error) {
return repository.Missing(ctx, dst, files...)
}

func TestCache(t *testing.T) {
ctx := context.Background()
cache := &Cache{
Expand Down
Loading

0 comments on commit 2093b25

Please sign in to comment.