Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

[WIP] 2017: AWSVPC #2091

Closed
wants to merge 63 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
dfa2b36
Add support for AWSVPC
brb Mar 23, 2016
dfbcefa
Disable temporarily system tests on GCE
brb Apr 5, 2016
7dff3d4
Fix tests
brb May 9, 2016
66ca9e1
Get rid of the isCIDRAligned flag
brb May 16, 2016
29e9238
Cleanup
brb May 16, 2016
d423662
Address some comments
brb May 19, 2016
2cbea50
Get rid of WEAVE_NO_FASTDP=1
brb May 19, 2016
dde0371
Dead code elimination
brb May 19, 2016
b7bdcc3
Get rid of comment about deadlock
brb May 19, 2016
204832d
Add comments
brb May 19, 2016
ed9a6d5
Refactor
brb May 19, 2016
3dc6877
Keep docker_bridge_ip
brb May 26, 2016
9177e3d
Extend 900_awsvpc_2_test.sh
brb May 26, 2016
8ec3427
Cleanup
brb May 27, 2016
97d02da
Disable proxy_delay on the weave bridge
brb May 27, 2016
53c3a87
Cleanup
brb May 27, 2016
c739884
Cleanup weave script
brb May 27, 2016
ed89038
Cleanup
brb May 31, 2016
6df76d2
Cleanup
brb May 31, 2016
343d90d
Cleanup
brb May 31, 2016
3aca783
Cleanup
brb May 31, 2016
e82906e
Cleanup
brb May 31, 2016
d72874a
Cleanup
brb May 31, 2016
c7201dc
Use expose_ip for AWSVPC
brb May 31, 2016
6055df7
Add temp TODO
brb May 31, 2016
dd7f7bb
Add dummy overlay for AWSVPC
brb Jun 1, 2016
5581c84
Adjust bridge interface friendly name
brb Jun 1, 2016
5cea0bc
Update TODO
brb Jun 1, 2016
8280622
Cleanup
brb Jun 2, 2016
c49e648
Add /monitor endpoint
brb Jun 2, 2016
9163541
Check for deadlocks
brb Jun 2, 2016
90e1f46
Add subnet check
brb Jun 2, 2016
216e45c
Keep TX ON for awsvpc
brb Jun 2, 2016
d8b60ba
Cleanup
brb Jun 3, 2016
83ac36f
Cleanup
brb Jun 3, 2016
84577ea
Refactor Monitor -> LocalRangeTracker
brb Jun 3, 2016
699c3f7
Refactor removeCommon
brb Jun 3, 2016
d0cee65
Do merging inside HandleUpdate
brb Jun 3, 2016
5260771
Cleanup removeCommon
brb Jun 3, 2016
86f3e48
Move HandleUpdate to ring
brb Jun 3, 2016
7a3e356
Get rid of String method
brb Jun 3, 2016
784ac88
Add Matthias simplification
brb Jun 3, 2016
94d71e2
Cleanup
brb Jun 3, 2016
83c2ed3
Save a line
brb Jun 3, 2016
be49c0e
Address comments
brb Jun 3, 2016
cdaad55
Add logging
brb Jun 3, 2016
62f1b23
More logging
brb Jun 3, 2016
8694629
Fix ring update
brb Jun 4, 2016
2805547
Fix broken route checker routines
brb Jun 4, 2016
97b1f3a
Refactor restoration
brb Jun 4, 2016
dae3718
Update TODO.md
brb Jun 5, 2016
6b6a6ab
Set AWS=1 only for AWS
brb Jun 6, 2016
9f14a52
Fix rmpeer
brb Jun 6, 2016
5c7afd4
Bring 3rd host
brb Jun 6, 2016
05a049a
Add rmpeer tests
brb Jun 6, 2016
b26256f
Get rid of AdminTransfer
brb Jun 6, 2016
2b75f3c
WIP: add vpc code to the plugin
brb Jun 6, 2016
3265604
Cleanup tests
brb Jun 7, 2016
ce0a58c
Cleanup subdivide
brb Jun 7, 2016
607382e
Add docs
brb Jun 7, 2016
44eec32
Snapshot
brb Jun 7, 2016
1fcedae
Revert plugin changes
brb Jun 7, 2016
8dadc5e
Cleanup
brb Jun 7, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,6 @@
[submodule "vendor/github.com/j-keck/arping"]
path = vendor/github.com/j-keck/arping
url = https://github.com/j-keck/arping
[submodule "vendor/github.com/aws/aws-sdk-go"]
path = vendor/github.com/aws/aws-sdk-go
url = https://github.com/aws/aws-sdk-go
17 changes: 17 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# AWSVPC

