Skip to content

Commit

Permalink
sql,distsqlrun,gossip: don't plan on incompatible nodes
Browse files Browse the repository at this point in the history
If we attempt to schedule a flow on a node who's DistSQL version is
incompatible, the scheduling is going to return an error.
This patch attempts to minimize the occurrence of these errors by not
planning flows on incompatible nodes. This is done by having each node
gossip it's range of accepted versions, and having planning consult
gossip before deciding to map key spans onto a node. Spans owned by
incompatible nodes are mapped to the gateway.

The planning version check is done in distSQLPlanner.partitionSpans().
This may not sounds like the right place for it, but there's currently
no better place. That's the layer that's currently similarly concerned
with node health because everything planned above TableReaders currently
mechanically follows the set of nodes decided in partitionSpans().
An alternative would be to lift the remapping onto the gateway to a
step done after the plan has been build, but that would lead to worse
plans.
  • Loading branch information
andreimatei committed Aug 17, 2017
1 parent caa0565 commit 110558c
Show file tree
Hide file tree
Showing 13 changed files with 600 additions and 145 deletions.
22 changes: 19 additions & 3 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,22 @@ var (
defaultGossipStoresInterval)
)

// KeyNotPresentError is returned by gossip when queried for a key that doesn't
// exist of has expired.
type KeyNotPresentError struct {
key string
}

// Error implements the error interface.
func (err KeyNotPresentError) Error() string {
return fmt.Sprintf("KeyNotPresentError: gossip key %q does not exist or has expired", err.key)
}

// NewKeyNotPresentError creates a new KeyNotPresentError.
func NewKeyNotPresentError(key string) error {
return KeyNotPresentError{key: key}
}

// Storage is an interface which allows the gossip instance
// to read and write bootstrapping data to persistent storage
// between instantiations.
Expand Down Expand Up @@ -788,7 +804,7 @@ func (g *Gossip) AddInfoProto(key string, msg proto.Message, ttl time.Duration)
return g.AddInfo(key, bytes, ttl)
}

// GetInfo returns an info value by key or an error if specified
// GetInfo returns an info value by key or an KeyNotPresentError if specified
// key does not exist or has expired.
func (g *Gossip) GetInfo(key string) ([]byte, error) {
g.mu.Lock()
Expand All @@ -801,10 +817,10 @@ func (g *Gossip) GetInfo(key string) ([]byte, error) {
}
return i.Value.GetBytes()
}
return nil, errors.Errorf("key %q does not exist or has expired", key)
return nil, NewKeyNotPresentError(key)
}

