From 6fab57b44c6cda7fecc1c1cbaaf6d44c3620f64b Mon Sep 17 00:00:00 2001 From: Alex Zhu Date: Thu, 9 Mar 2023 21:35:37 +0800 Subject: [PATCH] [Enhancement] Reduce S3 headObject requests in lake table's GC (#19148) Add a new FileSystem interface iterate_dir2() for getting the file modification time while traversing the directory to avoid additional `FileSystem::get_file_modified_time()` calls. In fact, this PR will not have any optimization, because the new interface is not really implemented in the StarletFilesystem, another PR is needed. Signed-off-by: sduzh (cherry picked from commit c301d2d3ba1b868ad8537dce6d1bb64485b919ad) --- be/src/fs/fs.h | 64 +++++++++++++++++++++++++----- be/src/fs/fs_broker.cpp | 6 +++ be/src/fs/fs_broker.h | 3 ++ be/src/fs/fs_hdfs.cpp | 55 +++++++++++++++++++++---- be/src/fs/fs_memory.cpp | 42 ++++++++++++++++++++ be/src/fs/fs_memory.h | 3 ++ be/src/fs/fs_posix.cpp | 32 +++++++++++++++ be/src/fs/fs_s3.cpp | 33 ++++++++++----- be/src/fs/fs_starlet.cpp | 12 ++++++ be/src/runtime/snapshot_loader.cpp | 55 +++++++++++++------------ be/src/storage/lake/gc.cpp | 51 ++++++++++++++---------- be/test/fs/fs_memory_test.cpp | 26 ++++++++++++ be/test/fs/fs_posix_test.cpp | 28 +++++++++++++ be/test/fs/fs_s3_test.cpp | 58 +++++++++++++++++++++++---- 14 files changed, 386 insertions(+), 82 deletions(-) diff --git a/be/src/fs/fs.h b/be/src/fs/fs.h index 92b084ded87d0f..28f4b849c25301 100644 --- a/be/src/fs/fs.h +++ b/be/src/fs/fs.h @@ -80,13 +80,6 @@ struct FSOptions { const TCloudConfiguration* cloud_configuration; }; -struct FileStatus { - FileStatus(std::string_view name, bool is_dir, int64_t size) : name(name), is_dir(is_dir), size(size) {} - std::string name; - bool is_dir; - int64_t size; -}; - struct SequentialFileOptions { SequentialFileOptions() = default; @@ -103,6 +96,51 @@ struct RandomAccessFileOptions { bool skip_fill_local_cache = false; }; +class FileMeta { +public: + // REQUIRE: size >= 0 + FileMeta& with_size(int64_t size) { + _size = size; + return *this; + } + + // REQUIRE: mtime > 0 + FileMeta& with_modify_time(int64_t mtime) { + _mtime = mtime; + return *this; + } + + FileMeta& with_is_dir(bool is_dir) { + _is_dir = is_dir; + return *this; + } + + bool has_size() const { return _size >= 0; } + + // REQUIRE: size >= 0 + void set_size(int64_t size) { _size = size; } + + int64_t size() const { return _size; } + + bool has_modify_time() const { return _mtime > 0; } + + // REQUIRE: mtime > 0 + void set_modify_time(int64_t mtime) { _mtime = mtime; } + + int64_t modify_time() const { return _mtime; } + + bool has_is_dir() const { return _is_dir >= 0; } + + void set_is_dir(bool is_dir) { _is_dir = is_dir; } + + bool is_dir() const { return _is_dir > 0; } + +private: + int64_t _size = -1; + int64_t _mtime = -1; + int _is_dir = -1; +}; + class FileSystem { public: enum Type { POSIX, S3, HDFS, BROKER, MEMORY, STARLET }; @@ -182,8 +220,6 @@ class FileSystem { // IOError if an IO Error was encountered virtual Status get_children(const std::string& dir, std::vector* result) = 0; - virtual Status list_path(const std::string& dir, std::vector* result) { return Status::OK(); } - // Iterate the specified directory and call given callback function with child's // name. This function continues execution until all children have been iterated // or callback function return false. @@ -199,6 +235,15 @@ class FileSystem { // IOError if an IO Error was encountered virtual Status iterate_dir(const std::string& dir, const std::function& cb) = 0; + // `iterate_dir2` is similar to `iterate_dir` but in addition to returning the directory entry name, it + // also returns some file statistics. + // + // For performance reason, some implementations may leave some fields in FileMeta unfilled, the caller + // should always check the value of `FileMeta::has_xxx()` before accessing the corresponding field of + // FileMeta. + virtual Status iterate_dir2(const std::string& dir, + const std::function& cb) = 0; + // Delete the named file. // FIXME: If the named file does not exist, OK or NOT_FOUND is returned, depend on the implementation. virtual Status delete_file(const std::string& fname) = 0; @@ -242,6 +287,7 @@ class FileSystem { // Get the last modification time by given 'fname'. virtual StatusOr get_file_modified_time(const std::string& fname) = 0; + // Rename file src to target. virtual Status rename_file(const std::string& src, const std::string& target) = 0; diff --git a/be/src/fs/fs_broker.cpp b/be/src/fs/fs_broker.cpp index 49cb6966299cbd..58213f838f56b8 100644 --- a/be/src/fs/fs_broker.cpp +++ b/be/src/fs/fs_broker.cpp @@ -405,6 +405,12 @@ Status BrokerFileSystem::iterate_dir(const std::string& dir, const std::function return Status::OK(); } +Status BrokerFileSystem::iterate_dir2(const std::string& dir, + const std::function& cb) { + FileMeta meta; + return iterate_dir(dir, [&](std::string_view name) { return cb(name, meta); }); +} + Status BrokerFileSystem::delete_file(const std::string& path) { return _delete_file(path); } diff --git a/be/src/fs/fs_broker.h b/be/src/fs/fs_broker.h index c44fc976bc239d..2a2c45da4b54d0 100644 --- a/be/src/fs/fs_broker.h +++ b/be/src/fs/fs_broker.h @@ -56,6 +56,9 @@ class BrokerFileSystem : public FileSystem { Status iterate_dir(const std::string& dir, const std::function& cb) override; + Status iterate_dir2(const std::string& dir, + const std::function& cb) override; + Status delete_file(const std::string& path) override; Status create_dir(const std::string& dirname) override; diff --git a/be/src/fs/fs_hdfs.cpp b/be/src/fs/fs_hdfs.cpp index 914f47e8d94ca1..443f1ac07e50ad 100644 --- a/be/src/fs/fs_hdfs.cpp +++ b/be/src/fs/fs_hdfs.cpp @@ -254,11 +254,10 @@ class HdfsFileSystem : public FileSystem { return Status::NotSupported("HdfsFileSystem::get_children"); } - Status list_path(const std::string& dir, std::vector* result) override; + Status iterate_dir(const std::string& dir, const std::function& cb) override; - Status iterate_dir(const std::string& dir, const std::function& cb) override { - return Status::NotSupported("HdfsFileSystem::iterate_dir"); - } + Status iterate_dir2(const std::string& dir, + const std::function& cb) override; Status delete_file(const std::string& path) override { return Status::NotSupported("HdfsFileSystem::delete_file"); } @@ -320,7 +319,43 @@ Status HdfsFileSystem::path_exists(const std::string& path) { return _path_exists(handle.hdfs_fs, path); } -Status HdfsFileSystem::list_path(const std::string& dir, std::vector* result) { +Status HdfsFileSystem::iterate_dir(const std::string& dir, const std::function& cb) { + std::string namenode; + RETURN_IF_ERROR(get_namenode_from_path(dir, &namenode)); + HdfsFsHandle handle; + RETURN_IF_ERROR(HdfsFsCache::instance()->get_connection(namenode, &handle, _options)); + Status status = _path_exists(handle.hdfs_fs, dir); + if (!status.ok()) { + return status; + } + + hdfsFileInfo* fileinfo; + int numEntries; + fileinfo = hdfsListDirectory(handle.hdfs_fs, dir.data(), &numEntries); + if (fileinfo == nullptr) { + return Status::InvalidArgument("hdfs list directory error {}"_format(dir)); + } + for (int i = 0; i < numEntries && fileinfo; ++i) { + // obj_key.data() + uri.key().size(), obj_key.size() - uri.key().size() + int32_t dir_size; + if (dir[dir.size() - 1] == '/') { + dir_size = dir.size(); + } else { + dir_size = dir.size() + 1; + } + std::string_view name(fileinfo[i].mName + dir_size); + if (!cb(name)) { + break; + } + } + if (fileinfo) { + hdfsFreeFileInfo(fileinfo, numEntries); + } + return Status::OK(); +} + +Status HdfsFileSystem::iterate_dir2(const std::string& dir, + const std::function& cb) { std::string namenode; RETURN_IF_ERROR(get_namenode_from_path(dir, &namenode)); HdfsFsHandle handle; @@ -345,9 +380,13 @@ Status HdfsFileSystem::list_path(const std::string& dir, std::vector dir_size = dir.size() + 1; } std::string_view name(fileinfo[i].mName + dir_size); - bool is_dir = fileinfo[i].mKind == tObjectKind::kObjectKindDirectory; - int64_t file_size = fileinfo[i].mSize; - result->emplace_back(name, is_dir, file_size); + FileMeta meta; + meta.with_is_dir(fileinfo[i].mKind == tObjectKind::kObjectKindDirectory) + .with_size(fileinfo[i].mSize) + .with_modify_time(fileinfo[i].mLastMod); + if (!cb(name, meta)) { + break; + } } if (fileinfo) { hdfsFreeFileInfo(fileinfo, numEntries); diff --git a/be/src/fs/fs_memory.cpp b/be/src/fs/fs_memory.cpp index b75009610067b8..701da5ade461ac 100644 --- a/be/src/fs/fs_memory.cpp +++ b/be/src/fs/fs_memory.cpp @@ -182,6 +182,41 @@ class EnvMemoryImpl { return Status::OK(); } + Status iterate_dir2(const butil::FilePath& path, const std::function& cb) { + auto inode = get_inode(path); + if (inode == nullptr || inode->type != kDir) { + return Status::NotFound(path.value()); + } + DCHECK(path.value().back() != '/' || path.value() == "/"); + std::string s = (path.value() == "/") ? path.value() : path.value() + "/"; + for (auto iter = _namespace.lower_bound(s); iter != _namespace.end(); ++iter) { + Slice child(iter->first); + if (!child.starts_with(s)) { + break; + } + FileMeta meta; + ASSIGN_OR_RETURN(bool is_dir, is_directory(butil::FilePath(child.to_string()))); + meta.set_is_dir(is_dir); + if (!is_dir) { + ASSIGN_OR_RETURN(int64_t size, get_file_size(butil::FilePath(child.to_string()))); + meta.set_size(size); + } + // Get the relative path. + child.remove_prefix(s.size()); + if (child.empty()) { + continue; + } + auto slash = (const char*)memchr(child.data, '/', child.size); + if (slash != nullptr) { + continue; + } + if (!cb(child.data, meta)) { + break; + } + } + return Status::OK(); + } + Status delete_file(const butil::FilePath& path) { auto iter = _namespace.find(path.value()); if (iter == _namespace.end() || iter->second->type != kNormal) { @@ -420,6 +455,13 @@ Status MemoryFileSystem::iterate_dir(const std::string& dir, const std::function return _impl->iterate_dir(butil::FilePath(new_path), cb); } +Status MemoryFileSystem::iterate_dir2(const std::string& dir, + const std::function& cb) { + std::string new_path; + RETURN_IF_ERROR(canonicalize(dir, &new_path)); + return _impl->iterate_dir2(butil::FilePath(new_path), cb); +} + Status MemoryFileSystem::delete_file(const std::string& path) { std::string new_path; RETURN_IF_ERROR(canonicalize(path, &new_path)); diff --git a/be/src/fs/fs_memory.h b/be/src/fs/fs_memory.h index 2d1feae5c0bf85..dee8e0d8dd93f9 100644 --- a/be/src/fs/fs_memory.h +++ b/be/src/fs/fs_memory.h @@ -53,6 +53,9 @@ class MemoryFileSystem : public FileSystem { Status iterate_dir(const std::string& dir, const std::function& cb) override; + Status iterate_dir2(const std::string& dir, + const std::function& cb) override; + Status delete_file(const std::string& url) override; Status create_dir(const std::string& dirname) override; diff --git a/be/src/fs/fs_posix.cpp b/be/src/fs/fs_posix.cpp index 35c2520e5914b1..1ff16c5c0312c9 100644 --- a/be/src/fs/fs_posix.cpp +++ b/be/src/fs/fs_posix.cpp @@ -407,6 +407,38 @@ class PosixFileSystem : public FileSystem { return Status::OK(); } + Status iterate_dir2(const std::string& dir, + const std::function& cb) override { + DIR* d = opendir(dir.c_str()); + if (d == nullptr) { + return io_error(dir, errno); + } + errno = 0; + struct dirent* entry; + while ((entry = readdir(d)) != nullptr) { + std::string_view name(entry->d_name); + if (name == "." || name == "..") { + continue; + } + FileMeta meta; + struct stat child_stat; + std::string child_path = fmt::format("{}/{}", dir, name); + if (stat(child_path.c_str(), &child_stat) != 0) { + break; + } + meta.set_is_dir(S_ISDIR(child_stat.st_mode)); + meta.set_modify_time(static_cast(child_stat.st_mtime)); + meta.set_size(child_stat.st_size); + // callback returning false means to terminate iteration + if (!cb(name, meta)) { + break; + } + } + closedir(d); + if (errno != 0) return io_error(dir, errno); + return Status::OK(); + } + Status delete_file(const std::string& fname) override { if (config::file_descriptor_cache_capacity > 0 && enable_fd_cache(fname)) { FdCache::Instance()->erase(fname); diff --git a/be/src/fs/fs_s3.cpp b/be/src/fs/fs_s3.cpp index b4c31a4e5e6474..467f860ee4a9f1 100644 --- a/be/src/fs/fs_s3.cpp +++ b/be/src/fs/fs_s3.cpp @@ -334,14 +334,15 @@ class S3FileSystem : public FileSystem { Status path_exists(const std::string& path) override { return Status::NotSupported("S3FileSystem::path_exists"); } - Status list_path(const std::string& dir, std::vector* result) override; - Status get_children(const std::string& dir, std::vector* file) override { return Status::NotSupported("S3FileSystem::get_children"); } Status iterate_dir(const std::string& dir, const std::function& cb) override; + Status iterate_dir2(const std::string& dir, + const std::function& cb) override; + Status delete_file(const std::string& path) override; Status create_dir(const std::string& dirname) override; @@ -528,7 +529,8 @@ Status S3FileSystem::iterate_dir(const std::string& dir, const std::function* file_status) { +Status S3FileSystem::iterate_dir2(const std::string& dir, + const std::function& cb) { S3URI uri; if (!uri.parse(dir)) { return Status::InvalidArgument(fmt::format("Invalid S3 URI {}", dir)); @@ -559,9 +561,11 @@ Status S3FileSystem::list_path(const std::string& dir, std::vector* const auto& full_name = cp.GetPrefix(); std::string_view name(full_name.data() + uri.key().size(), full_name.size() - uri.key().size() - 1); - bool is_dir = true; - int64_t file_size = 0; - file_status->emplace_back(name, is_dir, file_size); + FileMeta stat; + stat.set_is_dir(true); + if (!cb(name, stat)) { + return Status::OK(); + } } for (auto&& obj : result.GetContents()) { if (obj.GetKey() == uri.key()) { @@ -570,19 +574,26 @@ Status S3FileSystem::list_path(const std::string& dir, std::vector* DCHECK(HasPrefixString(obj.GetKey(), uri.key())); std::string_view obj_key(obj.GetKey()); - bool is_dir = true; - int64_t file_size = 0; + FileMeta stat; + if (obj.LastModifiedHasBeenSet()) { + stat.set_modify_time(obj.GetLastModified().Seconds()); + } if (obj_key.back() == '/') { obj_key = std::string_view(obj_key.data(), obj_key.size() - 1); + stat.set_is_dir(true); } else { DCHECK(obj.SizeHasBeenSet()); - is_dir = false; - file_size = obj.GetSize(); + stat.set_is_dir(false); + if (obj.SizeHasBeenSet()) { + stat.set_size(obj.GetSize()); + } } std::string_view name(obj_key.data() + uri.key().size(), obj_key.size() - uri.key().size()); - file_status->emplace_back(name, is_dir, file_size); + if (!cb(name, stat)) { + return Status::OK(); + } } } while (result.GetIsTruncated()); return directory_exist ? Status::OK() : Status::NotFound(dir); diff --git a/be/src/fs/fs_starlet.cpp b/be/src/fs/fs_starlet.cpp index eabca0d2f60914..adea3944410c6e 100644 --- a/be/src/fs/fs_starlet.cpp +++ b/be/src/fs/fs_starlet.cpp @@ -304,6 +304,18 @@ class StarletFileSystem : public FileSystem { return to_status(st); } + Status iterate_dir2(const std::string& dir, + const std::function& cb) override { + ASSIGN_OR_RETURN(auto pair, parse_starlet_uri(dir)); + auto fs_st = get_shard_filesystem(pair.second); + if (!fs_st.ok()) { + return to_status(fs_st.status()); + } + FileMeta meta; + auto st = (*fs_st)->list_dir(pair.first, false, [&](std::string_viw name) { return cb(name, meta); }); + return to_status(st); + } + Status create_dir(const std::string& dirname) override { auto st = is_directory(dirname); if (st.ok() && st.value()) { diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index d7547a230093e9..dc180c392ed2ef 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -801,37 +801,40 @@ Status SnapshotLoader::_get_existing_files_from_remote(BrokerServiceConnection& Status SnapshotLoader::_get_existing_files_from_remote_without_broker(const std::unique_ptr& fs, const std::string& remote_path, std::map* files) { - std::vector file_status; - Status status = fs->list_path(remote_path, &file_status); - if (!status.ok() && !status.is_not_found()) { - std::stringstream ss; - ss << "failed to list files in remote path: " << remote_path << ", msg: " << status.message(); - LOG(WARNING) << ss.str(); - return status; - } - LOG(INFO) << "finished to list files from remote path. file num: " << file_status.size(); - - // split file name and checksum - for (const auto& file : file_status) { - if (file.is_dir) { - // this is not a file - continue; + int64_t file_num = 0; + Status st; + st.update(fs->iterate_dir2(remote_path, [&](std::string_view name, const FileMeta& meta) { + if (UNLIKELY(!meta.has_is_dir())) { + st.update(Status::InternalError("Unable to recognize the file type")); + return false; } - - const std::string& file_name = file.name; - size_t pos = file_name.find_last_of('.'); - if (pos == std::string::npos || pos == file_name.size() - 1) { + file_num++; + if (meta.is_dir()) { + return true; + } + if (UNLIKELY(!meta.has_size())) { + st.update(Status::InternalError("Unable to get file size")); + return false; + } + size_t pos = name.find_last_of('.'); + if (pos == std::string::npos || pos == name.size() - 1) { // Not found checksum separator, ignore this file - continue; + return true; } - std::string name = std::string(file_name, 0, pos); - std::string md5 = std::string(file_name, pos + 1); - FileStat stat = {name, md5, file.size}; - files->emplace(name, stat); - VLOG(2) << "split remote file: " << name << ", checksum: " << md5; + std::string name_part(name.data(), name.data() + pos); + std::string md5_part(name.data() + pos + 1, name.data() + name.size()); + FileStat stat = {name_part, md5_part, meta.size()}; + files->emplace(name_part, stat); + VLOG(2) << "split remote file: " << name_part << ", checksum: " << md5_part; + return true; + })); + + if (!st.ok() && !st.is_not_found()) { + LOG(WARNING) << "failed to list files in remote path: " << remote_path << ", msg: " << st; + return st; } - LOG(INFO) << "finished to split files. valid file num: " << files->size(); + LOG(INFO) << "finished to split files. total file num: " << file_num << " valid file num: " << files->size(); return Status::OK(); } diff --git a/be/src/storage/lake/gc.cpp b/be/src/storage/lake/gc.cpp index f8de857abbda81..357bef9e2fbf96 100644 --- a/be/src/storage/lake/gc.cpp +++ b/be/src/storage/lake/gc.cpp @@ -213,24 +213,36 @@ static StatusOr> find_orphan_datafiles(TabletManager* tabl const std::vector& tablet_metadatas, const std::vector& txn_logs) { ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(root_location)); + const auto now = std::time(nullptr); + const auto expire_seconds = config::lake_gc_segment_expire_seconds; const auto metadata_root_location = join_path(root_location, kMetadataDirectoryName); const auto txn_log_root_location = join_path(root_location, kTxnLogDirectoryName); const auto segment_root_location = join_path(root_location, kSegmentDirectoryName); std::set datafiles; + bool need_check_modify_time = true; + int64_t total_files = 0; // List segment - auto iter_st = fs->iterate_dir(segment_root_location, [&](std::string_view name) { - if (LIKELY(is_segment(name) || is_del(name) || is_delvec(name))) { - datafiles.emplace(name); + auto iter_st = fs->iterate_dir2(segment_root_location, [&](std::string_view name, const FileMeta& meta) { + total_files++; + if (!is_segment(name) && !is_del(name) && !is_delvec(name)) { + LOG_EVERY_N(WARNING, 100) << "Unrecognized data file " << name; + return true; + } + if (meta.has_modify_time() && now < meta.modify_time() + expire_seconds) { + need_check_modify_time = false; + return true; } + datafiles.emplace(name); return true; }); if (!iter_st.ok() && !iter_st.is_not_found()) { return iter_st; } - LOG(INFO) << "find_orphan_datafiles datafile cnt: " << datafiles.size(); + VLOG(4) << "Listed all data files. total files=" << total_files << " possible orphan files=" << datafiles.size(); + if (datafiles.empty()) { return datafiles; } @@ -264,7 +276,6 @@ static StatusOr> find_orphan_datafiles(TabletManager* tabl auto location = join_path(metadata_root_location, filename); auto res = tablet_mgr->get_tablet_metadata(location, false); if (res.status().is_not_found()) { - LOG(WARNING) << fmt::format("find_orphan_datafiles tablet meta {} not found", location); continue; } else if (!res.ok()) { return res.status(); @@ -281,7 +292,6 @@ static StatusOr> find_orphan_datafiles(TabletManager* tabl auto location = join_path(txn_log_root_location, filename); auto res = tablet_mgr->get_txn_log(location, false); if (res.status().is_not_found()) { - LOG(WARNING) << fmt::format("find_orphan_datafiles txnlog {} not found", location); continue; } else if (!res.ok()) { return res.status(); @@ -307,22 +317,23 @@ static StatusOr> find_orphan_datafiles(TabletManager* tabl } } - auto now = std::time(nullptr); - - for (auto it = datafiles.begin(); it != datafiles.end(); /**/) { - auto location = join_path(segment_root_location, *it); - auto res = fs->get_file_modified_time(location); - if (!res.ok()) { - LOG_IF(WARNING, !res.status().is_not_found()) - << "Fail to get modified time of " << location << ": " << res.status(); - it = datafiles.erase(it); - } else if (now < *res + config::lake_gc_segment_expire_seconds) { - it = datafiles.erase(it); - } else { - ++it; + if (need_check_modify_time && !datafiles.empty()) { + LOG(INFO) << "Checking modify time of " << datafiles.size() << " data files"; + for (auto it = datafiles.begin(); it != datafiles.end(); /**/) { + auto location = join_path(segment_root_location, *it); + auto res = fs->get_file_modified_time(location); + if (!res.ok()) { + LOG_IF(WARNING, !res.status().is_not_found()) + << "Fail to get modified time of " << location << ": " << res.status(); + it = datafiles.erase(it); + } else if (now < *res + expire_seconds) { + it = datafiles.erase(it); + } else { + ++it; + } } } - + VLOG(4) << "Found " << datafiles.size() << " orphan files"; return datafiles; } diff --git a/be/test/fs/fs_memory_test.cpp b/be/test/fs/fs_memory_test.cpp index 3a4ad9dd9dc74e..bb958ac53ffe0d 100644 --- a/be/test/fs/fs_memory_test.cpp +++ b/be/test/fs/fs_memory_test.cpp @@ -455,4 +455,30 @@ TEST_F(MemoryFileSystemTest, test_random_access_file) { EXPECT_ERROR(f->read_at_fully(22, slice.data, slice.size)); } +TEST_F(MemoryFileSystemTest, test_iterate_dir2) { + ASSERT_OK(_fs->create_dir("/home")); + ASSERT_OK(_fs->create_dir("/home/code")); + ASSIGN_OR_ABORT(auto f, _fs->new_writable_file("/home/gcc")); + ASSERT_OK(f->append("test")); + ASSERT_OK(f->close()); + + ASSERT_OK(_fs->iterate_dir2("/home", [](std::string_view name, const FileMeta& meta) -> bool { + if (name == "code") { + CHECK(meta.has_is_dir()); + CHECK(meta.is_dir()); + CHECK(!meta.has_modify_time()); + CHECK(!meta.has_size()); + } else if (name == "gcc") { + CHECK(meta.has_is_dir()); + CHECK(!meta.is_dir()); + CHECK(!meta.has_modify_time()); + CHECK(meta.has_size()); + CHECK_EQ(4, meta.size()); + } else { + CHECK(false) << "Unexpected file " << name; + } + return true; + })); +} + } // namespace starrocks diff --git a/be/test/fs/fs_posix_test.cpp b/be/test/fs/fs_posix_test.cpp index ba028235fbb60c..0d7121353ee2bc 100644 --- a/be/test/fs/fs_posix_test.cpp +++ b/be/test/fs/fs_posix_test.cpp @@ -199,4 +199,32 @@ TEST_F(PosixFileSystemTest, create_dir_recursive) { ASSERT_TRUE(FileSystem::Default()->path_exists(dir_path).is_not_found()); } +TEST_F(PosixFileSystemTest, iterate_dir2) { + auto fs = FileSystem::Default(); + auto now = ::time(nullptr); + ASSERT_OK(fs->create_dir_recursive("./ut_dir/fs_posix/iterate_dir2.d")); + ASSIGN_OR_ABORT(auto f, fs->new_writable_file("./ut_dir/fs_posix/iterate_dir2")); + ASSERT_OK(f->append("test")); + ASSERT_OK(f->close()); + + ASSERT_OK(fs->iterate_dir2("./ut_dir/fs_posix/", [&](std::string_view name, const FileMeta& meta) -> bool { + if (name == "iterate_dir2.d") { + CHECK(meta.has_is_dir()); + CHECK(meta.is_dir()); + CHECK(meta.has_modify_time()); + CHECK_GE(meta.modify_time(), now); + } else if (name == "iterate_dir2") { + CHECK(meta.has_is_dir()); + CHECK(!meta.is_dir()); + CHECK(meta.has_size()); + CHECK_EQ(4, meta.size()); + CHECK(meta.has_modify_time()); + CHECK_GE(meta.modify_time(), now); + } else { + CHECK(false) << "Unexpected file " << name; + } + return true; + })); +} + } // namespace starrocks diff --git a/be/test/fs/fs_s3_test.cpp b/be/test/fs/fs_s3_test.cpp index 7c6e6541fb80a3..402a7022a28324 100644 --- a/be/test/fs/fs_s3_test.cpp +++ b/be/test/fs/fs_s3_test.cpp @@ -27,19 +27,34 @@ namespace starrocks { // NOTE: The bucket must be created before running this test. -constexpr static const char* kBucketName = "starrocks-fs-s3-unit-test"; +constexpr static const char* kBucketName = "starrocks-fs-s3-ut"; class S3FileSystemTest : public testing::Test { public: S3FileSystemTest() = default; ~S3FileSystemTest() override = default; - void SetUp() override { Aws::InitAPI(_options); } - void TearDown() override { Aws::ShutdownAPI(_options); } - std::string S3Path(std::string_view path) { - return fmt::format("s3://{}.{}{}", kBucketName, config::object_storage_endpoint, path); + static void SetUpTestCase() { + CHECK(!config::object_storage_access_key_id.empty()) << "Need set object_storage_access_key_id in be_test.conf"; + CHECK(!config::object_storage_secret_access_key.empty()) + << "Need set object_storage_secret_access_key in be_test.conf"; + CHECK(!config::object_storage_endpoint.empty()) << "Need set object_storage_endpoint in be_test.conf"; + + Aws::InitAPI(_s_options); + + ASSIGN_OR_ABORT(auto fs, FileSystem::CreateUniqueFromString("s3://")); + (void)fs->delete_dir_recursive(S3Path("/")); + } + + static void TearDownTestCase() { + ASSIGN_OR_ABORT(auto fs, FileSystem::CreateUniqueFromString("s3://")); + (void)fs->delete_dir_recursive(S3Path("/")); + + Aws::ShutdownAPI(_s_options); } + static std::string S3Path(std::string_view path) { return fmt::format("s3://{}{}", kBucketName, path); } + void CheckIsDirectory(FileSystem* fs, const std::string& dir_name, bool expected_success, bool expected_is_dir = true) { const StatusOr status_or = fs->is_directory(dir_name); @@ -50,11 +65,11 @@ class S3FileSystemTest : public testing::Test { } private: - Aws::SDKOptions _options; + static inline Aws::SDKOptions _s_options; }; TEST_F(S3FileSystemTest, test_write_and_read) { - auto uri = fmt::format("s3://{}.{}/dir/test-object.png", kBucketName, config::object_storage_endpoint); + auto uri = fmt::format("s3://{}/dir/test-object.png", kBucketName); ASSIGN_OR_ABORT(auto fs, FileSystem::CreateUniqueFromString(uri)); ASSIGN_OR_ABORT(auto wf, fs->new_writable_file(uri)); EXPECT_OK(wf->append("hello")); @@ -77,6 +92,7 @@ TEST_F(S3FileSystemTest, test_write_and_read) { } TEST_F(S3FileSystemTest, test_directory) { + auto now = ::time(nullptr); ASSIGN_OR_ABORT(auto fs, FileSystem::CreateUniqueFromString("s3://")); bool created = false; @@ -151,7 +167,7 @@ TEST_F(S3FileSystemTest, test_directory) { // { ASSIGN_OR_ABORT(auto of, fs->new_writable_file(S3Path("/dirname2/1.dat"))); - EXPECT_OK(of->append("hello")); + EXPECT_OK(of->append("starrocks")); EXPECT_OK(of->close()); CheckIsDirectory(fs.get(), S3Path("/dirname2/1.dat"), true, false); } @@ -168,6 +184,32 @@ TEST_F(S3FileSystemTest, test_directory) { EXPECT_OK(fs->create_dir(S3Path("/dirname2/subdir0"))); CheckIsDirectory(fs.get(), S3Path("/dirname2/subdir0"), true, true); + EXPECT_OK(fs->iterate_dir2(S3Path("/dirname2/"), [&](std::string_view name, const FileMeta& meta) { + if (name == "0.dat") { + CHECK(meta.has_is_dir()); + CHECK(!meta.is_dir()); + CHECK(meta.has_size()); + CHECK_EQ(/* length of "hello" = */ 5, meta.size()); + CHECK(meta.has_modify_time()); + CHECK_GE(meta.modify_time(), now); + } else if (name == "1.dat") { + CHECK(meta.has_is_dir()); + CHECK(!meta.is_dir()); + CHECK(meta.has_size()); + CHECK_EQ(/* length of "starrocks" = */ 9, meta.size()); + CHECK(meta.has_modify_time()); + CHECK_GE(meta.modify_time(), now); + } else if (name == "subdir0") { + CHECK(meta.has_is_dir()); + CHECK(meta.is_dir()); + CHECK(!meta.has_size()); + CHECK(!meta.has_modify_time()); + } else { + CHECK(false) << "Unexpected file " << name; + } + return true; + })); + std::vector entries; auto cb = [&](std::string_view name) -> bool { entries.emplace_back(name);