Skip to content

Commit

Permalink
implement tracking/locking for reindex
Browse files Browse the repository at this point in the history
  • Loading branch information
mdavis95 committed Oct 31, 2019
1 parent 99426e1 commit 4f44e41
Showing 1 changed file with 33 additions and 2 deletions.
35 changes: 33 additions & 2 deletions zulia-server/src/main/java/io/zulia/server/index/ZuliaShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.bson.Document;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.logging.Logger;

public class ZuliaShard {
Expand All @@ -38,6 +40,9 @@ public class ZuliaShard {

private final boolean primary;

private String trackingId;
private HashSet<String> trackedIds;

public ZuliaShard(ShardWriteManager shardWriteManager, boolean primary) throws Exception {

this.primary = primary;
Expand Down Expand Up @@ -95,16 +100,26 @@ public void tryIdleCommit() throws IOException {
}

public void reindex() throws IOException {

final String myTrackingId = UUID.randomUUID().toString();
synchronized (this) {
trackingId = myTrackingId;
trackedIds = new HashSet<>();
}

shardReaderManager.maybeRefreshBlocking();
ShardReader shardReader = shardReaderManager.acquire();

try {
shardReader.streamAllDocs(d -> {
if (!myTrackingId.equals(trackingId)) {
throw new RuntimeException("Reindex interrupted by another reindex");
}

try {
IndexableField f = d.getField(ZuliaConstants.TIMESTAMP_FIELD);
long timestamp = f.numericValue().longValue();

ZuliaQuery.ScoredResult.Builder srBuilder = ZuliaQuery.ScoredResult.newBuilder();
String uniqueId = d.get(ZuliaConstants.ID_FIELD);

Document mongoDocument = null;
Expand All @@ -120,12 +135,20 @@ public void reindex() throws IOException {
mongoDocument = ZuliaUtil.byteArrayToMongoDocument(docRef.bytes);
}

shardWriteManager.indexDocument(uniqueId, timestamp, mongoDocument, metadata);
if (!trackedIds.contains(uniqueId)) {
shardWriteManager.indexDocument(uniqueId, timestamp, mongoDocument, metadata);
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
});
synchronized (this) {
if (myTrackingId.equals(trackingId)) {
trackingId = null;
trackedIds = new HashSet<>();
}
}
forceCommit();
}
finally {
Expand All @@ -142,6 +165,10 @@ public void index(String uniqueId, long timestamp, org.bson.Document mongoDocume
throw new IllegalStateException("Cannot index document <" + uniqueId + "> from replica: index <" + indexName + "> shard <" + shardNumber + ">");
}

if (trackingId != null) {
trackedIds.add(uniqueId);
}

shardWriteManager.indexDocument(uniqueId, timestamp, mongoDocument, metadata);
if (shardWriteManager.markedChangedCheckIfCommitNeeded()) {
forceCommit();
Expand All @@ -154,6 +181,10 @@ public void deleteDocument(String uniqueId) throws Exception {
throw new IllegalStateException("Cannot delete document <" + uniqueId + "> from replica: index <" + indexName + "> shard <" + shardNumber + ">");
}

if (trackingId != null) {
trackedIds.add(uniqueId);
}

shardWriteManager.deleteDocuments(uniqueId);
if (shardWriteManager.markedChangedCheckIfCommitNeeded()) {
forceCommit();
Expand Down

0 comments on commit 4f44e41

Please sign in to comment.