Skip to content

Commit

Permalink
Fix invalidation race in MongoSession
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Jan 12, 2022
1 parent 52220aa commit a10b1e5
Showing 1 changed file with 9 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@
package io.trino.plugin.mongodb;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Primitives;
import com.google.common.primitives.Shorts;
import com.google.common.primitives.SignedBytes;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.mongodb.DBRef;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoClient;
Expand All @@ -34,6 +32,7 @@
import com.mongodb.client.result.DeleteResult;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.trino.plugin.base.cache.EvictableCache;
import io.trino.spi.HostAddress;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
Expand Down Expand Up @@ -64,6 +63,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -86,7 +86,6 @@
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
Expand Down Expand Up @@ -123,7 +122,7 @@ public class MongoSession
private final boolean caseInsensitiveNameMatching;
private final int cursorBatchSize;

private final LoadingCache<SchemaTableName, MongoTable> tableCache;
private final Cache<SchemaTableName, MongoTable> tableCache;
private final String implicitPrefix;

public MongoSession(TypeManager typeManager, MongoClient client, MongoClientConfig config)
Expand All @@ -135,10 +134,8 @@ public MongoSession(TypeManager typeManager, MongoClient client, MongoClientConf
this.cursorBatchSize = config.getCursorBatchSize();
this.implicitPrefix = requireNonNull(config.getImplicitRowFieldPrefix(), "config.getImplicitRowFieldPrefix() is null");

this.tableCache = CacheBuilder.newBuilder()
.expireAfterWrite(1, HOURS) // TODO: Configure
.refreshAfterWrite(1, MINUTES)
.build(CacheLoader.from(this::loadTableSchema));
this.tableCache = EvictableCache.buildWith(CacheBuilder.newBuilder()
.expireAfterWrite(1, MINUTES)); // TODO: Configure
}

public void shutdown()
Expand Down Expand Up @@ -179,11 +176,11 @@ public MongoTable getTable(SchemaTableName tableName)
throws TableNotFoundException
{
try {
return tableCache.getUnchecked(tableName);
return tableCache.get(tableName, () -> loadTableSchema(tableName));
}
catch (UncheckedExecutionException e) {
catch (ExecutionException e) {
throwIfInstanceOf(e.getCause(), TrinoException.class);
throw e;
throw new RuntimeException(e);
}
}

Expand Down

0 comments on commit a10b1e5

Please sign in to comment.