Skip to content

Commit

Permalink
Rework interfaces for the geoip processor (elastic#113045)
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo authored and javanna committed Sep 18, 2024
1 parent df3ada8 commit 0daa0b6
Show file tree
Hide file tree
Showing 13 changed files with 203 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

package org.elasticsearch.ingest.geoip;

import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.db.Reader;

import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
Expand Down Expand Up @@ -738,8 +738,8 @@ private void deleteDatabasesInConfigDirectory() throws Exception {

@SuppressForbidden(reason = "Maxmind API requires java.io.File")
private void parseDatabase(Path tempFile) throws IOException {
try (DatabaseReader databaseReader = new DatabaseReader.Builder(tempFile.toFile()).build()) {
assertNotNull(databaseReader.getMetadata());
try (Reader reader = new Reader(tempFile.toFile())) {
assertNotNull(reader.getMetadata());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.VersionType;
Expand Down Expand Up @@ -206,10 +205,10 @@ private static DatabaseNodeService createRegistry(Path geoIpConfigDir, Path geoI
private static void lazyLoadReaders(DatabaseNodeService databaseNodeService) throws IOException {
if (databaseNodeService.get("GeoLite2-City.mmdb") != null) {
databaseNodeService.get("GeoLite2-City.mmdb").getDatabaseType();
databaseNodeService.get("GeoLite2-City.mmdb").getCity(InetAddresses.forString("2.125.160.216"));
databaseNodeService.get("GeoLite2-City.mmdb").getCity("2.125.160.216");
}
databaseNodeService.get("GeoLite2-City-Test.mmdb").getDatabaseType();
databaseNodeService.get("GeoLite2-City-Test.mmdb").getCity(InetAddresses.forString("2.125.160.216"));
databaseNodeService.get("GeoLite2-City-Test.mmdb").getCity("2.125.160.216");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
* if there is an old instance of this database then that is closed.
* 4) Cleanup locally loaded databases that are no longer mentioned in {@link GeoIpTaskState}.
*/
public final class DatabaseNodeService implements GeoIpDatabaseProvider, Closeable {
public final class DatabaseNodeService implements IpDatabaseProvider, Closeable {

private static final Logger logger = LogManager.getLogger(DatabaseNodeService.class);

Expand Down Expand Up @@ -221,7 +221,7 @@ DatabaseReaderLazyLoader getDatabaseReaderLazyLoader(String name) {
}

@Override
public GeoIpDatabase getDatabase(String name) {
public IpDatabase getDatabase(String name) {
return getDatabaseReaderLazyLoader(name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@

package org.elasticsearch.ingest.geoip;

import com.maxmind.db.DatabaseRecord;
import com.maxmind.db.Network;
import com.maxmind.db.NoCache;
import com.maxmind.db.Reader;
import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.model.AbstractResponse;
import com.maxmind.geoip2.model.AnonymousIpResponse;
import com.maxmind.geoip2.model.AsnResponse;
Expand All @@ -25,18 +26,23 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -45,7 +51,7 @@
* Facilitates lazy loading of the database reader, so that when the geoip plugin is installed, but not used,
* no memory is being wasted on the database reader.
*/
class DatabaseReaderLazyLoader implements GeoIpDatabase, Closeable {
class DatabaseReaderLazyLoader implements IpDatabase, Closeable {

private static final boolean LOAD_DATABASE_ON_HEAP = Booleans.parseBoolean(System.getProperty("es.geoip.load_db_on_heap", "false"));

Expand All @@ -54,8 +60,8 @@ class DatabaseReaderLazyLoader implements GeoIpDatabase, Closeable {
private final String md5;
private final GeoIpCache cache;
private final Path databasePath;
private final CheckedSupplier<DatabaseReader, IOException> loader;
final SetOnce<DatabaseReader> databaseReader;
private final CheckedSupplier<Reader, IOException> loader;
final SetOnce<Reader> databaseReader;

// cache the database type so that we do not re-read it on every pipeline execution
final SetOnce<String> databaseType;
Expand Down Expand Up @@ -92,50 +98,90 @@ public final String getDatabaseType() throws IOException {

@Nullable
@Override
public CityResponse getCity(InetAddress ipAddress) {
return getResponse(ipAddress, DatabaseReader::tryCity);
public CityResponse getCity(String ipAddress) {
return getResponse(ipAddress, (reader, ip) -> lookup(reader, ip, CityResponse.class, CityResponse::new));
}

@Nullable
@Override
public CountryResponse getCountry(InetAddress ipAddress) {
return getResponse(ipAddress, DatabaseReader::tryCountry);
public CountryResponse getCountry(String ipAddress) {
return getResponse(ipAddress, (reader, ip) -> lookup(reader, ip, CountryResponse.class, CountryResponse::new));
}

@Nullable
@Override
public AsnResponse getAsn(InetAddress ipAddress) {
return getResponse(ipAddress, DatabaseReader::tryAsn);
public AsnResponse getAsn(String ipAddress) {
return getResponse(
ipAddress,
(reader, ip) -> lookup(
reader,
ip,
AsnResponse.class,
(response, responseIp, network, locales) -> new AsnResponse(response, responseIp, network)
)
);
}

@Nullable
@Override
public AnonymousIpResponse getAnonymousIp(InetAddress ipAddress) {
return getResponse(ipAddress, DatabaseReader::tryAnonymousIp);
public AnonymousIpResponse getAnonymousIp(String ipAddress) {
return getResponse(
ipAddress,
(reader, ip) -> lookup(
reader,
ip,
AnonymousIpResponse.class,
(response, responseIp, network, locales) -> new AnonymousIpResponse(response, responseIp, network)
)
);
}

@Nullable
@Override
public ConnectionTypeResponse getConnectionType(InetAddress ipAddress) {
return getResponse(ipAddress, DatabaseReader::tryConnectionType);
public ConnectionTypeResponse getConnectionType(String ipAddress) {
return getResponse(
ipAddress,
(reader, ip) -> lookup(
reader,
ip,
ConnectionTypeResponse.class,
(response, responseIp, network, locales) -> new ConnectionTypeResponse(response, responseIp, network)
)
);
}

@Nullable
@Override
public DomainResponse getDomain(InetAddress ipAddress) {
return getResponse(ipAddress, DatabaseReader::tryDomain);
public DomainResponse getDomain(String ipAddress) {
return getResponse(
ipAddress,
(reader, ip) -> lookup(
reader,
ip,
DomainResponse.class,
(response, responseIp, network, locales) -> new DomainResponse(response, responseIp, network)
)
);
}

@Nullable
@Override
public EnterpriseResponse getEnterprise(InetAddress ipAddress) {
return getResponse(ipAddress, DatabaseReader::tryEnterprise);
public EnterpriseResponse getEnterprise(String ipAddress) {
return getResponse(ipAddress, (reader, ip) -> lookup(reader, ip, EnterpriseResponse.class, EnterpriseResponse::new));
}

@Nullable
@Override
public IspResponse getIsp(InetAddress ipAddress) {
return getResponse(ipAddress, DatabaseReader::tryIsp);
public IspResponse getIsp(String ipAddress) {
return getResponse(
ipAddress,
(reader, ip) -> lookup(
reader,
ip,
IspResponse.class,
(response, responseIp, network, locales) -> new IspResponse(response, responseIp, network)
)
);
}

boolean preLookup() {
Expand All @@ -155,19 +201,19 @@ int current() {

@Nullable
private <T extends AbstractResponse> T getResponse(
InetAddress ipAddress,
CheckedBiFunction<DatabaseReader, InetAddress, Optional<T>, Exception> responseProvider
String ipAddress,
CheckedBiFunction<Reader, String, Optional<T>, Exception> responseProvider
) {
return cache.putIfAbsent(ipAddress, databasePath.toString(), ip -> {
try {
return responseProvider.apply(get(), ipAddress).orElse(null);
} catch (Exception e) {
throw new RuntimeException(e);
throw ExceptionsHelper.convertToRuntime(e);
}
});
}

DatabaseReader get() throws IOException {
Reader get() throws IOException {
if (databaseReader.get() == null) {
synchronized (databaseReader) {
if (databaseReader.get() == null) {
Expand Down Expand Up @@ -206,21 +252,32 @@ protected void doClose() throws IOException {
}
}

private static CheckedSupplier<DatabaseReader, IOException> createDatabaseLoader(Path databasePath) {
private static CheckedSupplier<Reader, IOException> createDatabaseLoader(Path databasePath) {
return () -> {
DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(NoCache.getInstance());
if (LOAD_DATABASE_ON_HEAP) {
builder.fileMode(Reader.FileMode.MEMORY);
} else {
builder.fileMode(Reader.FileMode.MEMORY_MAPPED);
}
return builder.build();
Reader.FileMode mode = LOAD_DATABASE_ON_HEAP ? Reader.FileMode.MEMORY : Reader.FileMode.MEMORY_MAPPED;
return new Reader(pathToFile(databasePath), mode, NoCache.getInstance());
};
}

@SuppressForbidden(reason = "Maxmind API requires java.io.File")
private static DatabaseReader.Builder createDatabaseBuilder(Path databasePath) {
return new DatabaseReader.Builder(databasePath.toFile());
private static File pathToFile(Path databasePath) {
return databasePath.toFile();
}

@FunctionalInterface
private interface ResponseBuilder<RESPONSE> {
RESPONSE build(RESPONSE response, String responseIp, Network network, List<String> locales);
}

private <RESPONSE> Optional<RESPONSE> lookup(Reader reader, String ip, Class<RESPONSE> clazz, ResponseBuilder<RESPONSE> builder)
throws IOException {
InetAddress inetAddress = InetAddresses.forString(ip);
DatabaseRecord<RESPONSE> record = reader.getRecord(inetAddress, clazz);
RESPONSE result = record.getData();
if (result == null) {
return Optional.empty();
} else {
return Optional.of(builder.build(result, NetworkAddress.format(inetAddress), record.getNetwork(), List.of("en")));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.ingest.geoip.stats.CacheStats;

import java.net.InetAddress;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
Expand Down Expand Up @@ -62,11 +61,7 @@ public String toString() {
}

@SuppressWarnings("unchecked")
<T extends AbstractResponse> T putIfAbsent(
InetAddress ip,
String databasePath,
Function<InetAddress, AbstractResponse> retrieveFunction
) {
<T extends AbstractResponse> T putIfAbsent(String ip, String databasePath, Function<String, AbstractResponse> retrieveFunction) {
// can't use cache.computeIfAbsent due to the elevated permissions for the jackson (run via the cache loader)
CacheKey cacheKey = new CacheKey(ip, databasePath);
long cacheStart = relativeNanoTimeProvider.getAsLong();
Expand Down Expand Up @@ -98,7 +93,7 @@ <T extends AbstractResponse> T putIfAbsent(
}

// only useful for testing
AbstractResponse get(InetAddress ip, String databasePath) {
AbstractResponse get(String ip, String databasePath) {
CacheKey cacheKey = new CacheKey(ip, databasePath);
return cache.get(cacheKey);
}
Expand Down Expand Up @@ -141,5 +136,5 @@ public CacheStats getCacheStats() {
* path is needed to be included in the cache key. For example, if we only used the IP address as the key the City and ASN the same
* IP may be in both with different values and we need to cache both.
*/
private record CacheKey(InetAddress ip, String databasePath) {}
private record CacheKey(String ip, String databasePath) {}
}
Loading

0 comments on commit 0daa0b6

Please sign in to comment.