Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-17306: [C++] Provide an optimized GetFileInfoGenerator specialization for LocalFileSystem #13796

Merged
merged 7 commits into from
Aug 18, 2022

Conversation

ManManson
Copy link
Contributor

@ManManson ManManson commented Aug 4, 2022

Introduce a specialization of GetFileInfoGenerator in the LocalFileSystem class.

This implementation tries to improves performance by hiding latencies at two levels:

  1. Child directories can be readahead so that listing directories entries from disk can be achieved in parallel with other work;
  2. Directory entries can be stat'ed and yielded in chunks so that the FileInfoGenerator consumer can start receiving entries before a large directory is fully processed.

Both mechanisms can be tuned using dedicated parameters in LocalFileSystemOptions.

Signed-off-by: Pavel Solodovnikov [email protected]
Co-Authored-by: Igor Seliverstov [email protected]

@ManManson ManManson changed the title ARROW-17306: [C++] Provide an optimizedGetFileInfoGenerator specialization for LocalFileSystem ARROW-17306: [C++] Provide an optimized GetFileInfoGenerator specialization for LocalFileSystem Aug 4, 2022
@ManManson ManManson force-pushed the arrow-17306 branch 2 times, most recently from b671401 to 3f2b141 Compare August 4, 2022 11:55
@github-actions
Copy link

github-actions bot commented Aug 4, 2022

@ManManson
Copy link
Contributor Author

Force-pushed the branch to fix some build issues on macOS and Windows.

Changelog can be found here: https://github.com/apache/arrow/compare/b6714018c4384cd00f0b9a4e92d804671d1d381b..3f2b141dea8ded351d41d687245917821295277a?diff=unified

@ManManson
Copy link
Contributor Author

I believe appveyor failures don't have anything to do with my patches... Also, "Dev / Source Merge Script" jobs seem to fail due to some issues related to 10.0.0 freeze?

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for contributing this. I think the addition of a async file generator will be a nice asset for scanning, especially for datasets with lots of files. I've just take a high-level scan through the implementation at the moment.

Did you find the async generator utilities useful? I have gotten some feedback that the merged async generator was a little confusing and was considering moving away from it in favor of something like a nestable async task group (see https://issues.apache.org/jira/browse/ARROW-16072). I can try and draft up an example of what this might look like on Monday. However, if you're content with the current implementation, we should proceed with this how it is and leave the merged generator question for the future.

I think it might be good to add a stress test now that this is parallel. Maybe create 10 directories with 10 directories with 10k files each so that we can get some testing of both nested parallelism and DiscoveryImplIterator::kBatchSize.

I will try and look through this more closely on Monday.

Comment on lines 136 to 138
/// How many partitions should be processed in parallel. May not be supported by all
/// implementations of `GetFileSystemGenerator`.
util::optional<int32_t> partitions_readahead;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...this makes sense but we haven't used util::optional in public interfaces that I know of. Typically we do something like int32_t partitions_readahead = kDefaultPartitionsReadahead. Although, the downside is that we usually end up having to repeat the default in python. Curious if @pitrou has any opinion here.

Also, I don't think "partitions" is the correct terminology to be using here. The FileSelector is to be understood in the context of "filesystems" which is a more generic abstraction than partitions. Perhaps directory_readahead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any practical reasons why you do avoid optionals in public APIs? Does Arrow try to maintain strict ABI stability across multiple releases? If not, then I guess it should be completely fine.

Regarding directory_readahead: I agree, the term partitions does not belong to this level of abstraction. I'll change it to directory_readahead, then.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have any ABI guarantees. But when there is a well-known default value, it doesn't make sense to pass an optional, IMHO.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I also don't understand why this is an attribute of FileSelector. This sounds more like an implementation-specific know that should probably be in LocalFileSystemOptions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I'm also curious why this needs to be exposed at all)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have any ABI guarantees. But when there is a well-known default value, it doesn't make sense to pass an optional, IMHO.

Maybe you are right, this can be supplied a reasonable default value, so no need to make it optional.

