From 42e56565a0e87195cc5581a8729479a0c61b8ab4 Mon Sep 17 00:00:00 2001 From: billkalter Date: Wed, 20 Jun 2018 13:35:31 -0500 Subject: [PATCH 1/2] Added short-term caching of the first column in the channel manifest --- .../db/astyanax/AstyanaxEventReaderDAO.java | 47 ++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/event/src/main/java/com/bazaarvoice/emodb/event/db/astyanax/AstyanaxEventReaderDAO.java b/event/src/main/java/com/bazaarvoice/emodb/event/db/astyanax/AstyanaxEventReaderDAO.java index aa222eadd6..6b1d6c3642 100644 --- a/event/src/main/java/com/bazaarvoice/emodb/event/db/astyanax/AstyanaxEventReaderDAO.java +++ b/event/src/main/java/com/bazaarvoice/emodb/event/db/astyanax/AstyanaxEventReaderDAO.java @@ -14,6 +14,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.base.Throwables; +import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -52,6 +53,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -72,6 +74,7 @@ public class AstyanaxEventReaderDAO implements EventReaderDAO { private final ExecutorService _cleanupExecutor; private final LoadingCache _openSlabCursors; private final LoadingCache _closedSlabCursors; + private final Cache _oldestSlab; private final Meter _staleSlabMeter; @Inject @@ -110,6 +113,10 @@ public SlabCursor load(ChannelSlab key) { maximumSize(100000). // 100k - bigger than the open slab cache since it's more important for performance. recordStats(). build(slabCursorFactory); + _oldestSlab = CacheBuilder.newBuilder(). + expireAfterWrite(10, TimeUnit.SECONDS). + maximumSize(10000). // 10k - as with open slabs, misses are inconvenient but tolerable + build(); InstrumentedCache.instrument(_openSlabCursors, metricRegistry, metricsGroup, "openSlabCursors", false); InstrumentedCache.instrument(_closedSlabCursors, metricRegistry, metricsGroup, "closedSlabCursors", false); @@ -283,16 +290,38 @@ public void readNewer(String channel, EventSink sink) { // benefit from using a cursor to start the read part-way through the slab. So the open slab cursor cache // has a very short TTL (250ms) to reduce memory requirements and minimize the latency between the time // data is written and first read. + // + // Additionally a third cache is used to track the oldest known slab in channel's manifest. If the events + // are written and acknowledged quickly and frequently then the head of the manifest row for the channel may + // accrue tombstones as older slabs are fully read and deleted. By caching the oldest slab in the manifest + // reading the older tombstones can be minimized. Since slabs can be written out-of-order across the cluster + // we still occasionally (10 seconds) re-reads all slabs to pick up any of these newer-older slabs we may + // have missed. + + final ByteBuffer oldestSlab = _oldestSlab.getIfPresent(channel); + RangeBuilder range = new RangeBuilder().setLimit(50); + if (oldestSlab != null) { + range.setStart(oldestSlab); + } + boolean firstSlab = true; Iterable> manifestColumns = executePaginated( _keyspace.prepareQuery(ColumnFamilies.MANIFEST, ConsistencyLevel.CL_LOCAL_ONE) .getKey(channel) - .withColumnRange(new RangeBuilder().setLimit(50).build()) + .withColumnRange(range.build()) .autoPaginate(true)); for (Column manifestColumn : manifestColumns) { ByteBuffer slabId = manifestColumn.getName(); boolean open = manifestColumn.getBooleanValue(); + + if (firstSlab) { + if (oldestSlab == null) { + cacheOldestSlabForChannel(channel, slabId); + } + firstSlab = false; + } + ChannelSlab channelSlab = new ChannelSlab(channel, slabId); SlabCursor cursor = (open ? _openSlabCursors : _closedSlabCursors).getUnchecked(channelSlab); @@ -313,6 +342,22 @@ public void readNewer(String channel, EventSink sink) { } } } + + if (firstSlab && oldestSlab == null) { + // Channel was completely empty. Cache a TimeUUID which will only read at most 1 minute of tombstones + // until the cache expires 10 seconds later. + cacheOldestSlabForChannel(channel, TimeUUIDSerializer.get().toByteBuffer( + TimeUUIDs.uuidForTimeMillis(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(1)))); + } + } + + private void cacheOldestSlabForChannel(String channel, ByteBuffer slabId) { + // Functionally the same as ConcurrentMap.computeIfAbsent(...) + try { + _oldestSlab.get(channel, () -> slabId); + } catch (ExecutionException e) { + // Won't happen, the "execution" just returns a constant. + } } /** Returns true to keep searching for more events, false to stop searching for events. */ From a9b8eccc5b2fba2489fc1ee33a09f758bf1b807d Mon Sep 17 00:00:00 2001 From: billkalter Date: Wed, 20 Jun 2018 14:44:25 -0500 Subject: [PATCH 2/2] Added 1 minute buffer to reduce impact of out-of-order writes --- .../db/astyanax/AstyanaxEventReaderDAO.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/event/src/main/java/com/bazaarvoice/emodb/event/db/astyanax/AstyanaxEventReaderDAO.java b/event/src/main/java/com/bazaarvoice/emodb/event/db/astyanax/AstyanaxEventReaderDAO.java index 6b1d6c3642..a119188539 100644 --- a/event/src/main/java/com/bazaarvoice/emodb/event/db/astyanax/AstyanaxEventReaderDAO.java +++ b/event/src/main/java/com/bazaarvoice/emodb/event/db/astyanax/AstyanaxEventReaderDAO.java @@ -317,7 +317,7 @@ public void readNewer(String channel, EventSink sink) { if (firstSlab) { if (oldestSlab == null) { - cacheOldestSlabForChannel(channel, slabId); + cacheOldestSlabForChannel(channel, TimeUUIDSerializer.get().fromByteBuffer(slabId)); } firstSlab = false; } @@ -344,17 +344,20 @@ public void readNewer(String channel, EventSink sink) { } if (firstSlab && oldestSlab == null) { - // Channel was completely empty. Cache a TimeUUID which will only read at most 1 minute of tombstones - // until the cache expires 10 seconds later. - cacheOldestSlabForChannel(channel, TimeUUIDSerializer.get().toByteBuffer( - TimeUUIDs.uuidForTimeMillis(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(1)))); + // Channel was completely empty. Cache a TimeUUID for the current time. This will cause future calls + // to read at most 1 minute of tombstones until the cache expires 10 seconds later. + cacheOldestSlabForChannel(channel, TimeUUIDs.newUUID()); } } - private void cacheOldestSlabForChannel(String channel, ByteBuffer slabId) { + private void cacheOldestSlabForChannel(String channel, UUID slabId) { // Functionally the same as ConcurrentMap.computeIfAbsent(...) try { - _oldestSlab.get(channel, () -> slabId); + // Subtract 1 minute from the slab ID to allow for a reasonable window of out-of-order writes while + // constraining the number of tombstones read to 1 minute's worth of rows. + _oldestSlab.get(channel, () -> + TimeUUIDSerializer.get().toByteBuffer( + TimeUUIDs.uuidForTimeMillis(TimeUUIDs.getTimeMillis(slabId) - TimeUnit.MINUTES.toMillis(1)))); } catch (ExecutionException e) { // Won't happen, the "execution" just returns a constant. }