-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BugFix] fix the CachingFileIO had wrong length and support pin/unpin for disk cache #18892
Conversation
run starrocks_fe_unittest |
fe/fe-core/src/main/java/com/starrocks/connector/iceberg/io/IcebergCachingFileIO.java
Outdated
Show resolved
Hide resolved
@@ -142,10 +147,12 @@ private CacheEntry(long length, List<ByteBuffer> buffers) { | |||
private static class DiskCacheEntry { | |||
private final long length; | |||
private final InputFile inputFile; | |||
private int useCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
atomic int?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is no need for that. we use cache.asMap.computeIfPresent that is thread-safe for the cache. so pin() and unpin() are both safe. as discussed by ben-manes/caffeine#513 compute will replace the entry with the compute() result, and Weights are measured and recorded when entries are inserted into or updated in the cache, and are thus effectively static during the lifetime of a cache entry, the weight is compute when insert, so use in weigher is also safe.
fe/fe-core/src/main/java/com/starrocks/connector/iceberg/io/IcebergCachingFileIO.java
Show resolved
Hide resolved
fe/fe-core/src/main/java/com/starrocks/connector/iceberg/io/IOUtil.java
Outdated
Show resolved
Hide resolved
fe/fe-core/src/main/java/com/starrocks/connector/iceberg/io/IcebergCachingFileIO.java
Outdated
Show resolved
Hide resolved
fe/fe-core/src/main/java/com/starrocks/connector/iceberg/io/IcebergCachingFileIO.java
Show resolved
Hide resolved
fe/fe-core/src/main/java/com/starrocks/connector/iceberg/io/IcebergCachingFileIO.java
Show resolved
Hide resolved
fe/fe-core/src/main/java/com/starrocks/connector/iceberg/io/IcebergCachingFileIO.java
Show resolved
Hide resolved
} catch (Exception e) { | ||
LOG.warn("failed on deleting file :" + hadoopOutputFile.getPath()); | ||
// use sync CacheWriter to avoid delete file newly generated by another thread | ||
.writer(new CacheWriter<String, DiskCacheEntry>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please upgrade and use .evictionListener((key, value, cause) -> ...)
? This handles evictions only, you'd need to handle the explicit removals via a Map compute.
CacheWriter
was deprecated in v2 and removed in v3. The write
method was not as useful and had a lot of confusing details. That interface approach just didn't work well with Map or AsyncCache, where computations and an eviction listener were more explicit and clear. Plus upgrading means improvements and bug fixes :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ben-manes That's so nice of you.
In my case, I just use delete() of CacheWriter
to delete the file on disk when DiskCacheEntry evicting, and I don't want to delete the disk file when I use computeIfPresent() to update useCount, so I think removalListener is not suitable.
You give me another choice, use .evictionListener
I will check it if I can upgrade the version and use it.
and I have another question, dose .evictionListener((key, value, cause) -> ...)
and .removalListener
work sync with the eviction, is there any possibility that evictionListener
removes the resource may used by a new entry which means that I want to evict an entry and delete the file and insert new entry (same key and same file name)with new file, I want that execute one by one, but delete happened after insert, that means my new file is deleted, is that possible? thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caffeine.evictionListener
runs within the atomic scope of the ConcurrentHashMap.compute
method, so other writes to that key are blocked (just like CacheWriter.delete
). This way the deletion cannot run concurrent with the insertion because the entry lock must be held, so you get key-ordered operations. If an eviction selects a victim that is no longer eligible by the time it acquired the entry lock (pinned), then it was "resurrected" and the attempt no-ops (likely finding another victim). Similarly if the client tries to pin and the entry was being evicted, then when the client acquires the lock it will be to an entry reservation (ala computeIfAbsent). So it should give you the atomicity that you need.
As you said, Caffeine.removalListener
runs after the atomic operation completed so you lose key-ordered actions and may suffer races. Of course the benefit is that you are not making writes more expensive to do some other work, so it's ideal when ordering does not matter.
AsyncCache means that a synchronous listener for removals only makes sense for evictions, as a Map.remove
could be of an in-flight future. Obviously we don't want to force Map.remove
to perform a future.join
to wait in order to notify the listener under the entry's lock. That's why evictionListener
only applies to expired / size / collected causes and not the explicit / replaced ones. Therefore if you have an evictionListener
to delete the file, an explicit Map.remove
won't call the delete for you and you need to use a compute
to both delete it and remove the entry. Therefore, CacheWriter
was replaced by using Map.compute
and evictionListener
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ben-manes Thank you so much! I think it ok that use CacheWriter in my case. because I don't use Map.compute
to generate a null entry so Map.compute
only has replace
not 'removal', evictionListener
is also enough.
if Map.compute
generate null entry then 'Map.compute' needs to deal with the 'removal action', am I right?
for invalidate
CacheWriter can deal that but we need deal it explicitly if we use evictionListener, right?
Thanks again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You’re right. My concern is that CacheWriter was removed in v3 (2/2021) so you’d have to stay on v2. I just don’t want that to be an annoying surprise so chimed in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ben-manes You are so kind, I will consider use new version and upgrade need check all the use case in our system. I will take it, thank you again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ben-manes , I upgrade to 2.9.3 because up to now we work on jdk8, and I use evictionListener
to deal with evicted
resource, and for invalidating I use Map.computeIfPresent
to deal with invalidated
resource and return null to remove the entry from cache.
fe/fe-core/src/main/java/com/starrocks/connector/iceberg/io/IcebergCachingFileIO.java
Show resolved
Hide resolved
fix CachingFileIO get wrong length(), diskCache support pin and unpin to avoid deleting a file which is using. Signed-off-by: zombee0 <[email protected]>
Signed-off-by: zombee0 <[email protected]>
Signed-off-by: zombee0 <[email protected]>
Signed-off-by: zombee0 <[email protected]>
Signed-off-by: zombee0 <[email protected]>
b840018
to
879bc5b
Compare
SonarCloud Quality Gate failed. |
[FE PR Coverage Check]😞 fail : 8 / 133 (06.02%) file detail
|
run starrocks_admit_test |
@Mergifyio backport branch-3.0 |
✅ Backports have been created
|
… for disk cache (#18892) Signed-off-by: zombee0 <[email protected]> (cherry picked from commit 4a3be4c)
… for disk cache (#18892) Signed-off-by: zombee0 <[email protected]> (cherry picked from commit 4a3be4c)
… for disk cache (StarRocks#18892) Signed-off-by: zombee0 <[email protected]>
… for disk cache (StarRocks#18892) Signed-off-by: zombee0 <[email protected]>
… for disk cache (StarRocks#18892) Signed-off-by: zombee0 <[email protected]>
… for disk cache (StarRocks#18892) Signed-off-by: zombee0 <[email protected]>
What type of PR is this:
Which issues of this PR fixes :
Fixes #
Problem Summary(Required) :
fix CachingFileIO has wrong length(),
diskCache support pin and unpin to avoid deleting
a file which is using.
Checklist:
Bugfix cherry-pick branch check: