Skip to content

Commit

Permalink
support async datastores
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Dec 5, 2019
1 parent 00ccc6b commit 5e38739
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 8 deletions.
34 changes: 33 additions & 1 deletion core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
blockservice "github.com/ipfs/go-blockservice"
cid "github.com/ipfs/go-cid"
cidutil "github.com/ipfs/go-cidutil"
filestore "github.com/ipfs/go-filestore"
bstore "github.com/ipfs/go-ipfs-blockstore"
files "github.com/ipfs/go-ipfs-files"
ipld "github.com/ipfs/go-ipld-format"
Expand Down Expand Up @@ -96,7 +97,29 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
bserv := blockservice.New(addblockstore, exch) // hash security 001
dserv := dag.NewDAGService(bserv)

fileAdder, err := coreunix.NewAdder(ctx, pinning, addblockstore, dserv)
// add a sync call to the DagService
// this ensures that data written to the DagService is persisted to the underlying datastore
// TODO: propagate the Sync function from the datastore through the blockstore, blockservice and dagservice
var syncDserv *syncDagService
if settings.OnlyHash {
syncDserv = &syncDagService{
DAGService: dserv,
syncFn: func(c cid.Cid) error { return nil },
}
} else {
syncDserv = &syncDagService{
DAGService: dserv,
syncFn: func(c cid.Cid) error {
ds := api.repo.Datastore()
if err := ds.Sync(bstore.BlockPrefix.ChildString(c.String())); err != nil {
return err
}
return ds.Sync(filestore.FilestorePrefix)
},
}
}

fileAdder, err := coreunix.NewAdder(ctx, pinning, addblockstore, syncDserv)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -272,3 +295,12 @@ func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, set
func (api *UnixfsAPI) core() *CoreAPI {
return (*CoreAPI)(api)
}

type syncDagService struct {
ipld.DAGService
syncFn func(c cid.Cid) error
}

func (s *syncDagService) Sync(c cid.Cid) error {
return s.syncFn(c)
}
14 changes: 12 additions & 2 deletions core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,13 @@ type Link struct {
Size uint64
}

type SyncDAGService interface {
ipld.DAGService
Sync(c cid.Cid) error
}

// NewAdder Returns a new Adder used for a file add operation.
func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCLocker, ds ipld.DAGService) (*Adder, error) {
func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCLocker, ds SyncDAGService) (*Adder, error) {
bufferedDS := ipld.NewBufferedDAG(ctx, ds)

return &Adder{
Expand All @@ -60,7 +65,7 @@ type Adder struct {
ctx context.Context
pinning pin.Pinner
gcLocker bstore.GCLocker
dagService ipld.DAGService
dagService SyncDAGService
bufferedDS *ipld.BufferedDAG
Out chan<- interface{}
Progress bool
Expand Down Expand Up @@ -316,6 +321,11 @@ func (adder *Adder) AddAllAndPin(file files.Node) (ipld.Node, error) {
return nil, err
}

err = adder.dagService.Sync(nd.Cid())
if err != nil {
return nil, err
}

if !adder.Pin {
return nd, nil
}
Expand Down
30 changes: 27 additions & 3 deletions core/node/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-filestore"
"github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-exchange-interface"
"github.com/ipfs/go-ipfs-exchange-offline"
Expand Down Expand Up @@ -41,18 +42,33 @@ func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interf
// Pinning creates new pinner which tells GC which blocks should be kept
func Pinning(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo) (pin.Pinner, error) {
internalDag := merkledag.NewDAGService(blockservice.New(bstore, offline.Exchange(bstore)))
pinning, err := pin.LoadPinner(repo.Datastore(), ds, internalDag)
rootDS := repo.Datastore()

syncFn := func() error { return rootDS.Sync(blockstore.BlockPrefix) }
syncDs := &syncDagService{ds, syncFn}
syncInternalDag := &syncDagService{internalDag, syncFn}

pinning, err := pin.LoadPinner(rootDS, syncDs, syncInternalDag)
if err != nil {
// TODO: we should move towards only running 'NewPinner' explicitly on
// node init instead of implicitly here as a result of the pinner keys
// not being found in the datastore.
// this is kinda sketchy and could cause data loss
pinning = pin.NewPinner(repo.Datastore(), ds, internalDag)
pinning = pin.NewPinner(rootDS, syncDs, syncInternalDag)
}

return pinning, nil
}

type syncDagService struct {
format.DAGService
syncFn func() error
}

func (s *syncDagService) Sync() error {
return s.syncFn()
}

// Dag creates new DAGService
func Dag(bs blockservice.BlockService) format.DAGService {
return merkledag.NewDAGService(bs)
Expand All @@ -77,7 +93,15 @@ func OnlineExchange(provide bool) interface{} {
func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, error) {
dsk := datastore.NewKey("/local/filesroot")
pf := func(ctx context.Context, c cid.Cid) error {
return repo.Datastore().Put(dsk, c.Bytes())
rootDS := repo.Datastore()
if err := rootDS.Sync(blockstore.BlockPrefix.ChildString(c.String())); err != nil {
return err
}
if err := rootDS.Sync(filestore.FilestorePrefix); err != nil {
return err
}

return rootDS.Put(dsk, c.Bytes())
}

var nd *merkledag.ProtoNode
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.4
github.com/ipfs/go-ipfs-pinner v0.0.2
github.com/ipfs/go-ipfs-pinner v0.0.3-0.20191205171545-5901eab20ba3
github.com/ipfs/go-ipfs-posinfo v0.0.1
github.com/ipfs/go-ipfs-provider v0.3.0
github.com/ipfs/go-ipfs-routing v0.1.0
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ github.com/davidlazar/go-crypto v0.0.0-20170701192655-dcfb0a7ac018/go.mod h1:rQY
github.com/dgraph-io/badger v1.5.5-0.20190226225317-8115aed38f8f/go.mod h1:VZxzAIRPHRVNRKRo6AXrX9BJegn6il06VMTZVJYCIjQ=
github.com/dgraph-io/badger v1.6.0-rc1 h1:JphPpoBZJ3WHha133BGYlQqltSGIhV+VsEID0++nN9A=
github.com/dgraph-io/badger v1.6.0-rc1/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo=
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
Expand Down Expand Up @@ -172,19 +173,23 @@ github.com/ipfs/go-datastore v0.1.0/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAK
github.com/ipfs/go-datastore v0.1.1 h1:F4k0TkTAZGLFzBOrVKDAvch6JZtuN4NHkfdcEZL50aI=
github.com/ipfs/go-datastore v0.1.1/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw=
github.com/ipfs/go-datastore v0.2.0/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw=
github.com/ipfs/go-datastore v0.3.0 h1:9au0tYi/+n7xeUnGHG6davnS8x9hWbOzP/388Vx3CMs=
github.com/ipfs/go-datastore v0.3.0/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw=
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8=
github.com/ipfs/go-ds-badger v0.0.5 h1:dxKuqw5T1Jm8OuV+lchA76H9QZFyPKZeLuT6bN42hJQ=
github.com/ipfs/go-ds-badger v0.0.5/go.mod h1:g5AuuCGmr7efyzQhLL8MzwqcauPojGPUaHzfGTzuE3s=
github.com/ipfs/go-ds-badger v0.1.0 h1:SFZHSLw4pLVzAQTX/oqTmRekvz/6qE46OzHyNgkCJU4=
github.com/ipfs/go-ds-badger v0.1.0/go.mod h1:qPnPoxI+M1UA1cD/wwk/ArA+Mv3hhUx3ODDyyIaG2LU=
github.com/ipfs/go-ds-flatfs v0.1.0 h1:6HFkVYkNTBrpPpM7jl5UspR9tkBED4Oj8INQhijgmoo=
github.com/ipfs/go-ds-flatfs v0.1.0/go.mod h1:g1RoobUt7Nr/BIpSmt9WqNzamRmDE+fQIt3Md+XS744=
github.com/ipfs/go-ds-flatfs v0.3.0 h1:f4NWqZmQLpbzCB2JKBnRPA+guSpm1n3l5vJqoL+hU9g=
github.com/ipfs/go-ds-flatfs v0.3.0/go.mod h1:E9qUWez2BdiJZ0kvKwG4SBv/mZzF77iekZlBm5rRXUI=
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.1.0 h1:OsCuIIh1LMTk4WIQ1UJH7e3j01qlOP+KWVhNS6lBDZY=
github.com/ipfs/go-ds-leveldb v0.1.0/go.mod h1:hqAW8y4bwX5LWcCtku2rFNX3vjDZCy5LZCg+cSZvYb8=
github.com/ipfs/go-ds-leveldb v0.3.0 h1:esjh4VgM8vWMnzyZ+cj6yQueU6wsRVzS1M532zJ7Wok=
github.com/ipfs/go-ds-leveldb v0.3.0/go.mod h1:uBqlFwWCp5iEYeSsyjs7azuSyav+0/gAtSrPHsNjvHk=
github.com/ipfs/go-ds-measure v0.0.2 h1:/r6KXXr9x9vyf1EcSQhojdcMmWkskjOrbD3wEqEleww=
github.com/ipfs/go-ds-measure v0.0.2/go.mod h1:wiH6bepKsgyNKpz3nyb4erwhhIVpIxnZbsjN1QpVbbE=
Expand Down Expand Up @@ -221,6 +226,8 @@ github.com/ipfs/go-ipfs-files v0.0.4/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjN
github.com/ipfs/go-ipfs-flags v0.0.1/go.mod h1:RnXBb9WV53GSfTrSDVK61NLTFKvWc60n+K9EgCDh+rA=
github.com/ipfs/go-ipfs-pinner v0.0.2 h1:KRXt2V0TzoTd3mO1aONSw8C9wnZtl7RLpPruN/XDnlQ=
github.com/ipfs/go-ipfs-pinner v0.0.2/go.mod h1:KZGyGAR+yLthGEkG9tuA2zweB7O6auXaJNjX6IbEbOs=
github.com/ipfs/go-ipfs-pinner v0.0.3-0.20191205171545-5901eab20ba3 h1:pm8ztvVQ/JyiJJxby3eERLa5hoCFzPKszMnX2KN82mg=
github.com/ipfs/go-ipfs-pinner v0.0.3-0.20191205171545-5901eab20ba3/go.mod h1:s4kFZWLWGDudN8Jyd/GTpt222A12C2snA2+OTdy/7p8=
github.com/ipfs/go-ipfs-posinfo v0.0.1 h1:Esoxj+1JgSjX0+ylc0hUmJCOv6V2vFoZiETLR6OtpRs=
github.com/ipfs/go-ipfs-posinfo v0.0.1/go.mod h1:SwyeVP+jCwiDu0C313l/8jg6ZxM0qqtlt2a0vILTc1A=
github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU=
Expand Down
6 changes: 5 additions & 1 deletion namesys/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,11 @@ func (p *IpnsPublisher) updateRecord(ctx context.Context, k ci.PrivKey, value pa
}

// Put the new record.
if err := p.ds.Put(IpnsDsKey(id), data); err != nil {
key := IpnsDsKey(id)
if err := p.ds.Put(key, data); err != nil {
return nil, err
}
if err := p.ds.Sync(key); err != nil {
return nil, err
}
return entry, nil
Expand Down

0 comments on commit 5e38739

Please sign in to comment.