# Urgent

* Add the subnet checks / keep checksum offloading for the plugin
* Test with the plugin

* Test with multiple addr from the default subnet

* Create new SSH keys or reuse the same for each test run

# Nice To Have

* What happens if $RCIDR is not found
* Do fuzz testing
* Read on VPC routing tables again.
* create a separate VPC for each test run
8 changes: 8 additions & 0 deletions api/ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ func (client *Client) DefaultSubnet() (*net.IPNet, error) {
return ipnet, err
}

func (client *Client) LocalRangeTracker() (string, error) {
t, err := client.httpVerb("GET", fmt.Sprintf("/ipinfo/tracker"), nil)
if err != nil {
return "", err
}
return t, nil
}

func parseIP(body string) (*net.IPNet, error) {
ip, ipnet, err := net.ParseCIDR(string(body))
if err != nil {
Expand Down
25 changes: 18 additions & 7 deletions circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ machine:
SRCDIR: /home/ubuntu/src/github.com/weaveworks/weave
PATH: $PATH:$HOME/.local/bin:$HOME/google-cloud-sdk/bin/
CLOUDSDK_CORE_DISABLE_PROMPTS: 1
AWS_DEFAULT_REGION: us-east-1

dependencies:
cache_directories:
Expand All @@ -22,7 +23,9 @@ dependencies:
- bin/setup-circleci-secrets "$SECRET_PASSWORD"
- "mkdir -p $(dirname $SRCDIR) && cp -r $(pwd)/ $SRCDIR"
- "cd $SRCDIR; git submodule update --init"
- cd $SRCDIR/test; ./gce.sh make_template:
#- cd $SRCDIR/test; ./gce.sh make_template:

This comment was marked as abuse.

This comment was marked as abuse.

# parallel: false
- cd $SRCDIR/test; ./aws.sh make_template:
parallel: false
- "cd $SRCDIR/build; ../tools/rebuild-image weaveworks/weavebuild . Dockerfile build.sh && touch $SRCDIR/.build.uptodate"

Expand All @@ -36,14 +39,22 @@ test:
parallel: true
- cd $SRCDIR; make testrunner:
parallel: true
- cd $SRCDIR/test; ./gce.sh setup && eval $(./gce.sh hosts) && ./setup.sh:
parallel: true
- cd $SRCDIR/test; eval $(./gce.sh hosts); export COVERAGE=true; ./run_all.sh:
parallel: true
#- cd $SRCDIR/test; ./gce.sh setup && eval $(./gce.sh hosts) && ./setup.sh:
# parallel: true
#- cd $SRCDIR/test; eval $(./gce.sh hosts); export COVERAGE=true; ./run_all.sh:
# parallel: true
# timeout: 300
- cd $SRCDIR/test; ./aws.sh setup && eval $(./aws.sh hosts) && ./setup.sh:
parallel: false

This comment was marked as abuse.

- cd $SRCDIR/test; eval $(./aws.sh hosts); export COVERAGE=true; AWS=1 ./run_all.sh:
parallel: false
timeout: 300

post:
- cd $SRCDIR/test; ./gce.sh destroy:
parallel: true
#- cd $SRCDIR/test; ./gce.sh destroy:
# parallel: true
- cd $SRCDIR/test; ./aws.sh destroy:
parallel: false
- test "$CIRCLE_NODE_INDEX" != "0" || (cd $SRCDIR/test; ./gen_coverage_reports.sh):
parallel: true

Expand Down
22 changes: 19 additions & 3 deletions ipam/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/weaveworks/weave/ipam/paxos"
"github.com/weaveworks/weave/ipam/ring"
"github.com/weaveworks/weave/ipam/space"
"github.com/weaveworks/weave/ipam/tracker"
"github.com/weaveworks/weave/net/address"
)

Expand Down Expand Up @@ -85,23 +86,34 @@ type Config struct {
Quorum func() uint
Db db.DB
IsKnownPeer func(name mesh.PeerName) bool
Tracker tracker.LocalRangeTracker
}

// NewAllocator creates and initialises a new Allocator
func NewAllocator(config Config) *Allocator {
var participant paxos.Participant
var alloc *Allocator
var onUpdate ring.OnUpdate

if config.IsObserver {
participant = paxos.NewObserver()
} else {
participant = paxos.NewNode(config.OurName, config.OurUID, 0)
}

return &Allocator{
if config.Tracker != nil {
onUpdate = func(prev []address.Range, curr []address.Range, local bool) {
if err := config.Tracker.HandleUpdate(prev, curr, local); err != nil {
alloc.errorf("HandleUpdate failed: %s", err)
}
}
}

alloc = &Allocator{
ourName: config.OurName,
seed: config.Seed,
universe: config.Universe,
ring: ring.New(config.Universe.Range().Start, config.Universe.Range().End, config.OurName),
ring: ring.New(config.Universe.Range().Start, config.Universe.Range().End, config.OurName, onUpdate),
owned: make(map[string]ownedData),
db: config.Db,
paxos: participant,
Expand All @@ -111,6 +123,7 @@ func NewAllocator(config Config) *Allocator {
dead: make(map[string]time.Time),
now: time.Now,
}
return alloc
}

func ParseCIDRSubnet(cidrStr string) (cidr address.CIDR, err error) {
Expand Down Expand Up @@ -958,7 +971,7 @@ func (alloc *Allocator) loadPersistedData() bool {
return false
}

alloc.ring = persistedRing
alloc.ring.Restore(persistedRing)
alloc.space.UpdateRanges(alloc.ring.OwnedRanges())

if ownedFound {
Expand Down Expand Up @@ -1068,6 +1081,9 @@ func (alloc *Allocator) fatalf(fmt string, args ...interface{}) {
func (alloc *Allocator) warnf(fmt string, args ...interface{}) {
alloc.logf(common.Log.Warnf, fmt, args...)
}
func (alloc *Allocator) errorf(fmt string, args ...interface{}) {
common.Log.Errorf("[allocator %s] "+fmt, append([]interface{}{alloc.ourName}, args...)...)
}
func (alloc *Allocator) infof(fmt string, args ...interface{}) {
alloc.logf(common.Log.Infof, fmt, args...)
}
Expand Down
155 changes: 154 additions & 1 deletion ipam/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

const (
testStart1 = "10.0.1.0"
testStart1 = "10.0.0.0"
testStart2 = "10.0.2.0"
testStart3 = "10.0.3.0"
)
Expand Down Expand Up @@ -644,3 +644,156 @@ func cidrRanges(s string) []address.Range {
c, _ := address.ParseCIDR(s)
return []address.Range{c.Range()}
}

func TestTracker(t *testing.T) {
const (
cidr = "10.0.0.0/30"
container1 = "container-1"
container2 = "container-2"
)

trackChan := make(chan rangePair, 10)
allocs, router, _ := makeNetworkOfAllocatorsWithTracker(2, cidr, newTestTracker(trackChan))
defer stopNetworkOfAllocators(allocs, router)

cidr1, _ := address.ParseCIDR(cidr)
addr1, err := allocs[0].Allocate(container1, cidr1, true, returnFalse)
require.Equal(t, nil, err, "")
require.Equal(t, ip("10.0.0.1"), addr1, "")

// Check HandleUpdate invocations. 2 of them should be invoked only with new ranges.
newPeer1 := false
newPeer2 := false
for i := 0; i < 3; i++ {
pair := <-trackChan
switch {
case !newPeer1 && pair.new[0].Equal(addrRange("10.0.0.0", "10.0.0.1")):
newPeer1 = true
case !newPeer2 && pair.new[0].Equal(addrRange("10.0.0.2", "10.0.0.3")):
newPeer2 = true
default:
continue
}
}
require.True(t, newPeer1 && newPeer2, "")

// The following allocation should trigger request for donation, because
// peer1 ran out of space.
addr2, err := allocs[0].Allocate(container2, cidr1, true, returnFalse)
require.Equal(t, nil, err, "")
require.Equal(t, ip("10.0.0.2"), addr2, "")

// Check whether HandleUpdate is invoked after donation.
newDonation1 := false
newDonation2 := false
for i := 0; i < 2; i++ {
pair := <-trackChan
switch {
case !newDonation1 &&
pair.old[0].Equal(addrRange("10.0.0.0", "10.0.0.1")) &&
pair.new[0].Equal(addrRange("10.0.0.0", "10.0.0.1")) &&
pair.new[1].Equal(addrRange("10.0.0.3", "10.0.0.3")):
// peer1

require.Equal(t, 1, len(pair.old), "")
require.Equal(t, 2, len(pair.new), "")
newDonation1 = true
case !newDonation2 &&
pair.old[0].Equal(addrRange("10.0.0.2", "10.0.0.3")) &&
pair.new[0].Equal(addrRange("10.0.0.2", "10.0.0.2")):
// peer2

require.Equal(t, 1, len(pair.old), "")
require.Equal(t, 1, len(pair.new), "")
newDonation2 = true
default:
continue
}
}
require.True(t, newDonation1 && newDonation2, "")
}

func TestShutdownWithTracker(t *testing.T) {
const (
cidr = "10.0.0.0/30"
container1 = "container-1"
)

trackChan := make(chan rangePair, 10)
allocs, router, _ := makeNetworkOfAllocatorsWithTracker(2, cidr, newTestTracker(trackChan))
defer stopNetworkOfAllocators(allocs, router)

cidr1, _ := address.ParseCIDR(cidr)
_, err := allocs[0].Allocate(container1, cidr1, true, returnFalse)
require.NoError(t, err, "")

time.Sleep(500 * time.Millisecond)
flush(trackChan, 2)

// After allocation (and ring establishment):
// * peer1: [10.0.0.0-10.0.0.1]
// * peer2: [10.0.0.2-10.0.0.3]

// Shutdown peer1
allocs[0].Shutdown()

done := false
for i := 0; i < 5; i++ {
p := <-trackChan
switch {
// This should uniquely match HandleUpdate invocation on peer2 which
// happens after peer1 notified peer2 about the donation due to its
// termination:
case !done && len(p.old) == 1 && len(p.new) == 2:
require.Equal(t, addrRange("10.0.0.2", "10.0.0.3"), p.old[0], "")
require.Equal(t, addrRange("10.0.0.0", "10.0.0.1"), p.new[0], "")
require.Equal(t, addrRange("10.0.0.2", "10.0.0.3"), p.new[1], "")
// peer2 has received peer1's previously owned ranges
done = true
default:
continue
}
}
require.True(t, done)

// Shutdown peer2
allocs[1].Shutdown()
p := <-trackChan
require.Equal(t,
[]address.Range{addrRange("10.0.0.0", "10.0.0.1"), addrRange("10.0.0.2", "10.0.0.3")},
p.old)
require.Len(t, p.new, 0)
}

func flush(c chan rangePair, count int) {
for i := 0; i < count; i++ {
<-c
}
}

type testTracker struct {
trackChan chan rangePair
}

type rangePair struct {
old, new []address.Range
}

func newTestTracker(trackChan chan rangePair) *testTracker {
return &testTracker{trackChan}
}

func (t *testTracker) HandleUpdate(old, new []address.Range, local bool) error {
t.trackChan <- rangePair{old, new}
return nil
}

// Creates [start;end] address.Range.
func addrRange(start, end string) address.Range {
return address.NewRange(ip(start), address.Subtract(ip(end)+1, ip(start)))
}

func ip(addr string) address.Address {
retAddr, _ := address.ParseIP(addr)
return retAddr
}
6 changes: 5 additions & 1 deletion ipam/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (alloc *Allocator) handleHTTPClaim(dockerCli *docker.Client, w http.Respons
}

// HandleHTTP wires up ipams HTTP endpoints to the provided mux.
func (alloc *Allocator) HandleHTTP(router *mux.Router, defaultSubnet address.CIDR, dockerCli *docker.Client) {
func (alloc *Allocator) HandleHTTP(router *mux.Router, defaultSubnet address.CIDR, tracker string, dockerCli *docker.Client) {
router.Methods("GET").Path("/ipinfo/defaultsubnet").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "%s", defaultSubnet)
})
Expand Down Expand Up @@ -174,4 +174,8 @@ func (alloc *Allocator) HandleHTTP(router *mux.Router, defaultSubnet address.CID
transferred := alloc.AdminTakeoverRanges(ident)
fmt.Fprintf(w, "%d IPs taken over from %s\n", transferred, ident)
})

router.Methods("GET").Path("/ipinfo/tracker").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, tracker)
})
}
2 changes: 1 addition & 1 deletion ipam/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func listenHTTP(alloc *Allocator, subnet address.CIDR) int {
router.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, fmt.Sprintln(alloc))
})
alloc.HandleHTTP(router, subnet, nil)
alloc.HandleHTTP(router, subnet, "", nil)

httpListener, err := net.Listen("tcp", ":0")
if err != nil {
Expand Down
Loading