From b34d41e01ed32a1cfbdcd96f2a34c8b38345a56d Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 1 Oct 2015 15:02:22 -0700 Subject: [PATCH] fix publish fail on prexisting bad record dont error out if prexisting record is bad, just grab its sequence number and continue on with the publish. License: MIT Signed-off-by: Jeromy --- core/core.go | 4 +- fuse/ipns/common.go | 2 +- namesys/namesys.go | 5 +- namesys/publisher.go | 65 ++++++++++++++---- namesys/republisher/repub_test.go | 2 +- namesys/resolve_test.go | 94 +++++++++++++++++++++++++- routing/dht/handlers.go | 104 +++++++++++++++++++++-------- routing/dht/pb/dht.pb.go | 15 ++++- routing/dht/pb/dht.proto | 3 + routing/dht/records.go | 9 +++ routing/mock/centralized_client.go | 31 ++++++--- routing/mock/centralized_server.go | 2 +- 12 files changed, 275 insertions(+), 61 deletions(-) diff --git a/core/core.go b/core/core.go index fee58a92862..0cebce350cf 100644 --- a/core/core.go +++ b/core/core.go @@ -227,7 +227,7 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Blockstore, alwaysSendToPeer) // setup name system - n.Namesys = namesys.NewNameSystem(n.Routing) + n.Namesys = namesys.NewNameSystem(n.Routing, n.Repo.Datastore()) // setup ipns republishing err = n.setupIpnsRepublisher() @@ -456,7 +456,7 @@ func (n *IpfsNode) SetupOfflineRouting() error { n.Routing = offroute.NewOfflineRouter(n.Repo.Datastore(), n.PrivateKey) - n.Namesys = namesys.NewNameSystem(n.Routing) + n.Namesys = namesys.NewNameSystem(n.Routing, n.Repo.Datastore()) return nil } diff --git a/fuse/ipns/common.go b/fuse/ipns/common.go index 95b2e0c8d2f..2ec4c7fb157 100644 --- a/fuse/ipns/common.go +++ b/fuse/ipns/common.go @@ -33,7 +33,7 @@ func InitializeKeyspace(n *core.IpfsNode, key ci.PrivKey) error { return err } - pub := nsys.NewRoutingPublisher(n.Routing) + pub := nsys.NewRoutingPublisher(n.Routing, n.Repo.Datastore()) if err := pub.Publish(ctx, key, path.FromKey(nodek)); err != nil { return err } diff --git a/namesys/namesys.go b/namesys/namesys.go index ed03b4cc2e7..12b73218ba3 100644 --- a/namesys/namesys.go +++ b/namesys/namesys.go @@ -4,6 +4,7 @@ import ( "strings" "time" + ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" ci "github.com/ipfs/go-ipfs/p2p/crypto" path "github.com/ipfs/go-ipfs/path" @@ -25,7 +26,7 @@ type mpns struct { } // NewNameSystem will construct the IPFS naming system based on Routing -func NewNameSystem(r routing.IpfsRouting) NameSystem { +func NewNameSystem(r routing.IpfsRouting, ds ds.Datastore) NameSystem { return &mpns{ resolvers: map[string]resolver{ "dns": newDNSResolver(), @@ -33,7 +34,7 @@ func NewNameSystem(r routing.IpfsRouting) NameSystem { "dht": newRoutingResolver(r), }, publishers: map[string]Publisher{ - "/ipns/": NewRoutingPublisher(r), + "/ipns/": NewRoutingPublisher(r, ds), }, } } diff --git a/namesys/publisher.go b/namesys/publisher.go index 521ce4cc8e6..e31217dff33 100644 --- a/namesys/publisher.go +++ b/namesys/publisher.go @@ -18,6 +18,7 @@ import ( path "github.com/ipfs/go-ipfs/path" pin "github.com/ipfs/go-ipfs/pin" routing "github.com/ipfs/go-ipfs/routing" + dhtpb "github.com/ipfs/go-ipfs/routing/dht/pb" record "github.com/ipfs/go-ipfs/routing/record" ft "github.com/ipfs/go-ipfs/unixfs" u "github.com/ipfs/go-ipfs/util" @@ -37,11 +38,15 @@ var PublishPutValTimeout = time.Minute // routing system. type ipnsPublisher struct { routing routing.IpfsRouting + ds ds.Datastore } // NewRoutingPublisher constructs a publisher for the IPFS Routing name system. -func NewRoutingPublisher(route routing.IpfsRouting) *ipnsPublisher { - return &ipnsPublisher{routing: route} +func NewRoutingPublisher(route routing.IpfsRouting, ds ds.Datastore) *ipnsPublisher { + if ds == nil { + panic("nil datastore") + } + return &ipnsPublisher{routing: route, ds: ds} } // Publish implements Publisher. Accepts a keypair and a value, @@ -62,22 +67,58 @@ func (p *ipnsPublisher) PublishWithEOL(ctx context.Context, k ci.PrivKey, value _, ipnskey := IpnsKeysForID(id) - // get previous records sequence number, and add one to it - var seqnum uint64 - prevrec, err := p.routing.GetValues(ctx, ipnskey, 0) + // get previous records sequence number + seqnum, err := p.getPreviousSeqNo(ctx, ipnskey) + if err != nil { + return err + } + + // increment it + seqnum++ + + return PutRecordToRouting(ctx, k, value, seqnum, eol, p.routing, id) +} + +func (p *ipnsPublisher) getPreviousSeqNo(ctx context.Context, ipnskey key.Key) (uint64, error) { + prevrec, err := p.ds.Get(ipnskey.DsKey()) + if err != nil && err != ds.ErrNotFound { + // None found, lets start at zero! + return 0, err + } + var val []byte if err == nil { - e := new(pb.IpnsEntry) - err := proto.Unmarshal(prevrec[0].Val, e) + prbytes, ok := prevrec.([]byte) + if !ok { + return 0, fmt.Errorf("unexpected type returned from datastore: %#v", prevrec) + } + dhtrec := new(dhtpb.Record) + err := proto.Unmarshal(prbytes, dhtrec) if err != nil { - return err + return 0, err } - seqnum = e.GetSequence() + 1 - } else if err != ds.ErrNotFound { - return err + val = dhtrec.GetValue() + } else { + // try and check the dht for a record + ctx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + + rv, err := p.routing.GetValue(ctx, ipnskey) + if err != nil { + // no such record found, start at zero! + return 0, nil + } + + val = rv } - return PutRecordToRouting(ctx, k, value, seqnum, eol, p.routing, id) + e := new(pb.IpnsEntry) + err = proto.Unmarshal(val, e) + if err != nil { + return 0, err + } + + return e.GetSequence(), nil } func PutRecordToRouting(ctx context.Context, k ci.PrivKey, value path.Path, seqnum uint64, eol time.Time, r routing.IpfsRouting, id peer.ID) error { diff --git a/namesys/republisher/repub_test.go b/namesys/republisher/repub_test.go index 66d137a70d7..ef7fcf7e342 100644 --- a/namesys/republisher/repub_test.go +++ b/namesys/republisher/repub_test.go @@ -54,7 +54,7 @@ func TestRepublish(t *testing.T) { // have one node publish a record that is valid for 1 second publisher := nodes[3] p := path.FromString("/ipfs/QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn") // does not need to be valid - rp := namesys.NewRoutingPublisher(publisher.Routing) + rp := namesys.NewRoutingPublisher(publisher.Routing, publisher.Repo.Datastore()) err := rp.PublishWithEOL(ctx, publisher.PrivateKey, p, time.Now().Add(time.Second)) if err != nil { t.Fatal(err) diff --git a/namesys/resolve_test.go b/namesys/resolve_test.go index 3dde211ad45..4d81751f1b5 100644 --- a/namesys/resolve_test.go +++ b/namesys/resolve_test.go @@ -1,10 +1,14 @@ package namesys import ( + "errors" "testing" + "time" + ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" key "github.com/ipfs/go-ipfs/blocks/key" + peer "github.com/ipfs/go-ipfs/p2p/peer" path "github.com/ipfs/go-ipfs/path" mockrouting "github.com/ipfs/go-ipfs/routing/mock" u "github.com/ipfs/go-ipfs/util" @@ -13,9 +17,10 @@ import ( func TestRoutingResolve(t *testing.T) { d := mockrouting.NewServer().Client(testutil.RandIdentityOrFatal(t)) + dstore := ds.NewMapDatastore() resolver := NewRoutingResolver(d) - publisher := NewRoutingPublisher(d) + publisher := NewRoutingPublisher(d, dstore) privk, pubk, err := testutil.RandTestKeyPair(512) if err != nil { @@ -43,3 +48,90 @@ func TestRoutingResolve(t *testing.T) { t.Fatal("Got back incorrect value.") } } + +func TestPrexistingExpiredRecord(t *testing.T) { + dstore := ds.NewMapDatastore() + d := mockrouting.NewServer().ClientWithDatastore(context.Background(), testutil.RandIdentityOrFatal(t), dstore) + + resolver := NewRoutingResolver(d) + publisher := NewRoutingPublisher(d, dstore) + + privk, pubk, err := testutil.RandTestKeyPair(512) + if err != nil { + t.Fatal(err) + } + + id, err := peer.IDFromPublicKey(pubk) + if err != nil { + t.Fatal(err) + } + + // Make an expired record and put it in the datastore + h := path.FromString("/ipfs/QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN") + eol := time.Now().Add(time.Hour * -1) + err = PutRecordToRouting(context.Background(), privk, h, 0, eol, d, id) + if err != nil { + t.Fatal(err) + } + + // Now, with an old record in the system already, try and publish a new one + err = publisher.Publish(context.Background(), privk, h) + if err != nil { + t.Fatal(err) + } + + err = verifyCanResolve(resolver, id.Pretty(), h) + if err != nil { + t.Fatal(err) + } +} + +func TestPrexistingRecord(t *testing.T) { + dstore := ds.NewMapDatastore() + d := mockrouting.NewServer().ClientWithDatastore(context.Background(), testutil.RandIdentityOrFatal(t), dstore) + + resolver := NewRoutingResolver(d) + publisher := NewRoutingPublisher(d, dstore) + + privk, pubk, err := testutil.RandTestKeyPair(512) + if err != nil { + t.Fatal(err) + } + + id, err := peer.IDFromPublicKey(pubk) + if err != nil { + t.Fatal(err) + } + + // Make a good record and put it in the datastore + h := path.FromString("/ipfs/QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN") + eol := time.Now().Add(time.Hour) + err = PutRecordToRouting(context.Background(), privk, h, 0, eol, d, id) + if err != nil { + t.Fatal(err) + } + + // Now, with an old record in the system already, try and publish a new one + err = publisher.Publish(context.Background(), privk, h) + if err != nil { + t.Fatal(err) + } + + err = verifyCanResolve(resolver, id.Pretty(), h) + if err != nil { + t.Fatal(err) + } +} + +func verifyCanResolve(r Resolver, name string, exp path.Path) error { + res, err := r.Resolve(context.Background(), name) + if err != nil { + return err + } + + if res != exp { + return errors.New("got back wrong record!") + } + + return nil +} diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 137995bed0b..6fa4d3f9b3a 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -3,6 +3,7 @@ package dht import ( "errors" "fmt" + "time" proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto" ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" @@ -10,6 +11,7 @@ import ( key "github.com/ipfs/go-ipfs/blocks/key" peer "github.com/ipfs/go-ipfs/p2p/peer" pb "github.com/ipfs/go-ipfs/routing/dht/pb" + u "github.com/ipfs/go-ipfs/util" lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables" ) @@ -46,41 +48,17 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) // first, is there even a key? - k := pmes.GetKey() + k := key.Key(pmes.GetKey()) if k == "" { return nil, errors.New("handleGetValue but no key was provided") // TODO: send back an error response? could be bad, but the other node's hanging. } - // let's first check if we have the value locally. - log.Debugf("%s handleGetValue looking into ds", dht.self) - dskey := key.Key(k).DsKey() - iVal, err := dht.datastore.Get(dskey) - log.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, iVal) - - // if we got an unexpected error, bail. - if err != nil && err != ds.ErrNotFound { + rec, err := dht.checkLocalDatastore(k) + if err != nil { return nil, err } - - // if we have the value, send it back - if err == nil { - log.Debugf("%s handleGetValue success!", dht.self) - - byts, ok := iVal.([]byte) - if !ok { - return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey) - } - - rec := new(pb.Record) - err := proto.Unmarshal(byts, rec) - if err != nil { - log.Debug("Failed to unmarshal dht record from datastore") - return nil, err - } - - resp.Record = rec - } + resp.Record = rec // Find closest peer on given cluster to desired key and reply with that info closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount) @@ -102,6 +80,69 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess return resp, nil } +func (dht *IpfsDHT) checkLocalDatastore(k key.Key) (*pb.Record, error) { + log.Debugf("%s handleGetValue looking into ds", dht.self) + dskey := k.DsKey() + iVal, err := dht.datastore.Get(dskey) + log.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, iVal) + + if err == ds.ErrNotFound { + return nil, nil + } + + // if we got an unexpected error, bail. + if err != nil { + return nil, err + } + + // if we have the value, send it back + log.Debugf("%s handleGetValue success!", dht.self) + + byts, ok := iVal.([]byte) + if !ok { + return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey) + } + + rec := new(pb.Record) + err = proto.Unmarshal(byts, rec) + if err != nil { + log.Debug("Failed to unmarshal dht record from datastore") + return nil, err + } + + // if its our record, dont bother checking the times on it + if peer.ID(rec.GetAuthor()) == dht.self { + return rec, nil + } + + var recordIsBad bool + recvtime, err := u.ParseRFC3339(rec.GetTimeReceived()) + if err != nil { + log.Info("either no receive time set on record, or it was invalid: ", err) + recordIsBad = true + } + + if time.Now().Sub(recvtime) > MaxRecordAge { + log.Debug("old record found, tossing.") + recordIsBad = true + } + + // NOTE: we do not verify the record here beyond checking these timestamps. + // we put the burden of checking the records on the requester as checking a record + // may be computationally expensive + + if recordIsBad { + err := dht.datastore.Delete(dskey) + if err != nil { + log.Error("Failed to delete bad record from datastore: ", err) + } + + return nil, nil // can treat this as not having the record at all + } + + return rec, nil +} + // Store a value in this peer local storage func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { defer log.EventBegin(ctx, "handlePutValue", p).Done() @@ -112,7 +153,12 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess return nil, err } - data, err := proto.Marshal(pmes.GetRecord()) + rec := pmes.GetRecord() + + // record the time we receive every record + rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now())) + + data, err := proto.Marshal(rec) if err != nil { return nil, err } diff --git a/routing/dht/pb/dht.pb.go b/routing/dht/pb/dht.pb.go index 9a313a89749..4b8501180bb 100644 --- a/routing/dht/pb/dht.pb.go +++ b/routing/dht/pb/dht.pb.go @@ -14,7 +14,7 @@ It has these top-level messages: */ package dht_pb -import proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto" +import proto "github.com/gogo/protobuf/proto" import math "math" // Reference imports to suppress errors if they are not otherwise used. @@ -221,8 +221,10 @@ type Record struct { // hash of the authors public key Author *string `protobuf:"bytes,3,opt,name=author" json:"author,omitempty"` // A PKI signature for the key+value+author - Signature []byte `protobuf:"bytes,4,opt,name=signature" json:"signature,omitempty"` - XXX_unrecognized []byte `json:"-"` + Signature []byte `protobuf:"bytes,4,opt,name=signature" json:"signature,omitempty"` + // Time the record was received, set by receiver + TimeReceived *string `protobuf:"bytes,5,opt,name=timeReceived" json:"timeReceived,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *Record) Reset() { *m = Record{} } @@ -257,6 +259,13 @@ func (m *Record) GetSignature() []byte { return nil } +func (m *Record) GetTimeReceived() string { + if m != nil && m.TimeReceived != nil { + return *m.TimeReceived + } + return "" +} + func init() { proto.RegisterEnum("dht.pb.Message_MessageType", Message_MessageType_name, Message_MessageType_value) proto.RegisterEnum("dht.pb.Message_ConnectionType", Message_ConnectionType_name, Message_ConnectionType_value) diff --git a/routing/dht/pb/dht.proto b/routing/dht/pb/dht.proto index 91c8d8e0436..de88c3451b8 100644 --- a/routing/dht/pb/dht.proto +++ b/routing/dht/pb/dht.proto @@ -75,4 +75,7 @@ message Record { // A PKI signature for the key+value+author optional bytes signature = 4; + + // Time the record was received, set by receiver + optional string timeReceived = 5; } diff --git a/routing/dht/records.go b/routing/dht/records.go index 3c7d1d17649..49a06d55744 100644 --- a/routing/dht/records.go +++ b/routing/dht/records.go @@ -2,6 +2,7 @@ package dht import ( "fmt" + "time" ctxfrac "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-context/frac" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" @@ -12,6 +13,14 @@ import ( record "github.com/ipfs/go-ipfs/routing/record" ) +// MaxRecordAge specifies the maximum time that any node will hold onto a record +// from the time its received. This does not apply to any other forms of validity that +// the record may contain. +// For example, a record may contain an ipns entry with an EOL saying its valid +// until the year 2020 (a great time in the future). For that record to stick around +// it must be rebroadcasted more frequently than once every 'MaxRecordAge' +const MaxRecordAge = time.Hour * 36 + func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) { log.Debugf("getPublicKey for: %s", p) diff --git a/routing/mock/centralized_client.go b/routing/mock/centralized_client.go index 09f17e61e25..9d9a1f6da2c 100644 --- a/routing/mock/centralized_client.go +++ b/routing/mock/centralized_client.go @@ -4,12 +4,15 @@ import ( "errors" "time" + proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto" ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" key "github.com/ipfs/go-ipfs/blocks/key" peer "github.com/ipfs/go-ipfs/p2p/peer" routing "github.com/ipfs/go-ipfs/routing" + dhtpb "github.com/ipfs/go-ipfs/routing/dht/pb" + u "github.com/ipfs/go-ipfs/util" "github.com/ipfs/go-ipfs/util/testutil" logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0" ) @@ -25,7 +28,16 @@ type client struct { // FIXME(brian): is this method meant to simulate putting a value into the network? func (c *client) PutValue(ctx context.Context, key key.Key, val []byte) error { log.Debugf("PutValue: %s", key) - return c.datastore.Put(key.DsKey(), val) + rec := new(dhtpb.Record) + rec.Value = val + rec.Key = proto.String(string(key)) + rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now())) + data, err := proto.Marshal(rec) + if err != nil { + return err + } + + return c.datastore.Put(key.DsKey(), data) } // FIXME(brian): is this method meant to simulate getting a value from the network? @@ -41,21 +53,22 @@ func (c *client) GetValue(ctx context.Context, key key.Key) ([]byte, error) { return nil, errors.New("could not cast value from datastore") } - return data, nil + rec := new(dhtpb.Record) + err = proto.Unmarshal(data, rec) + if err != nil { + return nil, err + } + + return rec.GetValue(), nil } func (c *client) GetValues(ctx context.Context, key key.Key, count int) ([]routing.RecvdVal, error) { - log.Debugf("GetValue: %s", key) - v, err := c.datastore.Get(key.DsKey()) + log.Debugf("GetValues: %s", key) + data, err := c.GetValue(ctx, key) if err != nil { return nil, err } - data, ok := v.([]byte) - if !ok { - return nil, errors.New("could not cast value from datastore") - } - return []routing.RecvdVal{{Val: data, From: c.peer.ID()}}, nil } diff --git a/routing/mock/centralized_server.go b/routing/mock/centralized_server.go index c7bd239ed30..a62f64f8d7a 100644 --- a/routing/mock/centralized_server.go +++ b/routing/mock/centralized_server.go @@ -80,7 +80,7 @@ func (rs *s) Client(p testutil.Identity) Client { func (rs *s) ClientWithDatastore(_ context.Context, p testutil.Identity, datastore ds.Datastore) Client { return &client{ peer: p, - datastore: ds.NewMapDatastore(), + datastore: datastore, server: rs, } }