Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce heap-memory usage of ingest-geoip plugin #28963

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/plugins/ingest-geoip.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ The ingest-geoip plugin ships by default with the GeoLite2 City, GeoLite2 Countr
under the CCA-ShareAlike 4.0 license. For more details see, http://dev.maxmind.com/geoip/geoip2/geolite2/

The GeoIP processor can run with other geoip2 databases from Maxmind. The files must be copied into the geoip config directory,
and the `database_file` option should be used to specify the filename of the custom database. Custom database files must be compressed
with gzip. The geoip config directory is located at `$ES_HOME/config/ingest-geoip` and holds the shipped databases too.
and the `database_file` option should be used to specify the filename of the custom database. Custom database files must be stored
uncompressed. The geoip config directory is located at `$ES_HOME/config/ingest-geoip` and holds the shipped databases too.

:plugin_name: ingest-geoip
include::install_remove.asciidoc[]
Expand All @@ -25,7 +25,7 @@ include::install_remove.asciidoc[]
| Name | Required | Default | Description
| `field` | yes | - | The field to get the ip address from for the geographical lookup.
| `target_field` | no | geoip | The field that will hold the geographical information looked up from the Maxmind database.
| `database_file` | no | GeoLite2-City.mmdb | The database filename in the geoip config directory. The ingest-geoip plugin ships with the GeoLite2-City.mmdb.gz and GeoLite2-Country.mmdb.gz files.
| `database_file` | no | GeoLite2-City.mmdb | The database filename in the geoip config directory. The ingest-geoip plugin ships with the GeoLite2-City.mmdb, GeoLite2-Country.mmdb and GeoLite2-ASN.mmdb files.
| `properties` | no | [`continent_name`, `country_iso_code`, `region_name`, `city_name`, `location`] * | Controls what properties are added to the `target_field` based on the geoip lookup.
| `ignore_missing` | no | `false` | If `true` and `field` does not exist, the processor quietly exits without modifying the document
|======
Expand Down Expand Up @@ -101,7 +101,7 @@ PUT _ingest/pipeline/geoip
"geoip" : {
"field" : "ip",
"target_field" : "geo",
"database_file" : "GeoLite2-Country.mmdb.gz"
"database_file" : "GeoLite2-Country.mmdb"
}
}
]
Expand Down
4 changes: 2 additions & 2 deletions plugins/ingest-geoip/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ dependencies {
compile("com.fasterxml.jackson.core:jackson-databind:${versions.jackson}")
compile('com.maxmind.db:maxmind-db:1.2.2')

testCompile 'org.elasticsearch:geolite2-databases:20171206'
testCompile 'org.elasticsearch:geolite2-databases:20180303'
}

task copyDefaultGeoIp2DatabaseFiles(type: Copy) {
from { zipTree(configurations.testCompile.files.find { it.name.contains('geolite2-databases')}) }
into "${project.buildDir}/ingest-geoip"
include "*.mmdb.gz"
include "*.mmdb"
}