// GetInfoProto returns an info value by key or an error if specified
// GetInfoProto returns an info value by key or KeyNotPresentError if specified
// key does not exist or has expired.
func (g *Gossip) GetInfoProto(key string, msg proto.Message) error {
bytes, err := g.GetInfo(key)
Expand Down
9 changes: 9 additions & 0 deletions pkg/gossip/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ const (
// The value if a config.SystemConfig which holds all key/value
// pairs in the system DB span.
KeySystemConfig = "system-db"

// KeyDistSQLNodeVersionKeyPrefix is key prefix for each node's DistSQL
// version.
KeyDistSQLNodeVersionKeyPrefix = "distsql-version"
)

// MakeKey creates a canonical key under which to gossip a piece of
Expand Down Expand Up @@ -128,3 +132,8 @@ func MakeStoreKey(storeID roachpb.StoreID) string {
func MakeDeadReplicasKey(storeID roachpb.StoreID) string {
return MakeKey(KeyDeadReplicasPrefix, storeID.String())
}

// MakeDistSQLNodeVersionKey returns the gossip key for the given store.
func MakeDistSQLNodeVersionKey(nodeID roachpb.NodeID) string {
return MakeKey(KeyDistSQLNodeVersionKeyPrefix, nodeID.String())
}
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
Metrics: &distSQLMetrics,

JobRegistry: s.jobRegistry,
Gossip: s.gossip,
}
if distSQLTestingKnobs := s.cfg.TestingKnobs.DistSQL; distSQLTestingKnobs != nil {
distSQLCfg.TestingKnobs = *distSQLTestingKnobs.(*distsqlrun.TestingKnobs)
Expand Down
57 changes: 54 additions & 3 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ import (
// and add processing stages (connected to the result routers of the children
// node).
type distSQLPlanner struct {
// planVersion is the version of DistSQL targeted by the plan we're building.
// This is currently only assigned to the node's current DistSQL version and
// is used to skip incompatible nodes when mapping spans.
planVersion distsqlrun.DistSQLVersion

st *cluster.Settings
// The node descriptor for the gateway node that initiated this query.
nodeDesc roachpb.NodeDescriptor
Expand All @@ -72,6 +77,9 @@ type distSQLPlanner struct {
// runnerChan is used to send out requests (for running SetupFlow RPCs) to a
// pool of workers.
runnerChan chan runnerRequest

// gossip handle use to check node version compatibility
gossip *gossip.Gossip
}

const resolverPolicy = distsqlplan.BinPackingLeaseHolderChoice
Expand All @@ -81,6 +89,7 @@ const resolverPolicy = distsqlplan.BinPackingLeaseHolderChoice
var logPlanDiagram = envutil.EnvOrDefaultBool("COCKROACH_DISTSQL_LOG_PLAN", false)

func newDistSQLPlanner(
planVersion distsqlrun.DistSQLVersion,
st *cluster.Settings,
nodeDesc roachpb.NodeDescriptor,
rpcCtx *rpc.Context,
Expand All @@ -91,11 +100,13 @@ func newDistSQLPlanner(
testingKnobs DistSQLPlannerTestingKnobs,
) *distSQLPlanner {
dsp := &distSQLPlanner{
planVersion: planVersion,
st: st,
nodeDesc: nodeDesc,
rpcContext: rpcCtx,
stopper: stopper,
distSQLSrv: distSQLSrv,
gossip: gossip,
spanResolver: distsqlplan.NewSpanResolver(distSender, gossip, nodeDesc, resolverPolicy),
testingKnobs: testingKnobs,
}
Expand Down Expand Up @@ -412,6 +423,10 @@ type spanPartition struct {
// spanPartitions (one for each relevant node), which form a partitioning of the
// spans (i.e. they are non-overlapping and their union is exactly the original
// set of spans).
//
// partitionSpans does its best to not assign ranges on nodes that are known to
// either be unhealthy or running an incompatible version. The ranges owned by
// such nodes are assigned to the gateway.
func (dsp *distSQLPlanner) partitionSpans(
planCtx *planningCtx, spans roachpb.Spans,
) ([]spanPartition, error) {
Expand All @@ -422,6 +437,9 @@ func (dsp *distSQLPlanner) partitionSpans(
partitions := make([]spanPartition, 0, 1)
// nodeMap maps a nodeID to an index inside the partitions array.
nodeMap := make(map[roachpb.NodeID]int)
// nodeVerCompatMap maintains info about which nodes advertise DistSQL
// versions compatible with this plan and which ones don't.
nodeVerCompatMap := make(map[roachpb.NodeID]bool)
it := planCtx.spanIter
for _, span := range spans {
var rspan roachpb.RSpan
Expand Down Expand Up @@ -489,12 +507,26 @@ func (dsp *distSQLPlanner) partitionSpans(
}
planCtx.nodeAddresses[nodeID] = addr
}
if addr == "" {
// An empty address indicates an unhealthy host. Use the gateway to
// process this span instead of the unhealthy host.
var compat bool
if addr != "" {
// Check if the node's DistSQL version is compatible with this plan.
// If it isn't, we'll use the gateway.
var ok bool
if compat, ok = nodeVerCompatMap[nodeID]; !ok {
compat = dsp.nodeVersionIsCompatible(nodeID, dsp.planVersion)
nodeVerCompatMap[nodeID] = compat
}
}
// If the node is unhealthy or its DistSQL version is incompatible, use
// the gateway to process this span instead of the unhealthy host.
// An empty address indicates an unhealthy host.
if addr == "" || !compat {
log.Eventf(ctx, "not planning on node %d. unhealthy: %t, incompatible version: %t",
nodeID, addr == "", !compat)
nodeID = dsp.nodeDesc.NodeID
partitionIdx, inNodeMap = nodeMap[nodeID]
}

if !inNodeMap {
partitionIdx = len(partitions)
partitions = append(partitions, spanPartition{node: nodeID})
Expand Down Expand Up @@ -525,6 +557,25 @@ func (dsp *distSQLPlanner) partitionSpans(
return partitions, nil
}

// nodeVersionIsCompatible decides whether a particular node's DistSQL version
// is compatible with planVer. It uses gossip to find out the node's version
// range.
func (dsp *distSQLPlanner) nodeVersionIsCompatible(
nodeID roachpb.NodeID, planVer distsqlrun.DistSQLVersion,
) bool {
var v distsqlrun.DistSQLVersionGossipInfo
if hook := dsp.testingKnobs.OverrideDistSQLVersionCheck; hook != nil {
if err := hook(nodeID, &v); err != nil {
return false
}
} else {
if err := dsp.gossip.GetInfoProto(gossip.MakeDistSQLNodeVersionKey(nodeID), &v); err != nil {
return false
}
}
return distsqlrun.FlowVerIsCompatible(dsp.planVersion, v.MinAcceptedVersion, v.Version)
}

// initTableReaderSpec initializes a TableReaderSpec/PostProcessSpec that
// corresponds to a scanNode, except for the Spans and OutputColumns.
func initTableReaderSpec(
Expand Down
167 changes: 166 additions & 1 deletion pkg/sql/distsql_physical_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -636,7 +637,7 @@ func TestPartitionSpans(t *testing.T) {
// spans to be passed to partitionSpans
spans [][2]string

// expected result: a list of spans, one for each node.
// expected result: a map of node to list of spans.
partitions map[int][][2]string
}{
{
Expand Down Expand Up @@ -736,6 +737,7 @@ func TestPartitionSpans(t *testing.T) {
tsp.ranges = tc.ranges

dsp := distSQLPlanner{
planVersion: distsqlrun.Version,
st: cluster.MakeTestingClusterSettings(),
nodeDesc: *tsp.nodes[tc.gatewayNode-1],
stopper: stopper,
Expand All @@ -749,6 +751,14 @@ func TestPartitionSpans(t *testing.T) {
}
return nil
},
OverrideDistSQLVersionCheck: func(
node roachpb.NodeID, res *distsqlrun.DistSQLVersionGossipInfo,
) error {
// Accept any version.
res.MinAcceptedVersion = 0
res.Version = 1000000
return nil
},
},
}

Expand Down Expand Up @@ -781,3 +791,158 @@ func TestPartitionSpans(t *testing.T) {
})
}
}

// Test that span partitioning takes into account the advertised acceptable
// versions of each node. Spans for which the owner node doesn't support our
// plan's version will be planned on the gateway.
func TestPartitionSpansSkipsIncompatibleNodes(t *testing.T) {
defer leaktest.AfterTest(t)()

// The spans that we're going to plan for.
span := roachpb.Span{Key: roachpb.Key("A"), EndKey: roachpb.Key("Z")}
gatewayNode := roachpb.NodeID(2)
ranges := []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}}

testCases := []struct {
// the test's name
name string

// planVersion is the DistSQL version that this plan is targeting.
// We'll play with this version and expect nodes to be skipped because of
// this.
planVersion distsqlrun.DistSQLVersion

// The versions accepted by each node.
nodeVersions map[roachpb.NodeID]distsqlrun.DistSQLVersionGossipInfo

// nodesNotInGossip is the set of nodes for which we're going to pretend
// that gossip returns an error when queried about the DistSQL version.
nodesNotInGossip map[roachpb.NodeID]struct{}

// expected result: a map of node to list of spans.
partitions map[roachpb.NodeID][][2]string
}{
{
// In the first test, all nodes are compatible.
name: "current_version",
planVersion: 2,
nodeVersions: map[roachpb.NodeID]distsqlrun.DistSQLVersionGossipInfo{
1: {
MinAcceptedVersion: 1,
Version: 2,
},
2: {
MinAcceptedVersion: 1,
Version: 2,
},
},
partitions: map[roachpb.NodeID][][2]string{
1: {{"A", "B"}, {"C", "Z"}},
2: {{"B", "C"}},
},
},
{
// Plan version is incompatible with node 1. We expect everything to be
// assigned to the gateway.
// Remember that the gateway is node 2.
name: "next_version",
planVersion: 3,
nodeVersions: map[roachpb.NodeID]distsqlrun.DistSQLVersionGossipInfo{
1: {
MinAcceptedVersion: 1,
Version: 2,
},
2: {
MinAcceptedVersion: 3,
Version: 3,
},
},
partitions: map[roachpb.NodeID][][2]string{
2: {{"A", "Z"}},
},
},
{
// Like the above, except node 1 is not gossiping its version (simulating
// a crdb 1.0 node).
name: "crdb_1.0",
planVersion: 3,
nodeVersions: map[roachpb.NodeID]distsqlrun.DistSQLVersionGossipInfo{
2: {
MinAcceptedVersion: 3,
Version: 3,
},
},
nodesNotInGossip: map[roachpb.NodeID]struct{}{
1: {},
},
partitions: map[roachpb.NodeID][][2]string{
2: {{"A", "Z"}},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())

tsp := &testSpanResolver{}
for i := 1; i <= 2; i++ {
tsp.nodes = append(tsp.nodes, &roachpb.NodeDescriptor{
NodeID: roachpb.NodeID(i),
Address: util.UnresolvedAddr{
AddressField: fmt.Sprintf("addr%d", i),
},
})
}
tsp.ranges = ranges

dsp := distSQLPlanner{
planVersion: tc.planVersion,
st: cluster.MakeTestingClusterSettings(),
nodeDesc: *tsp.nodes[gatewayNode-1],
stopper: stopper,
spanResolver: tsp,
testingKnobs: DistSQLPlannerTestingKnobs{
OverrideHealthCheck: func(node roachpb.NodeID, addr string) error {
// All the nodes are healthy.
return nil
},
OverrideDistSQLVersionCheck: func(
nodeID roachpb.NodeID, res *distsqlrun.DistSQLVersionGossipInfo,
) error {
if _, ok := tc.nodesNotInGossip[nodeID]; ok {
return gossip.NewKeyNotPresentError(
gossip.MakeDistSQLNodeVersionKey(nodeID),
)
}
*res = tc.nodeVersions[nodeID]
return nil
},
},
}

planCtx := dsp.NewPlanningCtx(context.Background(), nil /* txn */)
partitions, err := dsp.partitionSpans(&planCtx, roachpb.Spans{span})
if err != nil {
t.Fatal(err)
}

resMap := make(map[roachpb.NodeID][][2]string)
for _, p := range partitions {
if _, ok := resMap[p.node]; ok {
t.Fatalf("node %d shows up in multiple partitions", p)
}
var spans [][2]string
for _, s := range p.spans {
spans = append(spans, [2]string{string(s.Key), string(s.EndKey)})
}
resMap[p.node] = spans
}

if !reflect.DeepEqual(resMap, tc.partitions) {
t.Errorf("expected partitions:\n %v\ngot:\n %v", tc.partitions, resMap)
}
})
}
}
Loading

0 comments on commit 110558c

Please sign in to comment.