Skip to content

Commit

Permalink
stability: implement node liveness; first step towards new range leases
Browse files Browse the repository at this point in the history
This change adds a node liveness table as a global system table.
Nodes periodically write updates to their liveness record by doing
a conditional put to the liveness table. The leader of the range
containing the node liveness table gossips the latest information
to the rest of the system.

Each node has a `NodeLiveness` object which can be used to query
the status of any other node to find out if it's live or non-live
according to the liveness threshold duration compared to the last
time it successfully heartbeat its liveness record.

The as-yet-unused `IncrementEpoch` mechanism is also added in this
PR, for eventual use with the planned epoch-based range leader leases.

Updated the range leader lease RFC to reflect current thinking.
  • Loading branch information
spencerkimball committed Sep 28, 2016
1 parent aec8167 commit e8dd5da
Show file tree
Hide file tree
Showing 36 changed files with 1,610 additions and 254 deletions.
178 changes: 122 additions & 56 deletions docs/RFCS/range_leases.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,53 +45,76 @@ its replicas holding range leases at once?

# Detailed design

We introduce a new node lease table at the beginning of the keyspace
We introduce a new node liveness table at the beginning of the keyspace
(not an actual SQL table; it will need to be accessed with lower-level
APIs). This table is special in several respects: it is gossipped, and
APIs). This table is special in several respects: it is gossiped, and
leases within its keyspace (and all ranges that precede it, including
meta1 and meta2) use the current, per-range lease mechanism to avoid
circular dependencies. This table maps node IDs to an epoch counter
and a lease expiration timestamp.

The range lease is moved from a special raft command (which writes to
a range-local, non-transactional range lease key) to a transactional
range-local key (similar to the range descriptor). The range lease
identifies the node that holds the lease and its epoch counter. It has
a start timestamp but does not include an expiration time. The range
lease record is always updated in a distributed transaction with the
node lease record to ensure that the epoch counter is consistent and
the start time is greater than the prior range lease holder’s node
lease expiration (plus the maximum clock offset).

Each node periodically performs a conditional put to its node lease to
increase the expiration timestamp and ensure that the epoch has not
changed. If the epoch does change, *all* of the range leases held by
this node are revoked. A node *must not* propose any commands with a
timestamp greater than the latest expiration timestamp it has written
to the node lease table.
circular dependencies. This table maps node IDs to an epoch counter,
and an expiration timestamp.

## Liveness table

Column| Description
NodeID| node identifier
Epoch| monotonically increasing liveness epoch
Expiration| timestamp at which the liveness record expires

Each node periodically performs a conditional put to its node liveness
record to increase the expiration timestamp and ensure that the epoch
has not changed. If the epoch does change, *all* of the range leases
held by this node are revoked. A node *must not* propose any commands
or serve any reads with a timestamp greater than its expiration
timestamp minus the maximum clock offset.

The node liveness table is gossiped by the range lease holder for the
range which contains it. Gossip is used in order to minimize fanout
and make distribution efficient. The best-effort nature of gossip is
acceptable here because timely delivery of node livesness updates are
not required for system correctness. Any node which fails to receive
liveness updates will simply resort to a conditional put to increment
a seemingly not-live node's liveness epoch. The conditional put will
fail because the expected value is out of date and the correct liveness
info is returned to the caller.

A range lease is valid for as long as the node’s lease has the same
epoch. If a node is down (and its node lease has expired), another
node may revoke its lease(s) by incrementing the node lease
epoch. If a node is down (and its node liveness has expired), another
node may revoke its lease(s) by incrementing the node liveness
epoch. Once this is done the old range lease is invalidated and a new
node may claim the range lease.

A node can transfer a range lease it owns without incrementing the
epoch counter by means of a conditional put to the range lease record
to set the new leaseholder. This is necessary in the case of
rebalancing when the node that holds the range lease is being
removed. `AdminTransferLease` will be enhanced to perform transfers
correctly using node lease style range leases.

A replica claims the range lease by executing a transaction which
reads the replica’s node lease epoch and then does a conditional put
on the range-local range lease record. The transaction record will be
local to the range lease record, so intents will always be cleaned on
commit or abort. There are never intents on the node lease because
they’re only updated via a conditional put. Nodes either renew based
on their last read value, or revoke another node’s lease based on the
last gossiped value. The conditional put either succeeds or fails, but
is never written as part of a transaction.
to set the new leaseholder or else set the leaseholder to 0. This is
necessary in the case of rebalancing when the node that holds the
range lease is being removed. `AdminTransferLease` will be enhanced to
perform transfers correctly using epoch-based range leases.

Nodes which propose or transfer an epoch-based range lease must
themselves be live according to the liveness table. Keep in mind that
a node considers itself live according to whether it has successfully
written a recent liveness record which proves its liveness measured
by current time vs the record's expiration minus the maximum clock
offset.

