Skip to content
This repository has been archived by the owner on Feb 10, 2025. It is now read-only.

Added short-term caching of the first column in the channel manifest #180

Merged
merged 2 commits into from
Jun 20, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -52,6 +53,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -72,6 +74,7 @@ public class AstyanaxEventReaderDAO implements EventReaderDAO {
private final ExecutorService _cleanupExecutor;
private final LoadingCache<ChannelSlab, SlabCursor> _openSlabCursors;
private final LoadingCache<ChannelSlab, SlabCursor> _closedSlabCursors;
private final Cache<String, ByteBuffer> _oldestSlab;
private final Meter _staleSlabMeter;

@Inject
Expand Down Expand Up @@ -110,6 +113,10 @@ public SlabCursor load(ChannelSlab key) {
maximumSize(100000). // 100k - bigger than the open slab cache since it's more important for performance.
recordStats().
build(slabCursorFactory);
_oldestSlab = CacheBuilder.newBuilder().
expireAfterWrite(10, TimeUnit.SECONDS).
maximumSize(10000). // 10k - as with open slabs, misses are inconvenient but tolerable
build();
InstrumentedCache.instrument(_openSlabCursors, metricRegistry, metricsGroup, "openSlabCursors", false);
InstrumentedCache.instrument(_closedSlabCursors, metricRegistry, metricsGroup, "closedSlabCursors", false);

Expand Down Expand Up @@ -283,16 +290,38 @@ public void readNewer(String channel, EventSink sink) {
// benefit from using a cursor to start the read part-way through the slab. So the open slab cursor cache
// has a very short TTL (250ms) to reduce memory requirements and minimize the latency between the time
// data is written and first read.
//
// Additionally a third cache is used to track the oldest known slab in channel's manifest. If the events
// are written and acknowledged quickly and frequently then the head of the manifest row for the channel may
// accrue tombstones as older slabs are fully read and deleted. By caching the oldest slab in the manifest
// reading the older tombstones can be minimized. Since slabs can be written out-of-order across the cluster
// we still occasionally (10 seconds) re-reads all slabs to pick up any of these newer-older slabs we may
// have missed.

final ByteBuffer oldestSlab = _oldestSlab.getIfPresent(channel);
RangeBuilder range = new RangeBuilder().setLimit(50);
if (oldestSlab != null) {
range.setStart(oldestSlab);
}
boolean firstSlab = true;

Iterable<Column<ByteBuffer>> manifestColumns = executePaginated(
_keyspace.prepareQuery(ColumnFamilies.MANIFEST, ConsistencyLevel.CL_LOCAL_ONE)
.getKey(channel)
.withColumnRange(new RangeBuilder().setLimit(50).build())
.withColumnRange(range.build())
.autoPaginate(true));

for (Column<ByteBuffer> manifestColumn : manifestColumns) {
ByteBuffer slabId = manifestColumn.getName();
boolean open = manifestColumn.getBooleanValue();

if (firstSlab) {
if (oldestSlab == null) {
cacheOldestSlabForChannel(channel, TimeUUIDSerializer.get().fromByteBuffer(slabId));
}
firstSlab = false;
}

ChannelSlab channelSlab = new ChannelSlab(channel, slabId);
SlabCursor cursor = (open ? _openSlabCursors : _closedSlabCursors).getUnchecked(channelSlab);

Expand All @@ -313,6 +342,25 @@ public void readNewer(String channel, EventSink sink) {
}
}
}

if (firstSlab && oldestSlab == null) {
// Channel was completely empty. Cache a TimeUUID for the current time. This will cause future calls
// to read at most 1 minute of tombstones until the cache expires 10 seconds later.
cacheOldestSlabForChannel(channel, TimeUUIDs.newUUID());
}
}

private void cacheOldestSlabForChannel(String channel, UUID slabId) {
// Functionally the same as ConcurrentMap.computeIfAbsent(...)
try {
// Subtract 1 minute from the slab ID to allow for a reasonable window of out-of-order writes while
// constraining the number of tombstones read to 1 minute's worth of rows.
_oldestSlab.get(channel, () ->
Copy link
Contributor

@sujithvaddi sujithvaddi Jun 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@billkalter should it be put() here instead of get() :?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, this is very confusing and I tried to put a comment to clarify. With the newer Java interfaces like ConcurrentMap you can use computeIfAbsent() or putIfAbsent(). The Guava cache interface doesn't have a similar method, but if you do a get() this way it does the same thing: caches the new value only if there is no current un-expired version in the cache. From their docs:

This method provides a simple substitute for the conventional "if cached, return; otherwise create, cache and return" pattern.

TimeUUIDSerializer.get().toByteBuffer(
TimeUUIDs.uuidForTimeMillis(TimeUUIDs.getTimeMillis(slabId) - TimeUnit.MINUTES.toMillis(1))));
} catch (ExecutionException e) {
// Won't happen, the "execution" just returns a constant.
}
}

/** Returns true to keep searching for more events, false to stop searching for events. */
Expand Down