Skip to content

Commit

Permalink
Use a struct rather than a map[string]string for unreachable notifica…
Browse files Browse the repository at this point in the history
…tions. Fixes traceroute.
  • Loading branch information
ghjm committed Oct 13, 2020
1 parent ceb1c65 commit bd84c8d
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 14 deletions.
24 changes: 16 additions & 8 deletions pkg/netceptor/netceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ type serviceAdvertisementFull struct {
Cancel bool
}

// UnreachableMessage is the data associated with an unreachable message
// UnreachableMessage is the on-the-wire data associated with an unreachable message
type UnreachableMessage struct {
FromNode string
ToNode string
Expand All @@ -198,6 +198,12 @@ type UnreachableMessage struct {
Problem string
}

// UnreachableNotification includes additional information returned from SubscribeUnreachable
type UnreachableNotification struct {
UnreachableMessage
ReceivedFromNode string
}

var networkNames = make([]string, 0)
var networkNamesLock = sync.Mutex{}

Expand Down Expand Up @@ -265,8 +271,8 @@ func New(ctx context.Context, NodeID string, AllowedPeers []string) *Netceptor {
s.clientTLSConfigs["default"] = &tls.Config{}
s.addNameHash(NodeID)
s.context, s.cancelFunc = context.WithCancel(ctx)
s.unreachableBroker = utils.NewBroker(s.context)
s.routingUpdateBroker = utils.NewBroker(s.context)
s.unreachableBroker = utils.NewBroker(s.context, reflect.TypeOf(UnreachableNotification{}))
s.routingUpdateBroker = utils.NewBroker(s.context, reflect.TypeOf(map[string]string{}))
s.updateRoutingTableChan = tickrunner.Run(s.context, s.updateRoutingTable, time.Hour*24, time.Millisecond*100)
s.sendRouteFloodChan = tickrunner.Run(s.context, s.sendRoutingUpdate, RouteUpdateTime, time.Millisecond*100)
s.sendServiceAdsChan = tickrunner.Run(s.context, s.sendServiceAds, ServiceAdTime, time.Second*5)
Expand Down Expand Up @@ -985,15 +991,17 @@ func (s *Netceptor) handlePing(md *messageData) error {

// Handles an unreachable response
func (s *Netceptor) handleUnreachable(md *messageData) error {
unrData := make(map[string]string)
err := json.Unmarshal(md.Data, &unrData)
unrMsg := UnreachableMessage{}
err := json.Unmarshal(md.Data, &unrMsg)
if err != nil {
return err
}
unrData["ReceivedFromNode"] = md.FromNode
unrData := UnreachableNotification{
UnreachableMessage: unrMsg,
ReceivedFromNode: md.FromNode,
}
logger.Warning("Received unreachable message from %s", md.FromNode)
s.unreachableBroker.Publish(unrData)
return nil
return s.unreachableBroker.Publish(unrData)
}

// Sends an unreachable response
Expand Down
13 changes: 7 additions & 6 deletions pkg/netceptor/packetconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/project-receptor/receptor/pkg/utils"
"net"
"reflect"
"time"
)

Expand Down Expand Up @@ -70,32 +71,32 @@ func (s *Netceptor) ListenPacketAndAdvertise(service string, tags map[string]str
// startUnreachable starts monitoring the netceptor unreachable channel and forwarding relevant messages
func (pc *PacketConn) startUnreachable() {
pc.context, pc.cancel = context.WithCancel(pc.s.context)
pc.unreachableSubs = utils.NewBroker(pc.context)
pc.unreachableSubs = utils.NewBroker(pc.context, reflect.TypeOf(UnreachableNotification{}))
pc.unreachableMsgChan = pc.s.unreachableBroker.Subscribe()
go func() {
for {
select {
case <-pc.context.Done():
return
case msgIf := <-pc.unreachableMsgChan:
msg, ok := msgIf.(UnreachableMessage)
msg, ok := msgIf.(UnreachableNotification)
if !ok {
continue
}
FromNode := msg.FromNode
FromService := msg.FromService
if FromNode == pc.s.nodeID && FromService == pc.localService {
pc.unreachableSubs.Publish(msg)
_ = pc.unreachableSubs.Publish(msg)
}
}
}
}()
}

// SubscribeUnreachable subscribes for unreachable messages relevant to this PacketConn
func (pc *PacketConn) SubscribeUnreachable() chan UnreachableMessage {
func (pc *PacketConn) SubscribeUnreachable() chan UnreachableNotification {
iChan := pc.unreachableSubs.Subscribe()
uChan := make(chan UnreachableMessage)
uChan := make(chan UnreachableNotification)
go func() {
for {
select {
Expand All @@ -104,7 +105,7 @@ func (pc *PacketConn) SubscribeUnreachable() chan UnreachableMessage {
close(uChan)
return
}
msg, ok := msgIf.(UnreachableMessage)
msg, ok := msgIf.(UnreachableNotification)
if !ok {
continue
}
Expand Down

0 comments on commit bd84c8d

Please sign in to comment.