Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BS: Fast recovery #2672

Merged
merged 10 commits into from
May 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/BeaconService.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ Core BSes
*(uses: Interfaces.All)*

Core BSes originate new beacons to child and neighboring core ASes.
The periodic task has an interval that is less than the OriginateTime.
Beacons are originated on all interfaces that do not have a origination in the last OriginateTime.

1. Create a new PCB using the local IA and the current timestamp
1. Propagate the PCB on all core and child interfaces.
Expand Down
1 change: 1 addition & 0 deletions go/beacon_srv/internal/beaconing/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"originator.go",
"propagator.go",
"registrar.go",
"tick.go",
"util.go",
],
importpath = "github.com/scionproto/scion/go/beacon_srv/internal/beaconing",
Expand Down
12 changes: 9 additions & 3 deletions go/beacon_srv/internal/beaconing/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,24 @@
//
// The originator should only be instantiated by core beacon servers. It
// periodically creates fresh beacons and propagates them on all core and child
// links.
// links. In case the task is run before a full period has passed, beacons are
// originated on all interfaces that have last been originated on more than one
// period ago.
//
// Registrar
//
// The registrar is a periodic task to register segments with the appropriate
// path server. Core and Up segments are registered with the local path server.
// Down segments are registered with the originating core AS.
// Down segments are registered with the originating core AS. In case the task
// is run before a full period has passed, segments are only registered, if
// there has not been a successful registration in the last period.
//
// Propagator
//
// The propagator is a periodic task to propagate beacons to the appropriate
// neighboring ASes. In a core AS, the beacons are propagated to the neighbors
// on all core link, unless they will create an AS loop. In a non-core AS, the
// beacons are propagated to the neighbors on all child links.
// beacons are propagated to the neighbors on all child links. In case the task
// is run before a full period has passed, beacons are propagated on all
// interfaces that have last been propagated on more than one period ago.
package beaconing
33 changes: 29 additions & 4 deletions go/beacon_srv/internal/beaconing/originator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,16 @@ var _ periodic.Task = (*Originator)(nil)
type OriginatorConf struct {
Config ExtenderConf
Sender *onehop.Sender
Period time.Duration
}

// Originator originates beacons. It should only be used by core ASes.
type Originator struct {
*segExtender
sender *onehop.Sender

// tick is mutable.
tick tick
}

// New creates a new originator.
Expand All @@ -54,26 +58,28 @@ func (cfg OriginatorConf) New() (*Originator, error) {
o := &Originator{
sender: cfg.Sender,
segExtender: extender,
tick: tick{period: cfg.Period},
}
return o, nil
}

// Run originates core and downstream beacons.
func (o *Originator) Run(_ context.Context) {
o.tick.now = time.Now()
o.originateBeacons(proto.LinkType_core)
o.originateBeacons(proto.LinkType_child)
o.tick.updateLast()
}

// originateBeacons creates and sends a beacon for each active interface of
// the specified link type.
func (o *Originator) originateBeacons(linkType proto.LinkType) {

active, nonActive := sortedIntfs(o.cfg.Intfs, linkType)
if len(nonActive) > 0 {
if len(nonActive) > 0 && o.tick.passed() {
log.Debug("[Originator] Ignore non-active interfaces", "intfs", nonActive)
}
infoF := o.createInfoF(time.Now())
for _, ifid := range active {
infoF := o.createInfoF(o.tick.now)
for _, ifid := range o.needBeacon(active) {
if err := o.originateBeacon(ifid, infoF); err != nil {
log.Error("[Originator] Unable to originate on interface", "ifid", ifid, "err", err)
}
Expand All @@ -90,6 +96,24 @@ func (o *Originator) createInfoF(now time.Time) spath.InfoField {
return infoF
}

// needBeacon returns a list of interfaces that need a beacon.
func (o *Originator) needBeacon(active []common.IFIDType) []common.IFIDType {
if o.tick.passed() {
return active
}
stale := make([]common.IFIDType, 0, len(active))
for _, ifid := range active {
intf := o.cfg.Intfs.Get(ifid)
if intf == nil {
continue
}
if o.tick.now.Sub(intf.LastOriginate()) > o.tick.period {
stale = append(stale, ifid)
}
}
return stale
}

// originateBeacon originates a beacon on the given ifid.
func (o *Originator) originateBeacon(ifid common.IFIDType, infoF spath.InfoField) error {
intf := o.cfg.Intfs.Get(ifid)
Expand All @@ -105,6 +129,7 @@ func (o *Originator) originateBeacon(ifid common.IFIDType, infoF spath.InfoField
if err := o.sender.Send(msg, ov); err != nil {
return common.NewBasicError("Unable to send packet", err)
}
intf.Originate(o.tick.now)
return nil
}

Expand Down
48 changes: 37 additions & 11 deletions go/beacon_srv/internal/beaconing/propagator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package beaconing
import (
"context"
"sync"
"time"

"github.com/scionproto/scion/go/beacon_srv/internal/beacon"
"github.com/scionproto/scion/go/beacon_srv/internal/onehop"
Expand All @@ -39,6 +40,7 @@ type PropagatorConf struct {
Config ExtenderConf
BeaconProvider BeaconProvider
Sender *onehop.Sender
Period time.Duration
Core bool
AllowIsdLoop bool
}
Expand All @@ -53,6 +55,9 @@ type Propagator struct {
provider BeaconProvider
allowIsdLoop bool
core bool

// tick is mutable.
tick tick
}

// New creates a new beacon propagation task.
Expand All @@ -68,6 +73,7 @@ func (cfg PropagatorConf) New() (*Propagator, error) {
core: cfg.Core,
allowIsdLoop: cfg.AllowIsdLoop,
segExtender: extender,
tick: tick{period: cfg.Period},
}
return p, nil
}
Expand All @@ -77,47 +83,62 @@ func (cfg PropagatorConf) New() (*Propagator, error) {
// interfaces. In a non-core beacon server, child interfaces are the target
// interfaces.
func (p *Propagator) Run(ctx context.Context) {
p.tick.now = time.Now()
if err := p.run(ctx); err != nil {
log.Error("[Propagator] Unable to propagate beacons", "err", err)
}
p.tick.updateLast()
}

func (p *Propagator) run(ctx context.Context) error {
beacons, err := p.provider.BeaconsToPropagate(ctx)
if err != nil {
return err
intfs := p.needsBeacons()
if len(intfs) == 0 {
return nil
}
activeIntfs := p.activeIntfs()
peers, nonActivePeers := sortedIntfs(p.cfg.Intfs, proto.LinkType_peer)
if len(nonActivePeers) > 0 {
if len(nonActivePeers) > 0 && p.tick.passed() {
log.Debug("[Propagator] Ignore inactive peer links", "ifids", nonActivePeers)
}
beacons, err := p.provider.BeaconsToPropagate(ctx)
if err != nil {
return err
}
wg := &sync.WaitGroup{}
for bOrErr := range beacons {
if bOrErr.Err != nil {
log.Error("[Propagator] Unable to get beacon", "err", err)
continue
}
p.startPropagate(bOrErr.Beacon, activeIntfs, peers, wg)
p.startPropagate(bOrErr.Beacon, intfs, peers, wg)
}
wg.Wait()
return nil
}

// activeIntfs returns a list of active interface ids that beacons should be
// needsBeacons returns a list of active interface ids that beacons should be
// propagated to. In a core AS, these are all active core links. In a non-core
// AS, these are all active child links.
func (p *Propagator) activeIntfs() []common.IFIDType {
func (p *Propagator) needsBeacons() []common.IFIDType {
var activeIntfs, nonActiveIntfs []common.IFIDType
if p.core {
activeIntfs, nonActiveIntfs = sortedIntfs(p.cfg.Intfs, proto.LinkType_core)
} else {
activeIntfs, nonActiveIntfs = sortedIntfs(p.cfg.Intfs, proto.LinkType_child)
}
if len(nonActiveIntfs) > 0 {
if len(nonActiveIntfs) > 0 && p.tick.passed() {
log.Debug("[Propagator] Ignore inactive links", "ifids", nonActiveIntfs)
}
return activeIntfs
stale := make([]common.IFIDType, 0, len(activeIntfs))
for _, ifid := range activeIntfs {
intf := p.cfg.Intfs.Get(ifid)
if intf == nil {
continue
}
if p.tick.now.Sub(intf.LastPropagate()) > p.tick.period {
stale = append(stale, ifid)
}
}
return stale
}

// startPropagate adds to the wait group and starts propagation of the beacon on
Expand Down Expand Up @@ -179,7 +200,11 @@ func (p *Propagator) extendAndSend(bseg beacon.Beacon, egIfid common.IFIDType,
log.Error("[Propagator] Unable to extend beacon", "beacon", bseg, "err", err)
return
}
topoInfo := p.cfg.Intfs.Get(egIfid).TopoInfo()
intf := p.cfg.Intfs.Get(egIfid)
if intf == nil {
log.Error("[Propagator] Interface removed", "egIfid", egIfid)
}
topoInfo := intf.TopoInfo()
msg, err := packBeaconMsg(&seg.Beacon{Segment: bseg.Segment}, topoInfo.ISD_AS,
egIfid, p.cfg.Signer)
if err != nil {
Expand All @@ -191,6 +216,7 @@ func (p *Propagator) extendAndSend(bseg beacon.Beacon, egIfid common.IFIDType,
log.Error("[Propagator] Unable to send packet", "ifid", "err", err)
return
}
intf.Propagate(p.tick.now)
success.Inc()
}()
}
Expand Down
2 changes: 1 addition & 1 deletion go/beacon_srv/internal/beaconing/propagator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestPropagatorRun(t *testing.T) {
cfg.Config.Intfs.Get(ifid).Activate(remote)
}
g := graph.NewDefaultGraph(mctrl)
provider.EXPECT().BeaconsToPropagate(gomock.Any()).DoAndReturn(
provider.EXPECT().BeaconsToPropagate(gomock.Any()).MaxTimes(1).DoAndReturn(
func(_ interface{}) (<-chan beacon.BeaconOrErr, error) {
res := make(chan beacon.BeaconOrErr, len(beacons[test.core]))
for _, desc := range beacons[test.core] {
Expand Down
13 changes: 13 additions & 0 deletions go/beacon_srv/internal/beaconing/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"net"
"sync"
"time"

"github.com/scionproto/scion/go/beacon_srv/internal/beacon"
"github.com/scionproto/scion/go/lib/addr"
Expand Down Expand Up @@ -48,6 +49,7 @@ type RegistrarConf struct {
SegProvider SegmentProvider
TopoProvider topology.Provider
Msgr infra.Messenger
Period time.Duration
SegType proto.PathSegType
}

Expand All @@ -60,6 +62,10 @@ type Registrar struct {
segProvider SegmentProvider
topoProvider topology.Provider
segType proto.PathSegType

// mutable fields
lastSucc time.Time
tick tick
}

// New creates a new segment regsitration task.
Expand All @@ -74,19 +80,25 @@ func (cfg RegistrarConf) New() (*Registrar, error) {
topoProvider: cfg.TopoProvider,
segType: cfg.SegType,
msgr: cfg.Msgr,
tick: tick{period: cfg.Period},
segExtender: extender,
}
return r, nil
}

// Run registers path segments for the specified type to path servers.
func (r *Registrar) Run(ctx context.Context) {
r.tick.now = time.Now()
if err := r.run(ctx); err != nil {
log.Error("[Registrar] Unable to register", "type", r.segType, "err", err)
}
r.tick.updateLast()
}

func (r *Registrar) run(ctx context.Context) error {
if r.tick.now.Sub(r.lastSucc) < r.tick.period {
return nil
}
segments, err := r.segProvider.SegmentsToRegister(ctx, r.segType)
if err != nil {
return err
Expand Down Expand Up @@ -115,6 +127,7 @@ func (r *Registrar) run(ctx context.Context) error {
}
log.Info("[Registrar] Successfully registered segments", "success", success.c,
"candidates", total, "segCreationErrs", segErr.c, "sendErrs", sendErr.c)
r.lastSucc = r.tick.now
return nil
}

Expand Down
39 changes: 39 additions & 0 deletions go/beacon_srv/internal/beaconing/tick.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2019 Anapaya Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package beaconing

import (
"time"
)

// tick keeps track whether the period has passed compared to the last time.
type tick struct {
now time.Time
last time.Time
period time.Duration
}

// updateLast updates the last time to the current time, if the period has
// passed since last.
func (t *tick) updateLast() {
if t.passed() {
t.last = t.now
}
}

// passed returns whether the last timestamp is further away from now than the period
func (t *tick) passed() bool {
return t.now.Sub(t.last) >= t.period
}
2 changes: 2 additions & 0 deletions go/beacon_srv/internal/ifstate/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"handler.go",
"ifstate.go",
"metrics.go",
"pusher.go",
"revoker.go",
],
importpath = "github.com/scionproto/scion/go/beacon_srv/internal/ifstate",
Expand All @@ -31,6 +32,7 @@ go_test(
srcs = [
"handler_test.go",
"ifstate_test.go",
"pusher_test.go",
"revoker_test.go",
],
data = glob(["testdata/**"]),
Expand Down
Loading