project.bundlePlugin.dependsOn(copyDefaultGeoIp2DatabaseFiles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;

import java.io.IOException;
import java.net.InetAddress;
import java.security.AccessController;
import java.security.PrivilegedAction;
Expand Down Expand Up @@ -68,8 +67,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
private final Set<Property> properties;
private final boolean ignoreMissing;

GeoIpProcessor(String tag, String field, DatabaseReader dbReader, String targetField, Set<Property> properties,
boolean ignoreMissing) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for removing unused exception.

GeoIpProcessor(String tag, String field, DatabaseReader dbReader, String targetField, Set<Property> properties, boolean ignoreMissing) {
super(tag);
this.field = field;
this.targetField = targetField;
Expand Down Expand Up @@ -323,7 +321,7 @@ public GeoIpProcessor create(Map<String, Processor.Factory> registry, String pro
Map<String, Object> config) throws Exception {
String ipField = readStringProperty(TYPE, processorTag, config, "field");
String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "geoip");
String databaseFile = readStringProperty(TYPE, processorTag, config, "database_file", "GeoLite2-City.mmdb.gz");
String databaseFile = readStringProperty(TYPE, processorTag, config, "database_file", "GeoLite2-City.mmdb");
List<String> propertyNames = readOptionalList(TYPE, processorTag, config, "properties");
boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,28 @@

import com.maxmind.db.NoCache;
import com.maxmind.db.NodeCache;
import com.maxmind.db.Reader;
import com.maxmind.geoip2.DatabaseReader;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;

public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable {
public static final Setting<Long> CACHE_SIZE =
Expand Down Expand Up @@ -80,28 +80,38 @@ static Map<String, DatabaseReaderLazyLoader> loadDatabaseReaders(Path geoIpConfi
if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) {
throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");
}

boolean loadDatabaseOnHeap = Booleans.parseBoolean(System.getProperty("es.geoip.load_db_on_heap", "false"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 to this escape hatch

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this escape hatch? If this is to remain undocumented (which is should if it is kept as a sysprop), what would eventually allow us to remove this untested behavior?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed until we have a better understanding of the implications on nodes with small amounts of native memory.

Map<String, DatabaseReaderLazyLoader> databaseReaders = new HashMap<>();
try (Stream<Path> databaseFiles = Files.list(geoIpConfigDirectory)) {
PathMatcher pathMatcher = geoIpConfigDirectory.getFileSystem().getPathMatcher("glob:**.mmdb.gz");
PathMatcher pathMatcher = geoIpConfigDirectory.getFileSystem().getPathMatcher("glob:**.mmdb");
// Use iterator instead of forEach otherwise IOException needs to be caught twice...
Iterator<Path> iterator = databaseFiles.iterator();
while (iterator.hasNext()) {
Path databasePath = iterator.next();
if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) {
String databaseFileName = databasePath.getFileName().toString();
DatabaseReaderLazyLoader holder = new DatabaseReaderLazyLoader(databaseFileName, () -> {
try (InputStream inputStream = new GZIPInputStream(Files.newInputStream(databasePath, StandardOpenOption.READ))) {
return new DatabaseReader.Builder(inputStream).withCache(cache).build();
}
});
DatabaseReaderLazyLoader holder = new DatabaseReaderLazyLoader(databaseFileName,
() -> {
DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(cache);
if (loadDatabaseOnHeap) {
builder.fileMode(Reader.FileMode.MEMORY);
} else {
builder.fileMode(Reader.FileMode.MEMORY_MAPPED);
}
return builder.build();
});
databaseReaders.put(databaseFileName, holder);
}
}
}
return Collections.unmodifiableMap(databaseReaders);
}

@SuppressForbidden(reason = "Maxmind API requires java.io.File")
private static DatabaseReader.Builder createDatabaseBuilder(Path databasePath) {
return new DatabaseReader.Builder(databasePath.toFile());
}

@Override
public void close() throws IOException {
if (databaseReaders != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ public static void loadDatabaseReaders() throws IOException {
Path configDir = createTempDir();
Path geoIpConfigDir = configDir.resolve("ingest-geoip");
Files.createDirectories(geoIpConfigDir);
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb.gz")),
geoIpConfigDir.resolve("GeoLite2-City.mmdb.gz"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb.gz")),
geoIpConfigDir.resolve("GeoLite2-Country.mmdb.gz"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb.gz")),
geoIpConfigDir.resolve("GeoLite2-ASN.mmdb.gz"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")),
geoIpConfigDir.resolve("GeoLite2-City.mmdb"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")),
geoIpConfigDir.resolve("GeoLite2-Country.mmdb"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")),
geoIpConfigDir.resolve("GeoLite2-ASN.mmdb"));

NodeCache cache = randomFrom(NoCache.getInstance(), new GeoIpCache(randomNonNegativeLong()));
databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(geoIpConfigDir, cache);
Expand Down Expand Up @@ -111,7 +111,7 @@ public void testCountryBuildDefaults() throws Exception {

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb.gz");
config.put("database_file", "GeoLite2-Country.mmdb");
String processorTag = randomAlphaOfLength(10);

GeoIpProcessor processor = factory.create(null, processorTag, config);
Expand All @@ -129,7 +129,7 @@ public void testAsnBuildDefaults() throws Exception {

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-ASN.mmdb.gz");
config.put("database_file", "GeoLite2-ASN.mmdb");
String processorTag = randomAlphaOfLength(10);

GeoIpProcessor processor = factory.create(null, processorTag, config);
Expand Down Expand Up @@ -157,7 +157,7 @@ public void testBuildDbFile() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb.gz");
config.put("database_file", "GeoLite2-Country.mmdb");
GeoIpProcessor processor = factory.create(null, null, config);
assertThat(processor.getField(), equalTo("_field"));
assertThat(processor.getTargetField(), equalTo("geoip"));
Expand All @@ -170,7 +170,7 @@ public void testBuildWithCountryDbAndAsnFields() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb.gz");
config.put("database_file", "GeoLite2-Country.mmdb");
EnumSet<GeoIpProcessor.Property> asnOnlyProperties = EnumSet.copyOf(GeoIpProcessor.Property.ALL_ASN_PROPERTIES);
asnOnlyProperties.remove(GeoIpProcessor.Property.IP);
String asnProperty = RandomPicks.randomFrom(Randomness.get(), asnOnlyProperties).toString();
Expand All @@ -184,7 +184,7 @@ public void testBuildWithAsnDbAndCityFields() throws Exception {
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-ASN.mmdb.gz");
config.put("database_file", "GeoLite2-ASN.mmdb");
EnumSet<GeoIpProcessor.Property> cityOnlyProperties = EnumSet.copyOf(GeoIpProcessor.Property.ALL_CITY_PROPERTIES);
cityOnlyProperties.remove(GeoIpProcessor.Property.IP);
String cityProperty = RandomPicks.randomFrom(Randomness.get(), cityOnlyProperties).toString();
Expand All @@ -199,9 +199,9 @@ public void testBuildNonExistingDbFile() throws Exception {

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "does-not-exist.mmdb.gz");
config.put("database_file", "does-not-exist.mmdb");
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config));
assertThat(e.getMessage(), equalTo("[database_file] database file [does-not-exist.mmdb.gz] doesn't exist"));
assertThat(e.getMessage(), equalTo("[database_file] database file [does-not-exist.mmdb] doesn't exist"));
}

public void testBuildFields() throws Exception {
Expand Down Expand Up @@ -249,12 +249,12 @@ public void testLazyLoading() throws Exception {
Path configDir = createTempDir();
Path geoIpConfigDir = configDir.resolve("ingest-geoip");
Files.createDirectories(geoIpConfigDir);
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb.gz")),
geoIpConfigDir.resolve("GeoLite2-City.mmdb.gz"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb.gz")),
geoIpConfigDir.resolve("GeoLite2-Country.mmdb.gz"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb.gz")),
geoIpConfigDir.resolve("GeoLite2-ASN.mmdb.gz"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")),
geoIpConfigDir.resolve("GeoLite2-City.mmdb"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")),
geoIpConfigDir.resolve("GeoLite2-Country.mmdb"));
Files.copy(new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")),
geoIpConfigDir.resolve("GeoLite2-ASN.mmdb"));

// Loading another database reader instances, because otherwise we can't test lazy loading as the
// database readers used at class level are reused between tests. (we want to keep that otherwise running this
Expand All @@ -268,15 +268,15 @@ public void testLazyLoading() throws Exception {

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-City.mmdb.gz");
config.put("database_file", "GeoLite2-City.mmdb");
factory.create(null, "_tag", config);
config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-Country.mmdb.gz");
config.put("database_file", "GeoLite2-Country.mmdb");
factory.create(null, "_tag", config);
config = new HashMap<>();
config.put("field", "_field");
config.put("database_file", "GeoLite2-ASN.mmdb.gz");
config.put("database_file", "GeoLite2-ASN.mmdb");
factory.create(null, "_tag", config);

for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) {
Expand Down
Loading