Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic peer SIG discovery #2155

Merged
merged 1 commit into from
Dec 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go/lib/addr/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ func HostSVCFromString(str string) HostSVC {
return SvcCS | m
case "SB":
return SvcSB | m
case "SIG":
return SvcSIG | m
default:
return SvcNone
}
Expand Down Expand Up @@ -260,6 +262,8 @@ func (h HostSVC) String() string {
name = "CS"
case SvcSB:
name = "SB"
case SvcSIG:
name = "SIG"
default:
name = "UNKNOWN"
}
Expand Down
125 changes: 2 additions & 123 deletions go/sig/base/core/as.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,19 @@ import (
"github.com/scionproto/scion/go/sig/egress/router"
"github.com/scionproto/scion/go/sig/egress/session"
"github.com/scionproto/scion/go/sig/egress/worker"
"github.com/scionproto/scion/go/sig/internal/sigconfig"
"github.com/scionproto/scion/go/sig/sigcmn"
"github.com/scionproto/scion/go/sig/siginfo"
)

const (
sigMgrTick = 10 * time.Second
healthMonitorTick = 5 * time.Second
)

// ASEntry contains all of the information required to interact with a remote AS.
type ASEntry struct {
sync.RWMutex
Nets map[string]*net.IPNet
Sigs *siginfo.SigMap
IA addr.IA
IAString string
egressRing *ringbuf.Ring
sigMgrStop chan struct{}
healthMonitorStop chan struct{}
version uint64 // used to track certain changes made to ASEntry
log.Logger
Expand All @@ -65,16 +59,14 @@ func newASEntry(ia addr.IA) (*ASEntry, error) {
IA: ia,
IAString: ia.String(),
Nets: make(map[string]*net.IPNet),
Sigs: &siginfo.SigMap{},
sigMgrStop: make(chan struct{}),
healthMonitorStop: make(chan struct{}),
}
var err error
pool, err := session.NewPathPool(ia)
if err != nil {
return nil, err
}
ae.Session, err = session.NewSession(ia, 0, ae.Sigs, ae.Logger, pool, worker.DefaultFactory)
ae.Session, err = session.NewSession(ia, 0, ae.Logger, pool, worker.DefaultFactory)
if err != nil {
return nil, err
}
Expand All @@ -85,9 +77,7 @@ func (ae *ASEntry) ReloadConfig(cfg *config.ASEntry) bool {
ae.Lock()
defer ae.Unlock()
// Method calls first to prevent skips due to logical short-circuit
s := ae.addNewSIGS(cfg.Sigs)
s = ae.delOldSIGS(cfg.Sigs) && s
s = ae.addNewNets(cfg.Nets) && s
s := ae.addNewNets(cfg.Nets)
return ae.delOldNets(cfg.Nets) && s
}

Expand Down Expand Up @@ -185,111 +175,6 @@ func (ae *ASEntry) delNet(ipnet *net.IPNet) error {
return nil
}

// addNewSIGS adds the SIGs in sigs that are not currently configured.
func (ae *ASEntry) addNewSIGS(sigs config.SIGSet) bool {
s := true
for _, sig := range sigs {
ctrlPort := int(sig.CtrlPort)
if ctrlPort == 0 {
ctrlPort = sigconfig.DefaultCtrlPort
}
encapPort := int(sig.EncapPort)
if encapPort == 0 {
encapPort = sigconfig.DefaultEncapPort
}
err := ae.AddSig(sig.Id, sig.Addr, ctrlPort, encapPort, true)
if err != nil {
ae.Error("Unable to add SIG", "sig", sig, "err", err)
s = false
}
}
return s
}

// delOldSIGS deletes the currently configured SIGs that are not in sigs.
func (ae *ASEntry) delOldSIGS(sigs config.SIGSet) bool {
s := true
ae.Sigs.Range(func(id siginfo.SigIdType, sig *siginfo.Sig) bool {
if !sig.Static {
return true
}
if _, ok := sigs[sig.Id]; !ok {
err := ae.DelSig(sig.Id)
if err != nil {
ae.Error("Unable to delete SIG", "err", err)
s = false
}
}
return true
})
return s
}

// AddSig idempotently adds a SIG for the remote IA.
func (ae *ASEntry) AddSig(id siginfo.SigIdType, ip net.IP, ctrlPort, encapPort int,
static bool) error {
// ae.Sigs is thread safe, no master lock needed
if len(id) == 0 {
return common.NewBasicError("AddSig: SIG id empty", nil, "ia", ae.IA)
}
if ip == nil {
return common.NewBasicError("AddSig: SIG address empty", nil, "ia", ae.IA)
}
if err := sigcmn.ValidatePort("remote ctrl", ctrlPort); err != nil {
return common.NewBasicError("Remote ctrl port validation failed", err,
"ia", ae.IA, "id", id)
}
if err := sigcmn.ValidatePort("remote encap", encapPort); err != nil {
return common.NewBasicError("Remote encap port validation failed", err,
"ia", ae.IA, "id", id)
}
if sig, ok := ae.Sigs.Load(id); ok {
sig.Host = addr.HostFromIP(ip)
sig.CtrlL4Port = ctrlPort
sig.EncapL4Port = encapPort
ae.Info("Updated SIG", "sig", sig)
} else {
sig := siginfo.NewSig(ae.IA, id, addr.HostFromIP(ip), ctrlPort, encapPort, static)
ae.Sigs.Store(id, sig)
ae.Info("Added SIG", "sig", sig)
}
return nil
}

// DelSIG removes an SIG for the remote IA.
func (ae *ASEntry) DelSig(id siginfo.SigIdType) error {
// ae.Sigs is thread safe, no master lock needed
se, ok := ae.Sigs.Load(id)
if !ok {
return common.NewBasicError("DelSig: no SIG found", nil, "ia", ae.IA, "id", id)
}
ae.Sigs.Delete(id)
ae.Info("Removed SIG", "id", id)
return se.Cleanup()
}

// manage the Sig map
func (ae *ASEntry) sigMgr() {
ticker := time.NewTicker(sigMgrTick)
defer ticker.Stop()
ae.Info("sigMgr starting")
Top:
for {
// TODO(kormat): handle adding new SIGs from discovery, and updating existing ones.
select {
case <-ae.sigMgrStop:
break Top
case <-ticker.C:
ae.Sigs.Range(func(id siginfo.SigIdType, sig *siginfo.Sig) bool {
sig.ExpireFails()
return true
})
}
}
close(ae.sigMgrStop)
ae.Info("sigMgr stopping")
}

func (ae *ASEntry) monitorHealth() {
ticker := time.NewTicker(healthMonitorTick)
defer ticker.Stop()
Expand Down Expand Up @@ -340,8 +225,6 @@ func (ae *ASEntry) checkHealth() bool {
func (ae *ASEntry) Cleanup() error {
ae.Lock()
defer ae.Unlock()
// Clean up sigMgr goroutine.
ae.sigMgrStop <- struct{}{}
// Clean up health monitor
ae.healthMonitorStop <- struct{}{}
// Clean up NetMap entries
Expand Down Expand Up @@ -370,10 +253,6 @@ func (ae *ASEntry) setupNet() {
dispatcher.NewDispatcher(ae.IA, ae.egressRing,
&base.SingleSession{Session: ae.Session}).Run()
}()
go func() {
defer log.LogPanicAndExit()
ae.sigMgr()
}()
go func() {
defer log.LogPanicAndExit()
ae.monitorHealth()
Expand Down
34 changes: 0 additions & 34 deletions go/sig/base/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ import (
"sync"

"github.com/scionproto/scion/go/lib/addr"
"github.com/scionproto/scion/go/sig/siginfo"
)

type (
NetworkChangedCb func(NetworkChangedParams)
RemoteHealthChangedCb func(RemoteHealthChangedParams)
SigChangedCb func(SigChangedParams)
)

// NetworkChangedParams contains the parameters that are passed along with a NetworkChanged event.
Expand All @@ -40,25 +38,6 @@ type NetworkChangedParams struct {
Added bool
}

// SigChangedParams contains the parameters that are passed along with a SigChanged event.
type SigChangedParams struct {
// RemoteIA is the IA for which SIG information changed.
RemoteIA addr.IA
// Id is the id of the SIG.
Id siginfo.SigIdType
// Host is the host address of the SIG.
Host addr.HostAddr
// CtrlPort is the port on which the SIG accepts control traffic.
CtrlPort int
// EncapPort is the port on which the SIG accepts encapsulated SIG-SIG frames.
EncapPort int
// Static is true, if this SIG was statically configured and false if it was discovered
// dynamically.
Static bool
// Added is true if the SIG was added, false otherwise.
Added bool
}

// RemoteHealthChangedParams contains the parameters that are passed along with a
// RemoteHealthChanged event.
type RemoteHealthChangedParams struct {
Expand All @@ -76,9 +55,6 @@ type RemoteHealthChangedParams struct {
type EventCallbacks struct {
// NetworkChanged is called when a remote network was added or removed from the configuration.
NetworkChanged NetworkChangedCb
// SigChanged is called when a remote SIG is was added or removed from the configuration.
// TODO(shitz): This event does not get generated yet.
SigChanged SigChangedCb
// RemoteHealthChanged is called when the reachability status of a remote AS changed.
RemoteHealthChanged RemoteHealthChangedCb
}
Expand Down Expand Up @@ -117,13 +93,3 @@ func RemoteHealthChanged(params RemoteHealthChangedParams) {
}
}
}

func SigChanged(params SigChangedParams) {
lock.RLock()
defer lock.RUnlock()
for _, cbs := range listeners {
if cbs.SigChanged != nil {
cbs.SigChanged(params)
}
}
}
3 changes: 2 additions & 1 deletion go/sig/base/pollhdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func PollReqHdlr() {
"src", rpld.Addr, "type", common.TypeOf(rpld.P), "Id", rpld.Id, "pld", rpld.P)
continue
}
//log.Debug("PollReqHdlr: got PollReq", "src", rpld.Addr, "pld", req)
//log.Debug("PollReqHdlr: got PollReq", "src", rpld.Addr, "pld", req,
// "replyAddr", sigcmn.MgmtAddr, "replySession", req.Session)
spld, err := mgmt.NewPld(rpld.Id, mgmt.NewPollRep(sigcmn.MgmtAddr, req.Session))
if err != nil {
log.Error("PollReqHdlr: Error creating SIGCtrl payload", "err", err)
Expand Down
32 changes: 0 additions & 32 deletions go/sig/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ package config

import (
"encoding/json"
"fmt"
"io/ioutil"
"net"

"github.com/scionproto/scion/go/lib/addr"
"github.com/scionproto/scion/go/lib/common"
"github.com/scionproto/scion/go/sig/siginfo"
)

// Cfg is a direct Go representation of the JSON file format.
Expand All @@ -43,32 +41,12 @@ func LoadFromFile(path string) (*Cfg, error) {
if err := json.Unmarshal(b, cfg); err != nil {
return nil, common.NewBasicError("Unable to parse SIG config", err)
}
for ia, asCfg := range cfg.ASes {
if asCfg == nil {
return nil, common.NewBasicError(
fmt.Sprintf("Remote AS config for %s is nil", ia), nil)
}
}
cfg.postprocess()
return cfg, nil
}

// postprocess sets the SIG IDs of the SIG objects in cfg according the keys in
// SIGSet.
func (cfg *Cfg) postprocess() {
// Populate IDs
for _, as := range cfg.ASes {
for id := range as.Sigs {
sig := as.Sigs[id]
sig.Id = id
}
}
}

type ASEntry struct {
Name string
Nets []*IPNet
Sigs SIGSet
}

// IPNet is custom type of net.IPNet, to allow custom unmarshalling.
Expand Down Expand Up @@ -102,13 +80,3 @@ func (in *IPNet) IPNet() *net.IPNet {
func (in *IPNet) String() string {
return (*net.IPNet)(in).String()
}

// SIG represents a SIG in a remote IA.
type SIG struct {
Id siginfo.SigIdType `json:"-"`
Addr net.IP
CtrlPort uint16
EncapPort uint16
}

type SIGSet map[siginfo.SigIdType]*SIG
22 changes: 0 additions & 22 deletions go/sig/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,6 @@ func TestLoadFromFile(t *testing.T) {
Mask: net.CIDRMask(48, 8*net.IPv6len),
},
},
Sigs: SIGSet{
"remote-1": &SIG{
Id: "remote-1",
Addr: net.ParseIP("192.0.2.1"),
CtrlPort: 1234,
EncapPort: 5678,
},
"remote-2": &SIG{
Id: "remote-2",
Addr: net.ParseIP("192.0.2.2"),
CtrlPort: 65535,
EncapPort: 0,
},
},
},
xtest.MustParseIA("1-ff00:0:2"): {
Nets: []*IPNet{
Expand All @@ -75,21 +61,13 @@ func TestLoadFromFile(t *testing.T) {
Mask: net.CIDRMask(24, 8*net.IPv4len),
},
},
Sigs: SIGSet{},
},
xtest.MustParseIA("1-ff00:0:3"): {
Nets: []*IPNet{},
Sigs: SIGSet{},
},
xtest.MustParseIA("1-ff00:0:4"): {
Name: "AS 4",
Nets: []*IPNet{},
Sigs: SIGSet{
"remote-3": &SIG{
Id: "remote-3",
Addr: net.ParseIP("2001:DB8::4"),
},
},
},
},
ConfigVersion: 9001,
Expand Down
Loading