Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: Range with min/max mod revision #6411

Merged
merged 3 commits into from
Sep 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Documentation/dev-guide/api_reference_v3.md
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,8 @@ Empty field.
| serializable | serializable sets the range request to use serializable member-local reads. Range requests are linearizable by default; linearizable requests have higher latency and lower throughput than serializable requests but reflect the current consensus of the cluster. For better performance, in exchange for possible stale reads, a serializable range request is served locally without needing to reach consensus with other nodes in the cluster. | bool |
| keys_only | keys_only when set returns only the keys and not the values. | bool |
| count_only | count_only when set returns only the count of the keys in the range. | bool |
| min_mod_revision | min_mod_revision is the lower bound for returned key mod revisions; all keys with lesser mod revisions will be filtered away. | int64 |
| max_mod_revision | max_mod_revision is the upper bound for returned key mod revisions; all keys with greater mod revisions will be filtered away. | int64 |



Expand Down
10 changes: 10 additions & 0 deletions Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1865,6 +1865,16 @@
"format": "int64",
"description": "limit is a limit on the number of keys returned for the request."
},
"max_mod_revision": {
"type": "string",
"format": "int64",
"description": "max_mod_revision is the upper bound for returned key mod revisions; all keys with\ngreater mod revisions will be filtered away."
},
"min_mod_revision": {
"type": "string",
"format": "int64",
"description": "min_mod_revision is the lower bound for returned key mod revisions; all keys with\nlesser mod revisions will be filtered away."
},
"range_end": {
"type": "string",
"format": "byte",
Expand Down
16 changes: 1 addition & 15 deletions clientv3/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,21 +141,7 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) {
// TODO: handle other ops
case tRange:
var resp *pb.RangeResponse
r := &pb.RangeRequest{
Key: op.key,
RangeEnd: op.end,
Limit: op.limit,
Revision: op.rev,
Serializable: op.serializable,
KeysOnly: op.keysOnly,
CountOnly: op.countOnly,
}
if op.sort != nil {
r.SortOrder = pb.RangeRequest_SortOrder(op.sort.Order)
r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target)
}

resp, err = kv.remote.Range(ctx, r, grpc.FailFast(false))
resp, err = kv.remote.Range(ctx, op.toRangeRequest(), grpc.FailFast(false))
if err == nil {
return OpResponse{get: (*GetResponse)(resp)}, nil
}
Expand Down
51 changes: 37 additions & 14 deletions clientv3/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Op struct {
serializable bool
keysOnly bool
countOnly bool
minModRev int64
maxModRev int64

// for range, watch
rev int64
Expand All @@ -61,23 +63,32 @@ type Op struct {
leaseID LeaseID
}

func (op Op) toRangeRequest() *pb.RangeRequest {
if op.t != tRange {
panic("op.t != tRange")
}
r := &pb.RangeRequest{
Key: op.key,
RangeEnd: op.end,
Limit: op.limit,
Revision: op.rev,
Serializable: op.serializable,
KeysOnly: op.keysOnly,
CountOnly: op.countOnly,
MinModRevision: op.minModRev,
MaxModRevision: op.maxModRev,
}
if op.sort != nil {
r.SortOrder = pb.RangeRequest_SortOrder(op.sort.Order)
r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target)
}
return r
}

func (op Op) toRequestOp() *pb.RequestOp {
switch op.t {
case tRange:
r := &pb.RangeRequest{
Key: op.key,
RangeEnd: op.end,
Limit: op.limit,
Revision: op.rev,
Serializable: op.serializable,
KeysOnly: op.keysOnly,
CountOnly: op.countOnly,
}
if op.sort != nil {
r.SortOrder = pb.RangeRequest_SortOrder(op.sort.Order)
r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target)
}
return &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: r}}
return &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: op.toRangeRequest()}}
case tPut:
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV}
return &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: r}}
Expand Down Expand Up @@ -115,6 +126,8 @@ func OpDelete(key string, opts ...OpOption) Op {
panic("unexpected serializable in delete")
case ret.countOnly:
panic("unexpected countOnly in delete")
case ret.minModRev != 0, ret.maxModRev != 0:
panic("unexpected mod revision filter in delete")
case ret.filterDelete, ret.filterPut:
panic("unexpected filter in delete")
case ret.createdNotify:
Expand All @@ -139,6 +152,8 @@ func OpPut(key, val string, opts ...OpOption) Op {
panic("unexpected serializable in put")
case ret.countOnly:
panic("unexpected countOnly in put")
case ret.minModRev != 0, ret.maxModRev != 0:
panic("unexpected mod revision filter in delete")
case ret.filterDelete, ret.filterPut:
panic("unexpected filter in put")
case ret.createdNotify:
Expand All @@ -161,6 +176,8 @@ func opWatch(key string, opts ...OpOption) Op {
panic("unexpected serializable in watch")
case ret.countOnly:
panic("unexpected countOnly in watch")
case ret.minModRev != 0, ret.maxModRev != 0:
panic("unexpected mod revision filter in watch")
}
return ret
}
Expand Down Expand Up @@ -263,6 +280,12 @@ func WithCountOnly() OpOption {
return func(op *Op) { op.countOnly = true }
}

// WithMinModRev filters out keys for Get with modification revisions less than the given revision.
func WithMinModRev(rev int64) OpOption { return func(op *Op) { op.minModRev = rev } }

// WithMaxModRev filters out keys for Get with modification revisions greater than the given revision.
func WithMaxModRev(rev int64) OpOption { return func(op *Op) { op.maxModRev = rev } }

// WithFirstCreate gets the key with the oldest creation revision in the request range.
func WithFirstCreate() []OpOption { return withTop(SortByCreateRevision, SortAscend) }

Expand Down
22 changes: 21 additions & 1 deletion etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResp
}

limit := r.Limit
if r.SortOrder != pb.RangeRequest_NONE {
if r.SortOrder != pb.RangeRequest_NONE || r.MaxModRevision != 0 || r.MinModRevision != 0 {
// fetch everything; sort and truncate afterwards
limit = 0
}
Expand All @@ -285,6 +285,15 @@ func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResp
}
}

if r.MaxModRevision != 0 {
f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision }
pruneKVs(rr, f)
}
if r.MinModRevision != 0 {
f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision < r.MinModRevision }
pruneKVs(rr, f)
}

if r.SortOrder != pb.RangeRequest_NONE {
var sorter sort.Interface
switch {
Expand Down Expand Up @@ -795,3 +804,14 @@ func removeNeedlessRangeReqs(txn *pb.TxnRequest) {
txn.Success = f(txn.Success)
txn.Failure = f(txn.Failure)
}

func pruneKVs(rr *mvcc.RangeResult, isPrunable func(*mvccpb.KeyValue) bool) {
j := 0
for i := range rr.KVs {
rr.KVs[j] = rr.KVs[i]
if !isPrunable(&rr.KVs[i]) {
j++
}
}
rr.KVs = rr.KVs[:j]
}
Loading