Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure read-lock is not continuously held on a section while iterating over concurrent maps #9787

Merged
merged 2 commits into from
Mar 4, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,14 @@ private Section<V> getSection(long hash) {
}

public void clear() {
for (Section<V> s : sections) {
s.clear();
for (int i = 0; i < sections.length; i++) {
sections[i].clear();
}
}

public void forEach(EntryProcessor<V> processor) {
for (Section<V> s : sections) {
s.forEach(processor);
for (int i = 0; i < sections.length; i++) {
sections[i].forEach(processor);
}
}

Expand Down Expand Up @@ -393,46 +393,44 @@ void clear() {
public void forEach(EntryProcessor<V> processor) {
long stamp = tryOptimisticRead();

// We need to make sure that we read these 3 variables in a consistent way
int capacity = this.capacity;
long[] keys = this.keys;
V[] values = this.values;

boolean acquiredReadLock = false;
// Validate no rehashing
if (!validate(stamp)) {
// Fallback to read lock
stamp = readLock();

try {

// Validate no rehashing
if (!validate(stamp)) {
// Fallback to read lock
stamp = readLock();
acquiredReadLock = true;
capacity = this.capacity;
keys = this.keys;
values = this.values;
unlockRead(stamp);
}

capacity = this.capacity;
keys = this.keys;
values = this.values;
// Go through all the buckets for this section. We try to renew the stamp only after a validation
// error, otherwise we keep going with the same.
for (int bucket = 0; bucket < capacity; bucket++) {
if (stamp == 0) {
stamp = tryOptimisticRead();
}

// Go through all the buckets for this section
for (int bucket = 0; bucket < capacity; bucket++) {
long storedKey = keys[bucket];
V storedValue = values[bucket];

if (!acquiredReadLock && !validate(stamp)) {
// Fallback to acquiring read lock
stamp = readLock();
acquiredReadLock = true;
long storedKey = keys[bucket];
V storedValue = values[bucket];

storedKey = keys[bucket];
storedValue = values[bucket];
}
if (!validate(stamp)) {
// Fallback to acquiring read lock
stamp = readLock();

if (storedValue != DeletedValue && storedValue != EmptyValue) {
processor.accept(storedKey, storedValue);
}
}
} finally {
if (acquiredReadLock) {
storedKey = keys[bucket];
storedValue = values[bucket];
unlockRead(stamp);
stamp = 0;
}

if (storedValue != DeletedValue && storedValue != EmptyValue) {
processor.accept(storedKey, storedValue);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,23 @@ public ConcurrentLongPairSet(int expectedItems, int concurrencyLevel) {

public long size() {
long size = 0;
for (Section s : sections) {
size += s.size;
for (int i = 0; i < sections.length; i++) {
size += sections[i].size;
}
return size;
}

public long capacity() {
long capacity = 0;
for (Section s : sections) {
capacity += s.capacity;
for (int i = 0; i < sections.length; i++) {
capacity += sections[i].capacity;
}
return capacity;
}

public boolean isEmpty() {
for (Section s : sections) {
if (s.size != 0) {
for (int i = 0; i < sections.length; i++) {
if (sections[i].size != 0) {
return false;
}
}
Expand All @@ -111,8 +111,8 @@ public boolean isEmpty() {

long getUsedBucketCount() {
long usedBucketCount = 0;
for (Section s : sections) {
usedBucketCount += s.usedBuckets;
for (int i = 0; i < sections.length; i++) {
usedBucketCount += sections[i].usedBuckets;
}
return usedBucketCount;
}
Expand Down Expand Up @@ -154,8 +154,8 @@ public void clear() {
}

public void forEach(LongPairConsumer processor) {
for (Section s : sections) {
s.forEach(processor);
for (int i = 0; i < sections.length; i++) {
sections[i].forEach(processor);
}
}

Expand All @@ -169,8 +169,8 @@ public void forEach(LongPairConsumer processor) {
*/
public int removeIf(LongPairPredicate filter) {
int removedValues = 0;
for (Section s : sections) {
removedValues += s.removeIf(filter);
for (int i = 0; i < sections.length; i++) {
removedValues += sections[i].removeIf(filter);
}
return removedValues;
}
Expand All @@ -194,8 +194,8 @@ public Set<LongPair> items(int numberOfItems) {
@Override
public <T> Set<T> items(int numberOfItems, LongPairFunction<T> longPairConverter) {
Set<T> items = new HashSet<>();
for (Section s : sections) {
s.forEach((item1, item2) -> {
for (int i = 0; i < sections.length; i++) {
sections[i].forEach((item1, item2) -> {
if (items.size() < numberOfItems) {
items.add(longPairConverter.apply(item1, item2));
}
Expand Down Expand Up @@ -398,42 +398,31 @@ void clear() {
}

public void forEach(LongPairConsumer processor) {
long stamp = tryOptimisticRead();

long[] table = this.table;
boolean acquiredReadLock = false;

try {

// Validate no rehashing
if (!validate(stamp)) {
// Fallback to read lock
stamp = readLock();
acquiredReadLock = true;
table = this.table;
// Go through all the buckets for this section. We try to renew the stamp only after a validation
// error, otherwise we keep going with the same.
long stamp = 0;
for (int bucket = 0; bucket < table.length; bucket += 2) {
if (stamp == 0) {
stamp = tryOptimisticRead();
}

// Go through all the buckets for this section
for (int bucket = 0; bucket < table.length; bucket += 2) {
long storedItem1 = table[bucket];
long storedItem2 = table[bucket + 1];

if (!acquiredReadLock && !validate(stamp)) {
// Fallback to acquiring read lock
stamp = readLock();
acquiredReadLock = true;
long storedItem1 = table[bucket];
long storedItem2 = table[bucket + 1];

storedItem1 = table[bucket];
storedItem2 = table[bucket + 1];
}
if (!validate(stamp)) {
// Fallback to acquiring read lock
stamp = readLock();

if (storedItem1 != DeletedItem && storedItem1 != EmptyItem) {
processor.accept(storedItem1, storedItem2);
}
}
} finally {
if (acquiredReadLock) {
storedItem1 = table[bucket];
storedItem2 = table[bucket + 1];
unlockRead(stamp);
stamp = 0;
}

if (storedItem1 != DeletedItem && storedItem1 != EmptyItem) {
processor.accept(storedItem1, storedItem2);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand Down Expand Up @@ -149,28 +149,28 @@ private Section<K, V> getSection(long hash) {
}

public void clear() {
for (Section<K, V> s : sections) {
s.clear();
for (int i = 0; i < sections.length; i++) {
sections[i].clear();
}
}

public void forEach(BiConsumer<? super K, ? super V> processor) {
for (Section<K, V> s : sections) {
s.forEach(processor);
for (int i = 0; i < sections.length; i++) {
sections[i].forEach(processor);
}
}

/**
* @return a new list of all keys (makes a copy)
*/
public List<K> keys() {
List<K> keys = Lists.newArrayList();
List<K> keys = new ArrayList<>((int) size());
forEach((key, value) -> keys.add(key));
return keys;
}

public List<V> values() {
List<V> values = Lists.newArrayList();
List<V> values = new ArrayList<>((int) size());
forEach((key, value) -> values.add(value));
return values;
}
Expand Down Expand Up @@ -354,42 +354,33 @@ void clear() {
}

public void forEach(BiConsumer<? super K, ? super V> processor) {
long stamp = tryOptimisticRead();

// Take a reference to the data table, if there is a rehashing event, we'll be
// simply iterating over a snapshot of the data.
Object[] table = this.table;
boolean acquiredReadLock = false;

try {

// Validate no rehashing
if (!validate(stamp)) {
// Fallback to read lock
stamp = readLock();
acquiredReadLock = true;
table = this.table;
// Go through all the buckets for this section. We try to renew the stamp only after a validation
// error, otherwise we keep going with the same.
long stamp = 0;
for (int bucket = 0; bucket < table.length; bucket += 2) {
if (stamp == 0) {
stamp = tryOptimisticRead();
}

// Go through all the buckets for this section
for (int bucket = 0; bucket < table.length; bucket += 2) {
K storedKey = (K) table[bucket];
V storedValue = (V) table[bucket + 1];

if (!acquiredReadLock && !validate(stamp)) {
// Fallback to acquiring read lock
stamp = readLock();
acquiredReadLock = true;
K storedKey = (K) table[bucket];
V storedValue = (V) table[bucket + 1];

storedKey = (K) table[bucket];
storedValue = (V) table[bucket + 1];
}
if (!validate(stamp)) {
// Fallback to acquiring read lock
stamp = readLock();

if (storedKey != DeletedKey && storedKey != EmptyKey) {
processor.accept(storedKey, storedValue);
}
}
} finally {
if (acquiredReadLock) {
storedKey = (K) table[bucket];
storedValue = (V) table[bucket + 1];
unlockRead(stamp);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try-finally for unlock? (Same comment to all unlock locations)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I think in all places here it should be guaranteed to not throw, but makes sense to do it as general practice.

stamp = 0;
}

if (storedKey != DeletedKey && storedKey != EmptyKey) {
processor.accept(storedKey, storedValue);
}
}
}
Expand Down
Loading