From 67f9eca7447268bce4dbd5a2946f804d875d44c7 Mon Sep 17 00:00:00 2001 From: Qi Yu Date: Fri, 12 Apr 2024 19:10:02 +0800 Subject: [PATCH] [#2888] fix(core): Fix the possible concurrency for `KvGarbageCollector` (#2890) ### What changes were proposed in this pull request? Add a check mechanism in `KvGarbageCollector` to avoid removing uncommitted data wrote into kv storage a few seconds or minutes ago. ### Why are the changes needed? There is a potential that `KvGarbageCollector` will take normal data as uncommitted data and remove it. For more please see #2888 Fix: #2888 ### Does this PR introduce _any_ user-facing change? N/A. ### How was this patch tested? N/A. --- .../storage/kv/KvGarbageCollector.java | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/com/datastrato/gravitino/storage/kv/KvGarbageCollector.java b/core/src/main/java/com/datastrato/gravitino/storage/kv/KvGarbageCollector.java index a2f39c361d3..d505a541864 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/kv/KvGarbageCollector.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/kv/KvGarbageCollector.java @@ -11,6 +11,7 @@ import static com.datastrato.gravitino.storage.kv.TransactionalKvBackendImpl.generateCommitKey; import static com.datastrato.gravitino.storage.kv.TransactionalKvBackendImpl.generateKey; import static com.datastrato.gravitino.storage.kv.TransactionalKvBackendImpl.getBinaryTransactionId; +import static com.datastrato.gravitino.storage.kv.TransactionalKvBackendImpl.getTransactionId; import com.datastrato.gravitino.Config; import com.datastrato.gravitino.Entity.EntityType; @@ -43,6 +44,7 @@ public final class KvGarbageCollector implements Closeable { private final KvBackend kvBackend; private final Config config; private final EntityKeyEncoder entityKeyEncoder; + private long frequencyInMinutes; private static final String TIME_STAMP_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"; @@ -69,8 +71,9 @@ public void start() { // We will collect garbage every 10 minutes at least. If the dateTimeLineMinute is larger than // 100 minutes, we would collect garbage every dateTimeLineMinute/10 minutes. - long frequency = Math.max(dateTimeLineMinute / 10, 10); - garbageCollectorPool.scheduleAtFixedRate(this::collectAndClean, 5, frequency, TimeUnit.MINUTES); + this.frequencyInMinutes = Math.max(dateTimeLineMinute / 10, 10); + garbageCollectorPool.scheduleAtFixedRate( + this::collectAndClean, 5, frequencyInMinutes, TimeUnit.MINUTES); } @VisibleForTesting @@ -98,6 +101,16 @@ private void collectAndRemoveUncommittedData() throws IOException { .predicate( (k, v) -> { byte[] transactionId = getBinaryTransactionId(k); + + // Only remove the uncommitted data that were written frequencyInMinutes + // minutes ago. + // It may have concurrency issues with TransactionalKvBackendImpl#commit. + long writeTime = getTransactionId(transactionId) >> 18; + if (writeTime + < (System.currentTimeMillis() - frequencyInMinutes * 60 * 1000 * 2)) { + return false; + } + return kvBackend.get(generateCommitKey(transactionId)) == null; }) .limit(10000) /* Each time we only collect 10000 entities at most*/ @@ -209,7 +222,7 @@ private void collectAndRemoveOldVersionData() throws IOException { // All keys in this transaction have been deleted, we can remove the commit mark. if (keysDeletedCount == keysInTheTransaction.size()) { - long timestamp = TransactionalKvBackendImpl.getTransactionId(transactionId) >> 18; + long timestamp = getTransactionId(transactionId) >> 18; LOG.info( "Physically delete commit mark: {}, createTime: '{}({})', key: '{}'", Bytes.wrap(kv.getKey()), @@ -261,7 +274,7 @@ LogHelper decodeKey(byte[] key, byte[] timestampArray) { LOG.warn("Unable to decode key: {}", Bytes.wrap(key), e); return LogHelper.NONE; } - long timestamp = TransactionalKvBackendImpl.getTransactionId(timestampArray) >> 18; + long timestamp = getTransactionId(timestampArray) >> 18; String ts = DateFormatUtils.format(timestamp, TIME_STAMP_FORMAT); return new LogHelper(entityTypePair.getKey(), entityTypePair.getValue(), timestamp, ts);