Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Add support for AWSVPC
Browse files Browse the repository at this point in the history
The AWSVPC feature allows us to use weave-net on AWS
by utilizing AWS VPC routing facilities without relying
on overlay networks.
  • Loading branch information
brb committed Mar 23, 2016
1 parent b080224 commit a344dd6
Show file tree
Hide file tree
Showing 24 changed files with 1,866 additions and 104 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,6 @@ test/tls/tls
test/tls/*.pem
test/coverage
test/coverage.*

# Ctags
tags
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,6 @@
[submodule "vendor/github.com/coreos/go-iptables"]
path = vendor/github.com/coreos/go-iptables
url = https://github.com/coreos/go-iptables
[submodule "vendor/github.com/aws/aws-sdk-go"]
path = vendor/github.com/aws/aws-sdk-go
url = https://github.com/aws/aws-sdk-go
11 changes: 11 additions & 0 deletions circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ machine:
SRCDIR: /home/ubuntu/src/github.com/weaveworks/weave
PATH: $PATH:$HOME/.local/bin:$HOME/google-cloud-sdk/bin/
CLOUDSDK_CORE_DISABLE_PROMPTS: 1
AWS_DEFAULT_REGION: us-east-1

dependencies:
cache_directories:
Expand All @@ -24,6 +25,8 @@ dependencies:
- "cd $SRCDIR; git submodule update --init"
- cd $SRCDIR/test; ./gce.sh make_template:
parallel: false
- cd $SRCDIR/test; ./aws.sh make_template:
parallel: false
- "cd $SRCDIR/build; ../tools/rebuild-image weaveworks/weavebuild . Dockerfile build.sh && touch $SRCDIR/.build.uptodate"

test:
Expand All @@ -41,9 +44,17 @@ test:
- cd $SRCDIR/test; eval $(./gce.sh hosts); export COVERAGE=true; ./run_all.sh:
parallel: true
timeout: 300
- cd $SRCDIR/test; ./aws.sh setup && eval $(./aws.sh hosts) && ./setup.sh:
parallel: false
- cd $SRCDIR/test; eval $(./aws.sh hosts); export COVERAGE=true; ./run_all.sh:
parallel: false
timeout: 300

post:
- cd $SRCDIR/test; ./gce.sh destroy:
parallel: true
- cd $SRCDIR/test; ./aws.sh destroy:
parallel: false
- test "$CIRCLE_NODE_INDEX" != "0" || (cd $SRCDIR/test; ./gen_coverage_reports.sh):
parallel: true

Expand Down
7 changes: 7 additions & 0 deletions common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ func Assert(test bool) {
}
}

// AssertWithMsg panics with the given msg if test is false
func AssertWithMsg(test bool, msg string) {
if !test {
panic(msg)
}
}

func ErrorMessages(errors []error) string {
var result []string
for _, err := range errors {
Expand Down
82 changes: 60 additions & 22 deletions ipam/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/weaveworks/weave/common"
"github.com/weaveworks/weave/db"
"github.com/weaveworks/weave/ipam/monitor"
"github.com/weaveworks/weave/ipam/paxos"
"github.com/weaveworks/weave/ipam/ring"
"github.com/weaveworks/weave/ipam/space"
Expand Down Expand Up @@ -65,33 +66,39 @@ type Allocator struct {
shuttingDown bool // to avoid doing any requests while trying to shut down
isKnownPeer func(mesh.PeerName) bool
now func() time.Time
isCIDRAligned bool // should donate only CIDR aligned ranges
monitor monitor.Monitor // monitor tracks changes in address ranges owned by us
}

type Config struct {
OurName mesh.PeerName
OurUID mesh.PeerUID
OurNickname string
Seed []mesh.PeerName
Universe address.Range
Quorum uint
Db db.DB
IsKnownPeer func(name mesh.PeerName) bool
OurName mesh.PeerName
OurUID mesh.PeerUID
OurNickname string
Seed []mesh.PeerName
Universe address.Range
Quorum uint
Db db.DB
IsKnownPeer func(name mesh.PeerName) bool
IsCIDRAligned bool
Monitor monitor.Monitor
}

// NewAllocator creates and initialises a new Allocator
func NewAllocator(config Config) *Allocator {
return &Allocator{
ourName: config.OurName,
seed: config.Seed,
universe: config.Universe,
ring: ring.New(config.Universe.Start, config.Universe.End, config.OurName),
owned: make(map[string][]address.CIDR),
db: config.Db,
paxos: paxos.NewParticipant(config.OurName, config.OurUID, config.Quorum),
nicknames: map[mesh.PeerName]string{config.OurName: config.OurNickname},
isKnownPeer: config.IsKnownPeer,
dead: make(map[string]time.Time),
now: time.Now,
ourName: config.OurName,
seed: config.Seed,
universe: config.Universe,
ring: ring.New(config.Universe.Start, config.Universe.End, config.OurName),
owned: make(map[string][]address.CIDR),
db: config.Db,
paxos: paxos.NewParticipant(config.OurName, config.OurUID, config.Quorum),
nicknames: map[mesh.PeerName]string{config.OurName: config.OurNickname},
isKnownPeer: config.IsKnownPeer,
dead: make(map[string]time.Time),
now: time.Now,
isCIDRAligned: config.IsCIDRAligned,
monitor: config.Monitor,
}
}

Expand Down Expand Up @@ -368,6 +375,10 @@ func (alloc *Allocator) Shutdown() {
alloc.cancelOps(&alloc.pendingClaims)
alloc.cancelOps(&alloc.pendingAllocates)
alloc.cancelOps(&alloc.pendingConsenses)
err := alloc.monitor.HandleUpdate(alloc.ring.OwnedAndMergedRanges(), nil)
if err != nil {
alloc.errorf("HandleUpdate failed: %s", err)
}
if heir := alloc.pickPeerForTransfer(); heir != mesh.UnknownPeerName {
alloc.ring.Transfer(alloc.ourName, heir)
alloc.space.Clear()
Expand All @@ -381,7 +392,7 @@ func (alloc *Allocator) Shutdown() {

// AdminTakeoverRanges (Sync) - take over the ranges owned by a given
// peer, and return how much space was transferred in the process.
// Only done on adminstrator command.
// Only done on administrator command.
func (alloc *Allocator) AdminTakeoverRanges(peerNameOrNickname string) address.Count {
resultChan := make(chan address.Count)
alloc.actionChan <- func() {
Expand All @@ -399,6 +410,7 @@ func (alloc *Allocator) AdminTakeoverRanges(peerNameOrNickname string) address.C
return
}

oldRanges := alloc.ring.OwnedAndMergedRanges()
newRanges := alloc.ring.Transfer(peername, alloc.ourName)

if len(newRanges) == 0 {
Expand All @@ -410,6 +422,12 @@ func (alloc *Allocator) AdminTakeoverRanges(peerNameOrNickname string) address.C
alloc.space.AddRanges(newRanges)
after := alloc.space.NumFreeAddresses()

// TODO(mp): Return error over resultChan
err = alloc.monitor.HandleUpdate(oldRanges, alloc.ring.OwnedAndMergedRanges())
if err != nil {
alloc.errorf("HandleUpdate: %s", err)
}

alloc.gossip.GossipBroadcast(alloc.Gossip())

resultChan <- after - before
Expand Down Expand Up @@ -613,7 +631,12 @@ func (alloc *Allocator) establishRing() {

func (alloc *Allocator) createRing(peers []mesh.PeerName) {
alloc.debugln("Paxos consensus:", peers)
alloc.ring.ClaimForPeers(normalizeConsensus(peers))
alloc.ring.ClaimForPeers(normalizeConsensus(peers), alloc.isCIDRAligned)
// We assume that the peer has not possessed any address ranges before
err := alloc.monitor.HandleUpdate(nil, alloc.ring.OwnedAndMergedRanges())
if err != nil {
alloc.errorf("HandleUpdate failed: %s", err)
}
alloc.gossip.GossipBroadcast(alloc.Gossip())
alloc.ringUpdated()
}
Expand Down Expand Up @@ -705,14 +728,20 @@ func (alloc *Allocator) update(sender mesh.PeerName, msg []byte) error {
alloc.nicknames[peer] = nickname
}

var oldRanges []address.Range
switch {
// If someone sent us a ring, merge it into ours. Note this will move us
// out of the awaiting-consensus state if we didn't have a ring already.
case data.Ring != nil:
oldRanges = alloc.ring.OwnedAndMergedRanges()
switch err = alloc.ring.Merge(*data.Ring); err {
case nil:
if !alloc.ring.Empty() {
alloc.pruneNicknames()
err := alloc.monitor.HandleUpdate(oldRanges, alloc.ring.OwnedAndMergedRanges())
if err != nil {
return err
}
alloc.ringUpdated()
}
case ring.ErrDifferentSeeds:
Expand Down Expand Up @@ -774,7 +803,8 @@ func (alloc *Allocator) donateSpace(r address.Range, to mesh.PeerName) {
defer alloc.sendRingUpdate(to)

alloc.debugln("Peer", to, "asked me for space")
chunk, ok := alloc.space.Donate(r)
chunk, ok := alloc.space.Donate(r, alloc.isCIDRAligned,
func() []address.CIDR { return alloc.ring.OwnedCIDRRanges(r) })
if !ok {
free := alloc.space.NumFreeAddressesInRange(r)
common.Assert(free == 0)
Expand All @@ -785,8 +815,13 @@ func (alloc *Allocator) donateSpace(r address.Range, to mesh.PeerName) {
return
}
alloc.debugln("Giving range", chunk, "to", to)
oldRanges := alloc.ring.OwnedAndMergedRanges()
alloc.ring.GrantRangeToHost(chunk.Start, chunk.End, to)
alloc.persistRing()
err := alloc.monitor.HandleUpdate(oldRanges, alloc.ring.OwnedAndMergedRanges())
if err != nil {
alloc.errorf("HandleUpdate failed: %s", err)
}
}

func (alloc *Allocator) assertInvariants() {
Expand Down Expand Up @@ -971,6 +1006,9 @@ func (alloc *Allocator) fatalf(fmt string, args ...interface{}) {
func (alloc *Allocator) warnf(fmt string, args ...interface{}) {
alloc.logf(common.Log.Warnf, fmt, args...)
}
func (alloc *Allocator) errorf(fmt string, args ...interface{}) {
common.Log.Errorf("[allocator %s] "+fmt, append([]interface{}{alloc.ourName}, args...)...)
}
func (alloc *Allocator) infof(fmt string, args ...interface{}) {
alloc.logf(common.Log.Infof, fmt, args...)
}
Expand Down
Loading

0 comments on commit a344dd6

Please sign in to comment.