Skip to content

Commit

Permalink
[Enhancement] Reduce S3 headObject requests in lake table's GC (#19148)
Browse files Browse the repository at this point in the history
Add a new FileSystem interface iterate_dir2() for getting
the file modification time while traversing the directory
to avoid additional `FileSystem::get_file_modified_time()`
calls.

In fact, this PR will not have any optimization, because the
new interface is not really implemented in the StarletFilesystem,
another PR is needed.

Signed-off-by: sduzh <[email protected]>
(cherry picked from commit c301d2d)
  • Loading branch information
sduzh authored and wanpengfei-git committed Mar 10, 2023
1 parent c14cf77 commit 7d85b43
Show file tree
Hide file tree
Showing 15 changed files with 387 additions and 83 deletions.
64 changes: 55 additions & 9 deletions be/src/fs/fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,6 @@ struct FSOptions {
const TCloudConfiguration* cloud_configuration;
};

struct FileStatus {
FileStatus(std::string_view name, bool is_dir, int64_t size) : name(name), is_dir(is_dir), size(size) {}
std::string name;
bool is_dir;
int64_t size;
};

struct SequentialFileOptions {
SequentialFileOptions() = default;

Expand All @@ -103,6 +96,51 @@ struct RandomAccessFileOptions {
bool skip_fill_local_cache = false;
};

class FileMeta {
public:
// REQUIRE: size >= 0
FileMeta& with_size(int64_t size) {
_size = size;
return *this;
}

// REQUIRE: mtime > 0
FileMeta& with_modify_time(int64_t mtime) {
_mtime = mtime;
return *this;
}

FileMeta& with_is_dir(bool is_dir) {
_is_dir = is_dir;
return *this;
}

bool has_size() const { return _size >= 0; }

// REQUIRE: size >= 0
void set_size(int64_t size) { _size = size; }

int64_t size() const { return _size; }

bool has_modify_time() const { return _mtime > 0; }

// REQUIRE: mtime > 0
void set_modify_time(int64_t mtime) { _mtime = mtime; }

int64_t modify_time() const { return _mtime; }

bool has_is_dir() const { return _is_dir >= 0; }

void set_is_dir(bool is_dir) { _is_dir = is_dir; }

bool is_dir() const { return _is_dir > 0; }

private:
int64_t _size = -1;
int64_t _mtime = -1;
int _is_dir = -1;
};

class FileSystem {
public:
enum Type { POSIX, S3, HDFS, BROKER, MEMORY, STARLET };
Expand Down Expand Up @@ -182,8 +220,6 @@ class FileSystem {
// IOError if an IO Error was encountered
virtual Status get_children(const std::string& dir, std::vector<std::string>* result) = 0;

virtual Status list_path(const std::string& dir, std::vector<FileStatus>* result) { return Status::OK(); }

// Iterate the specified directory and call given callback function with child's
// name. This function continues execution until all children have been iterated
// or callback function return false.
Expand All @@ -199,6 +235,15 @@ class FileSystem {
// IOError if an IO Error was encountered
virtual Status iterate_dir(const std::string& dir, const std::function<bool(std::string_view)>& cb) = 0;

// `iterate_dir2` is similar to `iterate_dir` but in addition to returning the directory entry name, it
// also returns some file statistics.
//
// For performance reason, some implementations may leave some fields in FileMeta unfilled, the caller
// should always check the value of `FileMeta::has_xxx()` before accessing the corresponding field of
// FileMeta.
virtual Status iterate_dir2(const std::string& dir,
const std::function<bool(std::string_view, const FileMeta&)>& cb) = 0;

// Delete the named file.
// FIXME: If the named file does not exist, OK or NOT_FOUND is returned, depend on the implementation.
virtual Status delete_file(const std::string& fname) = 0;
Expand Down Expand Up @@ -242,6 +287,7 @@ class FileSystem {

// Get the last modification time by given 'fname'.
virtual StatusOr<uint64_t> get_file_modified_time(const std::string& fname) = 0;

// Rename file src to target.
virtual Status rename_file(const std::string& src, const std::string& target) = 0;

Expand Down
6 changes: 6 additions & 0 deletions be/src/fs/fs_broker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,12 @@ Status BrokerFileSystem::iterate_dir(const std::string& dir, const std::function
return Status::OK();
}

Status BrokerFileSystem::iterate_dir2(const std::string& dir,
const std::function<bool(std::string_view, const FileMeta&)>& cb) {
FileMeta meta;
return iterate_dir(dir, [&](std::string_view name) { return cb(name, meta); });
}

Status BrokerFileSystem::delete_file(const std::string& path) {
return _delete_file(path);
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/fs/fs_broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class BrokerFileSystem : public FileSystem {

Status iterate_dir(const std::string& dir, const std::function<bool(std::string_view)>& cb) override;

Status iterate_dir2(const std::string& dir,
const std::function<bool(std::string_view, const FileMeta&)>& cb) override;

Status delete_file(const std::string& path) override;

Status create_dir(const std::string& dirname) override;
Expand Down
55 changes: 47 additions & 8 deletions be/src/fs/fs_hdfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,10 @@ class HdfsFileSystem : public FileSystem {
return Status::NotSupported("HdfsFileSystem::get_children");
}

Status list_path(const std::string& dir, std::vector<FileStatus>* result) override;
Status iterate_dir(const std::string& dir, const std::function<bool(std::string_view)>& cb) override;

Status iterate_dir(const std::string& dir, const std::function<bool(std::string_view)>& cb) override {
return Status::NotSupported("HdfsFileSystem::iterate_dir");
}
Status iterate_dir2(const std::string& dir,
const std::function<bool(std::string_view, const FileMeta&)>& cb) override;

Status delete_file(const std::string& path) override { return Status::NotSupported("HdfsFileSystem::delete_file"); }

Expand Down Expand Up @@ -320,7 +319,43 @@ Status HdfsFileSystem::path_exists(const std::string& path) {
return _path_exists(handle.hdfs_fs, path);
}

Status HdfsFileSystem::list_path(const std::string& dir, std::vector<FileStatus>* result) {
Status HdfsFileSystem::iterate_dir(const std::string& dir, const std::function<bool(std::string_view)>& cb) {
std::string namenode;
RETURN_IF_ERROR(get_namenode_from_path(dir, &namenode));
HdfsFsHandle handle;
RETURN_IF_ERROR(HdfsFsCache::instance()->get_connection(namenode, &handle, _options));
Status status = _path_exists(handle.hdfs_fs, dir);
if (!status.ok()) {
return status;
}

hdfsFileInfo* fileinfo;
int numEntries;
fileinfo = hdfsListDirectory(handle.hdfs_fs, dir.data(), &numEntries);
if (fileinfo == nullptr) {
return Status::InvalidArgument("hdfs list directory error {}"_format(dir));
}
for (int i = 0; i < numEntries && fileinfo; ++i) {
// obj_key.data() + uri.key().size(), obj_key.size() - uri.key().size()
int32_t dir_size;
if (dir[dir.size() - 1] == '/') {
dir_size = dir.size();
} else {
dir_size = dir.size() + 1;
}
std::string_view name(fileinfo[i].mName + dir_size);
if (!cb(name)) {
break;
}
}
if (fileinfo) {
hdfsFreeFileInfo(fileinfo, numEntries);
}
return Status::OK();
}

Status HdfsFileSystem::iterate_dir2(const std::string& dir,
const std::function<bool(std::string_view, const FileMeta&)>& cb) {
std::string namenode;
RETURN_IF_ERROR(get_namenode_from_path(dir, &namenode));
HdfsFsHandle handle;
Expand All @@ -345,9 +380,13 @@ Status HdfsFileSystem::list_path(const std::string& dir, std::vector<FileStatus>
dir_size = dir.size() + 1;
}
std::string_view name(fileinfo[i].mName + dir_size);
bool is_dir = fileinfo[i].mKind == tObjectKind::kObjectKindDirectory;
int64_t file_size = fileinfo[i].mSize;
result->emplace_back(name, is_dir, file_size);
FileMeta meta;
meta.with_is_dir(fileinfo[i].mKind == tObjectKind::kObjectKindDirectory)
.with_size(fileinfo[i].mSize)
.with_modify_time(fileinfo[i].mLastMod);
if (!cb(name, meta)) {
break;
}
}
if (fileinfo) {
hdfsFreeFileInfo(fileinfo, numEntries);
Expand Down
42 changes: 42 additions & 0 deletions be/src/fs/fs_memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,41 @@ class EnvMemoryImpl {
return Status::OK();
}

Status iterate_dir2(const butil::FilePath& path, const std::function<bool(std::string_view, const FileMeta&)>& cb) {
auto inode = get_inode(path);
if (inode == nullptr || inode->type != kDir) {
return Status::NotFound(path.value());
}
DCHECK(path.value().back() != '/' || path.value() == "/");
std::string s = (path.value() == "/") ? path.value() : path.value() + "/";
for (auto iter = _namespace.lower_bound(s); iter != _namespace.end(); ++iter) {
Slice child(iter->first);
if (!child.starts_with(s)) {
break;
}
FileMeta meta;
ASSIGN_OR_RETURN(bool is_dir, is_directory(butil::FilePath(child.to_string())));
meta.set_is_dir(is_dir);
if (!is_dir) {
ASSIGN_OR_RETURN(int64_t size, get_file_size(butil::FilePath(child.to_string())));
meta.set_size(size);
}
// Get the relative path.
child.remove_prefix(s.size());
if (child.empty()) {
continue;
}
auto slash = (const char*)memchr(child.data, '/', child.size);
if (slash != nullptr) {
continue;
}
if (!cb(child.data, meta)) {
break;
}
}
return Status::OK();
}

Status delete_file(const butil::FilePath& path) {
auto iter = _namespace.find(path.value());
if (iter == _namespace.end() || iter->second->type != kNormal) {
Expand Down Expand Up @@ -420,6 +455,13 @@ Status MemoryFileSystem::iterate_dir(const std::string& dir, const std::function
return _impl->iterate_dir(butil::FilePath(new_path), cb);
}

Status MemoryFileSystem::iterate_dir2(const std::string& dir,
const std::function<bool(std::string_view, const FileMeta&)>& cb) {
std::string new_path;
RETURN_IF_ERROR(canonicalize(dir, &new_path));
return _impl->iterate_dir2(butil::FilePath(new_path), cb);
}

Status MemoryFileSystem::delete_file(const std::string& path) {
std::string new_path;
RETURN_IF_ERROR(canonicalize(path, &new_path));
Expand Down
3 changes: 3 additions & 0 deletions be/src/fs/fs_memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class MemoryFileSystem : public FileSystem {

Status iterate_dir(const std::string& dir, const std::function<bool(std::string_view)>& cb) override;

Status iterate_dir2(const std::string& dir,
const std::function<bool(std::string_view, const FileMeta&)>& cb) override;

Status delete_file(const std::string& url) override;

Status create_dir(const std::string& dirname) override;
Expand Down
32 changes: 32 additions & 0 deletions be/src/fs/fs_posix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,38 @@ class PosixFileSystem : public FileSystem {
return Status::OK();
}

Status iterate_dir2(const std::string& dir,
const std::function<bool(std::string_view, const FileMeta&)>& cb) override {
DIR* d = opendir(dir.c_str());
if (d == nullptr) {
return io_error(dir, errno);
}
errno = 0;
struct dirent* entry;
while ((entry = readdir(d)) != nullptr) {
std::string_view name(entry->d_name);
if (name == "." || name == "..") {
continue;
}
FileMeta meta;
struct stat child_stat;
std::string child_path = fmt::format("{}/{}", dir, name);
if (stat(child_path.c_str(), &child_stat) != 0) {
break;
}
meta.set_is_dir(S_ISDIR(child_stat.st_mode));
meta.set_modify_time(static_cast<int64_t>(child_stat.st_mtime));
meta.set_size(child_stat.st_size);
// callback returning false means to terminate iteration
if (!cb(name, meta)) {
break;
}
}
closedir(d);
if (errno != 0) return io_error(dir, errno);
return Status::OK();
}

Status delete_file(const std::string& fname) override {
if (config::file_descriptor_cache_capacity > 0 && enable_fd_cache(fname)) {
FdCache::Instance()->erase(fname);
Expand Down
33 changes: 22 additions & 11 deletions be/src/fs/fs_s3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,14 +334,15 @@ class S3FileSystem : public FileSystem {

Status path_exists(const std::string& path) override { return Status::NotSupported("S3FileSystem::path_exists"); }

Status list_path(const std::string& dir, std::vector<FileStatus>* result) override;

Status get_children(const std::string& dir, std::vector<std::string>* file) override {
return Status::NotSupported("S3FileSystem::get_children");
}

Status iterate_dir(const std::string& dir, const std::function<bool(std::string_view)>& cb) override;

Status iterate_dir2(const std::string& dir,
const std::function<bool(std::string_view, const FileMeta&)>& cb) override;

Status delete_file(const std::string& path) override;

Status create_dir(const std::string& dirname) override;
Expand Down Expand Up @@ -528,7 +529,8 @@ Status S3FileSystem::iterate_dir(const std::string& dir, const std::function<boo
return directory_exist ? Status::OK() : Status::NotFound(dir);
}

Status S3FileSystem::list_path(const std::string& dir, std::vector<FileStatus>* file_status) {
Status S3FileSystem::iterate_dir2(const std::string& dir,
const std::function<bool(std::string_view, const FileMeta&)>& cb) {
S3URI uri;
if (!uri.parse(dir)) {
return Status::InvalidArgument(fmt::format("Invalid S3 URI {}", dir));
Expand Down Expand Up @@ -559,9 +561,11 @@ Status S3FileSystem::list_path(const std::string& dir, std::vector<FileStatus>*
const auto& full_name = cp.GetPrefix();

std::string_view name(full_name.data() + uri.key().size(), full_name.size() - uri.key().size() - 1);
bool is_dir = true;
int64_t file_size = 0;
file_status->emplace_back(name, is_dir, file_size);
FileMeta stat;
stat.set_is_dir(true);
if (!cb(name, stat)) {
return Status::OK();
}
}
for (auto&& obj : result.GetContents()) {
if (obj.GetKey() == uri.key()) {
Expand All @@ -570,19 +574,26 @@ Status S3FileSystem::list_path(const std::string& dir, std::vector<FileStatus>*
DCHECK(HasPrefixString(obj.GetKey(), uri.key()));

std::string_view obj_key(obj.GetKey());
bool is_dir = true;
int64_t file_size = 0;
FileMeta stat;
if (obj.LastModifiedHasBeenSet()) {
stat.set_modify_time(obj.GetLastModified().Seconds());
}
if (obj_key.back() == '/') {
obj_key = std::string_view(obj_key.data(), obj_key.size() - 1);
stat.set_is_dir(true);
} else {
DCHECK(obj.SizeHasBeenSet());
is_dir = false;
file_size = obj.GetSize();
stat.set_is_dir(false);
if (obj.SizeHasBeenSet()) {
stat.set_size(obj.GetSize());
}
}

std::string_view name(obj_key.data() + uri.key().size(), obj_key.size() - uri.key().size());

file_status->emplace_back(name, is_dir, file_size);
if (!cb(name, stat)) {
return Status::OK();
}
}
} while (result.GetIsTruncated());
return directory_exist ? Status::OK() : Status::NotFound(dir);
Expand Down
12 changes: 12 additions & 0 deletions be/src/fs/fs_starlet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,18 @@ class StarletFileSystem : public FileSystem {
return to_status(st);
}

Status iterate_dir2(const std::string& dir,
const std::function<bool(std::string_view, const FileMeta&)>& cb) override {
ASSIGN_OR_RETURN(auto pair, parse_starlet_uri(dir));
auto fs_st = get_shard_filesystem(pair.second);
if (!fs_st.ok()) {
return to_status(fs_st.status());
}
FileMeta meta;
auto st = (*fs_st)->list_dir(pair.first, false, [&](std::string_view name) { return cb(name, meta); });
return to_status(st);
}

Status create_dir(const std::string& dirname) override {
auto st = is_directory(dirname);
if (st.ok() && st.value()) {
Expand Down
Loading

0 comments on commit 7d85b43

Please sign in to comment.