Skip to content

Commit

Permalink
[#2888] fix(core): Fix the possible concurrency for `KvGarbageCollect…
Browse files Browse the repository at this point in the history
…or` (#2890)

### What changes were proposed in this pull request?

Add a check mechanism in `KvGarbageCollector` to avoid removing
uncommitted data wrote into kv storage a few seconds or minutes ago.

### Why are the changes needed?

There is a potential that `KvGarbageCollector` will take normal data as
uncommitted data and remove it. For more please see #2888

Fix: #2888 

### Does this PR introduce _any_ user-facing change?

N/A.

### How was this patch tested?

N/A.
  • Loading branch information
yuqi1129 authored Apr 12, 2024
1 parent 1e7759c commit 67f9eca
Showing 1 changed file with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static com.datastrato.gravitino.storage.kv.TransactionalKvBackendImpl.generateCommitKey;
import static com.datastrato.gravitino.storage.kv.TransactionalKvBackendImpl.generateKey;
import static com.datastrato.gravitino.storage.kv.TransactionalKvBackendImpl.getBinaryTransactionId;
import static com.datastrato.gravitino.storage.kv.TransactionalKvBackendImpl.getTransactionId;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.Entity.EntityType;
Expand Down Expand Up @@ -43,6 +44,7 @@ public final class KvGarbageCollector implements Closeable {
private final KvBackend kvBackend;
private final Config config;
private final EntityKeyEncoder<byte[]> entityKeyEncoder;
private long frequencyInMinutes;

private static final String TIME_STAMP_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";

Expand All @@ -69,8 +71,9 @@ public void start() {

// We will collect garbage every 10 minutes at least. If the dateTimeLineMinute is larger than
// 100 minutes, we would collect garbage every dateTimeLineMinute/10 minutes.
long frequency = Math.max(dateTimeLineMinute / 10, 10);
garbageCollectorPool.scheduleAtFixedRate(this::collectAndClean, 5, frequency, TimeUnit.MINUTES);
this.frequencyInMinutes = Math.max(dateTimeLineMinute / 10, 10);
garbageCollectorPool.scheduleAtFixedRate(
this::collectAndClean, 5, frequencyInMinutes, TimeUnit.MINUTES);
}

@VisibleForTesting
Expand Down Expand Up @@ -98,6 +101,16 @@ private void collectAndRemoveUncommittedData() throws IOException {
.predicate(
(k, v) -> {
byte[] transactionId = getBinaryTransactionId(k);

// Only remove the uncommitted data that were written frequencyInMinutes
// minutes ago.
// It may have concurrency issues with TransactionalKvBackendImpl#commit.
long writeTime = getTransactionId(transactionId) >> 18;
if (writeTime
< (System.currentTimeMillis() - frequencyInMinutes * 60 * 1000 * 2)) {
return false;
}

return kvBackend.get(generateCommitKey(transactionId)) == null;
})
.limit(10000) /* Each time we only collect 10000 entities at most*/
Expand Down Expand Up @@ -209,7 +222,7 @@ private void collectAndRemoveOldVersionData() throws IOException {

// All keys in this transaction have been deleted, we can remove the commit mark.
if (keysDeletedCount == keysInTheTransaction.size()) {
long timestamp = TransactionalKvBackendImpl.getTransactionId(transactionId) >> 18;
long timestamp = getTransactionId(transactionId) >> 18;
LOG.info(
"Physically delete commit mark: {}, createTime: '{}({})', key: '{}'",
Bytes.wrap(kv.getKey()),
Expand Down Expand Up @@ -261,7 +274,7 @@ LogHelper decodeKey(byte[] key, byte[] timestampArray) {
LOG.warn("Unable to decode key: {}", Bytes.wrap(key), e);
return LogHelper.NONE;
}
long timestamp = TransactionalKvBackendImpl.getTransactionId(timestampArray) >> 18;
long timestamp = getTransactionId(timestampArray) >> 18;
String ts = DateFormatUtils.format(timestamp, TIME_STAMP_FORMAT);

return new LogHelper(entityTypePair.getKey(), entityTypePair.getValue(), timestamp, ts);
Expand Down

0 comments on commit 67f9eca

Please sign in to comment.