Skip to content

Commit

Permalink
Implement ipns republisher
Browse files Browse the repository at this point in the history
This commit adds a very basic process that will periodically go through
a list of given ids and republish the values for their ipns entries.

License: MIT
Signed-off-by: Jeromy <[email protected]>
  • Loading branch information
whyrusleeping committed Sep 22, 2015
1 parent dac65e7 commit 8f5c610
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 7 deletions.
8 changes: 8 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
ipnsfs "github.com/ipfs/go-ipfs/ipnsfs"
merkledag "github.com/ipfs/go-ipfs/merkledag"
namesys "github.com/ipfs/go-ipfs/namesys"
ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher"
path "github.com/ipfs/go-ipfs/path"
pin "github.com/ipfs/go-ipfs/pin"
repo "github.com/ipfs/go-ipfs/repo"
Expand Down Expand Up @@ -104,6 +105,7 @@ type IpfsNode struct {
Diagnostics *diag.Diagnostics // the diagnostics service
Ping *ping.PingService
Reprovider *rp.Reprovider // the value reprovider system
IpnsRepub *ipnsrp.Republisher

IpnsFs *ipnsfs.Filesystem

Expand Down Expand Up @@ -226,6 +228,12 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost
// setup name system
n.Namesys = namesys.NewNameSystem(n.Routing)

// setup ipns republishing
n.IpnsRepub = ipnsrp.NewRepublisher(n.Routing, n.Peerstore)
n.IpnsRepub.AddName(n.Identity)

n.Process().Go(n.IpnsRepub.Run)

return nil
}

Expand Down
19 changes: 12 additions & 7 deletions namesys/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,12 @@ func NewRoutingPublisher(route routing.IpfsRouting) Publisher {
func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Path) error {
log.Debugf("Publish %s", value)

pubkey := k.GetPublic()
pkbytes, err := pubkey.Bytes()
pkbytes, err := k.GetPublic().Bytes()
if err != nil {
return err
}

nameb := u.Hash(pkbytes)
namekey := key.Key("/pk/" + string(nameb))
ipnskey := key.Key("/ipns/" + string(nameb))
namekey, ipnskey := IpnsKeysForID(pkbytes)

// get previous records sequence number, and add one to it
var seqnum uint64
Expand All @@ -73,7 +70,7 @@ func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Pa

_ = prevrec

data, err := createRoutingEntryData(k, value, seqnum)
data, err := CreateRoutingEntryData(k, value, seqnum)
if err != nil {
return err
}
Expand All @@ -98,7 +95,7 @@ func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Pa
return nil
}

func createRoutingEntryData(pk ci.PrivKey, val path.Path, seq uint64) ([]byte, error) {
func CreateRoutingEntryData(pk ci.PrivKey, val path.Path, seq uint64) ([]byte, error) {
entry := new(pb.IpnsEntry)

entry.Value = []byte(val)
Expand Down Expand Up @@ -206,3 +203,11 @@ func InitializeKeyspace(ctx context.Context, ds dag.DAGService, pub Publisher, p

return nil
}

func IpnsKeysForID(pkbytes []byte) (name, ipns key.Key) {
nameb := u.Hash(pkbytes)
namekey := key.Key("/pk/" + string(nameb))
ipnskey := key.Key("/ipns/" + string(nameb))

return namekey, ipnskey
}
108 changes: 108 additions & 0 deletions namesys/republisher/repub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package republisher

import (
"sync"
"time"

namesys "github.com/ipfs/go-ipfs/namesys"
pb "github.com/ipfs/go-ipfs/namesys/pb"
peer "github.com/ipfs/go-ipfs/p2p/peer"
path "github.com/ipfs/go-ipfs/path"
"github.com/ipfs/go-ipfs/routing"

proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
goprocess "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
gpctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0"
)

var log = logging.Logger("ipns-repub")

const DefaultRebroadcastInterval = time.Hour * 12

type Republisher struct {
r routing.IpfsRouting
resolver namesys.Resolver
pub namesys.Publisher
ps peer.Peerstore

Interval time.Duration

entrylock sync.Mutex
entries map[peer.ID]struct{}
}

func NewRepublisher(r routing.IpfsRouting, ps peer.Peerstore) *Republisher {
return &Republisher{
r: r,
resolver: namesys.NewRoutingResolver(r),
pub: namesys.NewRoutingPublisher(r),
ps: ps,
entries: make(map[peer.ID]struct{}),
Interval: DefaultRebroadcastInterval,
}
}

func (rp *Republisher) AddName(id peer.ID) {
rp.entrylock.Lock()
defer rp.entrylock.Unlock()
rp.entries[id] = struct{}{}
}

func (rp *Republisher) Run(proc goprocess.Process) {
tick := time.NewTicker(rp.Interval)
defer tick.Stop()

for {
select {
case <-tick.C:
err := rp.republishEntries(proc)
if err != nil {
log.Error(err)
}
case <-proc.Closing():
return
}
}
}

func (rp *Republisher) republishEntries(p goprocess.Process) error {
ctx, cancel := context.WithCancel(gpctx.OnClosingContext(p))
defer cancel()

for id, _ := range rp.entries {
log.Infof("republishing ipns entry for %s", id)
priv := rp.ps.PrivKey(id)

pkb, err := priv.GetPublic().Bytes()
if err != nil {
return err
}

// Look for it locally only
_, ipnskey := namesys.IpnsKeysForID(pkb)
vals, err := rp.r.GetValues(ctx, ipnskey, 0)
if err != nil {
// not found means we dont have a previously published entry
continue
}

// extract published data from record
val := vals[0].Val
e := new(pb.IpnsEntry)
err = proto.Unmarshal(val, e)
if err != nil {
return err
}
p := path.Path(e.Value)

// republish it
err = rp.pub.Publish(ctx, priv, p)
if err != nil {
return err
}
}

return nil
}

0 comments on commit 8f5c610

Please sign in to comment.