Skip to content

Commit

Permalink
index all can now be load balanced #1757
Browse files Browse the repository at this point in the history
  • Loading branch information
pdurbin committed Mar 26, 2015
1 parent 10b1ef7 commit 317bcbf
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 103 deletions.
2 changes: 2 additions & 0 deletions scripts/search/index
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/bash
curl -s "http://localhost:8080/api/admin/index?numPartitions=$1&partitionIdToProcess=$2&previewOnly=$3"
14 changes: 14 additions & 0 deletions src/main/java/edu/harvard/iq/dataverse/DatasetServiceBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,20 @@ public List<Dataset> findAll() {
return em.createQuery("select object(o) from Dataset as o order by o.id").getResultList();
}

/**
* For docs, see the equivalent method on the DataverseServiceBean.
*/
public List<Dataset> findAllOrSubset(long numPartitions, long partitionId) {
if (numPartitions < 1) {
long saneNumPartitions = 1;
numPartitions = saneNumPartitions;
}
TypedQuery<Dataset> typedQuery = em.createQuery("SELECT OBJECT(o) FROM Dataset AS o WHERE MOD( o.id, :numPartitions) = :partitionId ORDER BY o.id", Dataset.class);
typedQuery.setParameter("numPartitions", numPartitions);
typedQuery.setParameter("partitionId", partitionId);
return typedQuery.getResultList();
}

/**
* @todo write this method for real. Don't just iterate through every single
* dataset! See https://redmine.hmdc.harvard.edu/issues/3988
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/edu/harvard/iq/dataverse/DataverseServiceBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,30 @@ public List<Dataverse> findAll() {
return em.createQuery("select object(o) from Dataverse as o order by o.name").getResultList();
}

/**
* @param numPartitions The number of partitions you intend to split the
* indexing job into. Perhaps you have three Glassfish servers and you'd
* like each one to operate on a subset of dataverses.
*
* @param partitionId Maybe "partitionId" is the wrong term but it's what we
* call in the (text) UI. If you've specified three partitions the three
* partitionIds are 0, 1, and 2. We do `dataverseId % numPartitions =
* partitionId` to figure out which partition the dataverseId falls into.
*
* @return All dataverses if you say numPartitions=1 and partitionId=0.
* Otherwise, a subset of dataverses.
*/
public List<Dataverse> findAllOrSubset(long numPartitions, long partitionId) {
if (numPartitions < 1) {
long saneNumPartitions = 1;
numPartitions = saneNumPartitions;
}
TypedQuery<Dataverse> typedQuery = em.createQuery("SELECT OBJECT(o) FROM Dataverse AS o WHERE MOD( o.id, :numPartitions) = :partitionId ORDER BY o.id", Dataverse.class);
typedQuery.setParameter("numPartitions", numPartitions);
typedQuery.setParameter("partitionId", partitionId);
return typedQuery.getResultList();
}

