diff --git a/src/main/java/com/clevertap/stormdb/CompactionState.java b/src/main/java/com/clevertap/stormdb/CompactionState.java index 74f873b..ef4284e 100644 --- a/src/main/java/com/clevertap/stormdb/CompactionState.java +++ b/src/main/java/com/clevertap/stormdb/CompactionState.java @@ -14,6 +14,7 @@ class CompactionState { File nextWalFile; File nextDataFile; + File nextDeletedKeysFile; boolean runningForTooLong() { return System.currentTimeMillis() - start > 30 * 60 * 1000; diff --git a/src/main/java/com/clevertap/stormdb/Config.java b/src/main/java/com/clevertap/stormdb/Config.java index 79a9316..ba69e92 100644 --- a/src/main/java/com/clevertap/stormdb/Config.java +++ b/src/main/java/com/clevertap/stormdb/Config.java @@ -39,6 +39,8 @@ public class Config { static final int MIN_OPEN_FD_COUNT = 1; static final int MAX_OPEN_FD_COUNT = 100; + static final int MAX_KEYS_IN_SET_FOR_DELETION = 20; // every 20 keys write to deletion files + // Must have parameters boolean autoCompact = true; int valueSize; diff --git a/src/main/java/com/clevertap/stormdb/StormDB.java b/src/main/java/com/clevertap/stormdb/StormDB.java index 483e781..ba39493 100644 --- a/src/main/java/com/clevertap/stormdb/StormDB.java +++ b/src/main/java/com/clevertap/stormdb/StormDB.java @@ -10,6 +10,8 @@ import com.clevertap.stormdb.internal.RandomAccessFilePool; import com.clevertap.stormdb.utils.ByteUtil; import com.clevertap.stormdb.utils.RecordUtil; +import gnu.trove.procedure.TIntProcedure; +import gnu.trove.set.hash.TIntHashSet; import java.io.BufferedOutputStream; import java.io.DataOutputStream; import java.io.File; @@ -50,6 +52,7 @@ public class StormDB { private static final String FILE_NAME_DATA = "data"; private static final String FILE_NAME_WAL = "wal"; private static final String FILE_TYPE_NEXT = ".next"; + private static final String FILE_NAME_DELETED_KEYS = "keysDeleted"; /** * Key: The actual key within this KV store. @@ -58,6 +61,11 @@ public class StormDB { */ private final IndexMap index; + /** + * Set to hold keys that have been deleted so far + */ + private final TIntHashSet deletedKeysSet = new TIntHashSet(); + private BitSet dataInWalFile = new BitSet(); private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); @@ -76,8 +84,10 @@ public class StormDB { private File dataFile; private File walFile; + private File deletedKeysFile; private DataOutputStream walOut; + private DataOutputStream deleteKeysOut; private Thread tWorker; private final Object compactionSync; @@ -116,6 +126,7 @@ public class StormDB { dataFile = new File(dbDirFile.getAbsolutePath() + File.separator + FILE_NAME_DATA); walFile = new File(dbDirFile.getAbsolutePath() + File.separator + FILE_NAME_WAL); + deletedKeysFile = new File(dbDirFile.getAbsoluteFile() + File.separator + FILE_NAME_DELETED_KEYS); // Open DB. final File metaFile = new File(conf.getDbDir() + "/meta"); @@ -138,6 +149,7 @@ public class StormDB { } initWalOut(); + initDeletedKeyOut(); recover(); buildIndex(); @@ -231,6 +243,10 @@ private void initWalOut() throws FileNotFoundException { bytesInWalFile = walFile.length(); } + private void initDeletedKeyOut() throws FileNotFoundException { + deleteKeysOut = new DataOutputStream(new FileOutputStream(deletedKeysFile, true)); + } + private boolean shouldFlushBuffer() { return System.currentTimeMillis() - lastBufferFlushTimeMs > conf .getBufferFlushTimeoutMs(); @@ -387,6 +403,7 @@ public void compact() throws IOException { // This is because we will be resetting bytesInWalFile below and we need to get all // buffer to file so that their offsets are honoured. flush(); + flushDeletedKeys(); LOG.info("Beginning compaction with bytesInWalFile={}", bytesInWalFile); // Check whether there was any data coming in. If not simply bail out. @@ -399,8 +416,11 @@ public void compact() throws IOException { compactionState.nextWalFile = new File(dbDirFile.getAbsolutePath() + File.separator + FILE_NAME_WAL + FILE_TYPE_NEXT); + compactionState.nextDeletedKeysFile = new File(dbDirFile.getAbsoluteFile() + File.separator + FILE_NAME_DELETED_KEYS + FILE_TYPE_NEXT); + // Create new walOut File walOut = new DataOutputStream(new FileOutputStream(compactionState.nextWalFile)); + deleteKeysOut = new DataOutputStream(new FileOutputStream(compactionState.nextDeletedKeysFile)); bytesInWalFile = 0; compactionState.nextFileRecordIndex = 0; @@ -413,6 +433,8 @@ public void compact() throws IOException { compactionState.nextDataFile = new File(dbDirFile.getAbsolutePath() + File.separator + FILE_NAME_DATA + FILE_TYPE_NEXT); + // 1. read delete file and create a set for deleted keys + // 2. move the new file to previous delete file try (final BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(compactionState.nextDataFile), buffer.getWriteBufferSize())) { @@ -437,6 +459,7 @@ public void compact() throws IOException { // First rename prevWalFile and prevDataFile so that .next can be renamed walFile = move(compactionState.nextWalFile, walFile).toFile(); dataFile = move(compactionState.nextDataFile, dataFile).toFile(); + deletedKeysFile = move(compactionState.nextDeletedKeysFile, deletedKeysFile).toFile(); // Now make bitsets point right. dataInWalFile = compactionState.dataInNextWalFile; @@ -571,6 +594,63 @@ public void flush() throws IOException { } } + private void readFromDeletedKeysFile(RandomAccessFile file, Consumer entryConsumer) throws IOException{ + ByteBuffer bufferDeletedKeys = ByteBuffer.allocate(4); // will hold 10 keys + while(true) { + bufferDeletedKeys.clear(); + final int bytesRead = file.read(bufferDeletedKeys.array()); + if (bytesRead == -1) { + break; + } + entryConsumer.accept(bufferDeletedKeys.getInt()); + } + } + + private TIntHashSet getKeysToBeDeletedFromFile(final boolean useLatestWalFile, final boolean readInMemoryBuffer) throws IOException{ + TIntHashSet keysToBeDeleted = new TIntHashSet(); + + Consumer entryConsumer = keyToBeDeleted -> { + int address = index.get(keyToBeDeleted); + if (address == RESERVED_KEY_MARKER) { + // delete only when key is not set in index + // because a set value will itself overwrite previous value so no need to delete + keysToBeDeleted.add(keyToBeDeleted); + } + }; + + final ArrayList filesToRead = new ArrayList<>(2); + + if (isCompactionInProgress() && useLatestWalFile && compactionState.nextDeletedKeysFile.exists()) { + final RandomAccessFileWrapper reader = filePool.borrowObject(compactionState.nextDeletedKeysFile); + reader.seek(0); + filesToRead.add(reader); + } + + if (deletedKeysFile.exists()) { + final RandomAccessFileWrapper reader = filePool.borrowObject(deletedKeysFile); + reader.seek(0); + filesToRead.add(reader); + } + + if (readInMemoryBuffer) { + deletedKeysSet.forEach(new TIntProcedure() { + @Override + public boolean execute(int i) { + entryConsumer.accept(i); + return true; + } + }); + } + // read from the deletedKeysFile + for (RandomAccessFile file: filesToRead) { + readFromDeletedKeysFile(file, entryConsumer); + filePool.returnObject(((RandomAccessFileWrapper) file).getFile(), + (RandomAccessFileWrapper) file); + } + + return keysToBeDeleted; + } + public void iterate(final EntryConsumer consumer) throws IOException { iterate(true, true, consumer); } @@ -609,11 +689,14 @@ private void iterate(final boolean useLatestWalFile, final boolean readInMemoryB rwLock.readLock().unlock(); } + // first iterate over the deleteKeys file and create a set out of that and then use that set with index to check if a key should be deleted or not + TIntHashSet keysToBeDeleted = getKeysToBeDeletedFromFile(useLatestWalFile, readInMemoryBuffer); + final BitSet keysRead = new BitSet(index.size()); final Consumer entryConsumer = entry -> { final int key = entry.getInt(); - final boolean b = keysRead.get(key); + final boolean b = keysRead.get(key) || keysToBeDeleted.contains(key); if (!b) { try { consumer.accept(key, entry.array(), entry.position()); @@ -718,6 +801,49 @@ public byte[] randomGet(final int key) throws IOException, StormDBException { } } + private void flushDeletedKeys() throws IOException { + // flush the deleteSet keys to file + rwLock.writeLock().lock(); + try { + if (deleteKeysOut == null || deletedKeysSet.isEmpty()) { + return; + } + + ByteBuffer deletedKeysBuffer = ByteBuffer.allocate(4 * deletedKeysSet.size()); + + deletedKeysSet.forEach(new TIntProcedure() { + @Override + public boolean execute(int key) { + deletedKeysBuffer.putInt(key); + return true; + } + }); + + deleteKeysOut.write(deletedKeysBuffer.array(), 0, deletedKeysBuffer.position()); + deleteKeysOut.flush(); + } finally { + rwLock.writeLock().unlock(); + } + deletedKeysSet.clear(); + } + + + public void remove(int key) throws IOException, StormDBException { + + final int recordIndexForKey = index.get(key); + if (recordIndexForKey == RESERVED_KEY_MARKER) { + return; // no deletion + } + + deletedKeysSet.add(key); + index.remove(key); + + if (deletedKeysSet.size() >= Config.MAX_KEYS_IN_SET_FOR_DELETION) { + // write to file + flushDeletedKeys(); + } + } + public void close() throws IOException, InterruptedException { flush(); shutDown = true; diff --git a/src/main/java/com/clevertap/stormdb/maps/DefaultIndexMap.java b/src/main/java/com/clevertap/stormdb/maps/DefaultIndexMap.java index d108def..0e5d7a3 100644 --- a/src/main/java/com/clevertap/stormdb/maps/DefaultIndexMap.java +++ b/src/main/java/com/clevertap/stormdb/maps/DefaultIndexMap.java @@ -31,4 +31,7 @@ public int get(int key) { public int size() { return indexMap.size(); } + + @Override + public int remove(int key) { return indexMap.remove(key); } } diff --git a/src/main/java/com/clevertap/stormdb/maps/IndexMap.java b/src/main/java/com/clevertap/stormdb/maps/IndexMap.java index 038e45f..02c1d05 100644 --- a/src/main/java/com/clevertap/stormdb/maps/IndexMap.java +++ b/src/main/java/com/clevertap/stormdb/maps/IndexMap.java @@ -26,4 +26,11 @@ public interface IndexMap { * @return Size of the index. */ int size(); + + /** + * API to support key deletion from the map + * @param key The key to be deleted + * @return + */ + int remove(int key); } diff --git a/src/test/java/com/clevertap/stormdb/CustomIndexMapTest.java b/src/test/java/com/clevertap/stormdb/CustomIndexMapTest.java index 93ce5ba..3836bcb 100644 --- a/src/test/java/com/clevertap/stormdb/CustomIndexMapTest.java +++ b/src/test/java/com/clevertap/stormdb/CustomIndexMapTest.java @@ -43,6 +43,11 @@ public int get(int key) { public int size() { return kvCache.size(); } + + @Override + public int remove(int key) { + return kvCache.remove(key); + } }) .build(); diff --git a/src/test/java/com/clevertap/stormdb/StormDBTest.java b/src/test/java/com/clevertap/stormdb/StormDBTest.java index babd241..988e6f9 100644 --- a/src/test/java/com/clevertap/stormdb/StormDBTest.java +++ b/src/test/java/com/clevertap/stormdb/StormDBTest.java @@ -716,17 +716,24 @@ void testMidWayFileDelete() throws IOException { } } - @Test - void testInMemoryUpdate() throws IOException, StormDBException { + StormDB getStormDBInstance(final int valueSize) throws IOException{ final Path path = Files.createTempDirectory("stormdb"); - final int valueSize = 28; final StormDB db = new StormDBBuilder() .withDbDir(path.toString()) .withValueSize(valueSize) .withAutoCompactDisabled() .build(); + return db; + } + + @Test + void testInMemoryUpdate() throws IOException, StormDBException { + final int valueSize = 28; + + StormDB db = getStormDBInstance(valueSize); + assertEquals(0, db.size()); int[] keysToInsert = {1, 2, 3, 1, 2}; @@ -753,4 +760,58 @@ void testInMemoryUpdate() throws IOException, StormDBException { }); assertEquals(3, db.size()); } + + @Test + public void testKeysDeletion() throws IOException, StormDBException{ + final int valueSize = 28; + StormDB db = getStormDBInstance(valueSize); + // insert some keys + int totalEntries = 115; + ByteBuffer value = ByteBuffer.allocate(valueSize); + for (int i = 1; i <= totalEntries; i++) { + value.clear(); + value.putInt(i); + db.put(i, value.array()); + } + + int[] total = {0}; + + db.iterate((key, data, offset) -> { + ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize); + assertEquals(key, dataValue.getInt()); + total[0]++; + }); + + assertEquals(totalEntries, total[0]); + // now delete every even number + total[0] = 0; + + // simple deletion + for (int i = 1;i <= totalEntries; i++) { + if (i%2 == 0) { + db.remove(i); + } + if (i % 3 == 0) { + value.clear(); + value.putInt(i); + db.put(i, value.array()); + } + } + db.iterate((key, data, offset) -> { + ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize); + assertEquals(key, dataValue.getInt()); + total[0]++; + }); + assertEquals(77, total[0]); + + // compaction based deletion + db.compact(); + total[0] = 0; + db.iterate((key, data, offset) -> { + ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize); + assertEquals(key, dataValue.getInt()); + total[0]++; + }); + assertEquals(77, total[0]); + } } \ No newline at end of file