-
Notifications
You must be signed in to change notification settings - Fork 9.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
api: hash by rev #8263
api: hash by rev #8263
Conversation
mvcc/backend/backend.go
Outdated
@@ -52,6 +52,7 @@ type Backend interface { | |||
|
|||
Snapshot() Snapshot | |||
Hash(ignores map[IgnoreKey]struct{}) (uint32, error) | |||
HashByRev(bucket []byte, f func(rev []byte) bool) (uint32, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HashBucket? Backend should know nothing about revision.
mvcc/kv.go
Outdated
@@ -111,6 +111,9 @@ type KV interface { | |||
// This method is designed for consistency checking purposes. | |||
Hash() (hash uint32, revision int64, err error) | |||
|
|||
// HashByRev retrieves the hash of KV state given a revision. | |||
HashByRev(rev int64) (hash uint32, compactRev int64, err error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hash includes all state (including leases for example). HashByRev only hashes the kv state for good reason (lease does not understand rev yet...). we need to make this clear though.
mvcc/kvstore.go
Outdated
return 0, 0, ErrFutureRev | ||
} | ||
|
||
compactc := s.compactc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid holding the lock while waiting on compaction?
for {
s.mu.RLock()
compactMainRev := s.compactMainRev
currentRev := s.currentRev
compactc := s.compactc
s.mu.RUnlock()
if rev < s.compactMainRev {
return 0, compactMainRev, ErrCompacted
} else if rev > s.currentRev {
return 0, compactMainRev, ErrFutureRev
}
select {
case <-compactc:
case <-s.stopc:
return 0, 0, ErrClosed
}
s.mu.Lock()
if s.compactc == compactc {
defer s.mu.Unlock()
break
}
s.mu.Unlock()
}
s.b.ForceCommit()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good.
7e852b3
to
614b3b1
Compare
all fixed. PTAL |
mvcc/backend/backend.go
Outdated
@@ -240,6 +241,37 @@ func (b *backend) Hash(ignores map[IgnoreKey]struct{}) (uint32, error) { | |||
return h.Sum32(), nil | |||
} | |||
|
|||
func (b *backend) HashBucket(bucket []byte, f func(rev []byte) bool) (hash uint32, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
explain what the pass in func does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do.
we need some tests for this. |
mvcc/kvstore.go
Outdated
@@ -42,8 +42,12 @@ var ( | |||
ErrFutureRev = errors.New("mvcc: required revision is a future revision") | |||
ErrCanceled = errors.New("mvcc: watcher is canceled") | |||
ErrClosed = errors.New("mvcc: closed") | |||
ErrTimeout = errors.New("mvcc: req hash timeout") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused?
mvcc/kvstore.go
Outdated
|
||
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc") | ||
|
||
// hashTimeout is the max waiting time on the ongoing compaction. | ||
hashTimeout = time.Duration(10) * time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused?
mvcc/kv.go
Outdated
@@ -107,10 +107,14 @@ type KV interface { | |||
// Write creates a write transaction. | |||
Write() TxnWrite | |||
|
|||
// Hash retrieves the hash of KV state and revision. | |||
// This method is designed for consistency checking purposes. | |||
// Hash retrieves the hash of all states including leases. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can stay the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is KV state here means the entire Key-Value space of db which includes leases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe Hash computes the hash of the KV's backend
, then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds better.
mvcc/kv.go
Outdated
Hash() (hash uint32, revision int64, err error) | ||
|
||
// HashByRev retrieves the hash of the KV state at a revision. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// HashByRev computes the hash of all MVCC keys up to a given revision.
mvcc/kv.go
Outdated
Hash() (hash uint32, revision int64, err error) | ||
|
||
// HashByRev retrieves the hash of the KV state at a revision. | ||
// This method is designed to check consistency of the KV space. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably don't need to explain what the hash is being used for
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it
etcdserver/api/v3rpc/maintenance.go
Outdated
@@ -128,11 +128,11 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance | |||
} | |||
|
|||
func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) { | |||
h, rev, err := ms.kg.KV().Hash() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still need to call Hash()
if r.Revision == 0
, otherwise the functional tester won't check all the buckets
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good.
etcdserver/api/v3rpc/maintenance.go
Outdated
@@ -128,11 +128,11 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance | |||
} | |||
|
|||
func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if we should change this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the full hash is still useful.
we might want to add a quickHash which only cares about kv space and can do this incrementally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking it could be rev == 0 => full hash?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is quick hash a flag in HashRequest
or another api called quickHash
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with @heyitsanthony suggestion. one issue with current full hash operation it ignores compaction. However, I can probably merge Hash()
into HashByRev ()
in to one function so both s.b.Hash
and s.b.HashBucket
will wait for compaction.
something like following
func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
for {
s.mu.RLock()
compactMainRev := s.compactMainRev
currentRev = s.currentRev
compactc := s.compactc
s.mu.RUnlock()
if rev > 0 && rev < compactMainRev {
return 0, 0, compactMainRev, ErrCompacted
} else if rev > 0 && rev > currentRev {
return 0, currentRev, 0, ErrFutureRev
}
select {
case <-compactc:
case <-s.stopc:
return 0, 0, 0, ErrClosed
}
s.mu.Lock()
if s.compactc == compactc {
defer s.mu.Unlock()
break
}
s.mu.Unlock()
}
s.b.ForceCommit()
if rev == 0 { // full hash if rev == 0
hash, err = s.b.Hash(DefaultIgnores)
} else {
upper := revision{main: rev + 1}
f := func(r []byte) bool {
return upper.GreaterThan(bytesToRev(r))
}
hash, err = s.b.HashBucket(keyBucketName, f)
}
return hash, s.currentRev, s.compactMainRev, err
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe there should be a separate RPC (HashRev
? HashKV
?), then; there might be cases where hashing the keys bucket with the current revision is desirable and there's no way to do that when reusing the current Hash RPC without having a flag like quickHash
, which seems kind of hacky.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HashKV
sounds good to me.
Should HashKV has its own req/resp type? or should it just use HashRequest
and HashResponse
defined in this pr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should take HashKVRequest and return a HashKVResponse
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably a different request type/resp makes more sense since Maintenance Hash API shouldn't have reason about rev
in HashRequest
nor compact_revision
in HashResponse
If this is going to be used for background consistency checking, it should probably use ReadTx. Right now it seems this will block all mvcc operations while scanning the keys bucket? The current approach is OK as a first cut, though. |
mvcc/kvstore.go
Outdated
for { | ||
s.mu.RLock() | ||
compactMainRev := s.compactMainRev | ||
currentRev := s.currentRev |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@heyitsanthony should HashByRev also return currentRev
to maintenance api where it will be used as the rev in the header?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the rev be the kv state when hash finishes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g return hash, s.currentRev, s.compactMainRev, err
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The revision at the time of starting the hash. It'll matter more once it's nonblocking. Consider:
Start Hash(rev=10); currentRev=100
Compact(rev=50); currentRev=100
Put; currentRev=101
Finish Hash; currentRev=101
If Hash returns 101 as its header revision instead of 100, then etcd is saying the Hash would have worked at rev=101. This cannot be true since the keys for the hash at rev=10 have been compacted away by rev=101.
This is similar to the reasoning for Range's header revisions. If I request a Get with rev=0, then I'd expect the same KV values (or a compaction error) if I make a request later using WithRev(getResp.Header.Revision). This is guaranteed if the header is populated with the revision at the time of starting the read transaction, but not if it's using the revision after the transaction completes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's right.
mvcc/kvstore.go
Outdated
} | ||
s.mu.Lock() | ||
if s.compactc == compactc { | ||
defer s.mu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably need to retrieve currentRev = s.currentRev
here because it is under the s.mu.Lock()
which marks the beginning KV state of HashBucket
operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
anything greater than the rev argument will be ignored; I don't think it necessarily needs the most up to date current revision, just one that is >= rev
If we use ReadTx, it can only see the top layer of KV, but not the historical versions, right? I am not sure how are we going to calculate the hash incrementally with that approach. Or we need to make sure ReadTx is really cheap and will not block anything, and calculating the hash of a few mill keys is also cheap enough. |
@xiang90 ReadTx can efficiently scan historical data; it's keyed by revision on the mvcc bucket just like batchtx. Maybe you're thinking of mvcc.TxnRead? It also has the advantage that it won't need to force commit like when scanning with db.View. What do you mean by computing the hash incrementally? |
yea. i was thinking of the readView thing. sorry for the confusion. |
HashKV rpc hash of all MVCC keys up to a given revision for a given node.
add HashKV and use backend.ReadTx to traverse Key-Value space. PTAL |
Codecov Report
@@ Coverage Diff @@
## master #8263 +/- ##
========================================
- Coverage 76.2% 76.1% -0.1%
========================================
Files 346 346
Lines 27055 27110 +55
========================================
+ Hits 20616 20632 +16
- Misses 4957 4992 +35
- Partials 1482 1486 +4
Continue to review full report at Codecov.
|
mvcc/kv.go
Outdated
Hash() (hash uint32, revision int64, err error) | ||
|
||
// HashByRev computes the hash of all MVCC keys up to a given revision. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the hash of only mvcc keys up to a given revision (excluding KV backend metadata, leases, etc.).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wonder if there is a keyword represents just key-pair values of the "key" bucket?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
computes the hash of all MVCC revisions up to a given revision
? that would cover deleted keys as well
mvcc/kvstore.go
Outdated
@@ -160,6 +166,56 @@ func (s *store) Hash() (hash uint32, revision int64, err error) { | |||
return h, s.currentRev, err | |||
} | |||
|
|||
func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) { | |||
readTx := s.b.ReadTx() | |||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't needed with readtx, it should be enough to have TxnRead:
kvstore_txn.go:
func (s *store) Read() TxnRead { return newMetricsTxnRead(s.read()) }
func (s *store) read() *storeTxnRead {
s.mu.RLock()
tx := s.b.ReadTx()
s.revMu.RLock()
tx.Lock()
firstRev, rev := s.compactMainRev, s.currentRev
s.revMu.RUnlock()
return &storeTxnRead{s, tx, firstRev, rev}
}
func (s *store) HashByRev(rev int64) (int32, int64, int64) {
txnRead := s.read()
defer txnRead.End()
...
txnRead.tx.UnsafeForEach(keyBucketName, func(k, v []byte) error {
...
}
...
return hash, readTxn.Rev(), readTxn.FirstRev()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder is waiting for compaction even necessary?
The moment i obtain the txnRead
I have the view of the world; i know the compaction rev, current rev.
Then could I just hash from all the ways between [compaction rev, current rev]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's no need to wait on compaction with txnread; I should have made that more clear...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one more thought. if I want to hash all the ways between [compaction rev, current rev], I can't simply just hash all the key that has rev > compaction rev and <= current rev because compaction deletes historic revisions for a key if it is not tombstone. In that case, I need to skip key that's historic and has rev < compaction rev. hence, I need a way to determine if a key is historic. How would that be achieve easily just by looping through kv space?
It seems to me that waiting for the compaction to finish can be a simpler approach where i don't have to worry about checking tombstone and historic keys that has rev < compaction rev.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it shouldn't skip historic keys; can you write a test where the hashing fails because of the problem you're describing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@heyitsanthony I wrote a test to illustrate my concern.
func TestHashBeforeCompactionFinishes(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
cli.Put(context.TODO(), "foo", "bar2") // rev = 2
cli.Put(context.TODO(), "foo", "bar3") // rev = 3
cli.Put(context.TODO(), "bla", "bar4") // rev = 4
cli := clus.Client(0)
cli1 := clus.Client(1)
// compact starting
cli.Compact(context.TODO(), 3)
// suppose that ongoing compaction on node 0 hasn't delete anything yet.
// if HashKV doesn't skip historic keys of "foo" that's before the compact rev 3,, then it will return a hash
// that's composed of ["foo", "bar2"], ["foo", "bar3"], ["bla", "bar4"].
// If hashKV skips historic keys of "foo", then it needs to figure out which
// key rev will be compacted; skip ["foo", "bar2"] since it will be compacted and
// return hash ["foo", "bar3"] and ["bla", "bar4"].
resp, _ = toGRPC(cli).Maintainance.HashKV(context.TODO(), &HashKVRequest{Revision: 4})
// suppose that ongoing compaction on node 1 completes before this Hash begins,
// then it will return a hash consists only ["foo", "bar3"] and ["bla", "bar4"]
// regardless skipping historic keys or not.
resp2, _ = toGRPC(cli2).Maintainance.HashKV(context.TODO(), &HashKVRequest{Revision: 4})
// expect resp.CompactRevision == resp2.CompactRevision
// and resp.Hash != resp2.Hash in the scenario no skipping historic keys.
if resp.CompactRevisions == resp2.CompactRevision {
if resp.Hash != resp2.CompactRevision {
t.Fatalf("hash mismatch on same CompactRevision")
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two possible solutions:
- open the read txn when there's no compaction scheduled (pricey since it will have to forcecommit, but simpler)
- hash all revisions >=compactRev, then scan the key index to get keys at the current rev with modrev < compactRev.
There should be a test to trigger this inflight behavior in the mvcc package. It doesn't look like that integration test will hit the problem very often...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, the test is just to illustrate the issue; a better test is need.
I'll give it a try on the second approach.
test case? |
will add tests. |
@heyitsanthony you are right. |
a800586
to
72366b8
Compare
add test and implement compaction wait free HashKV. PTAL. |
mvcc/kvstore.go
Outdated
@@ -44,6 +45,8 @@ var ( | |||
ErrClosed = errors.New("mvcc: closed") | |||
|
|||
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc") | |||
|
|||
EmptyKeep = make(map[revision]struct{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably can be now used for make(struct map[rev]struct{}
in your comment below
"If there are no scheduled compactions, won't this always force a commit to disk? Isn't that something keep is trying to avoid? a possible workaround is to set s.keep = make(struct map[rev]struct{}) after this ForceCommit (and have len(keep) > 0 instead of keep != nil in UnsafeForEach) so it won't commit on every call."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
emptyKeep
; shouldn't be exported
mvcc/kvstore.go
Outdated
if keep == nil { | ||
// ForceCommit ensures that txnRead begins after backend | ||
// has committed all the changes from the prev completed compaction. | ||
s.b.ForceCommit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are no scheduled compactions, won't this always force a commit to disk? Isn't that something keep
is trying to avoid? a possible workaround is to set s.keep = make(struct map[rev]struct{})
after this ForceCommit (and have len(keep) > 0
instead of keep != nil
in UnsafeForEach
) so it won't commit on every call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
having an empty keep make sense.
mvcc/kvstore.go
Outdated
tx backend.ReadTx | ||
) | ||
s.mu.Lock() | ||
s.revMu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rlock
mvcc/kvstore.go
Outdated
compactRev, currentRev = s.compactMainRev, s.currentRev | ||
s.revMu.Unlock() | ||
|
||
s.keepMu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rlock
mvcc/kvstore.go
Outdated
s.revMu.Unlock() | ||
|
||
s.keepMu.Lock() | ||
keep = s.keep |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep := s.keep
integration/v3_grpc_inflight_test.go
Outdated
cctx, cancel := context.WithCancel(context.Background()) | ||
var rev int64 | ||
for i := 0; i < 100; i++ { | ||
resp, err := cli1.Put(cctx, "foo", "bar"+strconv.Itoa(i)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grpc integration tests should use raw RPCs instead of going through the client RPC wrappers
integration/v3_grpc_inflight_test.go
Outdated
@@ -54,6 +55,101 @@ func TestV3MaintenanceDefragmentInflightRange(t *testing.T) { | |||
<-donec | |||
} | |||
|
|||
// TestV3CompactInflightHashKV ensures that HashKV returns | |||
// correct hash while compacting. | |||
func TestV3CompactInflightHashKV(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sort of stress test may better belong in the mvcc package since if there's a concurrency problem it should turn up with a single kv instance. The grpc test could instead only test functionality instead of stressing it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw that mvcc pkg only have unit tests. In that case, should I just move the exact same test to some new test.go file in mvcc? or should I transform my test to some sort of unit test on kvstore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test in mvcc would use a kvstore to test/stress HashByRev; the test in integrations/ would test that the server responds to the RPC
mvcc/kvstore.go
Outdated
func (s *store) Compact(rev int64) (<-chan struct{}, error) { | ||
s.mu.Lock() | ||
defer s.mu.Unlock() | ||
s.revMu.Lock() | ||
defer s.revMu.Unlock() | ||
|
||
s.keepMu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only hold lock when updating s.keep?
integration/v3_grpc_inflight_test.go
Outdated
go func() { | ||
defer close(donec) | ||
revHash := make(map[int64]uint32) | ||
for i := 0; i < 12; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if using the Compact loop goroutine, this should probably loop until at least there is one hit in revHash so there's always some hash comparison on the same compact rev
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a good idea.
integration/v3_grpc_inflight_test.go
Outdated
|
||
select { | ||
case hashesc <- resp: | ||
case <-donec: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case <-cctx.Done():
?
897a346
to
2062074
Compare
All fixed. PTAL. |
the tests still aren't in the mvcc package? |
one quick question. Shall I have my test in mvcc/integration pkg in some test.go file? |
@fanminshi there would be a new test in mvcc/kv_test.go that only tests the backend instead of issuing RPCs to stress the feature. The grpc integration test would only test that there's a reasonable response / the RPC is wired up right. |
add a test in mvcc to test HashKV in kvstore only and a integration test to Integration pkg to test HashKV rpc. |
6543444
to
767a217
Compare
mvcc/kvstore_test.go
Outdated
if err != nil { | ||
t.Fatal(err) | ||
} | ||
time.Sleep(10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use n * time.Millisecond
so the time resolution is clear
mvcc/kvstore_test.go
Outdated
case r := <-hashCompactc: | ||
if revHash[r.compactRev] == 0 { | ||
revHash[r.compactRev] = r.hash | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to break
mvcc/kvstore_test.go
Outdated
s.Put([]byte("foo"), []byte("bar"+strconv.Itoa(i)), lease.NoLease) | ||
} | ||
|
||
hashCompactc := make(chan struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
declare this struct above the func instead of inlining
type hashKVResult struct {
hash uint32
compactRev int64
}
mvcc/kvstore_test.go
Outdated
defer close(donec) | ||
revHash := make(map[int64]uint32) | ||
for round := 0; round < 1000; round++ { | ||
select { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's only one case in the select, so pull from the channel directly:
r := <-hashCompactc
if revHash[r.compactRev] == 0 {
...
integration/v3_grpc_test.go
Outdated
} | ||
|
||
// ensure appliedIdx is sync with committedIdx | ||
_, err := kvc.Range(context.Background(), &pb.RangeRequest{Key: []byte("foo")}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't necessary for a single member cluster; there won't be an ack for the put until it's synced
integration/v3_grpc_test.go
Outdated
if err != nil { | ||
t.Fatal(err) | ||
} | ||
rev = resp.Header.Revision |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not HashKV after every put and check that there's no match?
integration/v3_grpc_test.go
Outdated
} | ||
|
||
var ( | ||
prevHash uint32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can set these when looping on the puts and fetching the hash; avoids the need for the if prevHash == 0
check
integration/v3_grpc_test.go
Outdated
prevCompactRev int64 | ||
) | ||
for i := 0; i < 10; i++ { | ||
resp, err := mvc.HashKV(context.Background(), &pb.HashKVRequest{rev}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use rev=0 => current rev with &pb.HashKVRequest{}
?
mvcc/kvstore_test.go
Outdated
|
||
rev := 1000 | ||
for i := 2; i <= rev; i++ { | ||
s.Put([]byte("foo"), []byte("bar"+strconv.Itoa(i)), lease.NoLease) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fmt.Sprintf("bar%d", i)
to avoid the strconv import
mvcc/kvstore.go
Outdated
func (s *store) Compact(rev int64) (<-chan struct{}, error) { | ||
s.mu.Lock() | ||
defer s.mu.Unlock() | ||
s.revMu.Lock() | ||
defer s.revMu.Unlock() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep old spacing?
HashByRev computes the hash of all MVCC keys up to a given revision.
All fixed. PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm thanks
fixes #8016