Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: batch ner #1541

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ public Pair(T1 fst, T2 snd) {
second = snd;
}

public static <T1, T2> Pair<T1, T2> of(T1 fst, T2 snd) {
return new Pair<>(fst, snd);
}

public T1 _1() { return first; }
public T2 _2() { return second; }

Expand All @@ -27,12 +31,10 @@ public int hashCode() {

@Override
public boolean equals(Object o) {
if ( ! (o instanceof Pair) ) {
if ( ! (o instanceof Pair<?, ?> objPair) ) {
return false;
}
Pair objPair = (Pair) o;
return first .equals(objPair._1()) &&
second.equals(objPair._2());
return first .equals(objPair._1()) && second.equals(objPair._2());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ public static List<NamedEntity> allFrom(String text, Annotations annotations) {
}

public static NamedEntity from(String text, NlpTag tag, Annotations annotations) {
String mention = ThrowingFunctions.removeNewLines.apply(text.substring(tag.getBegin(), tag.getEnd()));
return NamedEntity.create(tag.getCategory(), mention, List.of((long) tag.getBegin()),
String mention = ThrowingFunctions.removeNewLines.apply(text.substring(tag.begin(), tag.end()));
return NamedEntity.create(tag.category(), mention, List.of((long) tag.begin()),
annotations.documentId, annotations.rootId, annotations.pipelineType, annotations.language
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface Indexer extends Closeable {
<T extends Entity> boolean bulkUpdate(String indexName, List<T> entities) throws IOException;
<T extends Entity> void add(String indexName, T obj) throws IOException;
<T extends Entity> void update(String indexName, T obj) throws IOException;
<T extends Entity> boolean exists(String indexName, String id) throws IOException;
boolean exists(String indexName, String id) throws IOException;

<T extends Entity> T get(String indexName, String id);
<T extends Entity> T get(String indexName, String id, List<String> sourceExcludes);
Expand Down Expand Up @@ -61,9 +61,11 @@ interface Searcher {
Searcher withoutSource(String... fields);
Searcher withSource(boolean source);
Searcher limit(int maxCount);
Searcher sort(String field, SortOrder order);
void clearScroll() throws IOException;
long totalHits();
Searcher with(int fuzziness, boolean phraseMatches);
enum SortOrder { ASC, DESC }
}

interface QueryBuilderSearcher extends Searcher {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.icij.datashare.text.nlp;

import org.icij.datashare.PropertiesProvider;
import org.icij.datashare.text.Document;
import org.icij.datashare.text.Language;
import org.icij.datashare.text.NamedEntity;
import org.slf4j.Logger;
Expand Down Expand Up @@ -65,11 +64,6 @@ public boolean initialize(Language language) throws InterruptedException {
return true;
}

/**
* Apply all specified stages/annotators on input
* @param doc is the document source to process */
public abstract List<NamedEntity> process(Document doc) throws InterruptedException;

/**
* Post-processing operations
*/
Expand Down
30 changes: 10 additions & 20 deletions datashare-api/src/main/java/org/icij/datashare/text/nlp/NlpTag.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,24 @@

import static java.util.Comparator.comparingInt;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Comparator;
import org.icij.datashare.text.NamedEntity;

public class NlpTag {
public record NlpTag(int begin, int end, NamedEntity.Category category) {

public static final Comparator<NlpTag> comparator = comparingInt(NlpTag::getBegin);
public static final Comparator<NlpTag> comparator = comparingInt(NlpTag::begin);

private final int begin;
private final int end;
private final NamedEntity.Category category;


NlpTag(int begin, int end, NamedEntity.Category category) {
@JsonCreator
public NlpTag(
@JsonProperty("begin") int begin,
@JsonProperty("start") int end,
@JsonProperty("category") NamedEntity.Category category
) {
this.begin = begin;
this.end = end;
this.category = category;
}

public int getBegin() {
return begin;
}

public int getEnd() {
return end;
}

public NamedEntity.Category getCategory() {
return category;
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package org.icij.datashare.text.nlp;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Stream;
import org.icij.datashare.reflect.EnumTypeToken;
import org.icij.datashare.text.Document;
import org.icij.datashare.text.Language;
import org.icij.datashare.text.NamedEntity;

import java.nio.charset.Charset;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -67,6 +73,10 @@ public static Set<Pipeline.Type> parseAll(final String comaSeparatedTypes) {
return comaSeparatedTypes == null || comaSeparatedTypes.isEmpty() ? new HashSet<>():
stream(comaSeparatedTypes.split(",")).map(Type::valueOf).collect(Collectors.toSet());
}

public boolean extractFromDoc() {
return this == Type.EMAIL;
}
}

enum Property {
Expand Down Expand Up @@ -96,8 +106,24 @@ public String getName() {

boolean initialize(Language language) throws InterruptedException;

List<NamedEntity> process(Document doc) throws InterruptedException;
List<NamedEntity> process(Document doc, int contentLength, int contentOffset) throws InterruptedException;
default List<NamedEntity> processDoc(Document doc) throws InterruptedException {
return processDoc(doc, doc.getContentTextLength(), 0);
}

default List<NamedEntity> processDoc(Document doc, int contentLength, int contentOffset) throws InterruptedException {
Annotations annotations = new Annotations(doc.getId(), this.getType(), doc.getLanguage());
String docContent = doc.getContent();
this.processText(Stream.of(docContent.substring(contentOffset, contentOffset + contentLength)), doc.getLanguage())
.get(0)
.forEach(tag -> {
int begin = tag.begin() + contentOffset;
int end = tag.end() + contentOffset;
annotations.add(begin, end, tag.category());
});
return NamedEntity.allFrom(docContent, annotations);
}

List<List<NlpTag>> processText(Stream<String> batch, Language language) throws InterruptedException;

void terminate(Language language) throws InterruptedException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,61 +73,62 @@ public void setUp() {
loader = new ExtensionLoader(folder.getRoot().toPath());
}

String EXTENSION_PIPELINE_SOURCE = "package org.icij.datashare.text.nlp.test;\n" +
"\n" +
"import org.icij.datashare.PropertiesProvider;\n" +
"import org.icij.datashare.text.Document;\n" +
"import org.icij.datashare.text.Language;\n" +
"import org.icij.datashare.text.NamedEntity;\n" +
"import org.icij.datashare.text.nlp.Annotations;\n" +
"import org.icij.datashare.text.nlp.Pipeline;\n" +
"\n" +
"import java.nio.charset.Charset;\n" +
"import java.util.List;\n" +
"import java.util.Optional;\n" +
"\n" +
"public class ExtensionPipeline implements Pipeline {\n" +
" @Override\n" +
" public Type getType() {\n" +
" return Type.TEST;\n" +
" }\n" +
"\n" +
" public ExtensionPipeline(PropertiesProvider provider) {}\n" +
" @Override\n" +
" public boolean initialize(Language language) throws InterruptedException {\n" +
" return false;\n" +
" }\n" +
" @Override\n" +
" public List<NamedEntity> process(Document doc) throws InterruptedException {\n" +
" return null;\n" +
" }\n" +
" @Override\n" +
" public List<NamedEntity> process(Document doc, int contentLength, int offset) throws InterruptedException {\n" +
" return null;\n" +
" }\n" +
" @Override\n" +
" public void terminate(Language language) throws InterruptedException {\n" +
"\n" +
" }\n" +
"\n" +
" @Override\n" +
" public boolean supports(Language language) {\n" +
" return false;\n" +
" }\n" +
"\n" +
" @Override\n" +
" public List<NamedEntity.Category> getTargetEntities() {\n" +
" return null;\n" +
" }\n" +
"\n" +
" @Override\n" +
" public boolean isCaching() {\n" +
" return false;\n" +
" }\n" +
"\n" +
" @Override\n" +
" public Charset getEncoding() {\n" +
" return null;\n" +
" }\n" +
"}\n";
String EXTENSION_PIPELINE_SOURCE = """
package org.icij.datashare.text.nlp.test;

import org.icij.datashare.PropertiesProvider;
import org.icij.datashare.text.Document;
import org.icij.datashare.text.Language;
import org.icij.datashare.text.NamedEntity;
import org.icij.datashare.text.nlp.Annotations;
import org.icij.datashare.text.nlp.Pipeline;
import org.icij.datashare.text.nlp.NlpTag;

import java.nio.charset.Charset;
import java.util.List;
import java.util.stream.Stream;
import java.util.Optional;

public class ExtensionPipeline implements Pipeline {
@Override
public Type getType() {
return Type.TEST;
}

public ExtensionPipeline(PropertiesProvider provider) {}
@Override
public boolean initialize(Language language) throws InterruptedException {
return false;
}
@Override
public List<List<NlpTag>> processText(Stream<String> batch, Language language) {
return null;
}

@Override
public void terminate(Language language) throws InterruptedException {

}

@Override
public boolean supports(Language language) {
return false;
}

@Override
public List<NamedEntity.Category> getTargetEntities() {
return null;
}

@Override
public boolean isCaching() {
return false;
}

@Override
public Charset getEncoding() {
return null;
}
}
""";
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package org.icij.datashare.text.nlp.test;

import java.util.stream.Stream;
import org.icij.datashare.PropertiesProvider;
import org.icij.datashare.text.Document;
import org.icij.datashare.text.Language;
import org.icij.datashare.text.NamedEntity;
import org.icij.datashare.text.nlp.NlpTag;
import org.icij.datashare.text.nlp.Pipeline;

import java.nio.charset.Charset;
Expand All @@ -22,15 +23,9 @@ public boolean initialize(Language language) {
}

@Override
public List<NamedEntity> process(Document doc) {
public List<List<NlpTag>> processText(Stream<String> batch, Language language) {
return null;
}

@Override
public List<NamedEntity> process(Document doc, int contentLength, int contentOffset) {
return null;
}

@Override
public void terminate(Language language) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Set;
import java.util.Collection;
import java.util.Collections;
import java.util.stream.Stream;
import org.icij.datashare.PropertiesProvider;
import org.icij.datashare.text.Document;
import org.icij.datashare.text.Language;
Expand All @@ -19,6 +20,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.icij.datashare.text.nlp.NlpTag;

import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableSet;
Expand Down Expand Up @@ -87,19 +89,26 @@ public EmailPipeline(final PropertiesProvider propertiesProvider) {
}

@Override
public List<NamedEntity> process(Document doc) {
return process(doc, doc.getContentTextLength(), 0);
public List<List<NlpTag>> processText(Stream<String> batch, Language ignored) {
return batch.map(text -> {
Matcher matcher = pattern.matcher(text);
return matcher.results()
.map(r -> new NlpTag(matcher.start(), matcher.end(), NamedEntity.Category.EMAIL))
.toList();
}).toList();
}

@Override
public List<NamedEntity> process(Document doc, int contentLength, int contentOffset) {
Matcher matcher = pattern.matcher(doc.getContent().substring(contentOffset, Math.min(contentLength + contentOffset, doc.getContentTextLength())));
public List<NamedEntity> processDoc(Document doc, int contentLength, int contentOffset) {
String docContent = doc.getContent();
NamedEntitiesBuilder namedEntitiesBuilder = new NamedEntitiesBuilder(EMAIL, doc.getId(), doc.getLanguage()).withRoot(doc.getRootDocument());
while (matcher.find()) {
String email = matcher.group(0);
int start = matcher.start();
namedEntitiesBuilder.add(NamedEntity.Category.EMAIL, email, start + contentOffset);
}
String chunkContent = docContent.substring(contentOffset, Math.min(contentLength + contentOffset, doc.getContentTextLength()));
this.processText(Stream.of(chunkContent), doc.getLanguage())
.get(0)
.forEach(t -> {
String mention = chunkContent.substring(t.begin(), t.end());
namedEntitiesBuilder.add(NamedEntity.Category.EMAIL, mention, t.begin() + contentOffset);
});
List<NamedEntity> entities = namedEntitiesBuilder.build();
if ("message/rfc822".equals(doc.getContentType())) {
entities.addAll(processMetadata(doc));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public EnqueueFromIndexTask(final DocumentCollectionFactory<String> factory, fin
public Long call() throws Exception {
super.call();
Indexer.Searcher searcher = indexer.search(singletonList(projectName), Document.class)
.without(nlpPipeline).withSource("rootDocument").limit(scrollSize);
.without(nlpPipeline).withSource("rootDocument").limit(scrollSize)
.sort("language", Indexer.Searcher.SortOrder.ASC);
logger.info("resuming NLP name finding for index {} and {} with {} scroll and size of {} : {} documents found", projectName, nlpPipeline,
scrollDuration, scrollSize, searcher.totalHits());
List<? extends Entity> docsToProcess = searcher.scroll(scrollDuration).collect(toList());
Expand Down
Loading