Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APP-1086: Fix typing, use BiConsumer subclass for iteration #267

Open
wants to merge 16 commits into
base: csd-2.3
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@

package org.apache.spark.util.kvstore;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -45,7 +40,7 @@
public class InMemoryStore implements KVStore {

private Object metadata;
private ConcurrentMap<Class<?>, InstanceList> data = new ConcurrentHashMap<>();
private InMemoryLists inMemoryLists = new InMemoryLists();

@Override
public <T> T getMetadata(Class<T> klass) {
Expand All @@ -59,13 +54,13 @@ public void setMetadata(Object value) {

@Override
public long count(Class<?> type) {
InstanceList list = data.get(type);
InstanceList<?> list = inMemoryLists.get(type);
return list != null ? list.size() : 0;
}

@Override
public long count(Class<?> type, String index, Object indexedValue) throws Exception {
InstanceList list = data.get(type);
InstanceList<?> list = inMemoryLists.get(type);
int count = 0;
Object comparable = asKey(indexedValue);
KVTypeInfo.Accessor accessor = list.getIndexAccessor(index);
Expand All @@ -79,55 +74,49 @@ public long count(Class<?> type, String index, Object indexedValue) throws Excep

@Override
public <T> T read(Class<T> klass, Object naturalKey) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Class<T> klass vs. Class<?> type an intentional choice or an accidental consistency? Either way, I'm not sure that I am following the logic of the convention.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... also not entirely consistent across related files.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did notice this, but I didn't follow up and look. Thinking about it before looking, I assume Class type doesn't work in scala, while Class[T] klass would. Let me see what's with that, though....

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking more about naming convention, klass vs. type -- more about understandability than functionality.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's 'type' because KVTypeInfo.
But KVStore seems to be mixed, and ElementTrackingStore uses klass always.

InstanceList list = data.get(klass);
Object value = list != null ? list.get(naturalKey) : null;
InstanceList<T> list = inMemoryLists.get(klass);
T value = list != null ? list.get(naturalKey) : null;
if (value == null) {
throw new NoSuchElementException();
}
return klass.cast(value);
return value;
}

@Override
public void write(Object value) throws Exception {
InstanceList list = data.computeIfAbsent(value.getClass(), key -> {
try {
return new InstanceList(key);
} catch (Exception e) {
throw Throwables.propagate(e);
}
});
list.put(value);
inMemoryLists.write(value);
}

@Override
public void delete(Class<?> type, Object naturalKey) {
InstanceList list = data.get(type);
InstanceList<?> list = inMemoryLists.get(type);
if (list != null) {
list.delete(naturalKey);
}
}

@Override
public <T> KVStoreView<T> view(Class<T> type){
InstanceList list = data.get(type);
return list != null ? list.view(type)
: new InMemoryView<>(type, Collections.<T>emptyList(), null);
InstanceList<T> list = inMemoryLists.get(type);
return list != null ? list.view() : emptyView();
}

@Override
public void close() {
metadata = null;
data.clear();
inMemoryLists.clear();
}

@SuppressWarnings("unchecked")
@Override
public <T> int countingRemoveIf(Class<T> type, Predicate<? super T> filter) {
InstanceList list = data.get(type);
public <T> boolean removeAllByKeys(Class<T> klass, String index, Collection keys) {
InstanceList<T> list = inMemoryLists.get(klass);

if (list != null) {
return list.countingRemoveIf(filter);
return list.countingRemoveIfByKeys(index, keys) > 0;
} else {
return false;
}
return 0;
}

@SuppressWarnings("unchecked")
Expand All @@ -138,14 +127,67 @@ private static Comparable<Object> asKey(Object in) {
return (Comparable<Object>) in;
}

private static class InstanceList {
@SuppressWarnings("unchecked")
private static <T> KVStoreView<T> emptyView() {
return (InMemoryView<T>) InMemoryView.EMPTY_VIEW;
}

/**
* Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a
* class of type T to an InstanceList of type T.
*/
private static class InMemoryLists {
private ConcurrentMap<Class<?>, InstanceList<?>> data = new ConcurrentHashMap<>();

@SuppressWarnings("unchecked")
public <T> InstanceList<T> get(Class<T> key) {
return (InstanceList<T>)data.get(key);
}

@SuppressWarnings("unchecked")
public <T> void write(T value) throws Exception {
InstanceList<T> list =
(InstanceList<T>) data.computeIfAbsent(value.getClass(), InstanceList::new);
list.put(value);
}

public void clear() {
data.clear();
}
}

private static class InstanceList<T> {

private static class CountingRemoveIfForEach<T> implements BiConsumer<Comparable<Object>, T> {
ConcurrentMap<Comparable<Object>, T> data;
Predicate<? super T> filter;
int count = 0;

CountingRemoveIfForEach(
ConcurrentMap<Comparable<Object>, T> data,
Predicate<? super T> filter) {
this.data = data;
this.filter = filter;
}

public void accept(Comparable<Object> key, T value) {
// To address https://bugs.openjdk.java.net/browse/JDK-8078645 which affects remove() on
// all iterators of concurrent maps, and specifically makes countingRemoveIf difficult to
// implement correctly against the values() iterator, we use forEach instead....
if (filter.test(value)) {
if (data.remove(key, value)) {
count++;
}
}
}
}

private final KVTypeInfo ti;
private final KVTypeInfo.Accessor naturalKey;
private final ConcurrentMap<Comparable<Object>, Object> data;
private final ConcurrentMap<Comparable<Object>, T> data;

private InstanceList(Class<?> type) throws Exception {
this.ti = new KVTypeInfo(type);
private InstanceList(Class<T> klass) {
this.ti = new KVTypeInfo(klass);
this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
this.data = new ConcurrentHashMap<>();
}
Expand All @@ -154,34 +196,20 @@ KVTypeInfo.Accessor getIndexAccessor(String indexName) {
return ti.getAccessor(indexName);
}

<T> int countingRemoveIf(Predicate<? super T> filter) {
Iterator<Map.Entry<Comparable<Object>, Object>> each = data.entrySet().iterator();
int count = 0;
@SuppressWarnings("unchecked")
int countingRemoveIfByKeys(String index, Collection keys) {
Predicate<? super T> filter = getPredicate(ti.getAccessor(index), keys);

// To address https://bugs.openjdk.java.net/brows/JDK-8078645 which affects remove() on all
// iterators of concurrent maps, and specifically makes countingRemoveIf difficult to
// implement correctly, we iterate on the entries and call the conditional remove....
while (each.hasNext()) {
Map.Entry<Comparable<Object>, Object> entry = each.next();
Comparable<Object> key = entry.getKey();
T val = (T)entry.getValue();

if (filter.test(val)) {
if (data.remove(key, val)) {
count += 1;
}
}
}
return count;
CountingRemoveIfForEach<T> callback = new CountingRemoveIfForEach<>(data, filter);
data.forEach(callback);
return callback.count;
}

public Object get(Object key) {
public T get(Object key) {
return data.get(asKey(key));
}

public void put(Object value) throws Exception {
Preconditions.checkArgument(ti.type().equals(value.getClass()),
"Unexpected type: %s", value.getClass());
public void put(T value) throws Exception {
data.put(asKey(naturalKey.get(value)), value);
}

Expand All @@ -194,21 +222,44 @@ public int size() {
}

@SuppressWarnings("unchecked")
public <T> InMemoryView<T> view(Class<T> type) {
Preconditions.checkArgument(ti.type().equals(type), "Unexpected type: %s", type);
Collection<T> all = (Collection<T>) data.values();
return new InMemoryView<>(type, all, ti);
public InMemoryView<T> view() {
return new InMemoryView<>(data.values(), ti);
}

@SuppressWarnings("unchecked")
private static <T> Predicate<? super T> getPredicate(
KVTypeInfo.Accessor getter,
Collection keys) {
if (Comparable.class.isAssignableFrom(getter.getType())) {
HashSet set = new HashSet(keys);

return (value) -> set.contains(keyFromValue(getter, value));
} else {
HashSet<Comparable> set = new HashSet<>(keys.size());
for (Object key : keys) {
set.add(asKey(key));
}
return (value) -> set.contains(asKey(keyFromValue(getter, value)));
}
}

private static Object keyFromValue(KVTypeInfo.Accessor getter, Object value) {
try {
return getter.get(value);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}
}

private static class InMemoryView<T> extends KVStoreView<T> {
private static InMemoryView EMPTY_VIEW = new InMemoryView<>(Collections.emptyList(), null);

private final Collection<T> elements;
private final KVTypeInfo ti;
private final KVTypeInfo.Accessor natural;

InMemoryView(Class<T> type, Collection<T> elements, KVTypeInfo ti) {
super(type);
InMemoryView(Collection<T> elements, KVTypeInfo ti) {
this.elements = elements;
this.ti = ti;
this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) : null;
Expand All @@ -220,34 +271,30 @@ public Iterator<T> iterator() {
return new InMemoryIterator<>(elements.iterator());
}

try {
KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null;
int modifier = ascending ? 1 : -1;

final List<T> sorted = copyElements();
Collections.sort(sorted, (e1, e2) -> modifier * compare(e1, e2, getter));
Stream<T> stream = sorted.stream();
KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null;
int modifier = ascending ? 1 : -1;

if (first != null) {
stream = stream.filter(e -> modifier * compare(e, getter, first) >= 0);
}
final List<T> sorted = copyElements();
sorted.sort((e1, e2) -> modifier * compare(e1, e2, getter));
Stream<T> stream = sorted.stream();

if (last != null) {
stream = stream.filter(e -> modifier * compare(e, getter, last) <= 0);
}
if (first != null) {
stream = stream.filter(e -> modifier * compare(e, getter, first) >= 0);
}

if (skip > 0) {
stream = stream.skip(skip);
}
if (last != null) {
stream = stream.filter(e -> modifier * compare(e, getter, last) <= 0);
}

if (max < sorted.size()) {
stream = stream.limit((int) max);
}
if (skip > 0) {
stream = stream.skip(skip);
}

return new InMemoryIterator<>(stream.iterator());
} catch (Exception e) {
throw Throwables.propagate(e);
if (max < sorted.size()) {
stream = stream.limit((int) max);
}

return new InMemoryIterator<>(stream.iterator());
}

/**
Expand All @@ -273,16 +320,16 @@ private int compare(T e1, T e2, KVTypeInfo.Accessor getter) {
diff = compare(e1, natural, natural.get(e2));
}
return diff;
} catch (Exception e) {
throw Throwables.propagate(e);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}

private int compare(T e1, KVTypeInfo.Accessor getter, Object v2) {
try {
return asKey(getter.get(e1)).compareTo(asKey(v2));
} catch (Exception e) {
throw Throwables.propagate(e);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.util.kvstore;

import java.io.Closeable;
import java.util.Collection;
import java.util.function.Predicate;

import org.apache.spark.annotation.Private;
Expand Down Expand Up @@ -130,5 +131,5 @@ public interface KVStore extends Closeable {
/**
* A cheaper way to remove multiple items from the KVStore
*/
abstract <T> int countingRemoveIf(Class<T> type, Predicate<? super T> filter) throws Exception;
<T> boolean removeAllByKeys(Class<T> klass, String index, Collection keys) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
@Private
public abstract class KVStoreView<T> implements Iterable<T> {

final Class<T> type;

boolean ascending = true;
String index = KVIndex.NATURAL_INDEX_NAME;
Object first = null;
Expand All @@ -48,10 +46,6 @@ public abstract class KVStoreView<T> implements Iterable<T> {
long skip = 0L;
long max = Long.MAX_VALUE;

public KVStoreView(Class<T> type) {
this.type = type;
}

/**
* Reverses the order of iteration. By default, iterates in ascending order.
*/
Expand Down
Loading