diff --git a/.gitmodules b/.gitmodules index 8636191c32..b13e74dc0f 100644 --- a/.gitmodules +++ b/.gitmodules @@ -73,3 +73,6 @@ [submodule "vendor/github.com/j-keck/arping"] path = vendor/github.com/j-keck/arping url = https://github.com/j-keck/arping +[submodule "vendor/github.com/aws/aws-sdk-go"] + path = vendor/github.com/aws/aws-sdk-go + url = https://github.com/aws/aws-sdk-go diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000000..c61dc80a6b --- /dev/null +++ b/TODO.md @@ -0,0 +1,17 @@ +# AWSVPC + +# Urgent + +* Add the subnet checks / keep checksum offloading for the plugin +* Test with the plugin + +* Test with multiple addr from the default subnet + +* Create new SSH keys or reuse the same for each test run + +# Nice To Have + +* What happens if $RCIDR is not found +* Do fuzz testing +* Read on VPC routing tables again. +* create a separate VPC for each test run diff --git a/api/ipam.go b/api/ipam.go index 3f15af7f2f..d346390686 100644 --- a/api/ipam.go +++ b/api/ipam.go @@ -53,6 +53,14 @@ func (client *Client) DefaultSubnet() (*net.IPNet, error) { return ipnet, err } +func (client *Client) LocalRangeTracker() (string, error) { + t, err := client.httpVerb("GET", fmt.Sprintf("/ipinfo/tracker"), nil) + if err != nil { + return "", err + } + return t, nil +} + func parseIP(body string) (*net.IPNet, error) { ip, ipnet, err := net.ParseCIDR(string(body)) if err != nil { diff --git a/circle.yml b/circle.yml index 528488cf98..4d4a55dc25 100644 --- a/circle.yml +++ b/circle.yml @@ -12,6 +12,7 @@ machine: SRCDIR: /home/ubuntu/src/github.com/weaveworks/weave PATH: $PATH:$HOME/.local/bin:$HOME/google-cloud-sdk/bin/ CLOUDSDK_CORE_DISABLE_PROMPTS: 1 + AWS_DEFAULT_REGION: us-east-1 dependencies: cache_directories: @@ -22,7 +23,9 @@ dependencies: - bin/setup-circleci-secrets "$SECRET_PASSWORD" - "mkdir -p $(dirname $SRCDIR) && cp -r $(pwd)/ $SRCDIR" - "cd $SRCDIR; git submodule update --init" - - cd $SRCDIR/test; ./gce.sh make_template: + #- cd $SRCDIR/test; ./gce.sh make_template: + # parallel: false + - cd $SRCDIR/test; ./aws.sh make_template: parallel: false - "cd $SRCDIR/build; ../tools/rebuild-image weaveworks/weavebuild . Dockerfile build.sh && touch $SRCDIR/.build.uptodate" @@ -36,14 +39,22 @@ test: parallel: true - cd $SRCDIR; make testrunner: parallel: true - - cd $SRCDIR/test; ./gce.sh setup && eval $(./gce.sh hosts) && ./setup.sh: - parallel: true - - cd $SRCDIR/test; eval $(./gce.sh hosts); export COVERAGE=true; ./run_all.sh: - parallel: true + #- cd $SRCDIR/test; ./gce.sh setup && eval $(./gce.sh hosts) && ./setup.sh: + # parallel: true + #- cd $SRCDIR/test; eval $(./gce.sh hosts); export COVERAGE=true; ./run_all.sh: + # parallel: true + # timeout: 300 + - cd $SRCDIR/test; ./aws.sh setup && eval $(./aws.sh hosts) && ./setup.sh: + parallel: false + - cd $SRCDIR/test; eval $(./aws.sh hosts); export COVERAGE=true; AWS=1 ./run_all.sh: + parallel: false timeout: 300 + post: - - cd $SRCDIR/test; ./gce.sh destroy: - parallel: true + #- cd $SRCDIR/test; ./gce.sh destroy: + # parallel: true + - cd $SRCDIR/test; ./aws.sh destroy: + parallel: false - test "$CIRCLE_NODE_INDEX" != "0" || (cd $SRCDIR/test; ./gen_coverage_reports.sh): parallel: true diff --git a/ipam/allocator.go b/ipam/allocator.go index 50950ffe2a..c33fe920e6 100644 --- a/ipam/allocator.go +++ b/ipam/allocator.go @@ -14,6 +14,7 @@ import ( "github.com/weaveworks/weave/ipam/paxos" "github.com/weaveworks/weave/ipam/ring" "github.com/weaveworks/weave/ipam/space" + "github.com/weaveworks/weave/ipam/tracker" "github.com/weaveworks/weave/net/address" ) @@ -85,11 +86,14 @@ type Config struct { Quorum func() uint Db db.DB IsKnownPeer func(name mesh.PeerName) bool + Tracker tracker.LocalRangeTracker } // NewAllocator creates and initialises a new Allocator func NewAllocator(config Config) *Allocator { var participant paxos.Participant + var alloc *Allocator + var onUpdate ring.OnUpdate if config.IsObserver { participant = paxos.NewObserver() @@ -97,11 +101,19 @@ func NewAllocator(config Config) *Allocator { participant = paxos.NewNode(config.OurName, config.OurUID, 0) } - return &Allocator{ + if config.Tracker != nil { + onUpdate = func(prev []address.Range, curr []address.Range, local bool) { + if err := config.Tracker.HandleUpdate(prev, curr, local); err != nil { + alloc.errorf("HandleUpdate failed: %s", err) + } + } + } + + alloc = &Allocator{ ourName: config.OurName, seed: config.Seed, universe: config.Universe, - ring: ring.New(config.Universe.Range().Start, config.Universe.Range().End, config.OurName), + ring: ring.New(config.Universe.Range().Start, config.Universe.Range().End, config.OurName, onUpdate), owned: make(map[string]ownedData), db: config.Db, paxos: participant, @@ -111,6 +123,7 @@ func NewAllocator(config Config) *Allocator { dead: make(map[string]time.Time), now: time.Now, } + return alloc } func ParseCIDRSubnet(cidrStr string) (cidr address.CIDR, err error) { @@ -958,7 +971,7 @@ func (alloc *Allocator) loadPersistedData() bool { return false } - alloc.ring = persistedRing + alloc.ring.Restore(persistedRing) alloc.space.UpdateRanges(alloc.ring.OwnedRanges()) if ownedFound { @@ -1068,6 +1081,9 @@ func (alloc *Allocator) fatalf(fmt string, args ...interface{}) { func (alloc *Allocator) warnf(fmt string, args ...interface{}) { alloc.logf(common.Log.Warnf, fmt, args...) } +func (alloc *Allocator) errorf(fmt string, args ...interface{}) { + common.Log.Errorf("[allocator %s] "+fmt, append([]interface{}{alloc.ourName}, args...)...) +} func (alloc *Allocator) infof(fmt string, args ...interface{}) { alloc.logf(common.Log.Infof, fmt, args...) } diff --git a/ipam/allocator_test.go b/ipam/allocator_test.go index ce7c3759dc..51db7521c9 100644 --- a/ipam/allocator_test.go +++ b/ipam/allocator_test.go @@ -15,7 +15,7 @@ import ( ) const ( - testStart1 = "10.0.1.0" + testStart1 = "10.0.0.0" testStart2 = "10.0.2.0" testStart3 = "10.0.3.0" ) @@ -644,3 +644,156 @@ func cidrRanges(s string) []address.Range { c, _ := address.ParseCIDR(s) return []address.Range{c.Range()} } + +func TestTracker(t *testing.T) { + const ( + cidr = "10.0.0.0/30" + container1 = "container-1" + container2 = "container-2" + ) + + trackChan := make(chan rangePair, 10) + allocs, router, _ := makeNetworkOfAllocatorsWithTracker(2, cidr, newTestTracker(trackChan)) + defer stopNetworkOfAllocators(allocs, router) + + cidr1, _ := address.ParseCIDR(cidr) + addr1, err := allocs[0].Allocate(container1, cidr1, true, returnFalse) + require.Equal(t, nil, err, "") + require.Equal(t, ip("10.0.0.1"), addr1, "") + + // Check HandleUpdate invocations. 2 of them should be invoked only with new ranges. + newPeer1 := false + newPeer2 := false + for i := 0; i < 3; i++ { + pair := <-trackChan + switch { + case !newPeer1 && pair.new[0].Equal(addrRange("10.0.0.0", "10.0.0.1")): + newPeer1 = true + case !newPeer2 && pair.new[0].Equal(addrRange("10.0.0.2", "10.0.0.3")): + newPeer2 = true + default: + continue + } + } + require.True(t, newPeer1 && newPeer2, "") + + // The following allocation should trigger request for donation, because + // peer1 ran out of space. + addr2, err := allocs[0].Allocate(container2, cidr1, true, returnFalse) + require.Equal(t, nil, err, "") + require.Equal(t, ip("10.0.0.2"), addr2, "") + + // Check whether HandleUpdate is invoked after donation. + newDonation1 := false + newDonation2 := false + for i := 0; i < 2; i++ { + pair := <-trackChan + switch { + case !newDonation1 && + pair.old[0].Equal(addrRange("10.0.0.0", "10.0.0.1")) && + pair.new[0].Equal(addrRange("10.0.0.0", "10.0.0.1")) && + pair.new[1].Equal(addrRange("10.0.0.3", "10.0.0.3")): + // peer1 + + require.Equal(t, 1, len(pair.old), "") + require.Equal(t, 2, len(pair.new), "") + newDonation1 = true + case !newDonation2 && + pair.old[0].Equal(addrRange("10.0.0.2", "10.0.0.3")) && + pair.new[0].Equal(addrRange("10.0.0.2", "10.0.0.2")): + // peer2 + + require.Equal(t, 1, len(pair.old), "") + require.Equal(t, 1, len(pair.new), "") + newDonation2 = true + default: + continue + } + } + require.True(t, newDonation1 && newDonation2, "") +} + +func TestShutdownWithTracker(t *testing.T) { + const ( + cidr = "10.0.0.0/30" + container1 = "container-1" + ) + + trackChan := make(chan rangePair, 10) + allocs, router, _ := makeNetworkOfAllocatorsWithTracker(2, cidr, newTestTracker(trackChan)) + defer stopNetworkOfAllocators(allocs, router) + + cidr1, _ := address.ParseCIDR(cidr) + _, err := allocs[0].Allocate(container1, cidr1, true, returnFalse) + require.NoError(t, err, "") + + time.Sleep(500 * time.Millisecond) + flush(trackChan, 2) + + // After allocation (and ring establishment): + // * peer1: [10.0.0.0-10.0.0.1] + // * peer2: [10.0.0.2-10.0.0.3] + + // Shutdown peer1 + allocs[0].Shutdown() + + done := false + for i := 0; i < 5; i++ { + p := <-trackChan + switch { + // This should uniquely match HandleUpdate invocation on peer2 which + // happens after peer1 notified peer2 about the donation due to its + // termination: + case !done && len(p.old) == 1 && len(p.new) == 2: + require.Equal(t, addrRange("10.0.0.2", "10.0.0.3"), p.old[0], "") + require.Equal(t, addrRange("10.0.0.0", "10.0.0.1"), p.new[0], "") + require.Equal(t, addrRange("10.0.0.2", "10.0.0.3"), p.new[1], "") + // peer2 has received peer1's previously owned ranges + done = true + default: + continue + } + } + require.True(t, done) + + // Shutdown peer2 + allocs[1].Shutdown() + p := <-trackChan + require.Equal(t, + []address.Range{addrRange("10.0.0.0", "10.0.0.1"), addrRange("10.0.0.2", "10.0.0.3")}, + p.old) + require.Len(t, p.new, 0) +} + +func flush(c chan rangePair, count int) { + for i := 0; i < count; i++ { + <-c + } +} + +type testTracker struct { + trackChan chan rangePair +} + +type rangePair struct { + old, new []address.Range +} + +func newTestTracker(trackChan chan rangePair) *testTracker { + return &testTracker{trackChan} +} + +func (t *testTracker) HandleUpdate(old, new []address.Range, local bool) error { + t.trackChan <- rangePair{old, new} + return nil +} + +// Creates [start;end] address.Range. +func addrRange(start, end string) address.Range { + return address.NewRange(ip(start), address.Subtract(ip(end)+1, ip(start))) +} + +func ip(addr string) address.Address { + retAddr, _ := address.ParseIP(addr) + return retAddr +} diff --git a/ipam/http.go b/ipam/http.go index 0d68052a1f..04eb2088aa 100644 --- a/ipam/http.go +++ b/ipam/http.go @@ -87,7 +87,7 @@ func (alloc *Allocator) handleHTTPClaim(dockerCli *docker.Client, w http.Respons } // HandleHTTP wires up ipams HTTP endpoints to the provided mux. -func (alloc *Allocator) HandleHTTP(router *mux.Router, defaultSubnet address.CIDR, dockerCli *docker.Client) { +func (alloc *Allocator) HandleHTTP(router *mux.Router, defaultSubnet address.CIDR, tracker string, dockerCli *docker.Client) { router.Methods("GET").Path("/ipinfo/defaultsubnet").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "%s", defaultSubnet) }) @@ -174,4 +174,8 @@ func (alloc *Allocator) HandleHTTP(router *mux.Router, defaultSubnet address.CID transferred := alloc.AdminTakeoverRanges(ident) fmt.Fprintf(w, "%d IPs taken over from %s\n", transferred, ident) }) + + router.Methods("GET").Path("/ipinfo/tracker").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, tracker) + }) } diff --git a/ipam/http_test.go b/ipam/http_test.go index 176da22219..c371949b40 100644 --- a/ipam/http_test.go +++ b/ipam/http_test.go @@ -43,7 +43,7 @@ func listenHTTP(alloc *Allocator, subnet address.CIDR) int { router.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { io.WriteString(w, fmt.Sprintln(alloc)) }) - alloc.HandleHTTP(router, subnet, nil) + alloc.HandleHTTP(router, subnet, "", nil) httpListener, err := net.Listen("tcp", ":0") if err != nil { diff --git a/ipam/ring/ring.go b/ipam/ring/ring.go index a9cf60dd80..01010f2494 100644 --- a/ipam/ring/ring.go +++ b/ipam/ring/ring.go @@ -18,12 +18,15 @@ import ( "github.com/weaveworks/weave/net/address" ) +type OnUpdate func(prev, curr []address.Range, local bool) + // Ring represents the ring itself type Ring struct { Start, End address.Address // [min, max) tokens in this ring. Due to wrapping, min == max (effectively) Peer mesh.PeerName // name of peer owning this ring instance Entries entries // list of entries sorted by token Seeds []mesh.PeerName // peers with which the ring was seeded + onUpdate OnUpdate } func (r *Ring) assertInvariants() { @@ -94,15 +97,33 @@ func (r *Ring) checkEntries(entries entries) error { return nil } +func (r *Ring) trackUpdates() func() { + return r.trackUpdatesOfPeer(r.Peer) +} + +func (r *Ring) trackUpdatesOfPeer(peer mesh.PeerName) func() { + if r.onUpdate == nil { + return func() {} + } + ranges := r.OwnedRangesOfPeer(peer) + return func() { r.onUpdate(ranges, r.OwnedRangesOfPeer(peer), peer == r.Peer) } +} + // New creates an empty ring belonging to peer. -func New(start, end address.Address, peer mesh.PeerName) *Ring { +func New(start, end address.Address, peer mesh.PeerName, f OnUpdate) *Ring { common.Assert(start < end) - ring := &Ring{Start: start, End: end, Peer: peer, Entries: make([]*entry, 0)} + ring := &Ring{Start: start, End: end, Peer: peer, Entries: make([]*entry, 0), onUpdate: f} ring.updateExportedVariables() return ring } +func (r *Ring) Restore(other *Ring) { + onUpdate := r.onUpdate + *r = *other + r.onUpdate = onUpdate +} + func (r *Ring) Range() address.Range { return address.Range{Start: r.Start, End: r.End} } @@ -126,6 +147,7 @@ func (r *Ring) GrantRangeToHost(start, end address.Address, peer mesh.PeerName) //fmt.Printf("%s GrantRangeToHost [%v,%v) -> %s\n", r.Peer, start, end, peer) r.assertInvariants() + defer r.trackUpdates()() defer r.assertInvariants() defer r.updateExportedVariables() @@ -193,6 +215,7 @@ func (r *Ring) GrantRangeToHost(start, end address.Address, peer mesh.PeerName) // got updated as a result. func (r *Ring) Merge(gossip Ring) (bool, error) { r.assertInvariants() + defer r.trackUpdates()() defer r.updateExportedVariables() // Don't panic when checking the gossiped in ring. @@ -230,6 +253,7 @@ func (r *Ring) Merge(gossip Ring) (bool, error) { r.Seeds = gossip.Seeds } r.Entries = result + return updated, nil } @@ -336,10 +360,14 @@ func (r *Ring) splitRangesOverZero(ranges []address.Range) []address.Range { // ranges are owned by this peer. Will split ranges which // span 0 in the ring. func (r *Ring) OwnedRanges() (result []address.Range) { + return r.OwnedRangesOfPeer(r.Peer) +} + +func (r *Ring) OwnedRangesOfPeer(peer mesh.PeerName) (result []address.Range) { r.assertInvariants() for i, entry := range r.Entries { - if entry.Peer == r.Peer { + if entry.Peer == peer { nextEntry := r.Entries.entry(i + 1) result = append(result, address.Range{Start: entry.Token, End: nextEntry.Token}) } @@ -368,32 +396,36 @@ func (r *Ring) AllRangeInfo() (result []RangeInfo) { } // ClaimForPeers claims the entire ring for the array of peers passed -// in. Only works for empty rings. +// in. Only works for empty rings. Each claimed range is CIDR-aligned. func (r *Ring) ClaimForPeers(peers []mesh.PeerName) { common.Assert(r.Empty()) + + defer r.trackUpdates()() defer r.assertInvariants() defer r.updateExportedVariables() + defer func() { + e := r.Entries[len(r.Entries)-1] + common.Assert(address.Add(e.Token, address.Offset(e.Free)) == r.End) + }() - totalSize := r.distance(r.Start, r.End) - share := totalSize/address.Count(len(peers)) + 1 - remainder := totalSize % address.Count(len(peers)) - pos := r.Start - - for i, peer := range peers { - if address.Count(i) == remainder { - share-- - if share == 0 { - break - } - } + r.subdivide(r.Start, r.End, peers) + r.Seeds = peers +} - r.Entries.insert(entry{Token: pos, Peer: peer, Free: share}) - pos += address.Address(share) +// subdivide subdivides the [from,to) CIDR for the given peers into +// CIDR-aligned subranges. +func (r *Ring) subdivide(from, to address.Address, peers []mesh.PeerName) { + share := address.Length(to, from) + if share == 0 { + return } - - common.Assert(pos == r.End) - - r.Seeds = peers + if share == 1 || len(peers) == 1 { + r.Entries.insert(entry{Token: from, Peer: peers[0], Free: share}) + return + } + mid := address.Add(from, address.Offset(share/2)) + r.subdivide(from, mid, peers[:len(peers)/2]) + r.subdivide(address.Add(mid, address.Offset(share%2)), to, peers[len(peers)/2:]) } func (r *Ring) FprintWithNicknames(w io.Writer, m map[mesh.PeerName]string) { @@ -525,6 +557,8 @@ func (r *Ring) PickPeerForTransfer(isValid func(mesh.PeerName) bool) mesh.PeerNa // and return ranges indicating the new space we picked up func (r *Ring) Transfer(from, to mesh.PeerName) []address.Range { r.assertInvariants() + defer r.trackUpdates()() + defer r.trackUpdatesOfPeer(from)() defer r.assertInvariants() defer r.updateExportedVariables() diff --git a/ipam/ring/ring_test.go b/ipam/ring/ring_test.go index 417c68b752..fa5e0c441f 100644 --- a/ipam/ring/ring_test.go +++ b/ipam/ring/ring_test.go @@ -36,8 +36,12 @@ func merge(r1, r2 *Ring) error { return err } +func NewRing(start, end address.Address, peer mesh.PeerName) *Ring { + return New(start, end, peer, nil) +} + func TestInvariants(t *testing.T) { - ring := New(start, end, peer1name) + ring := NewRing(start, end, peer1name) // Check ring is sorted ring.Entries = []*entry{{Token: dot245, Peer: peer1name}, {Token: dot10, Peer: peer2name}} @@ -48,7 +52,7 @@ func TestInvariants(t *testing.T) { require.True(t, ring.checkInvariants() == ErrTokenRepeated, "Expected error") // Check tokens are in bounds - ring = New(dot10, dot245, peer1name) + ring = NewRing(dot10, dot245, peer1name) ring.Entries = []*entry{{Token: start, Peer: peer1name}} require.True(t, ring.checkInvariants() == ErrTokenOutOfRange, "Expected error") @@ -57,7 +61,7 @@ func TestInvariants(t *testing.T) { } func TestInsert(t *testing.T) { - ring := New(start, end, peer1name) + ring := NewRing(start, end, peer1name) ring.Entries = []*entry{{Token: start, Peer: peer1name, Free: 255}} require.Panics(t, func() { @@ -82,7 +86,7 @@ func TestInsert(t *testing.T) { } func TestBetween(t *testing.T) { - ring1 := New(start, end, peer1name) + ring1 := NewRing(start, end, peer1name) ring1.Entries = []*entry{{Token: start, Peer: peer1name, Free: 255}} // First off, in a ring where everything is owned by the peer @@ -124,8 +128,8 @@ func TestBetween(t *testing.T) { } func TestGrantSimple(t *testing.T) { - ring1 := New(start, end, peer1name) - ring2 := New(start, end, peer2name) + ring1 := NewRing(start, end, peer1name) + ring2 := NewRing(start, end, peer2name) // Claim everything for peer1 ring1.ClaimItAll() @@ -159,8 +163,8 @@ func TestGrantSimple(t *testing.T) { } func TestGrantSplit(t *testing.T) { - ring1 := New(start, end, peer1name) - ring2 := New(start, end, peer2name) + ring1 := NewRing(start, end, peer1name) + ring2 := NewRing(start, end, peer2name) // Claim everything for peer1 ring1.Entries = []*entry{{Token: start, Peer: peer1name, Free: 256}} @@ -188,8 +192,8 @@ func TestGrantSplit(t *testing.T) { } func TestMergeSimple(t *testing.T) { - ring1 := New(start, end, peer1name) - ring2 := New(start, end, peer2name) + ring1 := NewRing(start, end, peer1name) + ring2 := NewRing(start, end, peer2name) // Claim everything for peer1 ring1.ClaimItAll() @@ -217,18 +221,18 @@ func TestMergeSimple(t *testing.T) { func TestMergeErrors(t *testing.T) { // Cannot Merge in an invalid ring - ring1 := New(start, end, peer1name) - ring2 := New(start, end, peer2name) + ring1 := NewRing(start, end, peer1name) + ring2 := NewRing(start, end, peer2name) ring2.Entries = []*entry{{Token: middle, Peer: peer2name}, {Token: start, Peer: peer2name}} require.True(t, merge(ring1, ring2) == ErrNotSorted, "Expected ErrNotSorted") // Should Merge two rings for different ranges - ring2 = New(start, middle, peer2name) + ring2 = NewRing(start, middle, peer2name) ring2.Entries = []*entry{} require.True(t, merge(ring1, ring2) == ErrDifferentRange, "Expected ErrDifferentRange") // Cannot Merge newer version of entry I own - ring2 = New(start, end, peer2name) + ring2 = NewRing(start, end, peer2name) ring1.Entries = []*entry{{Token: start, Peer: peer1name}} ring2.Entries = []*entry{{Token: start, Peer: peer1name, Version: 1}} fmt.Println(merge(ring1, ring2)) @@ -246,8 +250,8 @@ func TestMergeErrors(t *testing.T) { } func TestMergeMore(t *testing.T) { - ring1 := New(start, end, peer1name) - ring2 := New(start, end, peer2name) + ring1 := NewRing(start, end, peer1name) + ring2 := NewRing(start, end, peer2name) assertRing := func(ring *Ring, entries entries) { require.Equal(t, entries, ring.Entries) @@ -297,8 +301,8 @@ func TestMergeMore(t *testing.T) { } func TestMergeSplit(t *testing.T) { - ring1 := New(start, end, peer1name) - ring2 := New(start, end, peer2name) + ring1 := NewRing(start, end, peer1name) + ring2 := NewRing(start, end, peer2name) // Claim everything for peer2 ring1.Entries = []*entry{{Token: start, Peer: peer2name, Free: 256}} @@ -320,8 +324,8 @@ func TestMergeSplit(t *testing.T) { } func TestMergeSplit2(t *testing.T) { - ring1 := New(start, end, peer1name) - ring2 := New(start, end, peer2name) + ring1 := NewRing(start, end, peer1name) + ring2 := NewRing(start, end, peer2name) // Claim everything for peer2 ring1.Entries = []*entry{{Token: start, Peer: peer2name, Free: 250}, {Token: dot250, Peer: peer2name, Free: 5}} @@ -344,8 +348,8 @@ func TestMergeSplit2(t *testing.T) { // A simple test, very similar to above, but using the marshalling to byte[]s func TestGossip(t *testing.T) { - ring1 := New(start, end, peer1name) - ring2 := New(start, end, peer2name) + ring1 := NewRing(start, end, peer1name) + ring2 := NewRing(start, end, peer2name) assertRing := func(ring *Ring, entries entries) { require.Equal(t, entries, ring.Entries) @@ -372,7 +376,7 @@ func assertPeersWithSpace(t *testing.T, ring *Ring, start, end address.Address, } func TestFindFree(t *testing.T) { - ring1 := New(start, end, peer1name) + ring1 := NewRing(start, end, peer1name) assertPeersWithSpace(t, ring1, start, end, 0) @@ -408,8 +412,8 @@ func TestFindFree(t *testing.T) { } func TestReportFree(t *testing.T) { - ring1 := New(start, end, peer1name) - ring2 := New(start, end, peer2name) + ring1 := NewRing(start, end, peer1name) + ring2 := NewRing(start, end, peer2name) ring1.ClaimItAll() ring1.GrantRangeToHost(middle, end, peer2name) @@ -423,7 +427,7 @@ func TestReportFree(t *testing.T) { } func TestMisc(t *testing.T) { - ring := New(start, end, peer1name) + ring := NewRing(start, end, peer1name) require.True(t, ring.Empty(), "empty") @@ -432,8 +436,8 @@ func TestMisc(t *testing.T) { } func TestEmptyGossip(t *testing.T) { - ring1 := New(start, end, peer1name) - ring2 := New(start, end, peer2name) + ring1 := NewRing(start, end, peer1name) + ring2 := NewRing(start, end, peer2name) ring1.ClaimItAll() // This used to panic, and it shouldn't @@ -441,8 +445,8 @@ func TestEmptyGossip(t *testing.T) { } func TestMergeOldMessage(t *testing.T) { - ring1 := New(start, end, peer1name) - ring2 := New(start, end, peer2name) + ring1 := NewRing(start, end, peer1name) + ring2 := NewRing(start, end, peer2name) ring1.ClaimItAll() require.NoError(t, merge(ring2, ring1)) @@ -452,8 +456,8 @@ func TestMergeOldMessage(t *testing.T) { } func TestSplitRangeAtBeginning(t *testing.T) { - ring1 := New(start, end, peer1name) - ring2 := New(start, end, peer2name) + ring1 := NewRing(start, end, peer1name) + ring2 := NewRing(start, end, peer2name) ring1.ClaimItAll() require.NoError(t, merge(ring2, ring1)) @@ -463,7 +467,7 @@ func TestSplitRangeAtBeginning(t *testing.T) { } func TestOwnedRange(t *testing.T) { - ring1 := New(start, end, peer1name) + ring1 := NewRing(start, end, peer1name) ring1.ClaimItAll() require.Equal(t, []address.Range{{Start: start, End: end}}, ring1.OwnedRanges()) @@ -471,7 +475,7 @@ func TestOwnedRange(t *testing.T) { ring1.GrantRangeToHost(middle, end, peer2name) require.Equal(t, []address.Range{{Start: start, End: middle}}, ring1.OwnedRanges()) - ring2 := New(start, end, peer2name) + ring2 := NewRing(start, end, peer2name) merge(ring2, ring1) require.Equal(t, []address.Range{{Start: middle, End: end}}, ring2.OwnedRanges()) @@ -486,7 +490,7 @@ func TestOwnedRange(t *testing.T) { func TestTransfer(t *testing.T) { // First test just checks if we can grant some range to a host, when we transfer it, we get it back - ring1 := New(start, end, peer1name) + ring1 := NewRing(start, end, peer1name) ring1.ClaimItAll() ring1.GrantRangeToHost(middle, end, peer2name) ring1.Transfer(peer2name, peer1name) @@ -494,7 +498,7 @@ func TestTransfer(t *testing.T) { // Second test is what happens when a token exists at the end of a range but is transferred // - does it get resurrected correctly? - ring1 = New(start, end, peer1name) + ring1 = NewRing(start, end, peer1name) ring1.ClaimItAll() ring1.GrantRangeToHost(middle, end, peer2name) ring1.Transfer(peer2name, peer1name) @@ -503,7 +507,7 @@ func TestTransfer(t *testing.T) { } func TestOwner(t *testing.T) { - ring1 := New(start, end, peer1name) + ring1 := NewRing(start, end, peer1name) require.True(t, ring1.Contains(start), "start should be in ring") require.False(t, ring1.Contains(end), "end should not be in ring") @@ -540,11 +544,11 @@ func TestClaimForPeers(t *testing.T) { // Different end to usual so we get a number of addresses that a) // is smaller than the max number of peers, and b) is divisible by // some number of peers. This maximises coverage of edge cases. - end := dot10 + end := dot8 peers := makePeers(numPeers) // Test for a range of peer counts for i := 0; i < numPeers; i++ { - ring := New(start, end, peers[0]) + ring := NewRing(start, end, peers[0]) ring.ClaimForPeers(peers[:i+1]) } } @@ -582,7 +586,7 @@ func TestFuzzRing(t *testing.T) { sort.Sort(addressSlice(tokens)) peer := peers[rand.Intn(len(peers))] - ring := New(start, end, peer) + ring := NewRing(start, end, peer) for _, token := range tokens { peer = peers[rand.Intn(len(peers))] ring.Entries = append(ring.Entries, &entry{Token: start + token, Peer: peer}) @@ -616,7 +620,7 @@ func TestFuzzRing(t *testing.T) { } peer := peers[rand.Intn(len(peers))] - ring := New(start, end, peer) + ring := NewRing(start, end, peer) for _, token := range tokens { peer = peers[rand.Intn(len(peers))] ring.Entries = append(ring.Entries, &entry{Token: start + token, Peer: peer}) @@ -654,7 +658,7 @@ func TestFuzzRingHard(t *testing.T) { common.Log.Debugf("%s: Adding peer", peer) nextPeerID++ peers = append(peers, peer) - rings = append(rings, New(start, end, peer)) + rings = append(rings, NewRing(start, end, peer)) } for i := 0; i < numPeers; i++ { diff --git a/ipam/testutils_test.go b/ipam/testutils_test.go index 13fa8171bd..9108eca5f8 100644 --- a/ipam/testutils_test.go +++ b/ipam/testutils_test.go @@ -10,6 +10,7 @@ import ( "github.com/weaveworks/mesh" "github.com/stretchr/testify/require" + "github.com/weaveworks/weave/ipam/tracker" "github.com/weaveworks/weave/net/address" "github.com/weaveworks/weave/testing/gossip" ) @@ -129,6 +130,10 @@ func (d *mockDB) Load(_ string, _ interface{}) (bool, error) { return false, nil func (d *mockDB) Save(_ string, _ interface{}) error { return nil } func makeAllocator(name string, cidrStr string, quorum uint) (*Allocator, address.CIDR) { + return makeAllocatorWithTracker(name, cidrStr, quorum, nil) +} + +func makeAllocatorWithTracker(name string, cidrStr string, quorum uint, track tracker.LocalRangeTracker) (*Allocator, address.CIDR) { peername, err := mesh.PeerNameFromString(name) if err != nil { panic(err) @@ -148,11 +153,16 @@ func makeAllocator(name string, cidrStr string, quorum uint) (*Allocator, addres Quorum: func() uint { return quorum }, Db: new(mockDB), IsKnownPeer: func(mesh.PeerName) bool { return true }, + Tracker: track, }), cidr } func makeAllocatorWithMockGossip(t *testing.T, name string, universeCIDR string, quorum uint) (*Allocator, address.CIDR) { - alloc, subnet := makeAllocator(name, universeCIDR, quorum) + return makeAllocatorWithMockGossipAndTracker(t, name, universeCIDR, quorum, nil) +} + +func makeAllocatorWithMockGossipAndTracker(t *testing.T, name string, universeCIDR string, quorum uint, track tracker.LocalRangeTracker) (*Allocator, address.CIDR) { + alloc, subnet := makeAllocatorWithTracker(name, universeCIDR, quorum, track) gossip := &mockGossipComms{T: t, name: name} alloc.SetInterfaces(gossip) alloc.Start() @@ -214,14 +224,17 @@ func AssertNothingSentErr(t *testing.T, ch <-chan error) { } func makeNetworkOfAllocators(size int, cidr string) ([]*Allocator, *gossip.TestRouter, address.CIDR) { + return makeNetworkOfAllocatorsWithTracker(size, cidr, nil) +} + +func makeNetworkOfAllocatorsWithTracker(size int, cidr string, track tracker.LocalRangeTracker) ([]*Allocator, *gossip.TestRouter, address.CIDR) { gossipRouter := gossip.NewTestRouter(0.0) allocs := make([]*Allocator, size) var subnet address.CIDR for i := 0; i < size; i++ { var alloc *Allocator - alloc, subnet = makeAllocator(fmt.Sprintf("%02d:00:00:02:00:00", i), - cidr, uint(size/2+1)) + alloc, subnet = makeAllocatorWithTracker(fmt.Sprintf("%02d:00:00:02:00:00", i), cidr, uint(size/2+1), track) alloc.SetInterfaces(gossipRouter.Connect(alloc.ourName, alloc)) alloc.Start() allocs[i] = alloc diff --git a/ipam/tracker/awsvpc.go b/ipam/tracker/awsvpc.go new file mode 100644 index 0000000000..1089b61543 --- /dev/null +++ b/ipam/tracker/awsvpc.go @@ -0,0 +1,286 @@ +package tracker + +// The AWSVPC tracker tracks the IPAM ring changes and accordingly updates +// the AWS VPC route table and, if the announced change is local +// (denoted by local=true) to the host, the host route table. +// +// During the initialization, the tracker detects AWS VPC route table id which +// is associated with the host's subnet on the AWS network. If such a table does +// not exist, the default VPC route table is used. +// +// When a host A donates a range to a host B, the necessary route table +// updates (removal) happen on the host A first, and afterwards on +// the host B (installation). +// +// NB: there is a hard limit for 50 routes within any VPC route table +// (practically, it is 48, because one route is used by the AWS Internet GW and +// one by the AWS host subnet), therefore it is suggested to avoid +// an excessive fragmentation within the IPAM ring which might happen due to +// the claim operations or uneven distribution of containers across the hosts. + +import ( + "fmt" + "net" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/ec2metadata" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/vishvananda/netlink" + + "github.com/weaveworks/weave/common" + wnet "github.com/weaveworks/weave/net" + "github.com/weaveworks/weave/net/address" +) + +type AWSVPCTracker struct { + ec2 *ec2.EC2 + instanceID string // EC2 Instance ID + routeTableID string // VPC Route Table ID + linkIndex int // The weave bridge link index +} + +// NewAWSVPCTracker creates and initialises AWS VPC based tracker. +func NewAWSVPCTracker() (*AWSVPCTracker, error) { + var ( + err error + session = session.New() + t = &AWSVPCTracker{} + ) + + // Detect region and instance id + meta := ec2metadata.New(session) + t.instanceID, err = meta.GetMetadata("instance-id") + if err != nil { + return nil, fmt.Errorf("cannot detect instance-id: %s", err) + } + region, err := meta.Region() + if err != nil { + return nil, fmt.Errorf("cannot detect region: %s", err) + } + + t.ec2 = ec2.New(session, aws.NewConfig().WithRegion(region)) + + routeTableID, err := t.detectRouteTableID() + if err != nil { + return nil, err + } + t.routeTableID = *routeTableID + + // Detect Weave bridge link index + link, err := netlink.LinkByName(wnet.WeaveBridgeName) + if err != nil { + return nil, fmt.Errorf("cannot find \"%s\" interface: %s", wnet.WeaveBridgeName, err) + } + t.linkIndex = link.Attrs().Index + + t.infof("AWSVPC has been initialized on %s instance for %s route table at %s region", + t.instanceID, t.routeTableID, region) + + return t, nil +} + +// HandleUpdate method updates the AWS VPC and the host route tables. +func (t *AWSVPCTracker) HandleUpdate(prevRanges, currRanges []address.Range, local bool) error { + t.debugf("replacing %q by %q; local(%t)", prevRanges, currRanges, local) + + prev, curr := removeCommon(address.NewCIDRs(merge(prevRanges)), address.NewCIDRs(merge(currRanges))) + + // It might make sense to do the removal first and then add entries + // because of the 50 routes limit. However, in such case a container might + // not be reachable for a short period of time which is not a desired behavior. + + // Add new entries + for _, cidr := range curr { + cidrStr := cidr.String() + t.debugf("adding route %s to %s", cidrStr, t.instanceID) + _, err := t.createVPCRoute(cidrStr) + if err != nil { + return fmt.Errorf("createVPCRoutes failed: %s", err) + } + if local { + err = t.createHostRoute(cidrStr) + if err != nil { + return fmt.Errorf("createHostRoute failed: %s", err) + } + } + } + + // Remove obsolete entries + for _, cidr := range prev { + cidrStr := cidr.String() + t.debugf("removing %s route", cidrStr) + _, err := t.deleteVPCRoute(cidrStr) + if err != nil { + return fmt.Errorf("deleteVPCRoute failed: %s", err) + } + if local { + err = t.deleteHostRoute(cidrStr) + if err != nil { + return fmt.Errorf("deleteHostRoute failed: %s", err) + } + } + } + + return nil +} + +func (t *AWSVPCTracker) createVPCRoute(cidr string) (*ec2.CreateRouteOutput, error) { + route := &ec2.CreateRouteInput{ + RouteTableId: &t.routeTableID, + InstanceId: &t.instanceID, + DestinationCidrBlock: &cidr, + } + return t.ec2.CreateRoute(route) +} + +func (t *AWSVPCTracker) createHostRoute(cidr string) error { + dst, err := parseCIDR(cidr) + if err != nil { + return err + } + route := &netlink.Route{ + LinkIndex: t.linkIndex, + Dst: dst, + Scope: netlink.SCOPE_LINK, + } + return netlink.RouteAdd(route) +} + +func (t *AWSVPCTracker) deleteVPCRoute(cidr string) (*ec2.DeleteRouteOutput, error) { + route := &ec2.DeleteRouteInput{ + RouteTableId: &t.routeTableID, + DestinationCidrBlock: &cidr, + } + return t.ec2.DeleteRoute(route) +} + +func (t *AWSVPCTracker) deleteHostRoute(cidr string) error { + dst, err := parseCIDR(cidr) + if err != nil { + return err + } + route := &netlink.Route{ + LinkIndex: t.linkIndex, + Dst: dst, + Scope: netlink.SCOPE_LINK, + } + return netlink.RouteDel(route) +} + +// detectRouteTableID detects AWS VPC Route Table ID of the given tracker instance. +func (t *AWSVPCTracker) detectRouteTableID() (*string, error) { + instancesParams := &ec2.DescribeInstancesInput{ + InstanceIds: []*string{aws.String(t.instanceID)}, + } + instancesResp, err := t.ec2.DescribeInstances(instancesParams) + if err != nil { + return nil, fmt.Errorf("DescribeInstances failed: %s", err) + } + if len(instancesResp.Reservations) == 0 || + len(instancesResp.Reservations[0].Instances) == 0 { + return nil, fmt.Errorf("cannot find %s instance within reservations", t.instanceID) + } + vpcID := instancesResp.Reservations[0].Instances[0].VpcId + subnetID := instancesResp.Reservations[0].Instances[0].SubnetId + + // First try to find a routing table associated with the subnet of the instance + tablesParams := &ec2.DescribeRouteTablesInput{ + Filters: []*ec2.Filter{ + { + Name: aws.String("association.subnet-id"), + Values: []*string{subnetID}, + }, + }, + } + tablesResp, err := t.ec2.DescribeRouteTables(tablesParams) + if err != nil { + return nil, fmt.Errorf("DescribeRouteTables failed: %s", err) + } + if len(tablesResp.RouteTables) != 0 { + return tablesResp.RouteTables[0].RouteTableId, nil + } + // Fallback to the default routing table + tablesParams = &ec2.DescribeRouteTablesInput{ + Filters: []*ec2.Filter{ + { + Name: aws.String("association.main"), + Values: []*string{aws.String("true")}, + }, + { + Name: aws.String("vpc-id"), + Values: []*string{vpcID}, + }, + }, + } + tablesResp, err = t.ec2.DescribeRouteTables(tablesParams) + if err != nil { + return nil, fmt.Errorf("DescribeRouteTables failed: %s", err) + } + if len(tablesResp.RouteTables) != 0 { + return tablesResp.RouteTables[0].RouteTableId, nil + } + + return nil, fmt.Errorf("cannot find routetable for %s instance", t.instanceID) +} + +func (t *AWSVPCTracker) debugf(fmt string, args ...interface{}) { + common.Log.Debugf("[tracker] "+fmt, args...) +} + +func (t *AWSVPCTracker) infof(fmt string, args ...interface{}) { + common.Log.Infof("[tracker] "+fmt, args...) +} + +// Helpers + +// merge merges adjacent range entries. +// The given slice has to be sorted in increasing order. +func merge(r []address.Range) []address.Range { + var merged []address.Range + + for i := range r { + if prev := len(merged) - 1; prev >= 0 && merged[prev].End == r[i].Start { + merged[prev].End = r[i].End + } else { + merged = append(merged, r[i]) + } + } + + return merged +} + +// removeCommon filters out CIDR ranges which are contained in both a and b slices. +// Both slices have to be sorted in increasing order. +func removeCommon(a, b []address.CIDR) (newA, newB []address.CIDR) { + i, j := 0, 0 + + for i < len(a) && j < len(b) { + switch { + case a[i].Start() < b[j].Start() || a[i].End() < b[j].End(): + newA = append(newA, a[i]) + i++ + case a[i].Start() > b[j].Start() || a[i].End() > b[j].End(): + newB = append(newB, b[j]) + j++ + default: + i++ + j++ + } + + } + newA = append(newA, a[i:]...) + newB = append(newB, b[j:]...) + + return +} + +func parseCIDR(cidr string) (*net.IPNet, error) { + ip, ipnet, err := net.ParseCIDR(cidr) + if err != nil { + return nil, err + } + ipnet.IP = ip + + return ipnet, nil +} diff --git a/ipam/tracker/awsvpc_test.go b/ipam/tracker/awsvpc_test.go new file mode 100644 index 0000000000..b99500a1df --- /dev/null +++ b/ipam/tracker/awsvpc_test.go @@ -0,0 +1,52 @@ +package tracker + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/weaveworks/weave/net/address" +) + +var ( + r0to127 = cidr("10.0.0.0", "10.0.0.127") + r128to255 = cidr("10.0.0.128", "10.0.0.255") + r0to255 = cidr("10.0.0.0", "10.0.0.255") + r1dot0to255 = cidr("10.0.1.0", "10.0.1.255") + r2dot0to255 = cidr("10.0.2.0", "10.0.2.255") +) + +func TestRemoveCommon(t *testing.T) { + a := []address.CIDR{r0to127, r1dot0to255} + b := []address.CIDR{r1dot0to255, r2dot0to255} + newA, newB := removeCommon(a, b) + require.Equal(t, []address.CIDR{r0to127}, newA) + require.Equal(t, []address.CIDR{r2dot0to255}, newB) +} + +func TestMerge(t *testing.T) { + ranges := []address.Range{ + r0to127.Range(), + r128to255.Range(), + r2dot0to255.Range(), + } + require.Equal(t, []address.Range{r0to255.Range(), r2dot0to255.Range()}, merge(ranges)) +} + +// Helper + +// TODO(mp) DRY with helpers of other tests. + +func ip(s string) address.Address { + addr, _ := address.ParseIP(s) + return addr +} + +// [start; end] +func cidr(start, end string) address.CIDR { + c := address.Range{Start: ip(start), End: ip(end) + 1}.CIDRs() + if len(c) != 1 { + panic("invalid cidr") + } + return c[0] +} diff --git a/ipam/tracker/tracker.go b/ipam/tracker/tracker.go new file mode 100644 index 0000000000..6a9846dbb1 --- /dev/null +++ b/ipam/tracker/tracker.go @@ -0,0 +1,20 @@ +package tracker + +import ( + "github.com/weaveworks/weave/net/address" +) + +// LocalRangeTracker is an interface for tracking changes in the IPAM ring. +type LocalRangeTracker interface { + // HandleUpdate is called whenever an address ring gets updated. + // + // prevRanges corresponds to ranges which were owned by a peer before + // a change in the ring, while currRanges to the ones which are currently + // owned by the peer. + // Both slices have to be sorted in increasing order. + // Adjacent ranges within each slice might appear as separate ranges. + // + // The local parameter indicates whether the ranges belong to the peer + // by which the method is called. + HandleUpdate(prevRanges, currRanges []address.Range, local bool) error +} diff --git a/net/address/address.go b/net/address/address.go index 42aa3cc2e4..2ea54fb233 100644 --- a/net/address/address.go +++ b/net/address/address.go @@ -23,6 +23,7 @@ func (r Range) Size() Count { return Length(r.End, r.Start) } func (r Range) String() string { return fmt.Sprintf("%s-%s", r.Start, r.End-1) } func (r Range) Overlaps(or Range) bool { return !(r.Start >= or.End || r.End <= or.Start) } func (r Range) Contains(addr Address) bool { return addr >= r.Start && addr < r.End } +func (r Range) Equal(or Range) bool { return r.Start == or.Start && r.End == or.End } func (r Range) AsCIDRString() string { prefixLen := 32 @@ -73,6 +74,40 @@ type CIDR struct { PrefixLen int } +// CIDRs returns a list of CIDR-aligned ranges which cover this range. +func (r Range) CIDRs() []CIDR { + const ( + fullMask = ^Address(0) + cidrMaxPrefixLen = 32 + ) + var cidrs []CIDR + + for start, end := r.Start, r.End-1; end >= start; { + mask, prefixLen := fullMask, cidrMaxPrefixLen + // Find the smallest mask which would cover some part of [start;end]. + // Once we found such, apply it by OR'ing + for mask > 0 { + tmpMask := mask << 1 + // Check whether mask neither too short nor too long + if (start&tmpMask) != start || (start|^tmpMask) > end { + break + } + mask = tmpMask + prefixLen-- + } + cidrs = append(cidrs, CIDR{start, prefixLen}) + // Apply mask + start |= ^mask + // Check for overflow + if start+1 < start { + break + } + start++ + } + + return cidrs +} + func ParseIP(s string) (Address, error) { if ip := net.ParseIP(s); ip != nil { return FromIP4(ip), nil @@ -80,6 +115,8 @@ func ParseIP(s string) (Address, error) { return 0, &net.ParseError{Type: "IP Address", Text: s} } +// ParseCIDR parses s as a CIDR notation IP address and mask and returns +// a host network address with a prefix len. func ParseCIDR(s string) (CIDR, error) { if ip, ipnet, err := net.ParseCIDR(s); err != nil { return CIDR{}, err @@ -91,6 +128,22 @@ func ParseCIDR(s string) (CIDR, error) { } } +func NewCIDRs(ranges []Range) (cidrs []CIDR) { + for _, r := range ranges { + cidrs = append(cidrs, r.CIDRs()...) + } + return cidrs +} + +func (cidr CIDR) Start() Address { + return cidr.Addr +} + +// cidr = [Start; End) +func (cidr CIDR) End() Address { + return cidr.Range().End +} + func (cidr CIDR) IsSubnet() bool { mask := cidr.Size() - 1 return Offset(cidr.Addr)&mask == 0 @@ -101,6 +154,7 @@ func (cidr CIDR) Size() Offset { return 1 << uint(32-cidr.PrefixLen) } func (cidr CIDR) Range() Range { return NewRange(cidr.Addr, cidr.Size()) } + func (cidr CIDR) HostRange() Range { // Respect RFC1122 exclusions of first and last addresses return NewRange(cidr.Addr+1, cidr.Size()-2) diff --git a/net/address/address_test.go b/net/address/address_test.go index 46f699b49e..8718fdb67e 100644 --- a/net/address/address_test.go +++ b/net/address/address_test.go @@ -39,3 +39,31 @@ func TestBiggestPow2AlignedRange(t *testing.T) { } require.NoError(t, quick.Check(prop, &quick.Config{MaxCount: 1000000})) } + +func ip(s string) Address { + addr, _ := ParseIP(s) + return addr +} + +func cidr(s string) CIDR { + c, err := ParseCIDR(s) + if err != nil { + panic(err) + } + return c +} + +func TestCIDRs(t *testing.T) { + start := ip("10.0.0.1") + end := ip("10.0.0.9") + r := NewRange(start, Subtract(end, start)) + require.Equal(t, + []CIDR{cidr("10.0.0.1/32"), cidr("10.0.0.2/31"), cidr("10.0.0.4/30"), cidr("10.0.0.8/32")}, + r.CIDRs()) +} + +func TestCIDRStartAndEnd(t *testing.T) { + cidr, _ := ParseCIDR("10.0.0.0/24") + require.Equal(t, ip("10.0.0.0"), cidr.Start(), "") + require.Equal(t, ip("10.0.1.0"), cidr.End(), "") +} diff --git a/net/veth.go b/net/veth.go index eea00485c8..fbca71c3a3 100644 --- a/net/veth.go +++ b/net/veth.go @@ -13,7 +13,7 @@ import ( ) // create and attach a veth to the Weave bridge -func CreateAndAttachVeth(name, peerName, bridgeName string, mtu int, init func(peer netlink.Link) error) (*netlink.Veth, error) { +func CreateAndAttachVeth(name, peerName, bridgeName string, mtu int, keepTXOn bool, init func(peer netlink.Link) error) (*netlink.Veth, error) { bridge, err := netlink.LinkByName(bridgeName) if err != nil { return nil, fmt.Errorf(`bridge "%s" not present; did you launch weave?`, bridgeName) @@ -42,7 +42,7 @@ func CreateAndAttachVeth(name, peerName, bridgeName string, mtu int, init func(p if err := netlink.LinkSetMasterByIndex(veth, bridge.Attrs().Index); err != nil { return cleanup(`unable to set master of %s: %s`, name, err) } - if bridgeType == Bridge { + if bridgeType == Bridge && !keepTXOn { if err := EthtoolTXOff(peerName); err != nil { return cleanup(`unable to set tx off on %q: %s`, peerName, err) } @@ -111,14 +111,14 @@ func interfaceExistsInNamespace(ns netns.NsHandle, ifName string) bool { return err == nil } -func AttachContainer(ns netns.NsHandle, id, ifName, bridgeName string, mtu int, withMulticastRoute bool, cidrs []*net.IPNet) error { +func AttachContainer(ns netns.NsHandle, id, ifName, bridgeName string, mtu int, withMulticastRoute bool, cidrs []*net.IPNet, keepTXOn bool) error { if !interfaceExistsInNamespace(ns, ifName) { maxIDLen := IFNAMSIZ - 1 - len(vethPrefix+"pl") if len(id) > maxIDLen { id = id[:maxIDLen] // trim passed ID if too long } name, peerName := vethPrefix+"pl"+id, vethPrefix+"pg"+id - _, err := CreateAndAttachVeth(name, peerName, bridgeName, mtu, func(veth netlink.Link) error { + _, err := CreateAndAttachVeth(name, peerName, bridgeName, mtu, keepTXOn, func(veth netlink.Link) error { if err := netlink.LinkSetNsFd(veth, int(ns)); err != nil { return fmt.Errorf("failed to move veth to container netns: %s", err) } diff --git a/plugin/net/cni.go b/plugin/net/cni.go index f43ef96751..eddd7dc700 100644 --- a/plugin/net/cni.go +++ b/plugin/net/cni.go @@ -92,7 +92,7 @@ func (c *CNIPlugin) CmdAdd(args *skel.CmdArgs) error { id = fmt.Sprintf("%x", data) } - if err := weavenet.AttachContainer(ns, id, args.IfName, conf.BrName, conf.MTU, false, []*net.IPNet{&result.IP4.IP}); err != nil { + if err := weavenet.AttachContainer(ns, id, args.IfName, conf.BrName, conf.MTU, false, []*net.IPNet{&result.IP4.IP}, false); err != nil { return err } if err := weavenet.WithNetNSLink(ns, args.IfName, func(link netlink.Link) error { diff --git a/plugin/net/driver.go b/plugin/net/driver.go index 675ca93d47..d813252911 100644 --- a/plugin/net/driver.go +++ b/plugin/net/driver.go @@ -134,8 +134,9 @@ func (driver *driver) JoinEndpoint(j *api.JoinRequest) (*api.JoinResponse, error if err != nil { return nil, driver.error("JoinEndpoint", "unable to find network info: %s", err) } + name, peerName := vethPair(j.EndpointID) - if _, err := weavenet.CreateAndAttachVeth(name, peerName, weavenet.WeaveBridgeName, 0, nil); err != nil { + if _, err := weavenet.CreateAndAttachVeth(name, peerName, weavenet.WeaveBridgeName, 0, false, nil); err != nil { return nil, driver.error("JoinEndpoint", "%s", err) } diff --git a/prog/weaver/main.go b/prog/weaver/main.go index 2846fa29f1..8700f8d76e 100644 --- a/prog/weaver/main.go +++ b/prog/weaver/main.go @@ -22,6 +22,7 @@ import ( "github.com/weaveworks/weave/common/docker" "github.com/weaveworks/weave/db" "github.com/weaveworks/weave/ipam" + "github.com/weaveworks/weave/ipam/tracker" "github.com/weaveworks/weave/nameserver" weavenet "github.com/weaveworks/weave/net" "github.com/weaveworks/weave/net/address" @@ -170,6 +171,7 @@ func main() { datapathName string trustedSubnetStr string dbPrefix string + useAWSVPC bool defaultDockerHost = "unix:///var/run/docker.sock" ) @@ -208,6 +210,7 @@ func main() { mflag.StringVar(&datapathName, []string{"-datapath"}, "", "ODP datapath name") mflag.StringVar(&trustedSubnetStr, []string{"-trusted-subnets"}, "", "comma-separated list of trusted subnets in CIDR notation") mflag.StringVar(&dbPrefix, []string{"-db-prefix"}, "/weavedb/weave", "pathname/prefix of filename to store data") + mflag.BoolVar(&useAWSVPC, []string{"#awsvpc", "-awsvpc"}, false, "use AWS VPC for routing") // crude way of detecting that we probably have been started in a // container, with `weave launch` --> suppress misleading paths in @@ -250,7 +253,7 @@ func main() { networkConfig.PacketLogging = nopPacketLogging{} } - overlay, bridge := createOverlay(datapathName, ifaceName, config.Host, config.Port, bufSzMB) + overlay, bridge := createOverlay(datapathName, ifaceName, useAWSVPC, config.Host, config.Port, bufSzMB) networkConfig.Bridge = bridge name := peerName(routerName, bridge.Interface()) @@ -300,9 +303,19 @@ func main() { var ( allocator *ipam.Allocator defaultSubnet address.CIDR + trackerName string ) if ipamConfig.Enabled() { - allocator, defaultSubnet = createAllocator(router, ipamConfig, db, isKnownPeer) + var t tracker.LocalRangeTracker + if useAWSVPC { + Log.Infoln("Creating AWSVPC LocalRangeTracker") + t, err = tracker.NewAWSVPCTracker() + if err != nil { + Log.Fatalf("Cannot create AWSVPC LocalRangeTracker: %s", err) + } + trackerName = "awsvpc" + } + allocator, defaultSubnet = createAllocator(router, ipamConfig, db, t, isKnownPeer) observeContainers(allocator) ids, err := dockerCli.AllContainerIDs() checkFatal(err) @@ -333,7 +346,7 @@ func main() { if httpAddr != "" { muxRouter := mux.NewRouter() if allocator != nil { - allocator.HandleHTTP(muxRouter, defaultSubnet, dockerCli) + allocator.HandleHTTP(muxRouter, defaultSubnet, trackerName, dockerCli) } if ns != nil { ns.HandleHTTP(muxRouter, dockerCli) @@ -388,10 +401,18 @@ func (nopPacketLogging) LogPacket(string, weave.PacketKey) { func (nopPacketLogging) LogForwardPacket(string, weave.ForwardPacketKey) { } -func createOverlay(datapathName string, ifaceName string, host string, port int, bufSzMB int) (weave.NetworkOverlay, weave.Bridge) { +func createOverlay(datapathName string, ifaceName string, useAWSVPC bool, host string, port int, bufSzMB int) (weave.NetworkOverlay, weave.Bridge) { overlay := weave.NewOverlaySwitch() var bridge weave.Bridge + var ignoreSleeve bool + switch { + case useAWSVPC: + vpc := weave.NewAWSVPC() + overlay.Add("awsvpc", vpc) + bridge = weave.NullBridge{} + // Currently, we do not support any overlay with AWSVPC + ignoreSleeve = true case datapathName != "" && ifaceName != "": Log.Fatal("At most one of --datapath and --iface must be specified.") case datapathName != "": @@ -409,13 +430,17 @@ func createOverlay(datapathName string, ifaceName string, host string, port int, default: bridge = weave.NullBridge{} } - sleeve := weave.NewSleeveOverlay(host, port) - overlay.Add("sleeve", sleeve) - overlay.SetCompatOverlay(sleeve) + + if !ignoreSleeve { + sleeve := weave.NewSleeveOverlay(host, port) + overlay.Add("sleeve", sleeve) + overlay.SetCompatOverlay(sleeve) + } + return overlay, bridge } -func createAllocator(router *weave.NetworkRouter, config ipamConfig, db db.DB, isKnownPeer func(mesh.PeerName) bool) (*ipam.Allocator, address.CIDR) { +func createAllocator(router *weave.NetworkRouter, config ipamConfig, db db.DB, track tracker.LocalRangeTracker, isKnownPeer func(mesh.PeerName) bool) (*ipam.Allocator, address.CIDR) { ipRange, err := ipam.ParseCIDRSubnet(config.IPRangeCIDR) checkFatal(err) defaultSubnet := ipRange @@ -437,6 +462,7 @@ func createAllocator(router *weave.NetworkRouter, config ipamConfig, db db.DB, i Quorum: func() uint { return determineQuorum(config.PeerCount, router) }, Db: db, IsKnownPeer: isKnownPeer, + Tracker: track, } allocator := ipam.NewAllocator(c) diff --git a/prog/weaveutil/attach.go b/prog/weaveutil/attach.go index 2dbe037fa2..83a3085a99 100644 --- a/prog/weaveutil/attach.go +++ b/prog/weaveutil/attach.go @@ -3,12 +3,15 @@ package main import ( "fmt" "net" + "os" "strconv" "syscall" docker "github.com/fsouza/go-dockerclient" "github.com/vishvananda/netns" + "github.com/weaveworks/weave/api" + "github.com/weaveworks/weave/common" weavenet "github.com/weaveworks/weave/net" ) @@ -17,11 +20,26 @@ func attach(args []string) error { cmdUsage("attach-container", "[--no-multicast-route] ...") } + client := api.NewClient(os.Getenv("WEAVE_HTTP_ADDR"), common.Log) + keepTXOn := false + isAWSVPC := false + // In a case of an error, we skip applying necessary steps for AWSVPC, because + // "attach" should work without the weave router running. + if t, err := client.LocalRangeTracker(); err != nil { + fmt.Fprintf(os.Stderr, "unable to determine tracker: %s; skipping AWSVPC initialization\n", err) + } else if t == "awsvpc" { + isAWSVPC = true + keepTXOn = true + } + withMulticastRoute := true if args[0] == "--no-multicast-route" { withMulticastRoute = false args = args[1:] } + if isAWSVPC { + withMulticastRoute = false + } pid, nsContainer, err := containerPidAndNs(args[0]) if err != nil { @@ -40,7 +58,22 @@ func attach(args []string) error { if err != nil { return err } - err = weavenet.AttachContainer(nsContainer, fmt.Sprint(pid), weavenet.VethName, args[1], mtu, withMulticastRoute, cidrs) + + if isAWSVPC { + // Currently, we allow only IP addresses from the default subnet + if defaultSubnet, err := client.DefaultSubnet(); err != nil { + fmt.Fprintf(os.Stderr, "unable to retrieve default subnet: %s; skipping AWSVPC checks\n", err) + } else { + for _, cidr := range cidrs { + if !sameSubnet(cidr, defaultSubnet) { + format := "AWSVPC constraints violation: %s does not belong to the default subnet %s" + return fmt.Errorf(format, cidr, defaultSubnet) + } + } + } + } + + err = weavenet.AttachContainer(nsContainer, fmt.Sprint(pid), weavenet.VethName, args[1], mtu, withMulticastRoute, cidrs, keepTXOn) // If we detected an error but the container has died, tell the user that instead. if err != nil && !processExists(pid) { err = fmt.Errorf("Container %s died", args[0]) @@ -48,6 +81,16 @@ func attach(args []string) error { return err } +// sameSubnet checks whether ip belongs to network's subnet +func sameSubnet(ip *net.IPNet, network *net.IPNet) bool { + if network.Contains(ip.IP) { + i1, i2 := ip.Mask.Size() + n1, n2 := network.Mask.Size() + return i1 == n1 && i2 == n2 + } + return false +} + func containerPidAndNs(containerID string) (int, netns.NsHandle, error) { c, err := docker.NewVersionedClientFromEnv("1.18") if err != nil { diff --git a/router/awsvpc.go b/router/awsvpc.go new file mode 100644 index 0000000000..59c64c979d --- /dev/null +++ b/router/awsvpc.go @@ -0,0 +1,75 @@ +package router + +// A dummy overlay for the AWSVPC underlay to make `weave status` to return +// a valid information about peer connections. + +import ( + "github.com/weaveworks/mesh" +) + +// mesh.OverlayConnection + +type AWSVPCConnection struct { + establishedChan chan struct{} + errorChan chan error +} + +func (conn *AWSVPCConnection) Confirm() { + // We close the channel to notify mesh that the connection has been established. + close(conn.establishedChan) +} + +func (conn *AWSVPCConnection) EstablishedChannel() <-chan struct{} { + return conn.establishedChan +} + +func (conn *AWSVPCConnection) ErrorChannel() <-chan error { + return conn.errorChan +} + +func (conn *AWSVPCConnection) Stop() {} + +func (conn *AWSVPCConnection) ControlMessage(tag byte, msg []byte) { +} + +func (conn *AWSVPCConnection) DisplayName() string { + return "awsvpc" +} + +// OverlayForwarder + +func (conn *AWSVPCConnection) Forward(key ForwardPacketKey) FlowOp { + return DiscardingFlowOp{} +} + +type AWSVPC struct{} + +func NewAWSVPC() AWSVPC { + return AWSVPC{} +} + +// mesh.Overlay + +func (vpc AWSVPC) AddFeaturesTo(features map[string]string) {} + +func (vpc AWSVPC) PrepareConnection(params mesh.OverlayConnectionParams) (mesh.OverlayConnection, error) { + conn := &AWSVPCConnection{ + establishedChan: make(chan struct{}), + errorChan: make(chan error, 1), + } + return conn, nil +} + +func (vpc AWSVPC) Diagnostics() interface{} { + return nil +} + +// NetworkOverlay + +func (vpc AWSVPC) InvalidateRoutes() {} + +func (vpc AWSVPC) InvalidateShortIDs() {} + +func (vpc AWSVPC) StartConsumingPackets(localPeer *mesh.Peer, peers *mesh.Peers, consumer OverlayConsumer) error { + return nil +} diff --git a/router/bridge.go b/router/bridge.go index 434669bd99..3f8a193f53 100644 --- a/router/bridge.go +++ b/router/bridge.go @@ -36,7 +36,7 @@ func (NullBridge) Interface() *net.Interface { } func (NullBridge) String() string { - return "" + return "no overlay bridge" } func (NullBridge) Stats() map[string]int { diff --git a/test/900_awsvpc_3_test.sh b/test/900_awsvpc_3_test.sh new file mode 100755 index 0000000000..69634f3e74 --- /dev/null +++ b/test/900_awsvpc_3_test.sh @@ -0,0 +1,134 @@ +#! /bin/bash + +. ./config.sh + +# Skip if it is run on non-AWS machine +[ -z "$AWS" ] && exit 0 + +UNIVERSE=10.32.0.0/12 +SUBNET=10.32.42.0/24 +CIDR1=10.32.0.0/14 +CIDR2=10.36.0.0/14 +CIDR3=10.40.0.0/14 +CIDR4=10.44.0.0/14 + +INSTANCE_ID_CMD="curl -s -L http://169.254.169.254/latest/meta-data/instance-id" + +# TODO(mp) Detect by using instance id instead! +routetableid() { + host=$1 + json=$(mktemp json.XXXXXXXXXX) + aws ec2 describe-instances \ + --filters "Name=instance-state-name,Values=pending,running" \ + "Name=tag:weavenet_ci,Values=true" \ + "Name=tag:Name,Values=$host" > $json + vpcid=$(jq -r ".Reservations[0].Instances[0].NetworkInterfaces[0].VpcId" $json) + aws ec2 describe-route-tables \ + --filters "Name=vpc-id,Values=$vpcid" > $json + jq -r ".RouteTables[0].RouteTableId" $json + rm $json +} + +cleanup_routetable() { + id=$1 + json=$(mktemp json.XXXXXXXXXX) + echo "Cleaning up routes" + aws ec2 describe-route-tables --route-table-ids $id > $json + cidrs=$(jq -r ".RouteTables[0].Routes[] | select(has(\"NetworkInterfaceId\")) | + .DestinationCidrBlock" $json) + for cidr in $cidrs; do + echo "Removing $cidr route" + aws ec2 delete-route \ + --route-table-id $id \ + --destination-cidr-block $cidr + done + rm $json +} + +route_exists() { + rt_id=$1 + dst_cidr=$2 + instance_id=$3 + q=".RouteTables[].Routes[] | select (.DestinationCidrBlock == \"$dst_cidr\") | + select (.InstanceId == \"$instance_id\")" + aws ec2 describe-route-tables --route-table-ids $rt_id | + jq -e -r "$q" > /dev/null +} + +route_not_exist() { + rt_id=$1 + dst_cidr=$2 + q=".RouteTables[].Routes[] | select (.DestinationCidrBlock == \"$dst_cidr\")" + aws ec2 describe-route-tables --route-table-ids $rt_id | + jq -e -r "$q" > /dev/null + [ $? -ne 0 ] || return 1 +} + +no_fastdp() { + weave_on $1 report -f "{{.Router.Interface}}" | grep -q -v "datapath" +} + +start_suite "AWS VPC" + +INSTANCE1=$($SSH $HOST1 $INSTANCE_ID_CMD) +INSTANCE2=$($SSH $HOST2 $INSTANCE_ID_CMD) +INSTANCE3=$($SSH $HOST3 $INSTANCE_ID_CMD) + +VPC_ROUTE_TABLE_ID=$(routetableid $HOST1) +cleanup_routetable $VPC_ROUTE_TABLE_ID + +echo "starting weave" + +weave_on $HOST1 launch --log-level=debug --ipalloc-range $UNIVERSE --awsvpc +weave_on $HOST2 launch --log-level=debug --ipalloc-range $UNIVERSE --awsvpc $HOST1 +weave_on $HOST3 launch --log-level=debug --ipalloc-range $UNIVERSE --awsvpc $HOST1 + +echo "starting containers" + +start_container $HOST1 --name=c1 +start_container $HOST2 --name=c4 +start_container $HOST1 --name=c2 +proxy_start_container $HOST1 -di --name=c3 +start_container $HOST3 --name=c5 + +assert_raises "route_exists $VPC_ROUTE_TABLE_ID $CIDR1 $INSTANCE1" +assert_raises "route_exists $VPC_ROUTE_TABLE_ID $CIDR2 $INSTANCE3" +assert_raises "route_exists $VPC_ROUTE_TABLE_ID $CIDR3 $INSTANCE1" +assert_raises "route_exists $VPC_ROUTE_TABLE_ID $CIDR4 $INSTANCE2" + +# Starting container within non-default subnet should fail +assert_raises "proxy_start_container $HOST1 --name=c6 -e WEAVE_CIDR=net:$SUBNET" 1 + +# Check that we do not use fastdp +assert_raises "no_fastdp $HOST1" +assert_raises "no_fastdp $HOST2" +assert_raises "no_fastdp $HOST3" + +assert_raises "exec_on $HOST1 c1 $PING c2" +assert_raises "exec_on $HOST1 c1 $PING c4" +assert_raises "exec_on $HOST2 c4 $PING c1" +assert_raises "exec_on $HOST2 c4 $PING c3" +assert_raises "exec_on $HOST1 c1 $PING c5" +assert_raises "exec_on $HOST2 c4 $PING c5" + +weave_on $HOST2 stop +# stopping should not remove the entries +assert_raises "route_exists $VPC_ROUTE_TABLE_ID $CIDR4 $INSTANCE2" + +weave_on $HOST2 launch --log-level=debug --ipalloc-range $UNIVERSE --awsvpc $HOST1 + +weave_on $HOST1 reset +PEER3=$(weave_on $HOST3 report -f '{{.Router.Name}}') +weave_on $HOST3 stop +weave_on $HOST2 rmpeer $PEER3 + +## host1 transferred previously owned ranges to host2 and host2 took over host2 ranges +assert_raises "route_not_exist $VPC_ROUTE_TABLE_ID $CIDR1" +assert_raises "route_not_exist $VPC_ROUTE_TABLE_ID $CIDR2" +assert_raises "route_not_exist $VPC_ROUTE_TABLE_ID $CIDR3" +assert_raises "route_not_exist $VPC_ROUTE_TABLE_ID $CIDR4" +assert_raises "route_exists $VPC_ROUTE_TABLE_ID $UNIVERSE $INSTANCE2" + +cleanup_routetable $VPC_ROUTE_TABLE_ID + +end_suite diff --git a/test/aws.sh b/test/aws.sh new file mode 100755 index 0000000000..bbbc1f77db --- /dev/null +++ b/test/aws.sh @@ -0,0 +1,410 @@ +#!/bin/bash + +# TODO(mp) create custom VPC to avoid potential clashes with other weave +# participants. + +# The script is used to setup AWS EC2 machines for running the integration tests. +# +# Before running the script, make sure that the following has been done (once): +# +# 1) The "weavenet-circleci" IAM user has been created and its credentials are +# set in CircleCI AWS profile page. +# 2) The "AmazonEC2FullAccess" policy has been attached to the user. +# 3) The "weavenet-vpc" policy has been created: +# { +# "Version": "2012-10-17", +# "Statement": [ +# { +# "Effect": "Allow", +# "Action": [ +# "ec2:CreateRoute", +# "ec2:DeleteRoute", +# "ec2:ReplaceRoute", +# "ec2:DescribeRouteTables", +# "ec2:DescribeInstances" +# ], +# "Resource": [ +# "*" +# ] +# } +# ] +# } +# 4) The "weavenet-test_host" role has been created. +# 5) The "weavenet-vpc" policy has been attached to the role. +# 6) The "weavenet-pass_vpc" policy has been created: +# { +# "Version": "2012-10-17", +# "Statement": [ +# { +# "Effect": "Allow", +# "Action": "iam:PassRole", +# "Resource": "$ARN" +# } +# ] +# } +# (where $ARN is an ARN of the "weavenet-test_host" role; can be found in the +# role's profile page). +# 7) The policy has been attached to the "weavenet-circleci" user. +# +# NB: Each machine is scheduled to terminate after 30mins (via `shutdown -h`). +# It is needed because, at the time of writing, CircleCI does not support +# a graceful teardown in a case of build cancellation. + +set -e + +: ${ZONE:="us-east-1a"} + +: ${SRC_IMAGE_ID:="ami-fce3c696"} # Ubuntu 14.04 LTS (HVM) at us-east-1 +: ${IMAGE_NAME:="weavenet_ci"} + +: ${INSTANCE_TYPE:="t2.micro"} +: ${SEC_GROUP_NAME:="weavenet-ci"} +: ${TERMINATE_AFTER_MIN:=30} + +: ${KEY_NAME:="weavenet_ci"} +: ${SSH_KEY_FILE:="$HOME/.ssh/$KEY_NAME"} + +: ${NUM_HOSTS:=3} +: ${AWSCLI:="aws"} +: ${SSHCMD:="ssh -o StrictHostKeyChecking=no -o CheckHostIp=no + -o UserKnownHostsFile=/dev/null -l ubuntu -i $SSH_KEY_FILE"} + +SUFFIX="" +if [ -n "$CIRCLECI" ]; then + SUFFIX="-${CIRCLE_BUILD_NUM}-$CIRCLE_NODE_INDEX" +fi + +# Creates and runs a set of VMs. +# Each VM is named after "host${ID}${SUFFIX}" and is tagged with the "weavenet_ci" +# tag. +# NOTE: each VM will be automatically terminated after $TERMINATE_AFTER_MIN minutes. +function setup { + # Destroy previous machines (if any) + destroy + + # Start instances + image_id=$(ami_id) + json=$(mktemp json.XXXXXXXXXX) + echo "Creating $NUM_HOSTS instances of $image_id image" + run_instances $NUM_HOSTS $image_id > $json + + # Assign a name to each instance and + # disable src/dst checks (required by awsvpc) + i=1 + for vm in `jq -r -e ".Instances[].InstanceId" $json`; do + $AWSCLI ec2 create-tags \ + --resources "$vm" \ + --tags "Key=Name,Value=\"$(vm_name $i)\"" \ + "Key=weavenet_ci,Value=\"true\"" + $AWSCLI ec2 modify-instance-attribute \ + --instance-id "$vm" \ + --no-source-dest-check + $AWSCLI ec2 modify-instance-attribute \ + --instance-id "$vm" \ + --instance-initiated-shutdown-behavior terminate + ((i++)) + done + + # Populate /etc/hosts of local host and of each instance + hosts=$(mktemp hosts.XXXXXXXXXX) + # wait_for_hosts will populate $json as well + wait_for_hosts $json + names=$(vm_names) + for vm in $names; do + echo "$(internal_ip $json $vm) $vm" >> $hosts + done + for vm in $names; do + sudo sed -i "/$vm/d" /etc/hosts + sudo sh -c "echo \"$(external_ip $json $vm) $vm\" >>/etc/hosts" + try_connect $vm + $SSHCMD $vm "nohup sudo shutdown -h +$TERMINATE_AFTER_MIN > /dev/null 2>&1 &" + copy_hosts $vm $hosts & + done + + wait + + rm $json $hosts +} + +# Creates AMI. +function make_template { + # Check if image exists + [[ $(ami_id) == "null" ]] || exit 0 + + # Create an instance + json=$(mktemp json.XXXXXXXXXX) + echo "Creating instances of $SRC_IMAGE_ID image" + run_instances 1 "$SRC_IMAGE_ID" > $json + + # Install docker and friends + instance_id=$(jq -r -e ".Instances[0].InstanceId" $json) + trap '$AWSCLI ec2 terminate-instances --instance-ids $instance_id > /dev/null' EXIT + list_instances_by_id "$instance_id" > $json + f=".Reservations[].Instances[].NetworkInterfaces[0].Association.PublicIp" + public_ip=$(jq -r -e "$f" $json) + try_connect "$public_ip" + install_docker_on "$public_ip" + + # Create an image + echo "Creating $IMAGE_NAME image from $instance_id instance" + $AWSCLI ec2 create-image \ + --instance-id "$instance_id" \ + --name "$IMAGE_NAME" + image_id=$(ami_id) + wait_for_ami $image_id + + # Delete artifacts + rm $json +} + +# Destroy VMs and remove keys. +function destroy { + delete_key_pair + json=$(mktemp json.XXXXXXXXXX) + list_instances >> $json + instances="" + for i in `jq -r -e ".Reservations[].Instances[].InstanceId" $json`; do + instances="$i $instances" + done + + if [[ ! -z "$instances" ]]; then + echo "Terminating $instances instances" + $AWSCLI ec2 terminate-instances --instance-ids $instances > /dev/null + fi + + rm $json +} + +# Helpers + +function run_instances { + count="$1" + image_id="$2" + + # Create keypair + create_key_pair > /dev/null + + # Check whether a necessary security group exists + ensure_sec_group > /dev/null + + $AWSCLI ec2 run-instances \ + --image-id "$image_id" \ + --key-name "$KEY_NAME" \ + --placement "AvailabilityZone=$ZONE" \ + --instance-type "$INSTANCE_TYPE" \ + --security-groups "$SEC_GROUP_NAME" \ + --iam-instance-profile Name="weavenet-test_host" \ + --count $count +} + +function list_instances { + $AWSCLI ec2 describe-instances \ + --filters "Name=instance-state-name,Values=pending,running" \ + "Name=tag:weavenet_ci,Values=true" \ + "Name=tag:Name,Values=$(vm_names| sed 's/ $//' | sed 's/ /,/g')" +} + +function list_instances_by_id { + ids="$1" + $AWSCLI ec2 describe-instances --instance-ids $1 +} + +function ami_id { + $AWSCLI ec2 describe-images --filter "Name=name,Values=$IMAGE_NAME" | + jq -r ".Images[0].ImageId" +} + +function ami_state { + image_id="$1" + $AWSCLI ec2 describe-images --image-ids "$image_id" | + jq -r -e ".Images[0].State" +} + +# Function blocks until image becomes ready (i.e. state != pending). +function wait_for_ami { + image_id="$1" + + echo "Waiting for $image_id image" + for i in {0..20}; do + state=$(ami_state "$image_id") + [[ "$state" != "pending" ]] && return + sleep 60 + done + echo "Done waiting" +} + +# Function blocks until external ip becomes available. +function wait_for_hosts { + json="$1" + + echo "Waiting for hosts" + for vm in $(vm_names); do + echo "Waiting for $vm" + # TODO(mp) maybe parallelize + wait_for_external_ip $json "$vm" + done + echo "Done waiting" +} + +function wait_for_external_ip { + json="$1" + vm="$2" + for i in {0..10}; do + list_instances > $json + ip=$(external_ip $json $vm) + [[ ! -z "$ip" ]] && return + sleep 2 + done +} + +function vm_names { + local names= + for i in $(seq 1 $NUM_HOSTS); do + names="$(vm_name $i) $names" + done + echo "$names" +} + +function vm_name { + id="$1" + echo "host$id$SUFFIX" +} + +function internal_ip { + jq -r ".Reservations[].Instances[] + | select (.Tags[].Value == \"$2\") + | .NetworkInterfaces[0].PrivateIpAddress" $1 +} + +function external_ip { + jq -r ".Reservations[].Instances[] + | select (.Tags[].Value == \"$2\") + | .NetworkInterfaces[0].Association.PublicIp" $1 +} + +function create_key_pair { + function _create { + $AWSCLI ec2 create-key-pair --key-name $KEY_NAME 2>&1 + } + + if ! RET=$(_create); then + if echo "$RET" | grep -q "InvalidKeyPair\.Duplicate"; then + delete_key_pair + RET=$(_create) + else + echo "$RET" + exit -1 + fi + fi + + echo "Created $KEY_FILE keypair" + echo "Writing $KEY_FILE into $SSH_KEY_FILE" + + echo "$RET" | jq -r .KeyMaterial > $SSH_KEY_FILE + chmod 400 $SSH_KEY_FILE +} + +function delete_key_pair { + echo "Deleting $KEY_NAME keypair" + $AWSCLI ec2 delete-key-pair --key-name $KEY_NAME + rm -f "$SSH_KEY_FILE" || true +} + +function ensure_sec_group { + $AWSCLI ec2 describe-security-groups | \ + jq -r -e ".SecurityGroups[] | + select (.GroupName == \"$SEC_GROUP_NAME\")" > /dev/null \ + || create_sec_group +} + +function create_sec_group { + echo "Creating $SEC_GROUP_NAME security group" + $AWSCLI ec2 create-security-group \ + --group-name "$SEC_GROUP_NAME" \ + --description "Weave CircleCI" > /dev/null + $AWSCLI ec2 authorize-security-group-ingress \ + --group-name "$SEC_GROUP_NAME" \ + --source-group "$SEC_GROUP_NAME" \ + --protocol all + $AWSCLI ec2 authorize-security-group-ingress \ + --group-name "$SEC_GROUP_NAME" \ + --protocol tcp --port 22 \ + --cidr "0.0.0.0/0" + $AWSCLI ec2 authorize-security-group-ingress \ + --group-name "$SEC_GROUP_NAME" \ + --protocol tcp --port 2375 \ + --cidr "0.0.0.0/0" + $AWSCLI ec2 authorize-security-group-ingress \ + --group-name "$SEC_GROUP_NAME" \ + --protocol tcp --port 12375 \ + --cidr "0.0.0.0/0" +} + +# Commons (taken from gce.sh, and slightly modified) + +# TODO(mp) DRY + +function hosts { + hosts= + json=$(mktemp json.XXXXXXXXXX) + list_instances > $json + for name in $(vm_names); do + hostname="$name" + hosts="$hostname $hosts" + done + echo export SSH=\"$SSHCMD\" + echo export NO_SCHEDULER=1 + echo export HOSTS=\"$hosts\" + rm $json +} + +function try_connect { + echo "Trying to connect to $1" + for i in {0..120}; do + $SSHCMD -t $1 true && return + sleep 2 + done + echo "Connected to $1" +} + +function copy_hosts { + hostname=$1 + hosts=$2 + + cat $hosts | $SSHCMD -t "$hostname" "sudo -- sh -c \"cat >>/etc/hosts\"" +} + +function install_docker_on { + # TODO(mp) bring back the `-s overlay` opt to DOCKER_OPTS. + + name=$1 + $SSHCMD -t $name sudo bash -x -s <> /etc/default/docker; +service docker restart +EOF + # It seems we need a short delay for docker to start up, so I put this in + # a separate ssh connection. This installs nsenter. + $SSHCMD -t $name sudo docker run --rm -v /usr/local/bin:/target jpetazzo/nsenter +} + +# Main + +case "$1" in +setup) + setup + ;; +hosts) + hosts + ;; +destroy) + destroy + ;; +make_template) + make_template + ;; +esac diff --git a/vendor/github.com/aws/aws-sdk-go b/vendor/github.com/aws/aws-sdk-go new file mode 160000 index 0000000000..e4be72e584 --- /dev/null +++ b/vendor/github.com/aws/aws-sdk-go @@ -0,0 +1 @@ +Subproject commit e4be72e584418dc0ff2af22ae247095143a7f258 diff --git a/weave b/weave index 53af6c0d51..1aab3d26f3 100755 --- a/weave +++ b/weave @@ -181,6 +181,7 @@ exec_remote() { -e PROXY_HOST="$PROXY_HOST" \ -e COVERAGE \ -e CHECKPOINT_DISABLE \ + -e AWSVPC \ $WEAVEEXEC_DOCKER_ARGS $EXEC_IMAGE --local "$@" } @@ -532,9 +533,10 @@ random_mac() { util_op() { if command_exists weaveutil ; then - weaveutil "$@" + WEAVE_HTTP_ADDR=$WEAVE_HTTP_ADDR weaveutil "$@" else docker run --rm --privileged --net=host --pid=host $(docker_sock_options) \ + -e WEAVE_HTTP_ADDR \ --entrypoint=/usr/bin/weaveutil $EXEC_IMAGE "$@" fi } @@ -645,7 +647,7 @@ EOF fi fi - [ "$1" = "--without-ethtool" ] || ethtool_tx_off_$BRIDGE_TYPE $BRIDGE + [ "$1" = "--without-ethtool" -o -n "$AWSVPC" ] || ethtool_tx_off_$BRIDGE_TYPE $BRIDGE ip link set dev $BRIDGE up @@ -660,6 +662,12 @@ expose_ip() { if ! ip addr show dev $BRIDGE | grep -qF $CIDR ; then ip addr add dev $BRIDGE $CIDR arp_update $BRIDGE $CIDR || true + # Remove a default route installed by the kernel, because awsvpc + # has installed it as well + if [ -n "$AWSVPC" ]; then + RCIDR=$(ip route list exact $CIDR proto kernel | head -n1 | cut -d' ' -f1) + [ -n "$RCIDR" ] && ip route del dev $BRIDGE proto kernel $RCIDR + fi fi [ -z "$FQDN" ] || when_weave_running put_dns_fqdn_no_check_alive weave:expose $FQDN $CIDR done @@ -1501,22 +1509,19 @@ launch_router() { LAUNCHING_ROUTER=1 check_forwarding_rules enforce_docker_bridge_addr_assign_type - create_bridge - docker_bridge_ip - # We set the router name to the bridge MAC, which in turn is - # derived from the system UUID (if available), and thus stable - # across reboots. - PEERNAME=$(cat /sys/class/net/$BRIDGE/address) + # backward compatibility... if is_cidr "$1" ; then echo "WARNING: $1 parameter ignored; 'weave launch' no longer takes a CIDR as the first parameter" >&2 shift 1 fi + CONTAINER_PORT=$PORT ARGS= IPRANGE= IPRANGE_SPECIFIED= + docker_bridge_ip DNS_ROUTER_OPTS="--dns-listen-address $DOCKER_BRIDGE_IP:53" NO_DNS_OPT= @@ -1558,6 +1563,10 @@ launch_router() { --no-restart) RESTART_POLICY= ;; + --awsvpc) + AWSVPC_ARGS="--awsvpc" + AWSVPC=1 + ;; *) ARGS="$ARGS '$(echo "$1" | sed "s|'|'\"'\"'|g")'" ;; @@ -1565,6 +1574,13 @@ launch_router() { shift done eval "set -- $ARGS" + + create_bridge + # We set the router name to the bridge MAC, which in turn is + # derived from the system UUID (if available), and thus stable + # across reboots. + PEERNAME=$(cat /sys/class/net/$BRIDGE/address) + if [ -z "$IPRANGE_SPECIFIED" ] ; then IPRANGE="10.32.0.0/12" if ! check_overlap $IPRANGE ; then @@ -1601,11 +1617,23 @@ launch_router() { --ipalloc-range "$IPRANGE" \ --dns-effective-listen-address $DOCKER_BRIDGE_IP \ $DNS_ROUTER_OPTS $NO_DNS_OPT \ + $AWSVPC_ARGS \ --http-addr $HTTP_ADDR \ "$@") setup_router_iface_$BRIDGE_TYPE wait_for_status $CONTAINER_NAME http_call $HTTP_ADDR populate_router + if [ -n "$AWSVPC" ]; then + expose_ip + # Set proxy_arp on the bridge, so that it could accept packets destined + # to containers within the same subnet but running on remote hosts. + # Without it, exact routes on each container are required. + echo 1 >/proc/sys/net/ipv4/conf/$BRIDGE/proxy_arp + # Avoid delaying the first ARP request. Also, setting it to 0 avoids + # placing the request into a bounded queue as it can be seen: + # https://git.kernel.org/cgit/linux/kernel/git/stable/linux-stable.git/tree/net/ipv4/arp.c?id=refs/tags/v4.6.1#n819 + echo 0 >/proc/sys/net/ipv4/neigh/$BRIDGE/proxy_delay + fi } # Recreate the parameter values that are set when the router is first launched