Skip to content

Commit

Permalink
Added short-term caching of the first column in the channel manifest (#…
Browse files Browse the repository at this point in the history
…180)

Added short-term caching of the first column in the channel manifest
  • Loading branch information
billkalter authored Jun 20, 2018
1 parent 3fc7881 commit 98d6195
Showing 1 changed file with 49 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -52,6 +53,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -72,6 +74,7 @@ public class AstyanaxEventReaderDAO implements EventReaderDAO {
private final ExecutorService _cleanupExecutor;
private final LoadingCache<ChannelSlab, SlabCursor> _openSlabCursors;
private final LoadingCache<ChannelSlab, SlabCursor> _closedSlabCursors;
private final Cache<String, ByteBuffer> _oldestSlab;
private final Meter _staleSlabMeter;

@Inject
Expand Down Expand Up @@ -110,6 +113,10 @@ public SlabCursor load(ChannelSlab key) {
maximumSize(100000). // 100k - bigger than the open slab cache since it's more important for performance.
recordStats().
build(slabCursorFactory);
_oldestSlab = CacheBuilder.newBuilder().
expireAfterWrite(10, TimeUnit.SECONDS).
maximumSize(10000). // 10k - as with open slabs, misses are inconvenient but tolerable
build();
InstrumentedCache.instrument(_openSlabCursors, metricRegistry, metricsGroup, "openSlabCursors", false);
InstrumentedCache.instrument(_closedSlabCursors, metricRegistry, metricsGroup, "closedSlabCursors", false);

Expand Down Expand Up @@ -283,16 +290,38 @@ public void readNewer(String channel, EventSink sink) {
// benefit from using a cursor to start the read part-way through the slab. So the open slab cursor cache
// has a very short TTL (250ms) to reduce memory requirements and minimize the latency between the time
// data is written and first read.
//
// Additionally a third cache is used to track the oldest known slab in channel's manifest. If the events
// are written and acknowledged quickly and frequently then the head of the manifest row for the channel may
// accrue tombstones as older slabs are fully read and deleted. By caching the oldest slab in the manifest
// reading the older tombstones can be minimized. Since slabs can be written out-of-order across the cluster
// we still occasionally (10 seconds) re-reads all slabs to pick up any of these newer-older slabs we may
// have missed.

final ByteBuffer oldestSlab = _oldestSlab.getIfPresent(channel);
RangeBuilder range = new RangeBuilder().setLimit(50);
if (oldestSlab != null) {
range.setStart(oldestSlab);
}
boolean firstSlab = true;

Iterable<Column<ByteBuffer>> manifestColumns = executePaginated(
_keyspace.prepareQuery(ColumnFamilies.MANIFEST, ConsistencyLevel.CL_LOCAL_ONE)
.getKey(channel)
.withColumnRange(new RangeBuilder().setLimit(50).build())
.withColumnRange(range.build())
.autoPaginate(true));

for (Column<ByteBuffer> manifestColumn : manifestColumns) {
ByteBuffer slabId = manifestColumn.getName();
boolean open = manifestColumn.getBooleanValue();

if (firstSlab) {
if (oldestSlab == null) {
cacheOldestSlabForChannel(channel, TimeUUIDSerializer.get().fromByteBuffer(slabId));
}
firstSlab = false;
}

ChannelSlab channelSlab = new ChannelSlab(channel, slabId);
SlabCursor cursor = (open ? _openSlabCursors : _closedSlabCursors).getUnchecked(channelSlab);

Expand All @@ -313,6 +342,25 @@ public void readNewer(String channel, EventSink sink) {
}
}
}

if (firstSlab && oldestSlab == null) {
// Channel was completely empty. Cache a TimeUUID for the current time. This will cause future calls
// to read at most 1 minute of tombstones until the cache expires 10 seconds later.
cacheOldestSlabForChannel(channel, TimeUUIDs.newUUID());
}
}

private void cacheOldestSlabForChannel(String channel, UUID slabId) {
// Functionally the same as ConcurrentMap.computeIfAbsent(...)
try {
// Subtract 1 minute from the slab ID to allow for a reasonable window of out-of-order writes while
// constraining the number of tombstones read to 1 minute's worth of rows.
_oldestSlab.get(channel, () ->
TimeUUIDSerializer.get().toByteBuffer(
TimeUUIDs.uuidForTimeMillis(TimeUUIDs.getTimeMillis(slabId) - TimeUnit.MINUTES.toMillis(1))));
} catch (ExecutionException e) {
// Won't happen, the "execution" just returns a constant.
}
}

/** Returns true to keep searching for more events, false to stop searching for events. */
Expand Down

0 comments on commit 98d6195

Please sign in to comment.