From 0cf4eabef7f0fd483de93cd7c8a9a46e87403746 Mon Sep 17 00:00:00 2001 From: Dongxu Yang Date: Fri, 15 Dec 2023 20:46:29 +0800 Subject: [PATCH] added direct io support --- cpp/src/wholememory/file_io.cpp | 434 +++++++++++++++++++++++++------- 1 file changed, 337 insertions(+), 97 deletions(-) diff --git a/cpp/src/wholememory/file_io.cpp b/cpp/src/wholememory/file_io.cpp index 3274811e1..dbe6af002 100644 --- a/cpp/src/wholememory/file_io.cpp +++ b/cpp/src/wholememory/file_io.cpp @@ -15,14 +15,17 @@ */ #include "file_io.h" +#include #include #include +#include #include #include #include "communicator.hpp" #include "error.hpp" +#include "integer_utils.hpp" #include "logger.hpp" namespace wholememory { @@ -38,6 +41,15 @@ static size_t StatFileSize(const char* filename) return filesize; } +static size_t StatFileBlockSize(const char* filename) +{ + auto blocksize = static_cast(-1); + struct stat statbuf {}; + if (stat(filename, &statbuf) < 0) { return blocksize; } + blocksize = statbuf.st_blksize; + return blocksize; +} + static size_t get_handle_partial_size(size_t handle_size, size_t memory_offset, size_t memory_entry_stride, @@ -62,6 +74,282 @@ static size_t get_handle_partial_size(size_t handle_size, return partial_size; } +static void read_file_list_to_local_memory(char* local_ptr, + size_t local_size, + size_t local_offset, + size_t entry_size, + size_t memory_entry_stride, + size_t memory_offset, + int file_count, + const char** file_names, + const std::vector& file_sizes, + size_t suggested_buffer_size, + int wm_rank) +{ + size_t buffer_size; + size_t buffer_entry_count = 1; + if (suggested_buffer_size < entry_size) { + buffer_size = entry_size; + } else { + buffer_entry_count = suggested_buffer_size / entry_size; + buffer_size = buffer_entry_count * entry_size; + } + std::vector file_read_buffer(buffer_size); + + size_t local_entry_memory_start_index = local_offset / memory_entry_stride; + size_t local_entry_file_start_index = + local_entry_memory_start_index - memory_offset / memory_entry_stride; + size_t local_entry_count = local_size / memory_entry_stride; + char* local_write_ptr = local_ptr + memory_offset % memory_entry_stride; + if (wm_rank == 0) { + local_entry_count -= memory_offset / memory_entry_stride; + local_write_ptr += (memory_offset / memory_entry_stride) * memory_entry_stride; + } + size_t local_entry_idx = 0; + + size_t file_entry_offset = 0; + size_t total_read_bytes = 0; + for (int i = 0; i < file_count; i++) { + size_t file_entry_count = file_sizes[i] / entry_size; + // already outside reading window + if (file_entry_offset >= local_entry_file_start_index + local_entry_count) break; + // in reading window + if (file_entry_offset + file_entry_count > local_entry_file_start_index) { + size_t file_read_start_offset = 0; + FILE* fp = fopen(file_names[i], "rb"); + if (fp == nullptr) { WHOLEMEMORY_ERROR("Open file %s for read failed.", file_names[i]); } + // maybe in window end, remove possible tailing data that don't belong to current rank. + size_t to_read_file_entry_count = std::min( + file_entry_count, local_entry_file_start_index + local_entry_count - file_entry_offset); + // if in window begin, remove possible data that belongs to previous rank and skip disk + // data. + if (file_entry_offset < local_entry_file_start_index) { + size_t skip_entry_count = local_entry_file_start_index - file_entry_offset; + + file_read_start_offset = skip_entry_count * entry_size; + + if (fseeko(fp, file_read_start_offset, SEEK_SET) != 0) { + WHOLEMEMORY_ERROR( + "File %s seek to %ld failed.", file_names[i], skip_entry_count * entry_size); + } + to_read_file_entry_count -= skip_entry_count; + } + // now all data in file_entry_count need to be read. + size_t bytes_to_read = to_read_file_entry_count * entry_size; + size_t left_entry_count = to_read_file_entry_count; + while (left_entry_count > 0) { + size_t read_entry_count = std::min(left_entry_count, buffer_entry_count); + + int ret = fread(file_read_buffer.data(), entry_size, read_entry_count, fp); + if (ret != read_entry_count) { + WHOLEMEMORY_ERROR( + "File %s line %d: reading from file %s, read_entry_count=%ld, entry_size=%ld, " + "returned %d, error=%s\n", + __FILE__, + __LINE__, + file_names[i], + read_entry_count, + entry_size, + ret, + strerror(errno)); + } + + if (entry_size != memory_entry_stride) { + WM_CUDA_CHECK(cudaMemcpy2D(local_write_ptr, + memory_entry_stride, + file_read_buffer.data(), + entry_size, + entry_size, + read_entry_count, + cudaMemcpyDefault)); + } else { + WM_CUDA_CHECK(cudaMemcpy(local_write_ptr, + file_read_buffer.data(), + read_entry_count * entry_size, + cudaMemcpyDefault)); + } + local_write_ptr += read_entry_count * memory_entry_stride; + + left_entry_count -= read_entry_count; + } + fclose(fp); + WHOLEMEMORY_INFO( + "Rank=%d done Reading %ld bytes from file %s size=%ld, starting from offset=%ld.", + wm_rank, + bytes_to_read, + file_names[i], + file_sizes[i], + file_read_start_offset); + total_read_bytes += bytes_to_read; + } + file_entry_offset += file_entry_count; + } + WHOLEMEMORY_INFO( + "Rank=%d done reading total %ld bytes from needed files.", wm_rank, total_read_bytes); +} + +static void read_file_list_to_local_memory_directio(char* local_ptr, + size_t local_size, + size_t local_offset, + size_t entry_size, + size_t memory_entry_stride, + size_t memory_offset, + int file_count, + const char** file_names, + const std::vector& file_sizes, + size_t suggested_buffer_size, + int wm_rank) +{ + if (memory_offset + entry_size > memory_entry_stride) { + WHOLEMEMORY_FAIL_NOTHROW("Direct io mode only support reading all entries."); + } + size_t local_entry_start_index = local_offset / memory_entry_stride; + size_t local_entry_count = local_size / memory_entry_stride; + char* local_write_ptr = local_ptr + memory_offset % memory_entry_stride; + + static size_t kAlignSize = 16 * 1024 * 1024; + suggested_buffer_size = round_up_unsafe(suggested_buffer_size, kAlignSize); + + char* block_buffer; + WHOLEMEMORY_CHECK_NOTHROW(posix_memalign(reinterpret_cast(&block_buffer), + kAlignSize, + suggested_buffer_size) == 0); + + size_t file_entry_offset = 0; + size_t read_entry_count = 0; + for (int i = 0; i < file_count; i++) { + size_t file_entry_count = file_sizes[i] / entry_size; + // already outside reading window + if (file_entry_offset >= local_entry_start_index + local_entry_count) break; + // reading window not reached + if (file_entry_offset + file_entry_count <= local_entry_start_index) { + file_entry_offset += file_entry_count; + continue; + } + // in reading window + auto block_size = StatFileBlockSize(file_names[i]); + if (block_size == 0 || block_size == (size_t)-1 || kAlignSize % block_size != 0) { + WHOLEMEMORY_FAIL_NOTHROW( + "block_size=%ld for file %s, but alignment is %ld", block_size, file_names[i], kAlignSize); + } + size_t buffer_block_count = suggested_buffer_size / block_size; + int fd = open(file_names[i], O_DIRECT | O_RDONLY); + if (fd < 0) { WHOLEMEMORY_FAIL_NOTHROW("Open file %s with direct io failed.", file_names[i]); } + + // maybe in window end, remove possible tailing data that don't belong to current rank. + size_t to_read_file_entry_count = + std::min(file_entry_count, local_entry_start_index + local_entry_count - file_entry_offset); + + size_t file_read_end = to_read_file_entry_count * entry_size; + // if in window begin, remove possible data that belongs to previous rank and skip disk + // data. + size_t file_read_start = 0; + if (file_entry_offset < local_entry_start_index) { + size_t skip_entry_count = local_entry_start_index - file_entry_offset; + to_read_file_entry_count -= skip_entry_count; + file_read_start = skip_entry_count * entry_size; + } + + size_t file_block_read_offset = file_read_start / block_size * block_size; + size_t skip_head_size = file_read_start - file_block_read_offset; + + char* local_mem_write_entry_for_file = local_write_ptr + read_entry_count * memory_entry_stride; + size_t first_mem_entry_offset = 0; + size_t useful_data_bytes_read = 0; + size_t physical_data_bytes_read = 0; + while (file_block_read_offset < file_read_end) { + size_t left_size = file_read_end - file_block_read_offset; + size_t left_block_count = div_rounding_up_unsafe(left_size, block_size); + size_t read_block_count = std::min(left_block_count, buffer_block_count); + size_t physical_read_size = read_block_count * block_size; + physical_data_bytes_read += physical_read_size; + + ssize_t pread_size = pread64(fd, block_buffer, physical_read_size, file_block_read_offset); + if (pread_size != physical_read_size && + file_block_read_offset + pread_size != file_sizes[i]) { + WHOLEMEMORY_FAIL_NOTHROW( + "rank=%d, pread_size=%ld, physical_read_size=%ld, file_block_read_offset=%ld, " + "file_sizes[i]=%ld, file=%s", + wm_rank, + pread_size, + physical_read_size, + file_block_read_offset, + file_sizes[i], + file_names[i]); + } + + size_t drop_tail_size = 0; + if (file_block_read_offset + physical_read_size > file_read_end) { + drop_tail_size = file_block_read_offset + physical_read_size - file_read_end; + } + + char* useful_data_ptr = block_buffer + skip_head_size; + size_t useful_data_size = physical_read_size - skip_head_size - drop_tail_size; + + useful_data_bytes_read += useful_data_size; + + if (first_mem_entry_offset != 0) { + // process head + size_t entry_left_size = entry_size - first_mem_entry_offset; + WM_CUDA_CHECK_NO_THROW(cudaMemcpy(local_mem_write_entry_for_file + first_mem_entry_offset, + useful_data_ptr, + entry_left_size, + cudaMemcpyDefault)); + local_mem_write_entry_for_file += memory_entry_stride; + useful_data_ptr += entry_left_size; + useful_data_size -= entry_left_size; + entry_left_size = 0; + } + + size_t full_entry_count = useful_data_size / entry_size; + size_t full_entry_size = full_entry_count * entry_size; + + if (full_entry_size > 0) { + if (entry_size != memory_entry_stride) { + WM_CUDA_CHECK(cudaMemcpy2D(local_mem_write_entry_for_file, + memory_entry_stride, + useful_data_ptr, + entry_size, + entry_size, + full_entry_count, + cudaMemcpyDefault)); + } else { + WM_CUDA_CHECK(cudaMemcpy( + local_mem_write_entry_for_file, useful_data_ptr, full_entry_size, cudaMemcpyDefault)); + } + local_mem_write_entry_for_file += memory_entry_stride * full_entry_count; + useful_data_ptr += full_entry_size; + useful_data_size -= full_entry_size; + } + + size_t tail_entry_size = useful_data_size % entry_size; + if (tail_entry_size != 0) { + // process tail + WM_CUDA_CHECK_NO_THROW(cudaMemcpy( + local_mem_write_entry_for_file, useful_data_ptr, tail_entry_size, cudaMemcpyDefault)); + first_mem_entry_offset = tail_entry_size; + } + + file_block_read_offset += physical_read_size; + skip_head_size = 0; + } + + WHOLEMEMORY_INFO( + "Rank=%d done Reading %ld useful bytes by reading %ld block bytes using DirectIO from file " + "%s size=%ld.", + wm_rank, + useful_data_bytes_read, + physical_data_bytes_read, + file_names[i], + file_sizes[i]); + + close(fd); + file_entry_offset += file_entry_count; + read_entry_count += to_read_file_entry_count; + } + free(block_buffer); +} + wholememory_error_code_t load_file_to_handle(wholememory_handle_t wholememory_handle, size_t memory_offset, size_t memory_entry_stride, @@ -153,107 +441,59 @@ wholememory_error_code_t load_file_to_handle(wholememory_handle_t wholememory_ha (void**)(&local_ptr), &local_size, &local_offset, wholememory_handle) == WHOLEMEMORY_SUCCESS); - constexpr int kSuggestedBufferSize = 16 * 1024 * 1024; - size_t buffer_size; - size_t buffer_entry_count = 1; - if (kSuggestedBufferSize < entry_size) { - buffer_size = entry_size; - } else { - buffer_entry_count = kSuggestedBufferSize / entry_size; - buffer_size = buffer_entry_count * entry_size; + int suggested_buffer_size_mb = 16; + const char* buffer_size_env_var = std::getenv("WG_LOAD_BUFFER_SIZE_MB"); + if (buffer_size_env_var != nullptr) { + try { + suggested_buffer_size_mb = std::stoi(buffer_size_env_var); + } catch (const std::invalid_argument& e) { + suggested_buffer_size_mb = 16; + WHOLEMEMORY_WARN( + "Environment variable WG_LOAD_BUFFER_SIZE_MB value %s is not valid, using default %d", + buffer_size_env_var, + suggested_buffer_size_mb); + } + if (suggested_buffer_size_mb < 1) { + suggested_buffer_size_mb = 16; + WHOLEMEMORY_WARN( + "Environment variable WG_LOAD_BUFFER_SIZE_MB value %s is not valid, using default %d", + buffer_size_env_var, + suggested_buffer_size_mb); + } } - std::vector file_read_buffer(buffer_size); + size_t suggested_buffer_size = static_cast(suggested_buffer_size_mb) * 1024 * 1024; - size_t local_entry_memory_start_index = local_offset / memory_entry_stride; - size_t local_entry_file_start_index = - local_entry_memory_start_index - memory_offset / memory_entry_stride; - size_t local_entry_count = local_size / memory_entry_stride; - char* local_write_ptr = local_ptr + memory_offset % memory_entry_stride; - if (wm_rank == 0) { - local_entry_count -= memory_offset / memory_entry_stride; - local_write_ptr += (memory_offset / memory_entry_stride) * memory_entry_stride; + const char* directio_env_var = std::getenv("WG_LOAD_USE_DIRECTIO"); + bool use_direct_io = false; + if (directio_env_var != nullptr && directio_env_var[0] == '1' && directio_env_var[1] == '\0') { + use_direct_io = true; } - size_t local_entry_idx = 0; - - size_t file_entry_offset = 0; - size_t total_read_bytes = 0; - for (int i = 0; i < file_count; i++) { - size_t file_entry_count = file_sizes[i] / entry_size; - // already outside reading window - if (file_entry_offset >= local_entry_file_start_index + local_entry_count) break; - // in reading window - if (file_entry_offset + file_entry_count > local_entry_file_start_index) { - size_t file_read_start_offset = 0; - FILE* fp = fopen(file_names[i], "rb"); - if (fp == nullptr) { WHOLEMEMORY_ERROR("Open file %s for read failed.", file_names[i]); } - // maybe in window end, remove possible tailing data that don't belong to current rank. - size_t to_read_file_entry_count = std::min( - file_entry_count, local_entry_file_start_index + local_entry_count - file_entry_offset); - // if in window begin, remove possible data that belongs to previous rank and skip disk - // data. - if (file_entry_offset < local_entry_file_start_index) { - size_t skip_entry_count = local_entry_file_start_index - file_entry_offset; - - file_read_start_offset = skip_entry_count * entry_size; - - if (fseeko(fp, file_read_start_offset, SEEK_SET) != 0) { - WHOLEMEMORY_ERROR( - "File %s seek to %ld failed.", file_names[i], skip_entry_count * entry_size); - } - to_read_file_entry_count -= skip_entry_count; - } - // now all data in file_entry_count need to be read. - size_t bytes_to_read = to_read_file_entry_count * entry_size; - size_t left_entry_count = to_read_file_entry_count; - while (left_entry_count > 0) { - size_t read_entry_count = std::min(left_entry_count, buffer_entry_count); - - int ret = fread(file_read_buffer.data(), entry_size, read_entry_count, fp); - if (ret != read_entry_count) { - WHOLEMEMORY_ERROR( - "File %s line %d: reading from file %s, read_entry_count=%ld, entry_size=%ld, " - "returned %d, error=%s\n", - __FILE__, - __LINE__, - file_names[i], - read_entry_count, - entry_size, - ret, - strerror(errno)); - } - - if (entry_size != memory_entry_stride) { - WM_CUDA_CHECK(cudaMemcpy2D(local_write_ptr, - memory_entry_stride, - file_read_buffer.data(), - entry_size, - entry_size, - read_entry_count, - cudaMemcpyDefault)); - } else { - WM_CUDA_CHECK(cudaMemcpy(local_write_ptr, - file_read_buffer.data(), - read_entry_count * entry_size, - cudaMemcpyDefault)); - } - local_write_ptr += read_entry_count * memory_entry_stride; - - left_entry_count -= read_entry_count; - } - fclose(fp); - WHOLEMEMORY_INFO( - "Rank=%d done Reading %ld bytes from file %s size=%ld, starting from offset=%ld.", - wm_rank, - bytes_to_read, - file_names[i], - file_sizes[i], - file_read_start_offset); - total_read_bytes += bytes_to_read; - } - file_entry_offset += file_entry_count; + if (!use_direct_io) { + read_file_list_to_local_memory(local_ptr, + local_size, + local_offset, + entry_size, + memory_entry_stride, + memory_offset, + file_count, + file_names, + file_sizes, + suggested_buffer_size, + wm_rank); + } else { + read_file_list_to_local_memory_directio(local_ptr, + local_size, + local_offset, + entry_size, + memory_entry_stride, + memory_offset, + file_count, + file_names, + file_sizes, + suggested_buffer_size, + wm_rank); } - WHOLEMEMORY_INFO( - "Rank=%d done reading total %ld bytes from needed files.", wm_rank, total_read_bytes); + wm_comm->barrier(); } catch (wholememory::logic_error& wle) { WHOLEMEMORY_ERROR("Logic error: %s", wle.what());