Skip to content

Commit

Permalink
#20 keys deletion issue
Browse files Browse the repository at this point in the history
  • Loading branch information
Akshay committed Sep 30, 2020
1 parent 53edb09 commit 816d86c
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/main/java/com/clevertap/stormdb/CompactionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class CompactionState {

File nextWalFile;
File nextDataFile;
File nextDeletedKeysFile;

boolean runningForTooLong() {
return System.currentTimeMillis() - start > 30 * 60 * 1000;
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/clevertap/stormdb/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class Config {
static final int MIN_OPEN_FD_COUNT = 1;
static final int MAX_OPEN_FD_COUNT = 100;

static final int MAX_KEYS_IN_SET_FOR_DELETION = 20; // every 20 keys write to deletion files

// Must have parameters
boolean autoCompact = true;
int valueSize;
Expand Down
128 changes: 127 additions & 1 deletion src/main/java/com/clevertap/stormdb/StormDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import com.clevertap.stormdb.internal.RandomAccessFilePool;
import com.clevertap.stormdb.utils.ByteUtil;
import com.clevertap.stormdb.utils.RecordUtil;
import gnu.trove.procedure.TIntProcedure;
import gnu.trove.set.hash.TIntHashSet;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.File;
Expand Down Expand Up @@ -50,6 +52,7 @@ public class StormDB {
private static final String FILE_NAME_DATA = "data";
private static final String FILE_NAME_WAL = "wal";
private static final String FILE_TYPE_NEXT = ".next";
private static final String FILE_NAME_DELETED_KEYS = "keysDeleted";

/**
* Key: The actual key within this KV store.
Expand All @@ -58,6 +61,11 @@ public class StormDB {
*/
private final IndexMap index;

/**
* Set to hold keys that have been deleted so far
*/
private final TIntHashSet deletedKeysSet = new TIntHashSet();

private BitSet dataInWalFile = new BitSet();

private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
Expand All @@ -76,8 +84,10 @@ public class StormDB {

private File dataFile;
private File walFile;
private File deletedKeysFile;

private DataOutputStream walOut;
private DataOutputStream deleteKeysOut;

private Thread tWorker;
private final Object compactionSync;
Expand Down Expand Up @@ -116,6 +126,7 @@ public class StormDB {

dataFile = new File(dbDirFile.getAbsolutePath() + File.separator + FILE_NAME_DATA);
walFile = new File(dbDirFile.getAbsolutePath() + File.separator + FILE_NAME_WAL);
deletedKeysFile = new File(dbDirFile.getAbsoluteFile() + File.separator + FILE_NAME_DELETED_KEYS);

// Open DB.
final File metaFile = new File(conf.getDbDir() + "/meta");
Expand All @@ -138,6 +149,7 @@ public class StormDB {
}

initWalOut();
initDeletedKeyOut();

recover();
buildIndex();
Expand Down Expand Up @@ -231,6 +243,10 @@ private void initWalOut() throws FileNotFoundException {
bytesInWalFile = walFile.length();
}

private void initDeletedKeyOut() throws FileNotFoundException {
deleteKeysOut = new DataOutputStream(new FileOutputStream(deletedKeysFile, true));
}

private boolean shouldFlushBuffer() {
return System.currentTimeMillis() - lastBufferFlushTimeMs > conf
.getBufferFlushTimeoutMs();
Expand Down Expand Up @@ -387,6 +403,7 @@ public void compact() throws IOException {
// This is because we will be resetting bytesInWalFile below and we need to get all
// buffer to file so that their offsets are honoured.
flush();
flushDeletedKeys();

LOG.info("Beginning compaction with bytesInWalFile={}", bytesInWalFile);
// Check whether there was any data coming in. If not simply bail out.
Expand All @@ -399,8 +416,11 @@ public void compact() throws IOException {
compactionState.nextWalFile = new File(dbDirFile.getAbsolutePath()
+ File.separator + FILE_NAME_WAL + FILE_TYPE_NEXT);

compactionState.nextDeletedKeysFile = new File(dbDirFile.getAbsoluteFile() + File.separator + FILE_NAME_DELETED_KEYS + FILE_TYPE_NEXT);

// Create new walOut File
walOut = new DataOutputStream(new FileOutputStream(compactionState.nextWalFile));
deleteKeysOut = new DataOutputStream(new FileOutputStream(compactionState.nextDeletedKeysFile));
bytesInWalFile = 0;
compactionState.nextFileRecordIndex = 0;

Expand All @@ -413,6 +433,8 @@ public void compact() throws IOException {
compactionState.nextDataFile = new File(dbDirFile.getAbsolutePath() + File.separator +
FILE_NAME_DATA + FILE_TYPE_NEXT);

// 1. read delete file and create a set for deleted keys
// 2. move the new file to previous delete file
try (final BufferedOutputStream out =
new BufferedOutputStream(new FileOutputStream(compactionState.nextDataFile),
buffer.getWriteBufferSize())) {
Expand All @@ -437,6 +459,7 @@ public void compact() throws IOException {
// First rename prevWalFile and prevDataFile so that .next can be renamed
walFile = move(compactionState.nextWalFile, walFile).toFile();
dataFile = move(compactionState.nextDataFile, dataFile).toFile();
deletedKeysFile = move(compactionState.nextDeletedKeysFile, deletedKeysFile).toFile();

// Now make bitsets point right.
dataInWalFile = compactionState.dataInNextWalFile;
Expand Down Expand Up @@ -571,6 +594,63 @@ public void flush() throws IOException {
}
}

private void readFromDeletedKeysFile(RandomAccessFile file, Consumer<Integer> entryConsumer) throws IOException{
ByteBuffer bufferDeletedKeys = ByteBuffer.allocate(4); // will hold 10 keys
while(true) {
bufferDeletedKeys.clear();
final int bytesRead = file.read(bufferDeletedKeys.array());
if (bytesRead == -1) {
break;
}
entryConsumer.accept(bufferDeletedKeys.getInt());
}
}

private TIntHashSet getKeysToBeDeletedFromFile(final boolean useLatestWalFile, final boolean readInMemoryBuffer) throws IOException{
TIntHashSet keysToBeDeleted = new TIntHashSet();

Consumer<Integer> entryConsumer = keyToBeDeleted -> {
int address = index.get(keyToBeDeleted);
if (address == RESERVED_KEY_MARKER) {
// delete only when key is not set in index
// because a set value will itself overwrite previous value so no need to delete
keysToBeDeleted.add(keyToBeDeleted);
}
};

final ArrayList<RandomAccessFile> filesToRead = new ArrayList<>(2);

if (isCompactionInProgress() && useLatestWalFile && compactionState.nextDeletedKeysFile.exists()) {
final RandomAccessFileWrapper reader = filePool.borrowObject(compactionState.nextDeletedKeysFile);
reader.seek(0);
filesToRead.add(reader);
}

if (deletedKeysFile.exists()) {
final RandomAccessFileWrapper reader = filePool.borrowObject(deletedKeysFile);
reader.seek(0);
filesToRead.add(reader);
}

if (readInMemoryBuffer) {
deletedKeysSet.forEach(new TIntProcedure() {
@Override
public boolean execute(int i) {
entryConsumer.accept(i);
return true;
}
});
}
// read from the deletedKeysFile
for (RandomAccessFile file: filesToRead) {
readFromDeletedKeysFile(file, entryConsumer);
filePool.returnObject(((RandomAccessFileWrapper) file).getFile(),
(RandomAccessFileWrapper) file);
}

return keysToBeDeleted;
}

public void iterate(final EntryConsumer consumer) throws IOException {
iterate(true, true, consumer);
}
Expand Down Expand Up @@ -609,11 +689,14 @@ private void iterate(final boolean useLatestWalFile, final boolean readInMemoryB
rwLock.readLock().unlock();
}

// first iterate over the deleteKeys file and create a set out of that and then use that set with index to check if a key should be deleted or not
TIntHashSet keysToBeDeleted = getKeysToBeDeletedFromFile(useLatestWalFile, readInMemoryBuffer);

final BitSet keysRead = new BitSet(index.size());

final Consumer<ByteBuffer> entryConsumer = entry -> {
final int key = entry.getInt();
final boolean b = keysRead.get(key);
final boolean b = keysRead.get(key) || keysToBeDeleted.contains(key);
if (!b) {
try {
consumer.accept(key, entry.array(), entry.position());
Expand Down Expand Up @@ -718,6 +801,49 @@ public byte[] randomGet(final int key) throws IOException, StormDBException {
}
}

private void flushDeletedKeys() throws IOException {
// flush the deleteSet keys to file
rwLock.writeLock().lock();
try {
if (deleteKeysOut == null || deletedKeysSet.isEmpty()) {
return;
}

ByteBuffer deletedKeysBuffer = ByteBuffer.allocate(4 * deletedKeysSet.size());

deletedKeysSet.forEach(new TIntProcedure() {
@Override
public boolean execute(int key) {
deletedKeysBuffer.putInt(key);
return true;
}
});

deleteKeysOut.write(deletedKeysBuffer.array(), 0, deletedKeysBuffer.position());
deleteKeysOut.flush();
} finally {
rwLock.writeLock().unlock();
}
deletedKeysSet.clear();
}


public void remove(int key) throws IOException, StormDBException {

final int recordIndexForKey = index.get(key);
if (recordIndexForKey == RESERVED_KEY_MARKER) {
return; // no deletion
}

deletedKeysSet.add(key);
index.remove(key);

if (deletedKeysSet.size() >= Config.MAX_KEYS_IN_SET_FOR_DELETION) {
// write to file
flushDeletedKeys();
}
}

public void close() throws IOException, InterruptedException {
flush();
shutDown = true;
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/clevertap/stormdb/maps/DefaultIndexMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ public int get(int key) {
public int size() {
return indexMap.size();
}

@Override
public int remove(int key) { return indexMap.remove(key); }
}
7 changes: 7 additions & 0 deletions src/main/java/com/clevertap/stormdb/maps/IndexMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,11 @@ public interface IndexMap {
* @return Size of the index.
*/
int size();

/**
* API to support key deletion from the map
* @param key The key to be deleted
* @return
*/
int remove(int key);
}
5 changes: 5 additions & 0 deletions src/test/java/com/clevertap/stormdb/CustomIndexMapTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public int get(int key) {
public int size() {
return kvCache.size();
}

@Override
public int remove(int key) {
return kvCache.remove(key);
}
})
.build();

Expand Down
67 changes: 64 additions & 3 deletions src/test/java/com/clevertap/stormdb/StormDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -716,17 +716,24 @@ void testMidWayFileDelete() throws IOException {
}
}

@Test
void testInMemoryUpdate() throws IOException, StormDBException {
StormDB getStormDBInstance(final int valueSize) throws IOException{
final Path path = Files.createTempDirectory("stormdb");

final int valueSize = 28;
final StormDB db = new StormDBBuilder()
.withDbDir(path.toString())
.withValueSize(valueSize)
.withAutoCompactDisabled()
.build();

return db;
}

@Test
void testInMemoryUpdate() throws IOException, StormDBException {
final int valueSize = 28;

StormDB db = getStormDBInstance(valueSize);

assertEquals(0, db.size());

int[] keysToInsert = {1, 2, 3, 1, 2};
Expand All @@ -753,4 +760,58 @@ void testInMemoryUpdate() throws IOException, StormDBException {
});
assertEquals(3, db.size());
}

@Test
public void testKeysDeletion() throws IOException, StormDBException{
final int valueSize = 28;
StormDB db = getStormDBInstance(valueSize);
// insert some keys
int totalEntries = 115;
ByteBuffer value = ByteBuffer.allocate(valueSize);
for (int i = 1; i <= totalEntries; i++) {
value.clear();
value.putInt(i);
db.put(i, value.array());
}

int[] total = {0};

db.iterate((key, data, offset) -> {
ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize);
assertEquals(key, dataValue.getInt());
total[0]++;
});

assertEquals(totalEntries, total[0]);
// now delete every even number
total[0] = 0;

// simple deletion
for (int i = 1;i <= totalEntries; i++) {
if (i%2 == 0) {
db.remove(i);
}
if (i % 3 == 0) {
value.clear();
value.putInt(i);
db.put(i, value.array());
}
}
db.iterate((key, data, offset) -> {
ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize);
assertEquals(key, dataValue.getInt());
total[0]++;
});
assertEquals(77, total[0]);

// compaction based deletion
db.compact();
total[0] = 0;
db.iterate((key, data, offset) -> {
ByteBuffer dataValue = ByteBuffer.wrap(data, offset, valueSize);
assertEquals(key, dataValue.getInt());
total[0]++;
});
assertEquals(77, total[0]);
}
}

0 comments on commit 816d86c

Please sign in to comment.