public List<Dataverse> findByOwnerId(Long ownerId) {
Query query = em.createQuery("select object(o) from Dataverse as o where o.owner.id =:ownerId order by o.name");
query.setParameter("ownerId", ownerId);
Expand Down
11 changes: 8 additions & 3 deletions src/main/java/edu/harvard/iq/dataverse/SuperUserPage.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import javax.enterprise.context.SessionScoped;
import javax.inject.Inject;
import javax.inject.Named;
import javax.json.JsonObjectBuilder;

@SessionScoped
@Named("SuperUserPage")
Expand All @@ -24,14 +25,15 @@ public class SuperUserPage implements java.io.Serializable {

private String indexAllStatus = "No status available";

private Future<String> indexAllFuture;
private Future<JsonObjectBuilder> indexAllFuture;

// modeled off http://docs.oracle.com/javaee/7/tutorial/ejb-async002.htm
public String getIndexAllStatus() {
if (indexAllFuture != null) {
if (indexAllFuture.isDone()) {
try {
indexAllStatus = indexAllFuture.get();
JsonObjectBuilder status = indexAllFuture.get();
indexAllStatus = status.build().toString();
} catch (ExecutionException | CancellationException | InterruptedException ex) {
indexAllStatus = ex.getCause().toString();
}
Expand All @@ -45,7 +47,10 @@ public String getIndexAllStatus() {
public void startIndexAll() {
User user = session.getUser();
if (user.isSuperuser()) {
indexAllFuture = indexAllService.indexAll();
long numPartitions = 1;
long partitionId = 0;
boolean previewOnly = false;
indexAllFuture = indexAllService.indexAllOrSubset(numPartitions, partitionId, previewOnly);
indexAllStatus = "Index all started...";
} else {
indexAllStatus = "Only a superuser can run index all";
Expand Down
81 changes: 61 additions & 20 deletions src/main/java/edu/harvard/iq/dataverse/api/Index.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import javax.ejb.EJBException;
import javax.json.Json;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.validation.ConstraintViolation;
import javax.validation.ConstraintViolationException;
Expand Down Expand Up @@ -46,14 +47,67 @@ public class Index extends AbstractApiBean {
DataFileServiceBean dataFileService;

@GET
public Response indexAll() {
public Response indexAllOrSubset(@QueryParam("numPartitions") Long numPartitionsSelected, @QueryParam("partitionIdToProcess") Long partitionIdToProcess, @QueryParam("previewOnly") boolean previewOnly) {
try {
long numPartitions = 1;
if (numPartitionsSelected != null) {
if (numPartitionsSelected < 1) {
return errorResponse(Status.BAD_REQUEST, "numPartitions must be 1 or higher but was " + numPartitionsSelected);
} else {
numPartitions = numPartitionsSelected;
}
}
List<Long> availablePartitionIds = new ArrayList<>();
for (long i = 0; i < numPartitions; i++) {
availablePartitionIds.add(i);
}

Response invalidParitionIdSelection = errorResponse(Status.BAD_REQUEST, "You specified " + numPartitions + " partition(s) and your selected partitionId was " + partitionIdToProcess + " but you must select from these availableParitionIds: " + availablePartitionIds);
if (partitionIdToProcess != null) {
long selected = partitionIdToProcess;
if (!availablePartitionIds.contains(selected)) {
return invalidParitionIdSelection;
}
} else if (numPartitionsSelected == null) {
/**
* The user has not specified a partitionId and hasn't specified
* the number of partitions. Run "index all", the whole thing.
*/
partitionIdToProcess = 0l;
} else {
return invalidParitionIdSelection;

}

JsonObjectBuilder args = Json.createObjectBuilder();
args.add("numPartitions", numPartitions);
args.add("partitionIdToProcess", partitionIdToProcess);
JsonArrayBuilder availablePartitionIdsBuilder = Json.createArrayBuilder();
for (long i : availablePartitionIds) {
availablePartitionIdsBuilder.add(i);
}

JsonObjectBuilder preview = indexAllService.indexAllOrSubsetPreview(numPartitions, partitionIdToProcess);
if (previewOnly) {
preview.add("args", args);
preview.add("availablePartitionIds", availablePartitionIdsBuilder);
return okResponse(preview);
}

JsonObjectBuilder response = Json.createObjectBuilder();
response.add("availablePartitionIds", availablePartitionIdsBuilder);
response.add("args", args);
/**
* @todo How can we expose the String returned from "index all" via
* the API?
*/
Future<String> indexAllFuture = indexAllService.indexAll();
return okResponse("A complete re-indexing has begun.");
Future<JsonObjectBuilder> indexAllFuture = indexAllService.indexAllOrSubset(numPartitions, partitionIdToProcess, previewOnly);
JsonObject workloadPreview = preview.build().getJsonObject("previewOfPartitionWorkload");
int dataverseCount = workloadPreview.getInt("dataverseCount");
int datasetCount = workloadPreview.getInt("datasetCount");
String status = "indexAllOrSubset has begun of " + dataverseCount + " dataverses and " + datasetCount + " datasets.";
response.add("message", status);
return okResponse(response);
} catch (EJBException ex) {
Throwable cause = ex;
StringBuilder sb = new StringBuilder();
Expand Down Expand Up @@ -176,25 +230,12 @@ public Response indexMissing() {
return okResponse("index missing started, Solr doc cleanup operations will be skipped");
}

@GET
@Path("partial")
public Response indexOffset(@QueryParam("startingPoint") int startingPoint, @QueryParam("offset") int offset) {
long numObjectToConsider = 100;
List<Long> dvObjectsIds = new ArrayList<>();
for (long i = 1; i <= numObjectToConsider; i++) {
dvObjectsIds.add(i);
}
List<Long> mine = IndexUtil.findDvObjectIdsToProcessEqualParts(dvObjectsIds, startingPoint, offset);
JsonObjectBuilder response = Json.createObjectBuilder();
response.add("startingPoint", startingPoint);
response.add("offset", offset);
response.add("mine", mine.toString());
return okResponse(response);
}

/**
* This is just a demo of the modular math logic we use for indexAll.
*/
@GET
@Path("mod")
public Response indexOffset(@QueryParam("partitions") long partitions, @QueryParam("which") long which) {
public Response indexMod(@QueryParam("partitions") long partitions, @QueryParam("which") long which) {
long numObjectToConsider = 100;
List<Long> dvObjectsIds = new ArrayList<>();
for (long i = 1; i <= numObjectToConsider; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.inject.Named;
import javax.json.Json;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObjectBuilder;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
Expand All @@ -38,30 +41,68 @@ public class IndexAllServiceBean {
SystemConfig systemConfig;

@Asynchronous
public Future<String> indexAll() {
long indexAllTimeBegin = System.currentTimeMillis();
String status;
SolrServer server = new HttpSolrServer("http://" + systemConfig.getSolrHostColonPort() + "/solr");
logger.info("attempting to delete all Solr documents before a complete re-index");
try {
server.deleteByQuery("*:*");// CAUTION: deletes everything!
} catch (SolrServerException | IOException ex) {
status = ex.toString();
logger.info(status);
return new AsyncResult<>(status);
public Future<JsonObjectBuilder> indexAllOrSubset(long numPartitions, long partitionId, boolean previewOnly) {
JsonObjectBuilder response = Json.createObjectBuilder();
Future<String> responseFromIndexAllOrSubset = indexAllOrSubset(numPartitions, partitionId);
String status = "indexAllOrSubset has begun";
response.add("responseFromIndexAllOrSubset", status);
return new AsyncResult<>(response);
}

public JsonObjectBuilder indexAllOrSubsetPreview(long numPartitions, long partitionId) {
JsonObjectBuilder response = Json.createObjectBuilder();
JsonObjectBuilder previewOfWorkload = Json.createObjectBuilder();
JsonObjectBuilder dvContainerIds = Json.createObjectBuilder();
JsonArrayBuilder dataverseIds = Json.createArrayBuilder();
List<Dataverse> dataverses = dataverseService.findAllOrSubset(numPartitions, partitionId);
for (Dataverse dataverse : dataverses) {
dataverseIds.add(dataverse.getId());
}
try {
server.commit();
} catch (SolrServerException | IOException ex) {
status = ex.toString();
logger.info(status);
return new AsyncResult<>(status);
JsonArrayBuilder datasetIds = Json.createArrayBuilder();
List<Dataset> datasets = datasetService.findAllOrSubset(numPartitions, partitionId);
for (Dataset dataset : datasets) {
datasetIds.add(dataset.getId());
}
dvContainerIds.add("dataverses", dataverseIds);
dvContainerIds.add("datasets", datasetIds);
previewOfWorkload.add("dvContainerIds", dvContainerIds);
previewOfWorkload.add("dataverseCount", dataverses.size());
previewOfWorkload.add("datasetCount", datasets.size());
previewOfWorkload.add("partitionId", partitionId);
response.add("previewOfPartitionWorkload", previewOfWorkload);
return response;
}

public Future<String> indexAllOrSubset(long numPartitions, long partitionId) {
long indexAllTimeBegin = System.currentTimeMillis();
String status;

String resultOfClearingIndexTimes;
if (numPartitions == 1) {
SolrServer server = new HttpSolrServer("http://" + systemConfig.getSolrHostColonPort() + "/solr");
logger.info("attempting to delete all Solr documents before a complete re-index");
try {
server.deleteByQuery("*:*");// CAUTION: deletes everything!
} catch (SolrServerException | IOException ex) {
status = ex.toString();
logger.info(status);
return new AsyncResult<>(status);
}
try {
server.commit();
} catch (SolrServerException | IOException ex) {
status = ex.toString();
logger.info(status);
return new AsyncResult<>(status);
}

int numRowsAffected = dvObjectService.clearAllIndexTimes();
String resultOfClearingIndexTimes = "Number of rows affected by clearAllIndexTimes: " + numRowsAffected + ".";
int numRowsAffected = dvObjectService.clearAllIndexTimes();
resultOfClearingIndexTimes = "Number of rows affected by clearAllIndexTimes: " + numRowsAffected + ".";
} else {
resultOfClearingIndexTimes = "Solr index was not cleared before indexing.";
}

List<Dataverse> dataverses = dataverseService.findAll();
List<Dataverse> dataverses = dataverseService.findAllOrSubset(numPartitions, partitionId);
int dataverseIndexCount = 0;
for (Dataverse dataverse : dataverses) {
dataverseIndexCount++;
Expand All @@ -70,7 +111,7 @@ public Future<String> indexAll() {
}

int datasetIndexCount = 0;
List<Dataset> datasets = datasetService.findAll();
List<Dataset> datasets = datasetService.findAllOrSubset(numPartitions, partitionId);
for (Dataset dataset : datasets) {
datasetIndexCount++;
logger.info("indexing dataset " + datasetIndexCount + " of " + datasets.size());
Expand Down
19 changes: 0 additions & 19 deletions src/main/java/edu/harvard/iq/dataverse/search/IndexUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,6 @@

public class IndexUtil {

public static List<Long> findDvObjectIdsToProcessEqualParts(List<Long> dvObjectIds, int startingPoint, int offset) {
if (startingPoint < 1) {
int saneStartingPoint = 1;
startingPoint = saneStartingPoint;
}
if (offset < 1) {
int saneOffset = 1;
offset = saneOffset;
}
List<Long> subsetToProcess = new ArrayList<>();
for (int i = startingPoint - 1; i < dvObjectIds.size(); i += offset) {
Long dvObjectId = dvObjectIds.get(i);
if (dvObjectId > 0) {
subsetToProcess.add(dvObjectId);
}
}
return subsetToProcess;
}

public static List<Long> findDvObjectIdsToProcessMod(List<Long> dvObjectIds, long mod, long which) {
List<Long> subsetToProcess = new ArrayList<>();
for (Long dvObjectId : dvObjectIds) {
Expand Down
40 changes: 0 additions & 40 deletions src/test/java/edu/harvard/iq/dataverse/search/IndexUtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,46 +36,6 @@ public void tearDown() {
@Test
public void testFindDvObjectIdsToProcess() {
// System.out.println("findDvObjectIdsToProcess");
// crazy input for starting point
assertEquals(Arrays.asList(1l, 2l, 3l), IndexUtil.findDvObjectIdsToProcessEqualParts(Arrays.asList(1l, 2l, 3l), 0, 1));
assertEquals(Arrays.asList(1l, 2l, 3l), IndexUtil.findDvObjectIdsToProcessEqualParts(Arrays.asList(1l, 2l, 3l), -1, 1));

// crazy input for offset
assertEquals(Arrays.asList(1l, 2l, 3l), IndexUtil.findDvObjectIdsToProcessEqualParts(Arrays.asList(1l, 2l, 3l), 1, 0));
assertEquals(Arrays.asList(1l, 2l, 3l), IndexUtil.findDvObjectIdsToProcessEqualParts(Arrays.asList(1l, 2l, 3l), 1, -1));

// crazy input for DvObjectIds
assertEquals(Arrays.asList(1l, 2l, 3l), IndexUtil.findDvObjectIdsToProcessEqualParts(Arrays.asList(0l, 1l, 2l, 3l), 1, 1));

// all
assertEquals(Arrays.asList(1l, 2l, 3l), IndexUtil.findDvObjectIdsToProcessEqualParts(Arrays.asList(1l, 2l, 3l), 1, 1));
// all, out of order
assertEquals(Arrays.asList(1l, 3l, 2l), IndexUtil.findDvObjectIdsToProcessEqualParts(Arrays.asList(1l, 3l, 2l), 1, 1));

// odd
assertEquals(Arrays.asList(1l, 3l), IndexUtil.findDvObjectIdsToProcessEqualParts(Arrays.asList(1l, 2l, 3l), 1, 2));
// even
assertEquals(Arrays.asList(2l), IndexUtil.findDvObjectIdsToProcessEqualParts(Arrays.asList(1l, 2l, 3l), 2, 2));

// offset 3
assertEquals(Arrays.asList(1l, 4l, 7l), IndexUtil.findDvObjectIdsToProcessEqualParts(Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l), 1, 3));
assertEquals(Arrays.asList(2l, 5l), IndexUtil.findDvObjectIdsToProcessEqualParts(Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l), 2, 3));
assertEquals(Arrays.asList(3l, 6l), IndexUtil.findDvObjectIdsToProcessEqualParts(Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l), 3, 3));

// offset 3, start at 3
assertEquals(Arrays.asList(3l, 6l), IndexUtil.findDvObjectIdsToProcessEqualParts(Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 8l), 3, 3));
assertEquals(Arrays.asList(4l, 7l), IndexUtil.findDvObjectIdsToProcessEqualParts(Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 8l), 4, 3));
assertEquals(Arrays.asList(5l, 8l), IndexUtil.findDvObjectIdsToProcessEqualParts(Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 8l), 5, 3));

// offset 4
assertEquals(Arrays.asList(1l, 5l), IndexUtil.findDvObjectIdsToProcessEqualParts(Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l), 1, 4));

// gaps
assertEquals(Arrays.asList(1l, 3l, 6l), IndexUtil.findDvObjectIdsToProcessEqualParts(Arrays.asList(1l, 2l, 3l, 5l, 6l, 7l), 1, 2));
assertEquals(Arrays.asList(1l, 5l), IndexUtil.findDvObjectIdsToProcessEqualParts(Arrays.asList(1l, 3l, 4l, 5l, 6l, 7l), 1, 3));
assertEquals(Arrays.asList(7l, 13l), IndexUtil.findDvObjectIdsToProcessEqualParts(Arrays.asList(2l, 3l, 4l, 5l, 7l, 8l, 10l, 13l), 5, 3));

// MOD VERSION
assertEquals(Arrays.asList(1l, 3l, 5l, 7l), IndexUtil.findDvObjectIdsToProcessMod(Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 8l), 2, 1));
assertEquals(Arrays.asList(2l, 4l, 6l, 8l), IndexUtil.findDvObjectIdsToProcessMod(Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 8l), 2, 0));

Expand Down

0 comments on commit 317bcbf

Please sign in to comment.