Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some fixes to bitswap + dht #155

Merged
merged 63 commits into from
Oct 19, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
1b1ef6a
add local to net/conn
jbenet Oct 11, 2014
d2671af
command help spacing
jbenet Oct 11, 2014
1555ce7
bitswap dials peers
jbenet Oct 11, 2014
0117fb1
dht handleAddProviders adds addr in msg
jbenet Oct 11, 2014
a2d55f3
removed DialAddr
jbenet Oct 11, 2014
d113aa8
no longer store incoming addrs
jbenet Oct 11, 2014
577f8fe
commenting out platform specific code
jbenet Oct 11, 2014
0e22892
remove old pkg
jbenet Oct 11, 2014
a5a7d99
meant to call net.DialPeer
jbenet Oct 13, 2014
4b5906e
logging + tweaks
jbenet Oct 13, 2014
c894b1d
iiii -> peerToQuery
jbenet Oct 13, 2014
c77ed6d
fix up FindProvidersAsync
whyrusleeping Oct 11, 2014
4189d50
fix bug in diagnostics, and add more peers to closer peer responses
whyrusleeping Oct 13, 2014
afe85ce
add in basic bandwidth tracking to the muxer
whyrusleeping Oct 13, 2014
a8330f1
add methods on net interface to retrieve bandwidth values
whyrusleeping Oct 13, 2014
be5f976
put bandwidth totals into the diagnostic messages
whyrusleeping Oct 13, 2014
b2bd684
fix core NewNode not setting network field, and added new json serial…
whyrusleeping Oct 13, 2014
779af0e
add file i had forgotten to
whyrusleeping Oct 14, 2014
0051629
Add test to test conncurrent connects between two peers
whyrusleeping Oct 14, 2014
1a7fac4
make test fail instead of hang
whyrusleeping Oct 15, 2014
3a28466
make vendor
jbenet Oct 15, 2014
f10b4bd
fixed old swarm test
jbenet Oct 16, 2014
60cd0f1
some dht cleanup, and make DHTs take a master context
whyrusleeping Oct 15, 2014
0b97d29
small changes to auxiliary dht functions
whyrusleeping Oct 18, 2014
18cfe02
dht tests with context
jbenet Oct 18, 2014
e989d6f
move IDFromPubKey to peer pkg
jbenet Oct 16, 2014
ccaa490
better peer gen
jbenet Oct 16, 2014
5681e27
reworked Conn
jbenet Oct 16, 2014
1edc5a4
updated Conn and Swarm
jbenet Oct 16, 2014
e7d7133
colored logfmt
jbenet Oct 17, 2014
5d9b1f8
swarm bugfix: dial peer out
jbenet Oct 17, 2014
08af98d
logging friendliness
jbenet Oct 17, 2014
8aed79c
fixed data races
jbenet Oct 17, 2014
e45a6ce
can just use ctx.Done
jbenet Oct 17, 2014
7a7bf8d
conn: raw []byte, not msg
jbenet Oct 18, 2014
8065b61
Added ContextCloser abstraction
jbenet Oct 18, 2014
ffba031
test closing/cancellation
jbenet Oct 18, 2014
afed188
separated out secure conn
jbenet Oct 18, 2014
d47115b
swarm: msg wrapping
jbenet Oct 18, 2014
f2e428d
moved versionhandshake to conn
jbenet Oct 18, 2014
3ab3170
IPFS_ADDRESS_RPC env var for changing rpc target
whyrusleeping Oct 19, 2014
20d1d35
moved XOR keyspace -> util
jbenet Oct 18, 2014
0078264
added to net/conn interface
jbenet Oct 18, 2014
f8d70f3
simultaneous open should work for now
jbenet Oct 19, 2014
331e433
keyspace XOR naming
jbenet Oct 19, 2014
c2e649b
make vendor
jbenet Oct 19, 2014
2308143
Fixed panic on closer
jbenet Oct 19, 2014
4783332
fixed tests
jbenet Oct 19, 2014
c2a228f
use ContextCloser better (listener fix)
jbenet Oct 19, 2014
68b85c9
broke out dial + listen
jbenet Oct 19, 2014
a4e4923
added multiconn
jbenet Oct 19, 2014
113c44f
listen: conn fate sharing
jbenet Oct 19, 2014
fc5b0c2
close listeners. + multiconn test
jbenet Oct 19, 2014
58fdcad
multiconn: map + close on children close
jbenet Oct 19, 2014
63d6ee6
multiconn in swarm
jbenet Oct 19, 2014
29ab6de
added msg counters to logs
jbenet Oct 19, 2014
aa70bba
evil deadlock that wasn't.
jbenet Oct 19, 2014
d17292a
differentiate ports cause timing.
jbenet Oct 19, 2014
3d2ba37
moved ctxcloser to own pkg
jbenet Oct 19, 2014
565f9b8
leaking goroutine ++ in travis
jbenet Oct 19, 2014
b29367a
in travis, leak tests dont work well
jbenet Oct 19, 2014
4c178f8
close conns directly in tests
jbenet Oct 19, 2014
7c4596a
more lenient time
jbenet Oct 19, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/ipfs/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/commander"
config "github.com/jbenet/go-ipfs/config"
ci "github.com/jbenet/go-ipfs/crypto"
spipe "github.com/jbenet/go-ipfs/crypto/spipe"
peer "github.com/jbenet/go-ipfs/peer"
updates "github.com/jbenet/go-ipfs/updates"
u "github.com/jbenet/go-ipfs/util"
)
Expand Down Expand Up @@ -121,7 +121,7 @@ func initCmd(c *commander.Command, inp []string) error {
}
cfg.Identity.PrivKey = base64.StdEncoding.EncodeToString(skbytes)

id, err := spipe.IDFromPubKey(pk)
id, err := peer.IDFromPubKey(pk)
if err != nil {
return err
}
Expand Down
3 changes: 1 addition & 2 deletions cmd/ipfs/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ Advanced Commands:

mount Mount an ipfs read-only mountpoint.
serve Serve an interface to ipfs.

net-diag Print network diagnostic
net-diag Print network diagnostic

Use "ipfs help <command>" for more information about a command.
`,
Expand Down
24 changes: 15 additions & 9 deletions core/commands/diag.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,22 @@ import (
"time"

"github.com/jbenet/go-ipfs/core"
diagn "github.com/jbenet/go-ipfs/diagnostics"
)

func PrintDiagnostics(info []*diagn.DiagInfo, out io.Writer) {
for _, i := range info {
fmt.Fprintf(out, "Peer: %s\n", i.ID)
fmt.Fprintf(out, "\tUp for: %s\n", i.LifeSpan.String())
fmt.Fprintf(out, "\tConnected To:\n")
for _, c := range i.Connections {
fmt.Fprintf(out, "\t%s\n\t\tLatency = %s\n", c.ID, c.Latency.String())
}
fmt.Fprintln(out)
}

}

func Diag(n *core.IpfsNode, args []string, opts map[string]interface{}, out io.Writer) error {
if n.Diagnostics == nil {
return errors.New("Cannot run diagnostic in offline mode!")
Expand All @@ -29,15 +43,7 @@ func Diag(n *core.IpfsNode, args []string, opts map[string]interface{}, out io.W
return err
}
} else {
for _, i := range info {
fmt.Fprintf(out, "Peer: %s\n", i.ID)
fmt.Fprintf(out, "\tUp for: %s\n", i.LifeSpan.String())
fmt.Fprintf(out, "\tConnected To:\n")
for _, c := range i.Connections {
fmt.Fprintf(out, "\t%s\n\t\tLatency = %s\n", c.ID, c.Latency.String())
}
fmt.Fprintln(out)
}
PrintDiagnostics(info, out)
}
return nil
}
5 changes: 4 additions & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
route *dht.IpfsDHT
exchangeSession exchange.Interface
diagnostics *diag.Diagnostics
network inet.Network
)

if online {
Expand Down Expand Up @@ -135,11 +136,12 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
if err != nil {
return nil, err
}
network = net

diagnostics = diag.NewDiagnostics(local, net, diagService)
diagService.SetHandler(diagnostics)

route = dht.NewDHT(local, peerstore, net, dhtService, d)
route = dht.NewDHT(ctx, local, peerstore, net, dhtService, d)
// TODO(brian): perform this inside NewDHT factory method
dhtService.SetHandler(route) // wire the handler to the service.

Expand Down Expand Up @@ -173,6 +175,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
Routing: route,
Namesys: ns,
Diagnostics: diagnostics,
Network: network,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion crypto/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func GenerateEKeyPair(curveName string) ([]byte, GenSharedKey, error) {
}

pubKey := elliptic.Marshal(curve, x, y)
log.Debug("GenerateEKeyPair %d", len(pubKey))
// log.Debug("GenerateEKeyPair %d", len(pubKey))

done := func(theirPub []byte) ([]byte, error) {
// Verify and unpack node's public key.
Expand Down
39 changes: 24 additions & 15 deletions crypto/spipe/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"hash"

proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"

ci "github.com/jbenet/go-ipfs/crypto"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
Expand Down Expand Up @@ -204,7 +205,7 @@ func (s *SecurePipe) handshake() error {
}

if bytes.Compare(resp2, finished) != 0 {
return errors.New("Negotiation failed.")
return fmt.Errorf("Negotiation failed, got: %s", resp2)
}

log.Debug("%s handshake: Got node id: %s", s.local, s.remote)
Expand All @@ -229,7 +230,15 @@ func (s *SecurePipe) handleSecureIn(hashType string, tIV, tCKey, tMKey []byte) {
theirMac, macSize := makeMac(hashType, tMKey)

for {
data, ok := <-s.insecure.In
var data []byte
ok := true

select {
case <-s.ctx.Done():
ok = false // return out
case data, ok = <-s.insecure.In:
}

if !ok {
close(s.Duplex.In)
return
Expand Down Expand Up @@ -266,8 +275,17 @@ func (s *SecurePipe) handleSecureOut(hashType string, mIV, mCKey, mMKey []byte)
myMac, macSize := makeMac(hashType, mMKey)

for {
data, ok := <-s.Out
var data []byte
ok := true

select {
case <-s.ctx.Done():
ok = false // return out
case data, ok = <-s.Out:
}

if !ok {
close(s.insecure.Out)
return
}

Expand All @@ -288,16 +306,6 @@ func (s *SecurePipe) handleSecureOut(hashType string, mIV, mCKey, mMKey []byte)
}
}

// IDFromPubKey retrieves a Public Key from the peer given by pk
func IDFromPubKey(pk ci.PubKey) (peer.ID, error) {
b, err := pk.Bytes()
if err != nil {
return nil, err
}
hash := u.Hash(b)
return peer.ID(hash), nil
}

// Determines which algorithm to use. Note: f(a, b) = f(b, a)
func selectBest(myPrefs, theirPrefs string) (string, error) {
// Person with greatest hash gets first choice.
Expand Down Expand Up @@ -334,7 +342,7 @@ func selectBest(myPrefs, theirPrefs string) (string, error) {
// else, construct it.
func getOrConstructPeer(peers peer.Peerstore, rpk ci.PubKey) (*peer.Peer, error) {

rid, err := IDFromPubKey(rpk)
rid, err := peer.IDFromPubKey(rpk)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -373,7 +381,8 @@ func getOrConstructPeer(peers peer.Peerstore, rpk ci.PubKey) (*peer.Peer, error)
// this shouldn't ever happen, given we hashed, etc, but it could mean
// expected code (or protocol) invariants violated.
if !npeer.PubKey.Equals(rpk) {
return nil, fmt.Errorf("WARNING: PubKey mismatch: %v", npeer)
log.Error("WARNING: PubKey mismatch: %v", npeer)
panic("secure channel pubkey mismatch")
}
return npeer, nil
}
36 changes: 16 additions & 20 deletions crypto/spipe/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,34 +34,29 @@ type params struct {

// NewSecurePipe constructs a pipe with channels of a given buffer size.
func NewSecurePipe(ctx context.Context, bufsize int, local *peer.Peer,
peers peer.Peerstore) (*SecurePipe, error) {
peers peer.Peerstore, insecure Duplex) (*SecurePipe, error) {

ctx, cancel := context.WithCancel(ctx)

sp := &SecurePipe{
Duplex: Duplex{
In: make(chan []byte, bufsize),
Out: make(chan []byte, bufsize),
},
local: local,
peers: peers,
}
return sp, nil
}
local: local,
peers: peers,
insecure: insecure,

// Wrap creates a secure connection on top of an insecure duplex channel.
func (s *SecurePipe) Wrap(ctx context.Context, insecure Duplex) error {
if s.ctx != nil {
return errors.New("Pipe in use")
ctx: ctx,
cancel: cancel,
}

s.insecure = insecure
s.ctx, s.cancel = context.WithCancel(ctx)

if err := s.handshake(); err != nil {
s.cancel()
return err
if err := sp.handshake(); err != nil {
sp.Close()
return nil, err
}

return nil
return sp, nil
}

// LocalPeer retrieves the local peer.
Expand All @@ -76,11 +71,12 @@ func (s *SecurePipe) RemotePeer() *peer.Peer {

// Close closes the secure pipe
func (s *SecurePipe) Close() error {
if s.cancel == nil {
return errors.New("pipe already closed")
select {
case <-s.ctx.Done():
return errors.New("already closed")
default:
}

s.cancel()
s.cancel = nil
return nil
}
39 changes: 28 additions & 11 deletions daemon/daemon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,43 @@ func getDaemonAddr(confdir string) (string, error) {
// over network RPC API. The address of the daemon is retrieved from the config
// directory, where live daemons write their addresses to special files.
func SendCommand(command *Command, confdir string) error {
//check if daemon is running
log.Info("Checking if daemon is running...")
server := os.Getenv("IPFS_ADDRESS_RPC")

if server == "" {
//check if daemon is running
log.Info("Checking if daemon is running...")
if !serverIsRunning(confdir) {
return ErrDaemonNotRunning
}

log.Info("Daemon is running!")

var err error
server, err = getDaemonAddr(confdir)
if err != nil {
return err
}
}

return serverComm(server, command)
}

func serverIsRunning(confdir string) bool {
var err error
confdir, err = u.TildeExpansion(confdir)
if err != nil {
return err
log.Error("Tilde Expansion Failed: %s", err)
return false
}
lk, err := daemonLock(confdir)
if err == nil {
lk.Close()
return ErrDaemonNotRunning
}

log.Info("Daemon is running! [reason = %s]", err)

server, err := getDaemonAddr(confdir)
if err != nil {
return err
return false
}
return true
}

func serverComm(server string, command *Command) error {
log.Info("Daemon address: %s", server)
maddr, err := ma.NewMultiaddr(server)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
config "github.com/jbenet/go-ipfs/config"
core "github.com/jbenet/go-ipfs/core"
ci "github.com/jbenet/go-ipfs/crypto"
spipe "github.com/jbenet/go-ipfs/crypto/spipe"
peer "github.com/jbenet/go-ipfs/peer"
)

func TestInitializeDaemonListener(t *testing.T) {
Expand All @@ -23,7 +23,7 @@ func TestInitializeDaemonListener(t *testing.T) {
t.Fatal(err)
}

ident, _ := spipe.IDFromPubKey(pub)
ident, _ := peer.IDFromPubKey(pub)
privKey := base64.StdEncoding.EncodeToString(prbytes)
pID := ident.Pretty()

Expand Down
Loading