Skip to content

Commit

Permalink
Add failover to other data nodes for distributed queries
Browse files Browse the repository at this point in the history
Fixes #2190
  • Loading branch information
jwilder committed Apr 15, 2015
1 parent 14a6c8c commit 131486a
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 13 deletions.
24 changes: 24 additions & 0 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1390,7 +1390,31 @@ func Test3NodeServer(t *testing.T) {

runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes))
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes))
}

func Test3NodeServerFailover(t *testing.T) {
testName := "3-node server integration"

if testing.Short() {
t.Skip(fmt.Sprintf("skipping '%s'", testName))
}
dir := tempfile()
defer func() {
os.RemoveAll(dir)
}()

nodes := createCombinedNodeCluster(t, testName, dir, 3, nil)

// allow cluster to stabilize
time.Sleep(time.Second)

// kill the last node, cluster should expect it to be there
nodes[2].node.Close()
nodes = nodes[:len(nodes)-1]

runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes))
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes))
nodes.Close()
}

// ensure that all queries work if there are more nodes in a cluster than the replication factor
Expand Down
29 changes: 17 additions & 12 deletions remote_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,25 @@ func (m *RemoteMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int
return err
}

node := m.dataNodes.Next()
if node == nil {
// no data nodes are available to service this query
return ErrShardNotFound
}
var resp *http.Response
for {
node := m.dataNodes.Next()
if node == nil {
// no data nodes are available to service this query
return ErrShardNotFound
}

// request to start streaming results
resp, err := http.Post(node.URL.String()+"/data/run_mapper", "application/json", bytes.NewReader(b))
if err != nil {
node.Down()
return err
// request to start streaming results
resp, err = http.Post(node.URL.String()+"/data/run_mapper", "application/json", bytes.NewReader(b))
if err != nil {
node.Down()
continue
}
// Mark the node as up
node.Up()
break
}
// Mark it as up
node.Up()

m.resp = resp
lr := io.LimitReader(m.resp.Body, MAX_MAP_RESPONSE_SIZE)
m.decoder = json.NewDecoder(lr)
Expand Down
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3548,7 +3548,7 @@ func (d *DataNode) Down() {
log.Printf("data node %s is offline for %ds", d.URL.String(), t)
}

// Up marks this DataNode as online if was currently down
// Up marks this DataNode as online if it was currently down
func (d *DataNode) Up() {
d.mu.RLock()
if d.downCount != 0 {
Expand Down

0 comments on commit 131486a

Please sign in to comment.