diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index ed378d32afe..55897922cb0 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -1390,7 +1390,31 @@ func Test3NodeServer(t *testing.T) { runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes)) runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes)) +} + +func Test3NodeServerFailover(t *testing.T) { + testName := "3-node server integration" + if testing.Short() { + t.Skip(fmt.Sprintf("skipping '%s'", testName)) + } + dir := tempfile() + defer func() { + os.RemoveAll(dir) + }() + + nodes := createCombinedNodeCluster(t, testName, dir, 3, nil) + + // allow cluster to stabilize + time.Sleep(time.Second) + + // kill the last node, cluster should expect it to be there + nodes[2].node.Close() + nodes = nodes[:len(nodes)-1] + + runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes)) + runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes)) + nodes.Close() } // ensure that all queries work if there are more nodes in a cluster than the replication factor diff --git a/remote_mapper.go b/remote_mapper.go index 8fcce413831..227030dfd49 100644 --- a/remote_mapper.go +++ b/remote_mapper.go @@ -79,20 +79,25 @@ func (m *RemoteMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int return err } - node := m.dataNodes.Next() - if node == nil { - // no data nodes are available to service this query - return ErrShardNotFound - } + var resp *http.Response + for { + node := m.dataNodes.Next() + if node == nil { + // no data nodes are available to service this query + return ErrShardNotFound + } - // request to start streaming results - resp, err := http.Post(node.URL.String()+"/data/run_mapper", "application/json", bytes.NewReader(b)) - if err != nil { - node.Down() - return err + // request to start streaming results + resp, err = http.Post(node.URL.String()+"/data/run_mapper", "application/json", bytes.NewReader(b)) + if err != nil { + node.Down() + continue + } + // Mark the node as up + node.Up() + break } - // Mark it as up - node.Up() + m.resp = resp lr := io.LimitReader(m.resp.Body, MAX_MAP_RESPONSE_SIZE) m.decoder = json.NewDecoder(lr) diff --git a/server.go b/server.go index 1dbf298993c..e694491cab2 100644 --- a/server.go +++ b/server.go @@ -3548,7 +3548,7 @@ func (d *DataNode) Down() { log.Printf("data node %s is offline for %ds", d.URL.String(), t) } -// Up marks this DataNode as online if was currently down +// Up marks this DataNode as online if it was currently down func (d *DataNode) Up() { d.mu.RLock() if d.downCount != 0 {