Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Don't delete completed tasks from RocksDbTaskQueue #1099

Merged
merged 4 commits into from
Mar 14, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,6 @@ private static CachingTaskCollection<NodeDataRequest> createWorldStateDownloader
"Pending request cache size for fast sync world state download",
taskCollection::cacheSize);

// We're using the CachingTaskCollection which isn't designed to reliably persist all
// added tasks. We therefore can't resume from previously added tasks.
// So for now, clear tasks when we start up.
taskCollection.clear();

return taskCollection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.nio.file.Path;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -38,7 +37,6 @@ public class RocksDbTaskQueue<T> implements TaskCollection<T> {

private long lastEnqueuedKey = 0;
private long lastDequeuedKey = 0;
private long oldestKey = 0;
private RocksIterator dequeueIterator;
private long lastValidKeyFromIterator;
private final Set<RocksDbTask<T>> outstandingTasks = new HashSet<>();
Expand All @@ -60,7 +58,9 @@ private RocksDbTaskQueue(
this.deserializer = deserializer;
try {
RocksDbUtil.loadNativeLibrary();
options = new Options().setCreateIfMissing(true);
// We don't support reloading data so ensure we're starting from a clean slate.
RocksDB.destroyDB(storageDirectory.toString(), new Options());
options = new Options().setCreateIfMissing(true).setErrorIfExists(true);
db = RocksDB.open(options, storageDirectory.toString());

enqueueLatency =
Expand All @@ -74,29 +74,11 @@ private RocksDbTaskQueue(
"dequeue_latency_seconds",
"Latency for dequeuing an item.");

// Initialize queue from existing db
initializeQueue();
} catch (final RocksDBException e) {
throw new StorageException(e);
}
}

private void initializeQueue() {
RocksIterator iter = db.newIterator();
iter.seekToFirst();
if (!iter.isValid()) {
// There is no data yet, nothing to do
return;
}
long firstKey = Longs.fromByteArray(iter.key());
iter.seekToLast();
long lastKey = Longs.fromByteArray(iter.key());

lastDequeuedKey = firstKey - 1;
oldestKey = firstKey;
lastEnqueuedKey = lastKey;
}

public static <T> RocksDbTaskQueue<T> create(
final Path storageDirectory,
final Function<T, BytesValue> serializer,
Expand Down Expand Up @@ -167,7 +149,7 @@ public synchronized boolean isEmpty() {
public synchronized void clear() {
assertNotClosed();
outstandingTasks.clear();
final byte[] from = Longs.toByteArray(oldestKey);
final byte[] from = Longs.toByteArray(0);
final byte[] to = Longs.toByteArray(lastEnqueuedKey + 1);
try {
db.deleteRange(from, to);
Expand All @@ -177,7 +159,6 @@ public synchronized void clear() {
}
lastDequeuedKey = 0;
lastEnqueuedKey = 0;
oldestKey = 0;
} catch (final RocksDBException e) {
throw new StorageException(e);
}
Expand All @@ -188,26 +169,6 @@ public synchronized boolean allTasksCompleted() {
return isEmpty() && outstandingTasks.isEmpty();
}

private synchronized void deleteCompletedTasks() {
final long oldestOutstandingKey =
outstandingTasks.stream()
.min(Comparator.comparingLong(RocksDbTask::getKey))
.map(RocksDbTask::getKey)
.orElse(lastDequeuedKey + 1);

if (oldestKey < oldestOutstandingKey) {
// Delete all contiguous completed tasks
final byte[] fromKey = Longs.toByteArray(oldestKey);
final byte[] toKey = Longs.toByteArray(oldestOutstandingKey);
try {
db.deleteRange(fromKey, toKey);
oldestKey = oldestOutstandingKey;
} catch (final RocksDBException e) {
throw new StorageException(e);
}
}
}

@Override
public synchronized void close() {
if (closed) {
Expand All @@ -228,11 +189,7 @@ private void assertNotClosed() {
}

private synchronized boolean markTaskCompleted(final RocksDbTask<T> task) {
if (outstandingTasks.remove(task)) {
deleteCompletedTasks();
return true;
}
return false;
return outstandingTasks.remove(task);
}

private synchronized void handleFailedTask(final RocksDbTask<T> task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
*/
package tech.pegasys.pantheon.services.tasks;

import static org.assertj.core.api.Assertions.assertThat;

import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.util.bytes.BytesValue;

Expand All @@ -22,7 +20,6 @@
import java.util.function.Function;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class RocksDbTaskQueueTest extends AbstractTaskQueueTest<RocksDbTaskQueue<BytesValue>> {
Expand All @@ -39,43 +36,4 @@ private RocksDbTaskQueue<BytesValue> createQueue(final Path dataDir) {
return RocksDbTaskQueue.create(
dataDir, Function.identity(), Function.identity(), new NoOpMetricsSystem());
}

@Test
public void shouldResumeFromExistingQueue() throws Exception {
testResumeFromExistingQueue(10);
}

@Test
public void shouldResumeFromExistingQueueWithOneElement() throws Exception {
testResumeFromExistingQueue(1);
}

@Test
public void shouldResumeFromExistingQueueWithNoElements() throws Exception {
testResumeFromExistingQueue(0);
}

private void testResumeFromExistingQueue(final int elementCount) throws Exception {
final Path dataDir = folder.newFolder().toPath();
try (final RocksDbTaskQueue<BytesValue> queue = createQueue(dataDir)) {
for (int i = 0; i < elementCount; i++) {
queue.add(BytesValue.of(i));
}
}

try (final RocksDbTaskQueue<BytesValue> resumedQueue = createQueue(dataDir)) {
assertThat(resumedQueue.size()).isEqualTo(elementCount);
// Queue an additional element
resumedQueue.add(BytesValue.of(99));
assertThat(resumedQueue.size()).isEqualTo(elementCount + 1);

// Check that everything dequeues in order as expected
for (int i = 0; i < elementCount; i++) {
assertThat(resumedQueue.remove().getData()).isEqualTo(BytesValue.of(i));
}
assertThat(resumedQueue.remove().getData()).isEqualTo(BytesValue.of(99));

assertThat(resumedQueue.size()).isEqualTo(0);
}
}
}