Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alluxio cache #18719

Merged
merged 6 commits into from
Feb 2, 2024
Merged

Alluxio cache #18719

merged 6 commits into from
Feb 2, 2024

Conversation

Pluies
Copy link
Contributor

@Pluies Pluies commented Aug 17, 2023

Description

👋

This PR includes Alluxio caching into Trino. It is a reworking of #16375 with:

  • Fully rebased on current code
  • Fixed guice bindings / protobuf definitions / other small issues
  • Added Delta Lake support to Alluxio cache
  • Using optimized-local-scheduling rather than introducing a new concept of node affinity
  • Passing the lastModifiedTime from the coordinator to the workers via ConnectorSplit to allow for immutable caching without workers having to maintain their own file status cache
  • Full smoke tests for the caching file system by extending TestDeltaLakeMinioAndHmsConnectorSmokeTest into TestCachingDeltaLakeMinioAndHmsConnectorSmokeTest

Additional context and related issues

This is very much #16375 from @beinan reworked after helpful feedback from @raunaqmorarka (thank you! 🙏 ).

I'm putting it out there so we can discuss next steps on integrating this to Trino 🥳 with the following notes:

  • This PR only works with HDFS so far, and we'll have to make it compatible with the new S3-native implementation in a subsequent PR
  • Using optimized-local-scheduling means each connector has to implement split scheduling by specifying an address on the splits it generates. Otherwise, each split will randomly be assigned to a worker node, and the cache won't be distributed.
  • Connectors can now optionally pass a lastModifiedTime argument when creating a TrinoInputFile. This argument can be used by Alluxio to bypass checking the lastModifiedTime from the underlying file system, and use the cached file directly if available
  • Both of the above are enabled for trino-delta-lake, the connector I'm developing this for, but will need to be implemented separately for other connectors to take full advantage of Alluxio

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text:

# Delta Lake
* Improve performance of scans by adding the ability to cache data files on local SSDs ({issue}`18719`)

@cla-bot cla-bot bot added the cla-signed label Aug 17, 2023
@github-actions github-actions bot added tests:hive delta-lake Delta Lake connector hive Hive connector labels Aug 17, 2023
@Pluies Pluies force-pushed the alluxio-cache branch 4 times, most recently from 3325fd5 to 659298d Compare August 29, 2023 13:23
@Pluies
Copy link
Contributor Author

Pluies commented Aug 29, 2023

@electrum @beinan hey folks 👋 I'd love to get this merged, let me know if you'd like more context to review this PR!

@rwilliams-r7
Copy link

would also love to see this or 16375 merged.

