Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bandwidth Calculations #162

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmd/ipfs/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ Advanced Commands:

mount Mount an ipfs read-only mountpoint.
serve Serve an interface to ipfs.

net-diag Print network diagnostic
net-diag Print network diagnostic

Use "ipfs help <command>" for more information about a command.
`,
Expand Down
24 changes: 15 additions & 9 deletions core/commands/diag.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,22 @@ import (
"time"

"github.com/jbenet/go-ipfs/core"
diagn "github.com/jbenet/go-ipfs/diagnostics"
)

func PrintDiagnostics(info []*diagn.DiagInfo, out io.Writer) {
for _, i := range info {
fmt.Fprintf(out, "Peer: %s\n", i.ID)
fmt.Fprintf(out, "\tUp for: %s\n", i.LifeSpan.String())
fmt.Fprintf(out, "\tConnected To:\n")
for _, c := range i.Connections {
fmt.Fprintf(out, "\t%s\n\t\tLatency = %s\n", c.ID, c.Latency.String())
}
fmt.Fprintln(out)
}

}

func Diag(n *core.IpfsNode, args []string, opts map[string]interface{}, out io.Writer) error {
if n.Diagnostics == nil {
return errors.New("Cannot run diagnostic in offline mode!")
Expand All @@ -29,15 +43,7 @@ func Diag(n *core.IpfsNode, args []string, opts map[string]interface{}, out io.W
return err
}
} else {
for _, i := range info {
fmt.Fprintf(out, "Peer: %s\n", i.ID)
fmt.Fprintf(out, "\tUp for: %s\n", i.LifeSpan.String())
fmt.Fprintf(out, "\tConnected To:\n")
for _, c := range i.Connections {
fmt.Fprintf(out, "\t%s\n\t\tLatency = %s\n", c.ID, c.Latency.String())
}
fmt.Fprintln(out)
}
PrintDiagnostics(info, out)
}
return nil
}
3 changes: 3 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
route *dht.IpfsDHT
exchangeSession exchange.Interface
diagnostics *diag.Diagnostics
network inet.Network
)

if online {
Expand Down Expand Up @@ -135,6 +136,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
if err != nil {
return nil, err
}
network = net

diagnostics = diag.NewDiagnostics(local, net, diagService)
diagService.SetHandler(diagnostics)
Expand Down Expand Up @@ -173,6 +175,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
Routing: route,
Namesys: ns,
Diagnostics: diagnostics,
Network: network,
}, nil
}

Expand Down
24 changes: 14 additions & 10 deletions diagnostics/diag.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package diagnostic
package diagnostics

import (
"bytes"
Expand Down Expand Up @@ -48,15 +48,17 @@ type connDiagInfo struct {
ID string
}

type diagInfo struct {
type DiagInfo struct {
ID string
Connections []connDiagInfo
Keys []string
LifeSpan time.Duration
BwIn uint64
BwOut uint64
CodeVersion string
}

func (di *diagInfo) Marshal() []byte {
func (di *DiagInfo) Marshal() []byte {
b, err := json.Marshal(di)
if err != nil {
panic(err)
Expand All @@ -69,12 +71,13 @@ func (d *Diagnostics) getPeers() []*peer.Peer {
return d.network.GetPeerList()
}

func (d *Diagnostics) getDiagInfo() *diagInfo {
di := new(diagInfo)
func (d *Diagnostics) getDiagInfo() *DiagInfo {
di := new(DiagInfo)
di.CodeVersion = "github.com/jbenet/go-ipfs"
di.ID = d.self.ID.Pretty()
di.LifeSpan = time.Since(d.birth)
di.Keys = nil // Currently no way to query datastore
di.BwIn, di.BwOut = d.network.GetBandwidthTotals()

for _, p := range d.getPeers() {
di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID.Pretty()})
Expand All @@ -88,7 +91,7 @@ func newID() string {
return string(id)
}

func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*DiagInfo, error) {
log.Debug("Getting diagnostic.")
ctx, _ := context.WithTimeout(context.TODO(), timeout)

Expand All @@ -102,7 +105,7 @@ func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error)
peers := d.getPeers()
log.Debug("Sending diagnostic request to %d peers.", len(peers))

var out []*diagInfo
var out []*DiagInfo
di := d.getDiagInfo()
out = append(out, di)

Expand Down Expand Up @@ -134,15 +137,15 @@ func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error)
return out, nil
}

func AppendDiagnostics(data []byte, cur []*diagInfo) []*diagInfo {
func AppendDiagnostics(data []byte, cur []*DiagInfo) []*DiagInfo {
buf := bytes.NewBuffer(data)
dec := json.NewDecoder(buf)
for {
di := new(diagInfo)
di := new(DiagInfo)
err := dec.Decode(di)
if err != nil {
if err != io.EOF {
log.Error("error decoding diagInfo: %v", err)
log.Error("error decoding DiagInfo: %v", err)
}
break
}
Expand Down Expand Up @@ -216,6 +219,7 @@ func (d *Diagnostics) handleDiagnostic(p *peer.Peer, pmes *Message) (*Message, e
sendcount := 0
for _, p := range d.getPeers() {
log.Debug("Sending diagnostic request to peer: %s", p)
sendcount++
go func(p *peer.Peer) {
out, err := d.getDiagnosticFromPeer(ctx, p, pmes)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions diagnostics/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion diagnostics/message.proto
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package diagnostic;
package diagnostics;

message Message {
required string DiagID = 1;
Expand Down
21 changes: 13 additions & 8 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ func NetMessageSession(parent context.Context, p *peer.Peer,
net inet.Network, srv inet.Service, directory bsnet.Routing,
d ds.Datastore, nice bool) exchange.Interface {

networkAdapter := bsnet.NetMessageAdapter(srv, nil)
networkAdapter := bsnet.NetMessageAdapter(srv, net, nil)
bs := &bitswap{
blockstore: blockstore.NewBlockstore(d),
notifications: notifications.New(),
strategy: strategy.New(nice),
routing: directory,
network: net,
sender: networkAdapter,
wantlist: u.NewKeySet(),
}
Expand All @@ -42,9 +41,6 @@ func NetMessageSession(parent context.Context, p *peer.Peer,
// bitswap instances implement the bitswap protocol.
type bitswap struct {

// network maintains connections to the outside world.
network inet.Network

// sender delivers messages on behalf of the session
sender bsnet.Adapter

Expand Down Expand Up @@ -85,11 +81,20 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
message.AppendWanted(wanted)
}
message.AppendWanted(k)
for iiiii := range peersToQuery {
log.Debug("bitswap got peersToQuery: %s", iiiii)
for peerToQuery := range peersToQuery {
log.Debug("bitswap got peersToQuery: %s", peerToQuery)
go func(p *peer.Peer) {

log.Debug("bitswap dialing peer: %s", p)
err := bs.sender.DialPeer(p)
if err != nil {
log.Error("Error sender.DialPeer(%s)", p)
return
}

response, err := bs.sender.SendRequest(ctx, p, message)
if err != nil {
log.Error("Error sender.SendRequest(%s)", p)
return
}
// FIXME ensure accounting is handled correctly when
Expand All @@ -101,7 +106,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
return
}
bs.ReceiveMessage(ctx, p, response)
}(iiiii)
}(peerToQuery)
}
}()

Expand Down
3 changes: 3 additions & 0 deletions exchange/bitswap/network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
// Adapter provides network connectivity for BitSwap sessions
type Adapter interface {

// DialPeer ensures there is a connection to peer.
DialPeer(*peer.Peer) error

// SendMessage sends a BitSwap message to a peer.
SendMessage(
context.Context,
Expand Down
8 changes: 7 additions & 1 deletion exchange/bitswap/network/net_message_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
)

// NetMessageAdapter wraps a NetMessage network service
func NetMessageAdapter(s inet.Service, r Receiver) Adapter {
func NetMessageAdapter(s inet.Service, n inet.Network, r Receiver) Adapter {
adapter := impl{
nms: s,
net: n,
receiver: r,
}
s.SetHandler(&adapter)
Expand All @@ -22,6 +23,7 @@ func NetMessageAdapter(s inet.Service, r Receiver) Adapter {
// implements an Adapter that integrates with a NetMessage network service
type impl struct {
nms inet.Service
net inet.Network

// inbound messages from the network are forwarded to the receiver
receiver Receiver
Expand Down Expand Up @@ -58,6 +60,10 @@ func (adapter *impl) HandleMessage(
return outgoing
}

func (adapter *impl) DialPeer(p *peer.Peer) error {
return adapter.net.DialPeer(p)
}

func (adapter *impl) SendMessage(
ctx context.Context,
p *peer.Peer,
Expand Down
16 changes: 16 additions & 0 deletions exchange/bitswap/testnet/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bitswap
import (
"bytes"
"errors"
"fmt"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
Expand All @@ -14,6 +15,8 @@ import (
type Network interface {
Adapter(*peer.Peer) bsnet.Adapter

HasPeer(*peer.Peer) bool

SendMessage(
ctx context.Context,
from *peer.Peer,
Expand Down Expand Up @@ -49,6 +52,11 @@ func (n *network) Adapter(p *peer.Peer) bsnet.Adapter {
return client
}

func (n *network) HasPeer(p *peer.Peer) bool {
_, found := n.clients[p.Key()]
return found
}

// TODO should this be completely asynchronous?
// TODO what does the network layer do with errors received from services?
func (n *network) SendMessage(
Expand Down Expand Up @@ -155,6 +163,14 @@ func (nc *networkClient) SendRequest(
return nc.network.SendRequest(ctx, nc.local, to, message)
}

func (nc *networkClient) DialPeer(p *peer.Peer) error {
// no need to do anything because dialing isn't a thing in this test net.
if !nc.network.HasPeer(p) {
return fmt.Errorf("Peer not in network: %s", p)
}
return nil
}

func (nc *networkClient) SetDelegate(r bsnet.Receiver) {
nc.Receiver = r
}
1 change: 0 additions & 1 deletion msgproto/msgproto.go

This file was deleted.

Loading