To propose an epoch-based range lease, the existing lease must either
be a traditional, expiration-based lease, with the proposer being the
leaseholder or the lease being expired, -or- be an epoch-based lease
where the proposer is the leaseholder or the leaseholder is 0, or have
an old epoch. Other replicas in the same range will always accept a
range lease request where the epoch is being incremented -- that is,
they defer to the veracity of the proposer's outlook on the liveness
table. They do not consult their outlook on the liveness table and can
even be disconnected from gossip.

[NB: previously this RFC recommended a distributed transaction to
update the range lease record. See note in "Alternatives" below for
details on why that's unnecessary.]

In addition to nodes updating their own liveness entry via conditional
puts, non-leaseholder nodes may increment the epoch of a node which
has failed to update its heartbeat in time to keep it younger than the
expiration time.

At the raft level, each command currently contains the node ID that
held the lease at the time the command was proposed. This will be
Expand All @@ -101,25 +124,48 @@ node ID and epoch match the last committed lease, the command will be
applied; otherwise it will be rejected.


# Performance implications

We expect traffic proportional to the number of nodes in the system.
With 1,000 nodes and a 3s liveness duration threshold, we expect every
node to do a conditional put to update the heartbeat timestamp every
2.4s. That would correspond to ~417 reqs/second, a not-unreasonable
load for this function. By contrast, using expiration-based leases in
a cluster with 1,000 nodes and 10,000 ranges / node, we'd expect to
see (10,000 ranges * 1,000 nodes / 3 replicas-per-range / 2.4s)
~= 1.39M reqs / second.

We still require the traditional expiration-based range leases for any
ranges located at or before the liveness table's range. This might be
problematic in the case of meta2 address record ranges, which are
expected to proliferate in a large cluster. This lease traffic could
be obviated if we moved the node liveness table to the very start of
the keyspace, but the historical apportionment of that keyspace makes
such a change difficult. A rough calculation puts the number of meta2
ranges at between 10 and 50 for a 10M range cluster, so this seems
safe to ignore for the conceivable future.


# Drawbacks

The greatest drawback is relying on the availability of the node lease
table. This presents a single point of failure which is not as severe
in the current system. Even though the first range is crucial to
addressing data in the system, those reads can be inconsistent and
meta1 records change slowly, so availability is likely to be good even
in the event the first range can’t reach consensus. A reasonable
solution is to increase the number of replicas in the zones including
the node lease table - something that is generally considered sound
practice in any case. [NB: we also rely on the availability of various
system tables. For example, if the `system.lease` table is unavailable
we won't be able to serve any SQL traffic].
The greatest drawback is relying on the availability of the node
liveness table. This presents a single point of failure which is not
as severe in the current system. Even though the first range is
crucial to addressing data in the system, those reads can be
inconsistent and meta1 records change slowly, so availability is
likely to be good even in the event the first range can’t reach
consensus. A reasonable solution is to increase the number of replicas
in the zones including the node liveness table - something that is
generally considered sound practice in any case. [NB: we also rely on
the availability of various system tables. For example, if the
`system.lease` table is unavailable we won't be able to serve any SQL
traffic].

Another drawback is the concentration of write traffic to the node
lease table. This could be mitigated by splitting the node lease table
at arbitrary resolutions, perhaps even so there’s a single node lease
per range. This is unlikely to be much of a problem unless the number
of nodes in the system is significant.
liveness table. This could be mitigated by splitting the node liveness
table at arbitrary resolutions, perhaps even so there’s a single node
liveness record per range. This is unlikely to be much of a problem
unless the number of nodes in the system is significant.


# Alternatives
Expand All @@ -133,6 +179,30 @@ their time on lease updates.
If we used copysets, there may be an opportunity to maintain lease holder
leases at the granularity of copysets.

## Use of distributed txn for updating liveness records

The original proposal mentioned: "The range lease record is always
updated in a distributed transaction with the node liveness record to
ensure that the epoch counter is consistent and the start time is
greater than the prior range lease holder’s node liveness expiration
(plus the maximum clock offset)."

This has been abandoned mostly out of a desire to avoid changing the
nature of the range lease record and the range lease raft command. To
see why it's not necessary, consider a range lease being updated out
of sync with the node liveness table. That would mean either that the
epoch being incremented is older than the epoch in the liveness table
or else at a timestamp which has already expired. It's not possible to
update to a later epoch or newer timestamp than what's in the liveness
table because epochs are taken directly from the liveness table and
are incremented monotonically; timestamps are proposed only within the
bounds by which a node has successfully heartbeat the liveness table.

In the event of an earlier timestamp or epoch, the proposer would
succeed at the range lease, but then fail immediately on attempting to
use the range lease, as it could not possibly still have an HLC clock
time corresponding to the now-old epoch at which it acquired the lease


# Unresolved questions

Expand All @@ -142,7 +212,3 @@ range leases using the proposed system.

How does this mechanism inform future designs to incorporate quorum
leases?

TODO(peter): What is the motivation for gossipping the node lease
table? Gossipping means the node's will have out of date info for the
table.
8 changes: 8 additions & 0 deletions gossip/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ const (
// string address of the node. E.g. node:1 => 127.0.0.1:24001
KeyNodeIDPrefix = "node"

// KeyNodeLivenessPrefix is the key prefix for gossiping node liveness info.
KeyNodeLivenessPrefix = "liveness"

// KeySentinel is a key for gossip which must not expire or
// else the node considers itself partitioned and will retry with
// bootstrap hosts. The sentinel is gossiped by the node that holds
Expand Down Expand Up @@ -91,6 +94,11 @@ func MakeNodeIDKey(nodeID roachpb.NodeID) string {
return MakeKey(KeyNodeIDPrefix, nodeID.String())
}

// MakeNodeLivenessKey returns the gossip key for node liveness info.
func MakeNodeLivenessKey(nodeID roachpb.NodeID) string {
return MakeKey(KeyNodeLivenessPrefix, nodeID.String())
}

// MakeStoreKey returns the gossip key for the given store.
func MakeStoreKey(storeID roachpb.StoreID) string {
return MakeKey(KeyStorePrefix, storeID.String())
Expand Down
11 changes: 11 additions & 0 deletions keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,17 @@ var (
SystemPrefix = roachpb.Key{systemPrefixByte}
SystemMax = roachpb.Key{systemMaxByte}

// NodeLivenessPrefix specifies the key prefix for the node liveness
// table. Note that this should sort before the rest of the system
// keyspace in order to limit the number of ranges which must use
// expiration-based range leases instead of the more efficient
// node-liveness epoch-based range leases (see
// https://github.com/cockroachdb/cockroach/blob/develop/docs/RFCS/range_leases.md)
NodeLivenessPrefix = roachpb.Key(makeKey(SystemPrefix, roachpb.RKey("\x00liveness-")))

// NodeLivenessKeyMax is the maximum value for any node liveness key.
NodeLivenessKeyMax = roachpb.Key(roachpb.RKey(NodeLivenessPrefix).PrefixEnd())

// DescIDGenerator is the global descriptor ID generator sequence used for
// table and namespace IDs.
DescIDGenerator = roachpb.Key(makeKey(SystemPrefix, roachpb.RKey("desc-idgen")))
Expand Down
12 changes: 10 additions & 2 deletions keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,17 @@ func StoreGossipKey() roachpb.Key {
return MakeStoreKey(localStoreGossipSuffix, nil)
}

// NodeLivenessKey returns the key for the node liveness record.
func NodeLivenessKey(nodeID roachpb.NodeID) roachpb.Key {
key := make(roachpb.Key, 0, len(NodeLivenessPrefix)+9)
key = append(key, NodeLivenessPrefix...)
key = encoding.EncodeUvarintAscending(key, uint64(nodeID))
return key
}

// NodeStatusKey returns the key for accessing the node status for the
// specified node ID.
func NodeStatusKey(nodeID int32) roachpb.Key {
func NodeStatusKey(nodeID roachpb.NodeID) roachpb.Key {
key := make(roachpb.Key, 0, len(StatusNodePrefix)+9)
key = append(key, StatusNodePrefix...)
key = encoding.EncodeUvarintAscending(key, uint64(nodeID))
Expand All @@ -63,7 +71,7 @@ func NodeStatusKey(nodeID int32) roachpb.Key {

// NodeLastUsageReportKey returns the key for accessing the node last update check
// time (when version check or usage reporting was done).
func NodeLastUsageReportKey(nodeID int32) roachpb.Key {
func NodeLastUsageReportKey(nodeID roachpb.NodeID) roachpb.Key {
prefix := append([]byte(nil), UpdateCheckPrefix...)
return encoding.EncodeUvarintAscending(prefix, uint64(nodeID))
}
Expand Down
9 changes: 9 additions & 0 deletions keys/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ var (
}},
},
{name: "/System", start: SystemPrefix, end: SystemMax, entries: []dictEntry{
{name: "/NodeLiveness", prefix: NodeLivenessPrefix,
ppFunc: decodeKeyPrint,
psFunc: parseUnsupported,
},
{name: "/NodeLivenessMax", prefix: NodeLivenessKeyMax,
ppFunc: decodeKeyPrint,
psFunc: parseUnsupported,
},
{name: "/StatusNode", prefix: StatusNodePrefix,
ppFunc: decodeKeyPrint,
psFunc: parseUnsupported,
Expand Down Expand Up @@ -481,6 +489,7 @@ func prettyPrintInternal(key roachpb.Key) (string, bool) {
// /Meta1/[key] "\x02"+[key]
// /Meta2/[key] "\x03"+[key]
// /System/... "\x04"
// /NodeLiveness/[key] "\x04\0x00liveness-"+[key]
// /StatusNode/[key] "\x04status-node-"+[key]
// /System/Max "\x05"
//
Expand Down
1 change: 1 addition & 0 deletions keys/printer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func TestPrettyPrint(t *testing.T) {
{makeKey(Meta1Prefix, roachpb.Key("foo")), `/Meta1/"foo"`},
{RangeMetaKey(roachpb.RKey("f")), `/Meta2/"f"`},

{NodeLivenessKey(10033), "/System/NodeLiveness/10033"},
{NodeStatusKey(1111), "/System/StatusNode/1111"},

{SystemMax, "/System/Max"},
Expand Down
14 changes: 11 additions & 3 deletions keys/spans.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,22 @@ var (
// Meta1Span holds all first level addressing.
Meta1Span = roachpb.Span{Key: roachpb.KeyMin, EndKey: Meta2Prefix}

// UserDataSpan is the non-meta and non-structured portion of the key space.
UserDataSpan = roachpb.Span{Key: SystemMax, EndKey: TableDataMin}
// NodeLivenessSpan holds the liveness records for nodes in the cluster.
NodeLivenessSpan = roachpb.Span{Key: NodeLivenessPrefix, EndKey: NodeLivenessKeyMax}

// SystemConfigSpan is the range of system objects which will be gossiped.
SystemConfigSpan = roachpb.Span{Key: TableDataMin, EndKey: SystemConfigTableDataMax}

// UserDataSpan is the non-meta and non-structured portion of the key space.
UserDataSpan = roachpb.Span{Key: SystemMax, EndKey: TableDataMin}

// GossipedSystemSpans are spans which contain system data which needs to be
// shared with other nodes in the system via gossip.
GossipedSystemSpans = []roachpb.Span{NodeLivenessSpan, SystemConfigSpan}

// NoSplitSpans describes the ranges that should never be split.
// Meta1Span: needed to find other ranges.
// NodeLivenessSpan: liveness information on nodes in the cluster.
// SystemConfigSpan: system objects which will be gossiped.
NoSplitSpans = []roachpb.Span{Meta1Span, SystemConfigSpan}
NoSplitSpans = []roachpb.Span{Meta1Span, NodeLivenessSpan, SystemConfigSpan}
)
16 changes: 4 additions & 12 deletions kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,6 @@ func (f firstRangeMissingError) Error() string {
return "the descriptor for the first range is not available via gossip"
}

// A noNodesAvailError specifies that no node addresses in a replica set
// were available via the gossip network.
type noNodeAddrsAvailError struct{}

// Error implements the error interface.
func (n noNodeAddrsAvailError) Error() string {
return "no replica node addresses available via gossip"
}

// A DistSender provides methods to access Cockroach's monolithic,
// distributed key value store. Each method invocation triggers a
// lookup or lookups to find replica metadata for implicated key
Expand Down Expand Up @@ -213,10 +204,10 @@ func NewDistSender(cfg *DistSenderConfig, g *gossip.Gossip) *DistSender {
if log.V(1) {
var desc roachpb.RangeDescriptor
if err := value.GetProto(&desc); err != nil {
log.Errorf(ds.Ctx, "unable to parse gossipped first range descriptor: %s", err)
log.Errorf(ds.Ctx, "unable to parse gossiped first range descriptor: %s", err)
} else {
log.Infof(ds.Ctx,
"gossipped first range descriptor: %+v", desc.Replicas)
"gossiped first range descriptor: %+v", desc.Replicas)
}
}
err := ds.rangeCache.EvictCachedRangeDescriptor(roachpb.RKeyMin, nil, false)
Expand Down Expand Up @@ -353,7 +344,8 @@ func (ds *DistSender) sendRPC(
ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
if len(replicas) == 0 {
return nil, noNodeAddrsAvailError{}
return nil, roachpb.NewSendError(
fmt.Sprintf("no replica node addresses available via gossip for range %d", rangeID))
}

// TODO(pmattis): This needs to be tested. If it isn't set we'll
Expand Down
2 changes: 1 addition & 1 deletion kv/replica_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type ReplicaSlice []ReplicaInfo

// newReplicaSlice creates a ReplicaSlice from the replicas listed in the range
// descriptor and using gossip to lookup node descriptors. Replicas on nodes
// that are not gossipped are omitted from the result.
// that are not gossiped are omitted from the result.
func newReplicaSlice(gossip *gossip.Gossip, desc *roachpb.RangeDescriptor) ReplicaSlice {
if gossip == nil {
return nil
Expand Down
2 changes: 1 addition & 1 deletion roachpb/api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e8dd5da

Please sign in to comment.