String cacheIdentifier = hashFunction.hashString(path + lastModifiedTime, UTF_8).toString();
URIStatus uriStatus = new URIStatus(info, CacheContext.defaults().setCacheIdentifier(cacheIdentifier));
return new FSDataInputStream(new HdfsFileInputStream(
new LocalCacheFileInStream(uriStatus, (uri) -> new AlluxioHdfsInputStream(fileSystem.open(file)), cacheManager, alluxioConf),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we use soft scheduling, this might end up unnecessarily caching data on nodes whenever soft scheduling fails. That is, if a piece of data A is supposed to be cached on node 1, but node 1 is too busy, it might be scheduled to run on node 2. Then node 2 will read A from cloud storage, and add it to its cache. However, the next time data A is supposed to be read, and node 1 is still too busy, we are just as likely to schedule the read to happen on node 3. So we will just end up consuming write capacity and cache capacity from node 2, without really getting any benefit of caching. This will especially be a problem in a cluster with a lot of nodes.

The nodes need a way to know if they are supposed to cache a piece of data or not, and fall back to not caching.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That might be possible to mitigate by populating a list of preferred host addresses in the connector split instead of just one host

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this potentially reduce the cache hit rate? For instance, let's say we have 100 nodes, each with a 1 TB cache, a 100 TB of data we want to query, and all data is only cached on a single node. If soft scheduling never fails we will eventually have a 100% cache hit rate. If soft scheduling fails 10% of the time, 10% of the data will be scheduled on a random node not supposed to cache this data, and store "garbage" in its cache. Eventually 10% of all cached data is not cached on a node with the correct cache key. This should give roughly an 80% cache hit rate.

If on the other hand we cache all data on two nodes, then only 50 TB of the data we want to query can be cached. This should give a cache hit rate of 50%, even if soft scheduling never fails.

If nodes only cache data they are supposed to cache, we would get an 90% cache hit rate, if soft scheduling fails 10% of the time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a problem as long as you deterministically provide the same list of preferred host addressees for a given split and the scheduler attempts to schedule splits on these hosts in the provided preferred order. I'm assuming that the probability that all the preferred hosts are too busy to scheduled splits is low with 3 preferred hosts. You might cache some data in multiple nodes but that would be useful when the primary preferred node is too busy.
A couple of alternatives to this are:

  1. The worker node checks that it is not the preferred host from the split and uses the non-cached file system implementation when it's not. This would give up on caching if the preferred host is too busy to schedule splits on.
  2. The embedded cache implementation itself has the the ability to remotely read cached data from any worker. This way even if a split gets scheduled on a node which didn't cache the data, it can still read the cached data from another node. This is the approach that Rubix takes. However, this is probably not worth the added complexity.

.setPath(path)
.setFolder(false)
.setLength(length());
String cacheIdentifier = hashFunction.hashString(path + lastModifiedTime, UTF_8).toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit unfortunate that we have to do the caching on the file level, and not on the part-of-file level. With the current caching scheme a single large file from cloud storage will be cached on a single node. For a large and hot file this could potentially lead to problems. But it might be hard to solve, and might not be worth it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beinan could you please weigh in on whether this is possible to improve ?

@Pluies
Copy link
Contributor Author

Pluies commented Sep 20, 2023

An update on this here - based on the feedback above, we're reworking the PR to:

  • Make it compatible with fs.native-s3
  • Remove the inheritance on Hadoop internals like HdfsInputFile
  • Add proper tests that confim the cache is properly used when enabled
  • Generalise the caching scaffolding so that other cache engines can be plugged in besides Alluxio
  • In general, make it much nicer 😅

We're planning to polish it a bit internally, then bring it up here for discussion probably next week.

@rwilliams-r7
Copy link

Amazing thank you for this work.

@jkylling
Copy link
Contributor

jkylling commented Sep 29, 2023

Hey, a small update here. We have just pushed our latest changes with the promised refactor. There is likely still some work to do, especially around testing. Still, we hope this code structure can be a good point for continuing with the review.

To summarize some of the changes and open questions:

  • The code is refactored to work with any TrinoFileSystem.
  • We have added some test coverage, but it's still a bit rough. We plan to polish this more and take into account changes to tests which have happened since we started on the refactor.
  • The AlluxioFileSystemCache is part of the trino-hdfs module, but it is really an independent component and could be extracted.
  • We've added a working ConsistentHashingNodeProvider.
  • The ConsistentHashingNodeProvider uses com.github.ishugaliy.allgood-consistent-hash through the com.qubole.rubix.rubix-presto-shaded dependency. This is a bit unfortunate, but we are unsure on the best approach here.
  • In the current implementation, the node provider (determines where split goes) and the caching (determines what get's cached, and caches it) are completely unrelated components. This makes the first suggestion in Alluxio cache #18719 (comment) difficult to implement.
  • We no longer set lastModifiedTime on the splits from the ConnectorSplitManager as discussed in Alluxio cache #18719 (comment) The reasoning for this is that the connector typically knows which files are immutable and which are not, but the connector need not always know the lastModifiedTime of the files. This is for instance the case when reading checkpoint files from the Delta log of a Delta table. These files are immutable (from the point of view of a Delta Lake reader which first reads the _last_checkpoint file), but the code paths which read the checkpoint files do not all have access to the lastModifiedTime. It seems simpler to have the connector define which files are cacheable or not. This will enable caching of checkpoint files. It also makes it simpler to define custom caching strategies in plugins.
  • The current implementation of DeltaLakeAlluxioFileSystemCache does not cache checkpoints, since the CheckpointEntryIterator is not always closed (first steps towards a fix was started in a6711fa).
  • The way we handle configuration can likely be improved. Currently we pass most of the configuration through a Guice MapBinder (to pick the correct TrinoFileSystemCache) and conditional modules (to set up TrinoFileSystemCache specific bindings). Are there other patterns which can be used here?

If anyone wants to test this PR with other connectors, please be aware that the ConnectorSplit must be modified to include a non-trivial getAddresses method, similar to what is done in ae2f822

@electrum
Copy link
Member

electrum commented Oct 4, 2023

Thanks for your work on this. Can you move AlluxioFileSystemCache to a new module trino-filesystem-alluxio and remove the dependencies on Hadoop? We don't want to introduce new Hadoop dependencies or require users to have to enable Hadoop.

Is the Alluxio cache code fundamentally tied to Hadoop? I see alluxio.hadoop.HdfsFileInputStream which seems to wrapper a Hadoop stream. My understanding based on discussion with @beinan is that they have a lower level cache API that doesn't depend on Hadoop.

@jkylling
Copy link
Contributor

jkylling commented Oct 5, 2023

Thanks for your work on this. Can you move AlluxioFileSystemCache to a new module trino-filesystem-alluxio and remove the dependencies on Hadoop? We don't want to introduce new Hadoop dependencies or require users to have to enable Hadoop.

Is the Alluxio cache code fundamentally tied to Hadoop? I see alluxio.hadoop.HdfsFileInputStream which seems to wrapper a Hadoop stream. My understanding based on discussion with @beinan is that they have a lower level cache API that doesn't depend on Hadoop.

I've moved it to a new module trino-filesystem-alluxio and removed the dependencies on Hadoop. We now only rely on the Alluxio FileInStream and LocalCacheFileInStream classes, which does not seem to rely on Hadoop within Alluxio. It was necessary to duplicate some of the functionality of HdfsInput to have the new AlluxioInput class implement TrinoInput. I copy pasted most of io.trino.hdfs.FSDataInputStreamTail to implement readTail, and some internal Hadoop code to implement readFully.

@wendigo
Copy link
Contributor

wendigo commented Oct 5, 2023

@jkylling can you rebase to resolve conflict?

@jkylling
Copy link
Contributor

jkylling commented Oct 5, 2023

@jkylling can you rebase to resolve conflict?

Yes, I'm looking at it. It's a bit tricky as the alluxio-shaded client contains a lot of classes which can overlap with other dependencies, and rebasing on master have brought about a new set of duplicate resources. I'm trying to switch to use non-shaded Alluxio libraries instead.

@wendigo
Copy link
Contributor

wendigo commented Oct 5, 2023

@jkylling we should use shaded version if possible

@jkylling jkylling force-pushed the alluxio-cache branch 2 times, most recently from 555645a to d66252e Compare January 31, 2024 22:22
Copy link
Member

@raunaqmorarka raunaqmorarka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can squash last two commits

@Override
public Optional<String> getCacheKey(TrinoInputFile delegate)
{
// TODO: Enable caching of parquet checkpoint files once the CheckpointEntryIterator is always closed
Copy link
Member

@raunaqmorarka raunaqmorarka Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the TODO "CheckpointEntryIterator is always closed" still pending ? If it is, please include a link to an open GH issue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be resolved by #20054 now. I'll enable the caching of checkpoint files.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure we include some test with checkpoint files in delta if we're doing that in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TestDeltaLakeAlluxioCacheFileOperations does have coverage for the cache operations for checkpoint files, but it's unclear how much coverage on checkpoints we get through the TestDeltaLakeAlluxioCacheMinioAndHmsConnectorSmokeTest test. It might be prudent to just leave it as is, and consider it as a future optimization. I'll keep the code as is, and leave a TODO to enable caching of checkpoint files.

Enabling caching of checkpoint files could make the delta.checkpoint-filtering.enabled=true feature work better. When we tried enabling this feature earlier we saw a major slow down of the planning and analysis phases. The in-memory cache of checkpoints seemed to no longer be used, and lots of queries were made by the coordinator to object storage to fetch checkpoint files. However, if we only need to read the checkpoint files from disk it could work without the in-memory checkpoint cache.

@raunaqmorarka
Copy link
Member

raunaqmorarka commented Feb 1, 2024

Ran SF1000 TPC benchmark on 6 r6gd.8xlarge workers
Alluxio delta partitioned TPC.pdf
Alluxio delta unpartitioned TPC.pdf

Partitioned TPC summary
Screenshot 2024-02-01 at 10 39 09 AM

Screenshot 2024-02-01 at 11 47 17 PM

Results on TPC look pretty good, there is significant reduction in wall time and some CPU time reduction.

As a follow-up we should look at exposing bytes read from cache as a connector metric, this will make it easy to see usage of the cache for each table scan in a query in output of EXPLAIN ANALYZE VERBOSE, queryinfo json, event listener metrics etc.

@wendigo
Copy link
Contributor

wendigo commented Feb 1, 2024

I'm going to merge Rubix drop in a while (#20102), so we can rebase and drop the conflict resolution commit from this PR.

@jkylling jkylling force-pushed the alluxio-cache branch 3 times, most recently from db4944b to 51bff46 Compare February 1, 2024 15:37
@wendigo
Copy link
Contributor

wendigo commented Feb 1, 2024

@jkylling please rephrase Rubix commit. We've decided internally to remove Rubix when we implement Hive caching with Alluxio. We will merge this PR and add Hive support in a separate change

jkylling and others added 2 commits February 1, 2024 19:20
Copy link
Member

@electrum electrum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments, otherwise looks good

<groupId>org.alluxio</groupId>
<artifactId>alluxio-core-common</artifactId>
<version>${dep.alluxio.version}</version>
<exclusions>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we file an issue to fix these in Alluxio?

DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session)
.build();
try {
File metastoreDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("delta_lake_metastore").toFile().getAbsoluteFile();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're trying to migrate the testing code so that

  1. we create connectors using properties rather than hand wiring
  2. we use OpenTelemetry tracing rather than custom tracking code

For example, 458bfd7 replaced a custom metastore wrapper with a getSpans() method on DistributedQueryrunner (take a look at the assertMetastoreInvocationsForQuery() utility method). If we can do a similar thing here, the test construction becomes simpler and easier to maintain, and we know that we're testing the same code that runs in production.

I'd like to remove TrackingFileSystemFactory, so it's best if we don't introduce more usages of it.

@@ -249,12 +249,24 @@
<scope>runtime</scope>
</dependency>

<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can convert TestDeltaLakeAlluxioCacheFileOperations to construct the connector with properties and use tracing (per my other comment), then we should be able to remove these runtime dependencies.


import java.util.List;

public class NoneCachingHostAddressProvider
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think NoCachingHostAddressProvider would sound better

@@ -91,6 +94,8 @@ protected void setup(Binder binder)
install(new GcsFileSystemModule());
factories.addBinding("gs").to(GcsFileSystemFactory.class);
}

newOptionalBinder(binder, CachingHostAddressProvider.class).setDefault().to(NoneCachingHostAddressProvider.class).in(Scopes.SINGLETON);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than using an optional binder here, the configured cache implementation should bind the implementation. So the NONE cache should install NoCachingHostAddressProvider and ALLUXIO should install ConsistentHashingHostAddressProvider.

public Optional<String> getCacheKey(TrinoInputFile delegate)
throws IOException
{
return Optional.of(delegate.location().path() + delegate.lastModified());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use a separator that won't appear in file names. Otherwise, we could have a collision with a filename ending in a number.

public Optional<String> getCacheKey(TrinoInputFile delegate)
{
// TODO: Consider caching of the Parquet checkpoint files within _delta_log
if (!delegate.location().path().contains("/_delta_log/")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we skip caching this directory?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _delta_log directory contains the files _last_checkpoint and _trino_meta/extended_stats.json. These are not immutable, so are tricky to cache. Also the commit files of the form 000...123.json might not be immutable on ABFS. The checkpoint files should be immutable when accessed by Trino. We decided to leave it as a future optimization in #18719 (comment)

@wendigo wendigo merged commit 273b535 into trinodb:master Feb 2, 2024
96 checks passed
@mosabua
Copy link
Member

mosabua commented Feb 6, 2024

@colebow .. release notes entry should link to the docs and maybe just say

Add support for filesystem caching

Same for the other incoming PRs for Iceberg, Hive, and Hudi

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed delta-lake Delta Lake connector hive Hive connector iceberg Iceberg connector performance
Development

Successfully merging this pull request may close these issues.