Skip to content

Commit

Permalink
Es7 10 0 (#348)
Browse files Browse the repository at this point in the history
* update gradle props

* fix for #340

* wip part2, gradlew test is failing for reworked LoggingFetchSubPhase

* Fix testConventions error with gradle clean check

* Fix doLog test

* Fix other failing test

* Update note around sketchy code

* Remove commented code

* Add support for FetchSubPhaseProcessor and inject IndexNameExpressionResolver

Co-authored-by: Nathan Day <[email protected]>
Co-authored-by: David Causse <[email protected]>
  • Loading branch information
3 people authored Dec 14, 2020
1 parent fdfd024 commit 81f1e2d
Show file tree
Hide file tree
Showing 22 changed files with 92 additions and 88 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,5 @@ eclipse-build
!/.settings/org.eclipse.core.resources.prefs
!/.settings/org.eclipse.jdt.core.prefs
!/.settings/org.eclipse.jdt.ui.prefs

bin/*
4 changes: 2 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ As a practice, we maintain a branch per Elasticsearch version, prefixed by ES. S

### Upgrading to latest Elasticsearch major/minor version (ie `es_6_7`)

Upgrading to the latest ES version is a great way to help us out, and get your feet wet. The first step is to open build.gradle, and change the ES version numbers in the dependencies to the version you wish to upgrade to. We recommend trying to build, (it'll likely fail) making a branch name with the es version number (ie `es_6_7`), and then sending us a "Work in Progress" PR to master. This will let us rally around the (sometimes annoyingly painful) upgrade as a team.
Upgrading to the latest ES version is a great way to help us out, and get your feet wet. The first step is to open `gradle.properties`, and change the ES/Lucene version numbers in the dependencies to the version you wish to upgrade to. We recommend trying to build, (it'll likely fail) making a branch name with the es version number (ie `es_6_7`), and then sending us a "Work in Progress" PR to master. This will let us rally around the (sometimes annoyingly painful) upgrade as a team.

#### Gradlew Wrapper and Java Upgrade

Expand Down Expand Up @@ -132,4 +132,4 @@ Most people come to this plugin, unfamiliar with the basic Learning to Rank work

# Other questions? Get in touch!

Please [email Doug Turnbull](mailto:[email protected]) to ask any questions about contributing not covered by this document. If there's a "bug" in this document, please feel free to file an issue.
Please open an issue to ask any questions about contributing not covered by this document. If there's a "bug" in this document, please feel free to file PR, typos are the wosrt.
12 changes: 12 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ allprojects {

apply plugin: 'idea'
apply plugin: 'elasticsearch.esplugin'
apply plugin: 'elasticsearch.java-rest-test'
apply plugin: 'elasticsearch.rest-resources'

// license of this project
Expand Down Expand Up @@ -69,6 +70,13 @@ dependencyLicenses {
mapping from: /compiler-.*/, to: 'lucene'
}

sourceSets {
javaRestTest {
compileClasspath += sourceSets["main"].output + sourceSets["test"].output + configurations["testRuntimeClasspath"]
runtimeClasspath += output + compileClasspath
}
}

// Set to false to not use elasticsearch checkstyle rules
checkstyleMain.enabled = true
checkstyleTest.enabled = true
Expand All @@ -84,3 +92,7 @@ licenseHeaders.enabled = false

// No need to validate POM, as we do not upload to sonatype
validateNebulaPom.enabled = false

// Elastic tried to remove the logging requirement for plugins, but didn't get it quite right so this is a short term fix until 7.11
// https://github.com/elastic/elasticsearch/issues/65247
loggerUsageCheck.enabled = false
8 changes: 4 additions & 4 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ltrVersion = 1.5.3
elasticsearchVersion = 7.9.3
luceneVersion = 8.6.2
ow2Version = 7.2
ltrVersion = 1.5.4
elasticsearchVersion = 7.10.0
luceneVersion = 8.7.0
ow2Version = 8.0.1
antlrVersion=4.5.1-1
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.5-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.6.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
8 changes: 4 additions & 4 deletions src/main/java/com/o19s/es/ltr/LtrQueryParserPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -256,17 +256,17 @@ public Collection<Object> createComponents(Client client,
}
}
});
return asList(caches, parserFactory, getStats(client, clusterService));
return asList(caches, parserFactory, getStats(client, clusterService, indexNameExpressionResolver));
}

private LTRStats getStats(Client client, ClusterService clusterService) {
private LTRStats getStats(Client client, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver) {
Map<String, LTRStat> stats = new HashMap<>();
stats.put(StatName.CACHE.getName(),
new LTRStat(false, new CacheStatsOnNodeSupplier(caches)));
stats.put(StatName.STORES.getName(),
new LTRStat(true, new StoreStatsSupplier(client, clusterService)));
new LTRStat(true, new StoreStatsSupplier(client, clusterService, indexNameExpressionResolver)));
stats.put(StatName.PLUGIN_STATUS.getName(),
new LTRStat(true, new PluginHealthStatusSupplier(clusterService)));
new LTRStat(true, new PluginHealthStatusSupplier(clusterService, indexNameExpressionResolver)));
return new LTRStats(unmodifiableMap(stats));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -48,6 +47,7 @@
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.Streams;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down
101 changes: 40 additions & 61 deletions src/main/java/com/o19s/es/ltr/logging/LoggingFetchSubPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,44 +23,41 @@
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.BoostQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.fetch.FetchContext;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.fetch.subphase.InnerHitsContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.fetch.FetchSubPhaseProcessor;
import org.elasticsearch.search.rescore.QueryRescorer;
import org.elasticsearch.search.rescore.RescoreContext;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class LoggingFetchSubPhase implements FetchSubPhase {
@Override
public void hitsExecute(SearchContext context, SearchHit[] hits) throws IOException {
public FetchSubPhaseProcessor getProcessor(FetchContext context) throws IOException {
LoggingSearchExtBuilder ext = (LoggingSearchExtBuilder) context.getSearchExt(LoggingSearchExtBuilder.NAME);
if (ext == null) {
return;
return null;
}

// Use a boolean query with all the models to log
// This way we reuse existing code to advance through multiple scorers/iterators
BooleanQuery.Builder builder = new BooleanQuery.Builder();
List<HitLogConsumer> loggers = new ArrayList<>();
Map<String, Query> namedQueries = context.parsedQuery().namedFilters();
if (!(context instanceof InnerHitsContext.InnerHitSubContext)) {


if (namedQueries.size() > 0) {
ext.logSpecsStream().filter((l) -> l.getNamedQuery() != null).forEach((l) -> {
Tuple<RankerQuery, HitLogConsumer> query = extractQuery(l, namedQueries);
builder.add(new BooleanClause(query.v1(), BooleanClause.Occur.MUST));
Expand All @@ -74,58 +71,15 @@ public void hitsExecute(SearchContext context, SearchHit[] hits) throws IOExcept
});
}

doLog(builder.build(), loggers, context.searcher(), hits);

}

void doLog(Query query, List<HitLogConsumer> loggers, IndexSearcher searcher, SearchHit[] hits) throws IOException {
// Reorder hits by id so we can scan all the docs belonging to the same
// segment by reusing the same scorer.
SearchHit[] reordered = new SearchHit[hits.length];
System.arraycopy(hits, 0, reordered, 0, hits.length);
Arrays.sort(reordered, Comparator.comparingInt(SearchHit::docId));

int hitUpto = 0;
int readerUpto = -1;
int endDoc = 0;
int docBase = 0;
Scorer scorer = null;
Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE, 1F);
// Loop logic borrowed from lucene QueryRescorer
while (hitUpto < reordered.length) {
SearchHit hit = reordered[hitUpto];
int docID = hit.docId();
loggers.forEach((l) -> l.nextDoc(hit));
LeafReaderContext readerContext = null;
while (docID >= endDoc) {
readerUpto++;
readerContext = searcher.getTopReaderContext().leaves().get(readerUpto);
endDoc = readerContext.docBase + readerContext.reader().maxDoc();
}

if (readerContext != null) {
// We advanced to another segment:
docBase = readerContext.docBase;
scorer = weight.scorer(readerContext);
}

if (scorer != null) {
int targetDoc = docID - docBase;
int actualDoc = scorer.docID();
if (actualDoc < targetDoc) {
actualDoc = scorer.iterator().advance(targetDoc);
}
if (actualDoc == targetDoc) {
// Scoring will trigger log collection
scorer.score();
}
}
Weight w = context.searcher().rewrite(builder.build()).createWeight(context.searcher(), ScoreMode.COMPLETE, 1.0F);

hitUpto++;
}
return new LoggingFetchSubPhaseProcessor(w, loggers);
}

private Tuple<RankerQuery, HitLogConsumer> extractQuery(LoggingSearchExtBuilder.LogSpec logSpec, Map<String, Query> namedQueries) {
private Tuple<RankerQuery, HitLogConsumer> extractQuery(LoggingSearchExtBuilder.LogSpec
logSpec, Map<String, Query> namedQueries) {
Query q = namedQueries.get(logSpec.getNamedQuery());
if (q == null) {
throw new IllegalArgumentException("No query named [" + logSpec.getNamedQuery() + "] found");
Expand Down Expand Up @@ -172,6 +126,31 @@ private Tuple<RankerQuery, HitLogConsumer> toLogger(LoggingSearchExtBuilder.LogS
query = query.toLoggerQuery(consumer);
return new Tuple<>(query, consumer);
}
static class LoggingFetchSubPhaseProcessor implements FetchSubPhaseProcessor {
private final Weight weight;
private final List<HitLogConsumer> loggers;
private Scorer scorer;

LoggingFetchSubPhaseProcessor(Weight weight, List<HitLogConsumer> loggers) {
this.weight = weight;
this.loggers = loggers;
}


@Override
public void setNextReader(LeafReaderContext readerContext) throws IOException {
scorer = weight.scorer(readerContext);
}

@Override
public void process(HitContext hitContext) throws IOException {
if (scorer != null && scorer.iterator().advance(hitContext.docId()) == hitContext.docId()) {
loggers.forEach((l) -> l.nextDoc(hitContext.hit()));
// Scoring will trigger log collection
scorer.score();
}
}
}

static class HitLogConsumer implements LogLtrRanker.LogConsumer {
private static final String FIELD_NAME = "_ltrlog";
Expand All @@ -191,7 +170,7 @@ static class HitLogConsumer implements LogLtrRanker.LogConsumer {
// ]
private List<Map<String, Object>> currentLog;
private SearchHit currentHit;
private Map<String,Object> extraLogging;
private Map<String, Object> extraLogging;


HitLogConsumer(String name, FeatureSet set, boolean missingAsZero) {
Expand Down Expand Up @@ -226,14 +205,14 @@ public void accept(int featureOrdinal, float score) {

/**
* Return Map to store additional logging information returned with the feature values.
*
* <p>
* The Map is created on first access.
*/
@Override
public Map<String,Object> getExtraLoggingMap() {
public Map<String, Object> getExtraLoggingMap() {
if (extraLogging == null) {
extraLogging = new HashMap<>();
Map<String,Object> logEntry = new HashMap<>();
Map<String, Object> logEntry = new HashMap<>();
logEntry.put("name", EXTRA_LOGGING_NAME);
logEntry.put("value", extraLogging);
currentLog.add(logEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.ThreadContext;

import java.util.Arrays;
import java.util.Locale;
Expand All @@ -21,9 +22,10 @@ public class PluginHealthStatusSupplier implements Supplier<String> {
private final ClusterService clusterService;
private final IndexNameExpressionResolver indexNameExpressionResolver;

public PluginHealthStatusSupplier(ClusterService clusterService) {
public PluginHealthStatusSupplier(ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver) {
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver();
ThreadContext threadContext = new ThreadContext(clusterService.getSettings());
this.indexNameExpressionResolver = indexNameExpressionResolver;
}

// currently it combines the store statuses to get the overall health
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ public String getName() {
}
}

public StoreStatsSupplier(Client client, ClusterService clusterService) {
public StoreStatsSupplier(Client client, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver) {
this.client = client;
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver();
this.indexNameExpressionResolver = indexNameExpressionResolver;
}

@Override
Expand Down
Loading

0 comments on commit 81f1e2d

Please sign in to comment.