However, I also don't understand why this is an attribute of FileSelector. This sounds more like an implementation-specific know that should probably be in LocalFileSystemOptions.

Well, I don't have a strong opinion on this one. Don't see anything wrong about it being a part of FileSelector (which is exactly designed to specify details of file selection algorithms). Though, since this option is only applicable for LocalFileSystem, maybe it makes sense to hide it behind LocalFileSystemOptions thing.

(I'm also curious why this needs to be exposed at all)

To be able to fine-tune the behavior if the default doesn't work well in a particular case, or better performance can be achieved with another value. For example, various filesystems have varying capabilities regarding parallel IO, e.g. XFS, which is the only FS I know of, that is capable of truly async IO.

@westonpace
Copy link
Member

westonpace commented Aug 5, 2022

Agreed that the test failures are unrelated. The Appveyor issues appears to be addressed already by #13795

/// in serial manner via `MakeConcatenatedGenerator` under the hood.
class AsyncStatSelector {
public:
using FileInfoGeneratorProducer = PushGenerator<FileInfoGenerator>::Producer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I'm curious; from an implementation POV, wouldn't it be simpler to have a PushGenerator<FileInfoVector>?
(that's what the S3 implementation of this does, for example)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surely this can be done, but I'll need to play with the code a little bit to figure out how this will work out.

I suggest we move forward with the current approach and, in case the code can be reshaped in a more optimal way, just provide the fix as a follow-up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I'm suggesting this is that it seems like the natural way to implement GetFileInfoGenerator, and it would also make the code easier to read and maintain. If you have some time to experiment it would be good to give it a try IMHO. Unless you have other PRs pending depending on this feature, merging this PR soon is not critical.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, there aren't yet, but I plan to post some more PRs soon :)

@ManManson
Copy link
Contributor Author

Force-pushed the branch to address review comments. Diff can be found here: https://github.com/apache/arrow/compare/3f2b141dea8ded351d41d687245917821295277a..136ae80540f851219c8d9a920470d4863a428d30

Changelog:

  • Rename partitions_readahead to directory_readahead, as well as the methods in AsyncStatSelector to get rid of Partitions terminology.
  • Moved directory_readahead and batch_size from FileSelector to LocalFileSystemOptions.
  • Removed use of util::optional from these options by providing sensible defaults (1 for directory_readahead and 1000 for batch_size).
  • Optimize memory allocation for internal chunking.

Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, I posted a couple comments.

Also, can you add some localfs-specific tests for this? Ideally you would stress both with and without directory readahead...

Comment on lines 37 to 38
static constexpr uint32_t kDefaultDirectoryReadahead = 1u;
static constexpr uint32_t kDefaultBatchSize = 1000u;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: let's avoid gratuitous use of unsigned integers. Can make these int or int32_t.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: why are uint32_t:s bad?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not bad per se, but the general tendency should be to use them only when necessary (for example you want to perform unsigned computations, or are you reading data provided by a third-party API), as otherwise you inevitably end up mixing signed and unsigned which is generally annoying.

/// a single FileInfoVector chunk by the `GetFileSystemGenerator` impl, which
/// is the result of `stat`:ing individual dirents, obtained by the call to
/// `internal::ListDir`.
uint32_t batch_size = kDefaultBatchSize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename this to file_info_batch_size for clarity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

if (!result.ok()) {
auto status = result.status();
if (selector_.allow_not_found && status.IsIOError()) {
auto exists = FileExists(dir_fn_);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here you can simply propagate the error which will simplify the code below.

Suggested change
auto exists = FileExists(dir_fn_);
ARROW_ASSIGN_OR_RAISE(bool exists, FileExists(dir_fn));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right, I'll fix that.

if (exists.ok() && !*exists) {
return Status::OK();
} else {
return exists.ok() ? arrow::Status::UnknownError(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If exists is ok you should simply propagate the error not-found error, not create another one.
This would make the final code look like this probably:

      if (!result.ok()) {
        if (selector_.allow_not_found && status.IsIOError()) {
          ARROW_ASSIGN_OR_RAISE(bool exists, FileExists(dir_fn));
          if (!exists) {
            return Status::OK();
          }
        }
        return status;
      }

(incidentally, this is a similar snippet as in StatSelector())

Comment on lines 466 to 468
FileInfoVector yield_vec;
std::swap(yield_vec, current_chunk_);
return yield_vec;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not simply:

Suggested change
FileInfoVector yield_vec;
std::swap(yield_vec, current_chunk_);
return yield_vec;
return std::move(current_chunk_);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, missed that one.

FileInfoGeneratorProducer file_gen_producer,
uint32_t batch_size) {
ARROW_RETURN_IF(file_gen_producer.is_closed(),
arrow::Status::Cancelled("Discovery cancelled"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does this occur?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this is more of a general failsafe rather than motivated by some real possibility. Ideally, should not happen, since there's no way to interact with the producer directly from the user's point of view. But, it's always better be safe than sorry. :)

@ManManson
Copy link
Contributor Author

Thanks for the update, I posted a couple comments.

Also, can you add some localfs-specific tests for this? Ideally you would stress both with and without directory readahead...

Sure, I'll add some stress tests for the feature to cover both batching and processing parallelism options.

@ManManson ManManson force-pushed the arrow-17306 branch 2 times, most recently from ac2954d to affcca7 Compare August 12, 2022 08:07
@ManManson
Copy link
Contributor Author

ManManson commented Aug 12, 2022

Force-pushed the branch to address review comments from @pitrou and @westonpace . The diff can be found here: https://github.com/apache/arrow/compare/136ae80540f851219c8d9a920470d4863a428d30..ac2954d39b5de052b3c05563d290f00c23b51b34, also pushed a fixup to amend commit message.

Changelog:

  • Change new option fields types from uint32_t to int32_t
  • Renamed batch_size to file_info_batch_size
  • Incorporated some code simplifications proposed by @pitrou
  • Added a benchmark for the new impl of LocalFileSystem::GetFileInfoGenerator
  • Discovered a bug with prematurely closed producer while running the benchmark, somehow it escaped manual testing until now... Introduced AutoClosingProducer to solve the problem

@ManManson ManManson requested review from westonpace and pitrou August 12, 2022 08:14
Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @ManManson ! I have another bunch of comments below.

#include "arrow/util/make_unique.h"
#include "arrow/util/string_view.h"

#include "parquet/arrow/writer.h"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the benchmark not use Parquet? I think we want to minimize coupling here.

Just create a dummy filesystem structure and run your benchmark over that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks.

/// Whether OpenInputStream and OpenInputFile return a mmap'ed file,
/// or a regular one.
bool use_mmap = false;

/// Options related to `GetFileSystemGenerator` interface.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Options related to `GetFileSystemGenerator` interface.
/// Options related to `GetFileInfoGenerator` interface.

/// Options related to `GetFileSystemGenerator` interface.

/// How many directories should be processed in parallel
/// by the `GetFileSystemGenerator` impl.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// by the `GetFileSystemGenerator` impl.
/// by the `GetFileInfoGenerator` impl.

/// by the `GetFileSystemGenerator` impl.
int32_t directory_readahead = kDefaultDirectoryReadahead;
/// Specifies how much entries shall be aggregated into
/// a single FileInfoVector chunk by the `GetFileSystemGenerator` impl, which
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// a single FileInfoVector chunk by the `GetFileSystemGenerator` impl, which
/// a single FileInfoVector chunk by the `GetFileInfoGenerator` impl, which

Comment on lines 490 to 491
FileInfoVector yield_vec;
std::swap(yield_vec, current_chunk_);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can probably be shortened:

Suggested change
FileInfoVector yield_vec;
std::swap(yield_vec, current_chunk_);
auto yield_vec = std::move(current_chunk_);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

std::move(dir_fn), nesting_depth, std::move(selector),
file_gen_producer, file_info_batch_size)),
io::default_io_context().executor()));
gen = MakeTransferredGenerator(std::move(gen), arrow::internal::GetCpuThreadPool());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... why are we transferring to the CPU thread pool? That doesn't seem necessary.
cc @westonpace

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, as far as I understand, if the generator is not transferred back, every continuation attached to every future produced by this generator, will also be run on IO thread pool?

This is undesirable, because we would want to do arbitrary (possibly compute-intensive) stuff on each delivered chunk from GetFileInfoGenerator().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But you may also want to do IO on the GetFileInfoGenerator results, in which case you'll transfer back and forth between executors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, looks reasonable. What we get from GetFileInfoGenerator() is only a raw file handle, so we would want to process it somehow later, e.g. read the file, inspect the metadata etc. Given that, the generator output is going to produce some more IO operations either way, in most cases.

I think MakeTransferredGenerator() is not necessary, indeed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave a note about that in DoDiscovery(), so that future readers don't have to guess.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strange thing: after removing a call to MakeTransferredGenerator(), the benchmark hangs. Debugging that...

In the meanwhile, maybe you know of any quirks when dealing with background generators, related to this problem?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@westonpace would be the best person to answer that question.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, it seems, that I need to transfer from the BackgroundGenerator anyway. There is a bunch of related issues about deadlocking background generators, e.g. https://issues.apache.org/jira/browse/ARROW-13109 and https://issues.apache.org/jira/browse/ARROW-13110, which added a corresponding note to MakeBackgroundGenerator:

You MUST transfer away from this background generator. Otherwise there could be a race condition if a callback on the background thread deletes the last consumer reference to the background generator. You can transfer onto the same executor as the background thread, it is only neccesary to create a new thread task, not to switch executors.

Though, it's legal to specify the same executor for a background generator, in order to start a new task but without switching to the other executor.

So, there should be MakeTransferredGenerator, but we can specify the same io_executor as a target.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I wasn't much help, I've been trying to wrap up my work on a better async task scheduler. I will give this PR a thorough look tomorrow. What you are describing sounds correct. Creating a new thread task doesn't seem ideal but it should work ok. I agree that, in the ideal case, no transfer should be necessary and we should just remain on the I/O thread.

/// and automatically calls `Close()` on it once the ref-count for the
/// state reaches zero (which is equivalent to finishing the file discovery
/// process).
class AutoClosingProducer {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PushGenerator is thread-safe, but unfortunately AutoClosingProducer is not.

It seems you can vastly simplify this and make it thread-safe by letting C++ do the work:

struct DiscoveryState {
  FileInfoGeneratorProducer producer;

  ~DiscoveryState() {
    producer->Close();
  }
};

... then pass the same std::shared_ptr<DiscoveryState> to every DiscoveryImplIterator:

  static Result<AsyncGenerator<FileInfoGenerator>> DiscoverDirectoryFiles(
      FileSelector selector, LocalFileSystemOptions fs_opts) {
    PushGenerator<FileInfoGenerator> file_gen;

    ARROW_ASSIGN_OR_RAISE(
        auto base_dir, arrow::internal::PlatformFilename::FromString(selector.base_dir));
    auto discovery_state = std::make_shared<DiscoveryState>(std::move(file_gen.producer()));
    ARROW_RETURN_NOT_OK(DoDiscovery(std::move(base_dir), 0, std::move(selector),
                                    std::move(discovery_state),
                                    fs_opts.file_info_batch_size));

    return file_gen;
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, thanks for catching it!

Comment on lines 39 to 40
Boost::filesystem
Boost::system)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the Boost dependency isn't needed?

Copy link
Contributor Author

@ManManson ManManson Aug 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, this should not be needed.

}
BENCHMARK_REGISTER_F(LocalFSFixture, AsyncFileDiscovery)
->ArgNames({"directory_readahead", "file_info_batch_size"})
->ArgsProduct({{1, 2, 4, 8, 16}, {1, 10, 100, 1000}})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we perhaps cut down on the number of generated benchmarks? For example:

Suggested change
->ArgsProduct({{1, 2, 4, 8, 16}, {1, 10, 100, 1000}})
->ArgsProduct({{1, 4, 16}, {100, 1000}})

(1 for file_info_batch_size seems so obviously pessimal that it needn't be tested, what do you think?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

@ManManson
Copy link
Contributor Author

Force-pushed the branch to address review comments from @pitrou
The diff can be found there: https://github.com/apache/arrow/compare/affcca7fa308c897ff5a358d1668486abed6c5fa..09142855f750ea046fb081c17d23b5fa675f7e9f

Changelog:

  • Removed AutoClosingProducer in favor of a more simplistic and correct std::shared_ptr<DiscoveryState> approach
  • Sub-generators are scheduled on the same IO executor so that we don't do redundant transitions between CPU and IO threads
  • Pass io_executor from the LocalFileSystem down to the discovery algorithm, so that we don't have to use the io::get_default_io_context()
  • Fixes to comments
  • Minor code fixes (e.g. swapping current chunk contens in DiscoveryImplIterator) proposed by @pitrou
  • Remove unused boost dependencies for localfs benchmark
  • Remove dependency on parquet in the localfs benchmark
  • Reduce the number of argument ranges to the benchmark

Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the benchmark only, a couple comments.

return Status::OK();
});
ASSERT_FINISHES_OK(visit_fut);
st.SetItemsProcessed(total_file_count);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SetItemsProcessed is per benchmark, not per iteration, so to get correct numbers when iterations > 1, you should instead do something like:

  size_t total_file_count = 0;
  for (auto _ : st) {
    ...
  }
  st.SetItemsProcessed(total_file_count);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I will fix that.


const size_t nesting_depth_ = 2;
const size_t num_dirs_ = 10;
const size_t num_files_ = 10000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates on the order of 1 million files total, and makes the benchmark rather slow to setup and run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason to create so many files was to adequately test for large batch/dir_readahead combinations, which I think show sensible results only if there's enough files to stress the target code.

But, if you think we can reduce the overall number of files, I don't have any strong objections to that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num_files_ = 1000 looks more reasonable here, but YMMV :-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there's a bit of a tension between producing interesting numbers and making the benchmark rather expensive to run (even on a fast machine... I'm afraid macOS or Windows might be dismal here :-)).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, win and macOS users will almost surely suffer in this case...
So, to keep benchmarking fast, let num_files_ be 1k, then. :)

@pitrou
Copy link
Member

pitrou commented Aug 17, 2022

I'm running this benchmark locally on Ubuntu 20.04, 24-thread CPU, ext4 filesystem on a fast SSD.
Out of curiosity I added different dataset sizes into the mix as well as file_info_batch_size = 10.

  • With num_files_ = 10000 (total ~1 million files):
-------------------------------------------------------------------------------------------------------------------------------------------------------
Benchmark                                                                                             Time             CPU   Iterations UserCounters...
-------------------------------------------------------------------------------------------------------------------------------------------------------
LocalFSFixture/AsyncFileDiscovery/directory_readahead:1/file_info_batch_size:10/real_time          2668 ms        0.872 ms            1 items_per_second=416.072k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:4/file_info_batch_size:10/real_time          2301 ms        0.857 ms            1 items_per_second=482.503k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:16/file_info_batch_size:10/real_time         2344 ms        0.784 ms            1 items_per_second=473.564k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:1/file_info_batch_size:100/real_time         2312 ms        0.799 ms            1 items_per_second=480.162k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:4/file_info_batch_size:100/real_time         1555 ms        0.717 ms            1 items_per_second=713.993k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:16/file_info_batch_size:100/real_time        1509 ms        0.698 ms            1 items_per_second=735.592k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:1/file_info_batch_size:1000/real_time        2676 ms        0.880 ms            1 items_per_second=414.832k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:4/file_info_batch_size:1000/real_time         634 ms        0.792 ms            1 items_per_second=1.75228M/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:16/file_info_batch_size:1000/real_time        285 ms        0.764 ms            3 items_per_second=3.89016M/s
  • With num_files_ = 1000 (total ~100 thousands files):
-------------------------------------------------------------------------------------------------------------------------------------------------------
Benchmark                                                                                             Time             CPU   Iterations UserCounters...
-------------------------------------------------------------------------------------------------------------------------------------------------------
LocalFSFixture/AsyncFileDiscovery/directory_readahead:1/file_info_batch_size:10/real_time           261 ms        0.086 ms            3 items_per_second=425.581k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:4/file_info_batch_size:10/real_time           186 ms        0.080 ms            4 items_per_second=595.899k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:16/file_info_batch_size:10/real_time          185 ms        0.088 ms            4 items_per_second=601.398k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:1/file_info_batch_size:100/real_time          296 ms        0.096 ms            2 items_per_second=375.29k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:4/file_info_batch_size:100/real_time         66.4 ms        0.081 ms           10 items_per_second=1.6738M/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:16/file_info_batch_size:100/real_time        29.6 ms        0.067 ms           24 items_per_second=3.74895M/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:1/file_info_batch_size:1000/real_time         304 ms        0.091 ms            2 items_per_second=365.748k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:4/file_info_batch_size:1000/real_time        70.2 ms        0.085 ms           10 items_per_second=1.58328M/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:16/file_info_batch_size:1000/real_time       32.7 ms        0.051 ms           21 items_per_second=3.40023M/s
  • With num_files_ = 10 (total ~1000 files):
-------------------------------------------------------------------------------------------------------------------------------------------------------
Benchmark                                                                                             Time             CPU   Iterations UserCounters...
-------------------------------------------------------------------------------------------------------------------------------------------------------
LocalFSFixture/AsyncFileDiscovery/directory_readahead:1/file_info_batch_size:10/real_time          4.24 ms        0.021 ms          187 items_per_second=287.682k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:4/file_info_batch_size:10/real_time          1.87 ms        0.025 ms          352 items_per_second=650.697k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:16/file_info_batch_size:10/real_time         1.59 ms        0.024 ms          455 items_per_second=769.645k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:1/file_info_batch_size:100/real_time         6.21 ms        0.026 ms          106 items_per_second=196.586k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:4/file_info_batch_size:100/real_time         1.86 ms        0.024 ms          364 items_per_second=657.369k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:16/file_info_batch_size:100/real_time        1.60 ms        0.024 ms          435 items_per_second=762.434k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:1/file_info_batch_size:1000/real_time        6.18 ms        0.022 ms          110 items_per_second=197.411k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:4/file_info_batch_size:1000/real_time        1.86 ms        0.025 ms          378 items_per_second=657.252k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:16/file_info_batch_size:1000/real_time       1.58 ms        0.024 ms          463 items_per_second=774.176k/s
  • With num_files_ = 10, num_dirs_ = 1 (total ~30 files):
-------------------------------------------------------------------------------------------------------------------------------------------------------
Benchmark                                                                                             Time             CPU   Iterations UserCounters...
-------------------------------------------------------------------------------------------------------------------------------------------------------
LocalFSFixture/AsyncFileDiscovery/directory_readahead:1/file_info_batch_size:10/real_time         0.105 ms        0.015 ms         6795 items_per_second=304.24k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:4/file_info_batch_size:10/real_time         0.133 ms        0.021 ms         5239 items_per_second=240.955k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:16/file_info_batch_size:10/real_time        0.133 ms        0.021 ms         5241 items_per_second=240.891k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:1/file_info_batch_size:100/real_time        0.157 ms        0.018 ms         4352 items_per_second=203.893k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:4/file_info_batch_size:100/real_time        0.131 ms        0.021 ms         5290 items_per_second=244.757k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:16/file_info_batch_size:100/real_time       0.138 ms        0.022 ms         5471 items_per_second=231.11k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:1/file_info_batch_size:1000/real_time       0.158 ms        0.018 ms         4294 items_per_second=201.957k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:4/file_info_batch_size:1000/real_time       0.131 ms        0.022 ms         5323 items_per_second=243.422k/s
LocalFSFixture/AsyncFileDiscovery/directory_readahead:16/file_info_batch_size:1000/real_time      0.134 ms        0.021 ms         5188 items_per_second=239.693k/s

These numbers seem to support a default readahead of 16 and a default batch size of 1000.

@ManManson
Copy link
Contributor Author

@pitrou Your numbers are in line with what I get on my machine. I agree we can tune the defaults to be kDefaultDirectoryReadahead = 16 and kDefaultFileInfoBatchSize = 1000.

@ManManson
Copy link
Contributor Author

Force-pushed the branch to address some follow-up comments.
The diff can be found here: https://github.com/apache/arrow/compare/09142855f750ea046fb081c17d23b5fa675f7e9f..eb77fad86834a6c10244eb924197e949b7faba3f

Changelog:

  • Set kDefaultDirectoryReadahead to 16 instead of 1, based on benchmarking results
  • Fixed SetItemsProcessed handling in the localfs benchmark
  • Reduce num_files_ from 10k to 1k in the localfs benchmark so that it stays fast enough for a wider range of hw configurations

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, this looks good to me. Thanks for adding the benchmark. I'm curious if you've had a chance to benchmark any real world use cases (e.g. dataset discovery or something)?

The transfer seems the best we can do for now.

Comment on lines 49 to 52
/// Specifies how much entries shall be aggregated into
/// a single FileInfoVector chunk by the `GetFileInfoGenerator` impl, which
/// is the result of `stat`:ing individual dirents, obtained by the call to
/// `internal::ListDir`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to be a rather heavy comment for your average Arrow user. Can we perhaps just give a brief description of the reasons you might want to change this parameter?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll be improving those docstrings. I think it's nice that they are explanatory.

@pitrou
Copy link
Member

pitrou commented Aug 18, 2022

I'll take a last look here.

@pitrou pitrou self-requested a review August 18, 2022 10:18
ManManson and others added 6 commits August 18, 2022 17:39
This header makes use of `int8_t`, which is defined in `<cstdint>`
system header.

Signed-off-by: Pavel Solodovnikov <[email protected]>
…h `directory_readahead` option

Introduce a `directory_readahead` option to `LocalFileSystemOptions`
to adjust how much directory readahead should happen for
`GetFileInfoGenerator()` (i.e. how many directories should
be processed in parallel).

Defaults to 16, judged by the benchmarking results (the
benchmark itself will be a few patches later).

These changes will be used in a later patch to add a separate
optimized specialization for `GetFileInfoGenerator(selector)` for
`LocalFileSystem` class.

Signed-off-by: Pavel Solodovnikov <[email protected]>
…h `file_info_batch_size` option

Introduce a `file_info_batch_size` option to `LocalFileSystemOptions`
to adjust how much internal batching should happen inside
`GetFileInfoGenerator` implementation (e.g. how much elements
should be yielded in a single batch by FileInfoGenerator).

Defaults to 1k elements, based on benchmarking results.

Signed-off-by: Pavel Solodovnikov <[email protected]>
…sion of `GetFileInfoGenerator`

Introduce a helper class `AsyncStatSelector`, which contains
an optimized specialization for `GetFileInfoGenerator` in the
`LocalFileSystem` class.

There are two variants of async discovery functions suported:
1. `DiscoverDirectoryFiles`, which parallelizes traversal of
   individual directories so that each directory results are
   yielded as a separate `FileInfoGenerator` via an underlying
   `DiscoveryImplIterator`, which delivers items in chunks
   (default size is `kDefaultFileInfoBatchSize == 1K` items).
2. `DiscoverDirectoriesFlattened`, which forwards execution to
   the `DiscoverDirectoryFiles`, with the difference that the
   results from individual sub-directory iterators are merged
   into the single FileInfoGenerator stream.

The implementation makes use of additional attributes in
`LocalFileSystemOptions`, such as `directory_readahead`,
which can be used to tune algorithm behavior and adjust
how many directories can be processed in parallel.

This option is disabled by default, so that individual
directories are processed in serial manner via
`MakeConcatenatedGenerator` under the hood.

Also, internal batching can also be configured by
`LocalFileSystemOptions::file_info_batch_size`, which specifies
how many `FileInfo`:s should be batched into a single
`FileInfoVector` to yield in a single `DirectoryImplIterator::Next()`
invocation.

Each `DirectoryImplIterator` maintains a reference to a shared
`DiscoveryState` struct, which:
1. Ensures that the producer is alive while there is at least
   one iterator still running.
2. Producer is properly closed when the discovery process is
   finished (i.e. ref-count for shared `DiscoveryState` reaches 0).

Tests: unit(release)

Signed-off-by: Pavel Solodovnikov <[email protected]>
Co-Authored-by: Igor Seliverstov <[email protected]>
…tFileGenerator`

This patch adds a simple benchmark for testing
`LocalFileSystem::GetFileInfoGenerator()` performance.

The test function is executed for each combination (cartesian product)
of input arguments tuple (directory_readahead, file_info_batch_size)
to test both internal parallelism and batching.

Test arguments are represented by the range
`{{1, 4, 16}, {100, 1000}}`.
I.e. directory readhead is tested for values
1 through 16, mult. factor 4;
batch size us tested for values
100 through 1000, mult. factor 10.

Signed-off-by: Pavel Solodovnikov <[email protected]>
Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed some minor changes, but most of all added a test to stress the GetFileInfoGenerator implementation with several parameter values.

Comment on lines +46 to +58
/// EXPERIMENTAL: The maximum number of directories processed in parallel
/// by `GetFileInfoGenerator`.
int32_t directory_readahead = kDefaultDirectoryReadahead;

/// EXPERIMENTAL: The maximum number of entries aggregated into each
/// FileInfoVector chunk by `GetFileInfoGenerator`.
///
/// Since each FileInfo entry needs a separate `stat` system call, a
/// directory with a very large number of files may take a lot of time to
/// process entirely. By generating a FileInfoVector after this chunk
/// size is reached, we ensure FileInfo entries can start being consumed
/// from the FileInfoGenerator with less initial latency.
int32_t file_info_batch_size = kDefaultFileInfoBatchSize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ManManson @westonpace Are the docstrings ok to you?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That helps a lot, thank you.

@pitrou pitrou merged commit a1c3d57 into apache:master Aug 18, 2022
@ursabot
Copy link

ursabot commented Aug 18, 2022

Benchmark runs are scheduled for baseline = bc52f9f and contender = a1c3d57. a1c3d57 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Failed ⬇️2.72% ⬆️3.16%] test-mac-arm
[Failed ⬇️1.92% ⬆️1.37%] ursa-i9-9960x
[Finished ⬇️10.22% ⬆️8.31%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] a1c3d57a ec2-t3-xlarge-us-east-2
[Failed] a1c3d57a test-mac-arm
[Failed] a1c3d57a ursa-i9-9960x
[Finished] a1c3d57a ursa-thinkcentre-m75q
[Finished] bc52f9f0 ec2-t3-xlarge-us-east-2
[Finished] bc52f9f0 test-mac-arm
[Failed] bc52f9f0 ursa-i9-9960x
[Finished] bc52f9f0 ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@ursabot
Copy link

ursabot commented Aug 18, 2022

['Python', 'R'] benchmarks have high level of regressions.
ursa-i9-9960x

zagto pushed a commit to zagto/arrow that referenced this pull request Oct 7, 2022
…lization for `LocalFileSystem` (apache#13796)

Introduce a specialization of `GetFileInfoGenerator` in the `LocalFileSystem` class.

This implementation tries to improves performance by hiding latencies at two levels:
1. Child directories can be readahead so that listing directories entries from disk can be achieved in parallel with other work;
2. Directory entries can be `stat`'ed and yielded in chunks so that the `FileInfoGenerator` consumer can start receiving entries before a large directory is fully processed.

Both mechanisms can be tuned using dedicated parameters in `LocalFileSystemOptions`.

Signed-off-by: Pavel Solodovnikov <[email protected]>
Co-Authored-by: Igor Seliverstov <[email protected]>

Lead-authored-by: Pavel Solodovnikov <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants