Skip to content

Commit

Permalink
clean up GenericTypeahead index roller code
Browse files Browse the repository at this point in the history
  • Loading branch information
jingwei committed Sep 17, 2012
1 parent baaca5b commit 6c9d7f3
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 204 deletions.
5 changes: 3 additions & 2 deletions src/main/java/cleo/search/store/KratiBufferedInts.java
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,14 @@ public boolean put(String key, int[] elemIds, long scn) throws Exception {
return delete(key, scn);
}

bufInts.put(key, new int[0], scn);
int index = getExtensionIndex(key);
ReentrantLock l = getLock(key);

l.lock();
try {
bufInts.put(key, new int[0], scn);
int index = getExtensionIndex(key);
extInts.set(index, elemIds, scn);

return true;
} finally {
l.unlock();
Expand Down
178 changes: 55 additions & 123 deletions src/main/java/cleo/search/typeahead/GenericTypeahead.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand All @@ -30,7 +31,6 @@
import org.apache.log4j.Logger;

import cleo.search.Element;
import cleo.search.IndexRoller;
import cleo.search.Indexer;
import cleo.search.Score;
import cleo.search.collector.Collector;
Expand All @@ -45,7 +45,6 @@
import cleo.search.store.StaticFloatArrayPartition;
import cleo.search.store.StaticLongArrayPartition;
import cleo.search.store.Stores;
import cleo.search.typeahead.GenericTypeaheadIndexRoller.ElementScoreCmpDsc;
import cleo.search.util.ElementScoreHandler;
import cleo.search.util.ScoreScanner;

Expand All @@ -57,6 +56,7 @@
*
* <p>
* 05/16, 2011 - Added field maxElementScore <br/>
* 09/16, 2012 - Used buffering connections store instead of roller to enhance indexing performance <br/>
*/
public class GenericTypeahead<E extends Element> extends AbstractTypeahead<E> implements Indexer<E>, Persistable {
private final static Logger logger = Logger.getLogger(GenericTypeahead.class);
Expand All @@ -68,8 +68,6 @@ public class GenericTypeahead<E extends Element> extends AbstractTypeahead<E> im
protected final int maxKeyLength;

protected volatile float maxElementScore;
protected volatile boolean rollingEnabled = true;
protected final RollingTypeahead<E> rollingTypeahead;

public GenericTypeahead(String name,
ArrayStoreElement<E> elementStore,
Expand All @@ -78,24 +76,6 @@ public GenericTypeahead(String name,
BloomFilter<Long> bloomFilter,
ScoreScanner scoreScanner,
int maxKeyLength) {
this(name,
elementStore,
connectionsStore,
selectorFactory,
bloomFilter,
scoreScanner,
maxKeyLength,
RollingTypeahead.DEFAULT_ROLLING_SIZE);
}

public GenericTypeahead(String name,
ArrayStoreElement<E> elementStore,
ConnectionsStore<String> connectionsStore,
SelectorFactory<E> selectorFactory,
BloomFilter<Long> bloomFilter,
ScoreScanner scoreScanner,
int maxKeyLength,
int rollingSize) {
super(name, elementStore, selectorFactory, bloomFilter);
logger.info(name + " start...");

Expand All @@ -110,10 +90,6 @@ public GenericTypeahead(String name,
// Initialize filterStore
this.filterStore = initFilterStore();

// Initialize RollingTypeahead
this.rollingTypeahead = initRollingTypeahead(rollingSize);
this.setRollingEnabled(false);

logger.info(name + " started.");
}

Expand Down Expand Up @@ -169,19 +145,6 @@ protected LongArrayPartition initFilterStore() {
return p;
}

protected RollingTypeahead<E> initRollingTypeahead(int rollingSize) {
IndexRoller<E> indexRoller =
new GenericTypeaheadIndexRoller<E>(this);
RollingTypeahead<E> rollingTA =
new RollingTypeahead<E>(getName(),
rollingSize,
indexRoller,
getElementStore(),
getSelectorFactory(),
getBloomFilter());
return rollingTA;
}

public final int getMaxKeyLength() {
return maxKeyLength;
}
Expand All @@ -204,11 +167,6 @@ public Collector<E> search(int uid, String[] terms, Collector<E> collector, long
Selector<E> selector = getSelectorFactory().createSelector(terms);
searchInternal(uid, terms, collector, selector, hitStats, timeoutMillis);

if(rollingEnabled) {
long timeout = Math.max(1, timeoutMillis - hitStats.tick());
rollingTypeahead.search(uid, terms, collector, timeout, hitStats);
}

hitStats.stop();
log(logger, uid, terms, hitStats);
return collector;
Expand Down Expand Up @@ -340,103 +298,69 @@ public synchronized void flush() throws IOException {
* @throws Exception
*/
protected void updateConnectionStore(E oldElement, E newElement) throws Exception {
if(rollingEnabled && oldElement == null) {
rollingTypeahead.offer(newElement);
} else {
long scn = newElement.getTimestamp();
int elemId = newElement.getElementId();

if(oldElement == null) {
// Insert operation
for(String term : newElement.getTerms()) {
int len = Math.min(term.length(), maxKeyLength);
for(int i = 1; i <= len; i++) {
String source = term.substring(0, i);
long scn = newElement.getTimestamp();
int elemId = newElement.getElementId();

if(oldElement == null) {
// Insert operation
Set<String> prefixes = new HashSet<String>();
for(String term : newElement.getTerms()) {
int len = Math.min(term.length(), maxKeyLength);
for(int i = 1; i <= len; i++) {
String source = term.substring(0, i);
if(prefixes.add(source)) {
connectionsStore.addConnection(source, elemId, scn);
}
}
} else if(newElement.getTimestamp() >= getHWMark()) {
// Update operation
Set<String> oldPrefixes = new HashSet<String>();
Set<String> newPrefixes = new HashSet<String>();

for(String term : oldElement.getTerms()) {
int len = Math.min(term.length(), maxKeyLength);
for(int i = 1; i <= len; i++) {
String source = term.substring(0, i);
oldPrefixes.add(source);
}
}

for(String term : newElement.getTerms()) {
int len = Math.min(term.length(), maxKeyLength);
for(int i = 1; i <= len; i++) {
String source = term.substring(0, i);
newPrefixes.add(source);
}
}

// Calculate intersection
Set<String> commonPrefixes = new HashSet<String>();
commonPrefixes.addAll(oldPrefixes);
commonPrefixes.retainAll(newPrefixes);

newPrefixes.removeAll(commonPrefixes);
for(String source : newPrefixes) {
connectionsStore.addConnection(source, elemId, scn);
}
} else if(newElement.getTimestamp() >= getHWMark()) {
// Update operation
Set<String> oldPrefixes = new HashSet<String>();
Set<String> newPrefixes = new HashSet<String>();

for(String term : oldElement.getTerms()) {
int len = Math.min(term.length(), maxKeyLength);
for(int i = 1; i <= len; i++) {
String source = term.substring(0, i);
oldPrefixes.add(source);
}

oldPrefixes.removeAll(commonPrefixes);
for(String source : oldPrefixes) {
connectionsStore.removeConnection(source, elemId, scn);
}

for(String term : newElement.getTerms()) {
int len = Math.min(term.length(), maxKeyLength);
for(int i = 1; i <= len; i++) {
String source = term.substring(0, i);
newPrefixes.add(source);
}
} else {
logger.info("ignored element: " + newElement);
}
}
}

public synchronized void setRollingEnabled(boolean rollingEnabled) {
if(this.rollingEnabled != rollingEnabled) {
if(rollingEnabled) {
rollingTypeahead.open();
logger.info("rolling enabled");
} else {
rollingTypeahead.close();
logger.info("rolling disabled");

// Calculate intersection
Set<String> commonPrefixes = new HashSet<String>();
commonPrefixes.addAll(oldPrefixes);
commonPrefixes.retainAll(newPrefixes);

newPrefixes.removeAll(commonPrefixes);
for(String source : newPrefixes) {
connectionsStore.addConnection(source, elemId, scn);
}
this.rollingEnabled = rollingEnabled;

try {
flush();
} catch (IOException e) {
logger.error("failed to flush indexes", e);
oldPrefixes.removeAll(commonPrefixes);
for(String source : oldPrefixes) {
connectionsStore.removeConnection(source, elemId, scn);
}
} else {
logger.info("ignored element: " + newElement);
}
}

public boolean isRollingEnabled() {
return rollingEnabled;
}

@Override
public synchronized void sync() throws IOException {
// Calls are ordered
if(rollingEnabled) {
rollingTypeahead.drain();
}

elementStore.sync();
connectionsStore.sync();
}

@Override
public synchronized void persist() throws IOException {
// Calls are ordered
if(rollingEnabled) {
rollingTypeahead.drain();
}

elementStore.persist();
connectionsStore.persist();
}
Expand All @@ -445,12 +369,20 @@ public synchronized void refresh() throws IOException {
int counter = 0;
long startTime = System.currentTimeMillis();

// Sync with roller first
// Sync first
sync();

// Re-score all elements for every connection source
List<E> list = new ArrayList<E>(10000);
ElementScoreCmpDsc scoreCmpDsc = new ElementScoreCmpDsc();
Comparator<E> scoreCmpDsc = new Comparator<E>() {
@Override
public int compare(E e0, E e1) {
float score0 = e0.getScore();
float score1 = e1.getScore();
// Descending order
return score0 < score1 ? 1 : (score0 == score1 ? (e0.getElementId() - e1.getElementId()) : -1);
}
};

Iterator<String> iter = connectionsStore.sourceIterator();
while(iter.hasNext()) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/cleo/search/typeahead/MultiTypeahead.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public Collector<E> search(int uid, String[] terms, Collector<E> collector) {
return search(uid, terms, collector, Long.MAX_VALUE);
}

@Override
@Override @SuppressWarnings("unchecked")
public Collector<E> search(int uid, String[] terms, Collector<E> collector, long timeoutMillis) {
List<TypeaheadTask<E>> taskList = new ArrayList<TypeaheadTask<E>>(typeaheads.size());
MultiSourceCollector<E> multiCollector = null;
Expand Down

This file was deleted.

This file was deleted.

0 comments on commit 6c9d7f3

Please sign in to comment.