diff --git a/datashare-api/src/test/java/org/icij/datashare/PipelineHelperTest.java b/datashare-api/src/test/java/org/icij/datashare/PipelineHelperTest.java index 59fa0cf51..bf4df7b91 100644 --- a/datashare-api/src/test/java/org/icij/datashare/PipelineHelperTest.java +++ b/datashare-api/src/test/java/org/icij/datashare/PipelineHelperTest.java @@ -52,7 +52,7 @@ public void test_get_queue_names_for_batch_nlp_pipeline() { @Test public void test_get_queue_names_for_batch_nlp_pipeline_from_index() { PipelineHelper pipelineHelper = new PipelineHelper(new PropertiesProvider(new HashMap<>() {{ - put("stages", "BATCHENQUEUEIDX,BATCHNLP"); + put("stages", "CREATENLPBATCHESFROMINDEX,BATCHNLP"); }})); assertThat(pipelineHelper.getQueueNameFor(Stage.BATCHNLP)).isEqualTo("extract:queue:batchnlp"); } diff --git a/datashare-app/src/main/java/org/icij/datashare/tasks/BatchNlpTask.java b/datashare-app/src/main/java/org/icij/datashare/tasks/BatchNlpTask.java index 6892bfd6c..f0199bbfd 100644 --- a/datashare-app/src/main/java/org/icij/datashare/tasks/BatchNlpTask.java +++ b/datashare-app/src/main/java/org/icij/datashare/tasks/BatchNlpTask.java @@ -6,6 +6,7 @@ import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; import java.util.List; +import java.util.Optional; import java.util.function.Function; import org.icij.datashare.asynctasks.CancellableTask; import org.icij.datashare.asynctasks.Task; @@ -66,7 +67,7 @@ public Long call() throws Exception { // TODO: for now None of the Java NER seems to support batch processing, we just iterate docs one by one // TODO: we could improve perfs by fetching docs and processing them concurrently... int nProcessed = 0; - this.progress.apply(0.0); + Optional.ofNullable(this.progress).ifPresent(p -> p.apply(0.0)); for (CreateNlpBatchesFromIndex.BatchDocument doc : this.docs) { String project = doc.project(); Document indexDoc = indexer.get(doc.id(), doc.rootDocument(), EXCLUDED_SOURCES); @@ -87,11 +88,12 @@ public Long call() throws Exception { } nProcessed += 1; if (nProcessed % updateRate == 0) { - this.progress.apply((double) nProcessed / (double) batchSize); + Double prog = (double) nProcessed / (double) batchSize; + Optional.ofNullable(this.progress).ifPresent(p -> p.apply(prog)); } } pipeline.terminate(language); - this.progress.apply(1.0); + Optional.ofNullable(this.progress).ifPresent(p -> p.apply(1.0)); return (long) batchSize; }