Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed query load balancing and failover #2301

Merged
merged 6 commits into from
Apr 17, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
## v0.9.0-rc26 [unreleased]

### Features
- [#2301](https://github.com/influxdb/influxdb/pull/2301): Distributed query load balancing and failover

### Bugfixes
- [#2297](https://github.com/influxdb/influxdb/pull/2297): create /var/run during startup. Thanks @neonstalwart.
- [#2312](https://github.com/influxdb/influxdb/pull/2312): Re-use httpclient for continuous queries
- [#2318](https://github.com/influxdb/influxdb/pull/2318): Remove pointless use of 'done' channel for collectd.
- [#2242](https://github.com/influxdb/influxdb/pull/2242): Distributed Query should balance requests
- [#2243](https://github.com/influxdb/influxdb/pull/2243): Use Limit Reader instead of fixed 1MB/1GB slice for DQ
- [#2190](https://github.com/influxdb/influxdb/pull/2190): Implement failover to other data nodes for distributed queries

## v0.9.0-rc25 [2015-04-15]

Expand Down
77 changes: 77 additions & 0 deletions balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package influxdb

import (
"math/rand"
"time"
)

// Balancer represents a load-balancing algorithm for a set of DataNodes
type Balancer interface {
// Next returns the next DataNode according to the balancing method
// or nil if there are no nodes available
Next() *DataNode
}

type dataNodeBalancer struct {
dataNodes []*DataNode // data nodes to balance between
p int // current node index
}

// NewDataNodeBalancer create a shuffled, round-robin balancer so that
// multiple instances will return nodes in randomized order and each
// each returned DataNode will be repeated in a cycle
func NewDataNodeBalancer(dataNodes []*DataNode) Balancer {
// make a copy of the dataNode slice so we can randomize it
// without affecting the original instance as well as ensure
// that each Balancer returns nodes in a different order
nodes := make([]*DataNode, len(dataNodes))
copy(nodes, dataNodes)

b := &dataNodeBalancer{
dataNodes: nodes,
}
b.shuffle()
return b
}

// shuffle randomizes the ordering the balancers available DataNodes
func (b *dataNodeBalancer) shuffle() {
for i := range b.dataNodes {
j := rand.Intn(i + 1)
b.dataNodes[i], b.dataNodes[j] = b.dataNodes[j], b.dataNodes[i]
}
}

// online returns a slice of the DataNodes that are online
func (b *dataNodeBalancer) online() []*DataNode {
now := time.Now().UTC()
up := []*DataNode{}
for _, n := range b.dataNodes {
if n.OfflineUntil.After(now) {
continue
}
up = append(up, n)
}
return up
}

// Next returns the next available DataNode
func (b *dataNodeBalancer) Next() *DataNode {
// only use online nodes
up := b.online()

// no nodes online
if len(up) == 0 {
return nil
}

// rollover back to the beginning
if b.p >= len(up) {
b.p = 0
}

d := up[b.p]
b.p += 1

return d
}
109 changes: 109 additions & 0 deletions balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package influxdb_test

import (
"fmt"
"net/url"
"testing"

"github.com/influxdb/influxdb"
)

func newDataNodes() []*influxdb.DataNode {
nodes := []*influxdb.DataNode{}
for i := 1; i <= 2; i++ {
u, _ := url.Parse(fmt.Sprintf("http://localhost:999%d", i))
nodes = append(nodes, &influxdb.DataNode{ID: uint64(i), URL: u})
}
return nodes
}

func TestBalancerEmptyNodes(t *testing.T) {
b := influxdb.NewDataNodeBalancer([]*influxdb.DataNode{})
got := b.Next()
if got != nil {
t.Errorf("expected nil, got %v", got)
}
}

func TestBalancerUp(t *testing.T) {
nodes := newDataNodes()
b := influxdb.NewDataNodeBalancer(nodes)

// First node in randomized round-robin order
first := b.Next()
if first == nil {
t.Errorf("expected datanode, got %v", first)
}

// Second node in randomized round-robin order
second := b.Next()
if second == nil {
t.Errorf("expected datanode, got %v", second)
}

// Should never get the same node in order twice
if first.ID == second.ID {
t.Errorf("expected first != second. got %v = %v", first.ID, second.ID)
}
}

func TestBalancerDown(t *testing.T) {
nodes := newDataNodes()
b := influxdb.NewDataNodeBalancer(nodes)

nodes[0].Down()

// First node in randomized round-robin order
first := b.Next()
if first == nil {
t.Errorf("expected datanode, got %v", first)
}

// Second node should rollover to the first up node
second := b.Next()
if second == nil {
t.Errorf("expected datanode, got %v", second)
}

// Health node should be returned each time
if first.ID != 2 && first.ID != second.ID {
t.Errorf("expected first != second. got %v = %v", first.ID, second.ID)
}
}

func TestBalancerBackUp(t *testing.T) {
nodes := newDataNodes()
b := influxdb.NewDataNodeBalancer(nodes)

nodes[0].Down()

for i := 0; i < 3; i++ {
got := b.Next()
if got == nil {
t.Errorf("expected datanode, got %v", got)
}

if exp := uint64(2); got.ID != exp {
t.Errorf("wrong node id: exp %v, got %v", exp, got.ID)
}
}

nodes[0].Up()

// First node in randomized round-robin order
first := b.Next()
if first == nil {
t.Errorf("expected datanode, got %v", first)
}

// Second node should rollover to the first up node
second := b.Next()
if second == nil {
t.Errorf("expected datanode, got %v", second)
}

// Should get both nodes returned
if first.ID == second.ID {
t.Errorf("expected first != second. got %v = %v", first.ID, second.ID)
}
}
2 changes: 1 addition & 1 deletion cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ func (cmd *RunCommand) Open(config *Config, join string) *Node {

//FIXME: Need to also pass in dataURLs to bootstrap a data node
s = cmd.openServer(joinURLs)
cmd.node.DataNode = s
s.SetAuthenticationEnabled(cmd.config.Authentication.Enabled)
log.Printf("authentication enabled: %v\n", cmd.config.Authentication.Enabled)

Expand Down Expand Up @@ -568,7 +569,6 @@ func (cmd *RunCommand) openServer(joinURLs []url.URL) *influxdb.Server {
s.ComputeNoMoreThan = time.Duration(cmd.config.ContinuousQuery.ComputeNoMoreThan)
s.Version = version
s.CommitHash = commit
cmd.node.DataNode = s

// Open server with data directory and broker client.
if err := s.Open(cmd.config.Data.Dir, c); err != nil {
Expand Down
22 changes: 22 additions & 0 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func write(t *testing.T, node *TestNode, data string) {
if err != nil {
t.Fatalf("Couldn't write data: %s", err)
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
fmt.Println("BODY: ", string(body))
if resp.StatusCode != http.StatusOK {
Expand Down Expand Up @@ -1399,7 +1400,28 @@ func Test3NodeServer(t *testing.T) {

runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes))
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes))
}

func Test3NodeServerFailover(t *testing.T) {
testName := "3-node server integration"

if testing.Short() {
t.Skip(fmt.Sprintf("skipping '%s'", testName))
}
dir := tempfile()
defer func() {
os.RemoveAll(dir)
}()

nodes := createCombinedNodeCluster(t, testName, dir, 3, nil)

// kill the last node, cluster should expect it to be there
nodes[2].node.Close()
nodes = nodes[:len(nodes)-1]

runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes))
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes))
nodes.Close()
}

// ensure that all queries work if there are more nodes in a cluster than the replication factor
Expand Down
3 changes: 3 additions & 0 deletions influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ var (
// ErrDataNodeURLRequired is returned when creating a data node without a URL.
ErrDataNodeURLRequired = errors.New("data node url required")

// ErrNoDataNodeAvailable is returned when there are no data nodes available
ErrNoDataNodeAvailable = errors.New("data node not available")

// ErrDataNodeExists is returned when creating a duplicate data node.
ErrDataNodeExists = errors.New("data node exists")

Expand Down
43 changes: 25 additions & 18 deletions remote_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,25 @@ import (
"bytes"
"encoding/json"
"errors"
"io"
"net/http"

"github.com/influxdb/influxdb/influxql"
)

const (
MAX_MAP_RESPONSE_SIZE = 1024 * 1024
MAX_MAP_RESPONSE_SIZE = 1024 * 1024 * 1024
)

// RemoteMapper implements the influxql.Mapper interface. The engine uses the remote mapper
// to pull map results from shards that only exist on other servers in the cluster.
type RemoteMapper struct {
dataNodes []*DataNode
dataNodes Balancer
resp *http.Response
results chan interface{}
unmarshal influxql.UnmarshalFunc
complete bool
decoder *json.Decoder

Call string `json:",omitempty"`
Database string `json:",omitempty"`
Expand Down Expand Up @@ -77,12 +79,28 @@ func (m *RemoteMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int
return err
}

// request to start streaming results
resp, err := http.Post(m.dataNodes[0].URL.String()+"/data/run_mapper", "application/json", bytes.NewReader(b))
if err != nil {
return err
var resp *http.Response
for {
node := m.dataNodes.Next()
if node == nil {
// no data nodes are available to service this query
return ErrNoDataNodeAvailable
}

// request to start streaming results
resp, err = http.Post(node.URL.String()+"/data/run_mapper", "application/json", bytes.NewReader(b))
if err != nil {
node.Down()
continue
}
// Mark the node as up
node.Up()
break
}

m.resp = resp
lr := io.LimitReader(m.resp.Body, MAX_MAP_RESPONSE_SIZE)
m.decoder = json.NewDecoder(lr)

return nil
}
Expand All @@ -94,19 +112,8 @@ func (m *RemoteMapper) NextInterval() (interface{}, error) {
return nil, nil
}

// read the chunk
chunk := make([]byte, MAX_MAP_RESPONSE_SIZE, MAX_MAP_RESPONSE_SIZE)
n, err := m.resp.Body.Read(chunk)
if err != nil {
return nil, err
}
if n == 0 {
return nil, nil
}

// marshal the response
mr := &MapResponse{}
err = json.Unmarshal(chunk[:n], mr)
err := m.decoder.Decode(&mr)
if err != nil {
return nil, err
}
Expand Down
Loading