diff --git a/conda/environments/all_cuda-118_arch-aarch64.yaml b/conda/environments/all_cuda-118_arch-aarch64.yaml index c6e5314b17..82790a443d 100644 --- a/conda/environments/all_cuda-118_arch-aarch64.yaml +++ b/conda/environments/all_cuda-118_arch-aarch64.yaml @@ -6,6 +6,8 @@ channels: - conda-forge - nvidia dependencies: +- aws-sdk-cpp +- boto3>=1.21.21 - c-compiler - cmake>=3.26.4,!=3.30.0 - cuda-python>=11.7.1,<12.0a0 @@ -17,6 +19,7 @@ dependencies: - dask>=2022.05.2 - doxygen=1.9.1 - gcc_linux-aarch64=11.* +- moto>=4.0.8 - ninja - numcodecs <0.12.0 - numpy>=1.23,<2.0a0 diff --git a/conda/environments/all_cuda-118_arch-x86_64.yaml b/conda/environments/all_cuda-118_arch-x86_64.yaml index 526e7155ef..cce77c120f 100644 --- a/conda/environments/all_cuda-118_arch-x86_64.yaml +++ b/conda/environments/all_cuda-118_arch-x86_64.yaml @@ -6,6 +6,8 @@ channels: - conda-forge - nvidia dependencies: +- aws-sdk-cpp +- boto3>=1.21.21 - c-compiler - cmake>=3.26.4,!=3.30.0 - cuda-python>=11.7.1,<12.0a0 @@ -19,6 +21,7 @@ dependencies: - gcc_linux-64=11.* - libcufile-dev=1.4.0.31 - libcufile=1.4.0.31 +- moto>=4.0.8 - ninja - numcodecs <0.12.0 - numpy>=1.23,<2.0a0 diff --git a/conda/environments/all_cuda-125_arch-aarch64.yaml b/conda/environments/all_cuda-125_arch-aarch64.yaml index 8a9b368aaa..6500ccfea3 100644 --- a/conda/environments/all_cuda-125_arch-aarch64.yaml +++ b/conda/environments/all_cuda-125_arch-aarch64.yaml @@ -6,6 +6,8 @@ channels: - conda-forge - nvidia dependencies: +- aws-sdk-cpp +- boto3>=1.21.21 - c-compiler - cmake>=3.26.4,!=3.30.0 - cuda-nvcc @@ -18,6 +20,7 @@ dependencies: - doxygen=1.9.1 - gcc_linux-aarch64=11.* - libcufile-dev +- moto>=4.0.8 - ninja - numcodecs <0.12.0 - numpy>=1.23,<2.0a0 diff --git a/conda/environments/all_cuda-125_arch-x86_64.yaml b/conda/environments/all_cuda-125_arch-x86_64.yaml index 3a1ed63b16..f5942cc6f6 100644 --- a/conda/environments/all_cuda-125_arch-x86_64.yaml +++ b/conda/environments/all_cuda-125_arch-x86_64.yaml @@ -6,6 +6,8 @@ channels: - conda-forge - nvidia dependencies: +- aws-sdk-cpp +- boto3>=1.21.21 - c-compiler - cmake>=3.26.4,!=3.30.0 - cuda-nvcc @@ -18,6 +20,7 @@ dependencies: - doxygen=1.9.1 - gcc_linux-64=11.* - libcufile-dev +- moto>=4.0.8 - ninja - numcodecs <0.12.0 - numpy>=1.23,<2.0a0 diff --git a/conda/recipes/kvikio/meta.yaml b/conda/recipes/kvikio/meta.yaml index 247be31cf7..504a38b0cf 100644 --- a/conda/recipes/kvikio/meta.yaml +++ b/conda/recipes/kvikio/meta.yaml @@ -52,6 +52,7 @@ requirements: - {{ compiler('cuda') }} {% endif %} - {{ stdlib("c") }} + - aws-sdk-cpp host: - python - pip @@ -64,11 +65,13 @@ requirements: - rapids-build-backend >=0.3.0,<0.4.0.dev0 - scikit-build-core >=0.10.0 - libkvikio ={{ version }} + - aws-sdk-cpp run: - python - numpy >=1.23,<2.0a0 - cupy >=12.0.0 - zarr + - aws-sdk-cpp # See https://github.com/zarr-developers/numcodecs/pull/475 - numcodecs <0.12.0 - packaging diff --git a/conda/recipes/libkvikio/meta.yaml b/conda/recipes/libkvikio/meta.yaml index 186c373f56..0c97e01abe 100644 --- a/conda/recipes/libkvikio/meta.yaml +++ b/conda/recipes/libkvikio/meta.yaml @@ -43,6 +43,7 @@ requirements: {% endif %} - ninja - {{ stdlib("c") }} + - aws-sdk-cpp host: - cuda-version ={{ cuda_version }} {% if cuda_major == "11" %} @@ -52,6 +53,7 @@ requirements: {% else %} - libcufile-dev # [linux] {% endif %} + - aws-sdk-cpp outputs: - name: libkvikio @@ -72,8 +74,10 @@ outputs: requirements: build: - cmake {{ cmake_version }} + - aws-sdk-cpp host: - cuda-version ={{ cuda_version }} + - aws-sdk-cpp run: - {{ pin_compatible('cuda-version', max_pin='x', min_pin='x') }} {% if cuda_major == "11" %} @@ -83,6 +87,7 @@ outputs: {% else %} - libcufile-dev # [linux] {% endif %} + - aws-sdk-cpp test: commands: - test -f $PREFIX/include/kvikio/file_handle.hpp @@ -106,6 +111,7 @@ outputs: - cuda-cudart-dev - libcufile-dev # [linux] {% endif %} + - aws-sdk-cpp requirements: build: - cmake {{ cmake_version }} @@ -118,6 +124,7 @@ outputs: - cuda-cudart-dev - libcufile-dev # [linux] {% endif %} + - aws-sdk-cpp run: - {{ pin_compatible('cuda-version', max_pin='x', min_pin='x') }} {% if cuda_major == "11" %} @@ -127,6 +134,7 @@ outputs: - cuda-cudart - libcufile # [linux] {% endif %} + - aws-sdk-cpp about: home: https://rapids.ai license: Apache-2.0 diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index f4f3f13109..646e676297 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -55,6 +55,12 @@ rapids_find_package( INSTALL_EXPORT_SET kvikio-exports ) +rapids_find_package( + AWSSDK COMPONENTS s3 + BUILD_EXPORT_SET kvikio-exports + INSTALL_EXPORT_SET kvikio-exports +) + rapids_find_package( cuFile BUILD_EXPORT_SET kvikio-exports @@ -130,6 +136,7 @@ target_include_directories( ) target_link_libraries( kvikio INTERFACE Threads::Threads ${CMAKE_DL_LIBS} nvtx3::nvtx3-cpp BS::thread_pool + ${AWSSDK_LINK_LIBRARIES} ) target_compile_features(kvikio INTERFACE cxx_std_17) diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt index c12ddb2e52..284590e943 100644 --- a/cpp/examples/CMakeLists.txt +++ b/cpp/examples/CMakeLists.txt @@ -14,6 +14,8 @@ set(TEST_INSTALL_PATH bin/tests/libkvikio) +# Example: basic_io + if(CUDAToolkit_FOUND) add_executable(BASIC_IO_TEST basic_io.cpp) set_target_properties(BASIC_IO_TEST PROPERTIES INSTALL_RPATH "\$ORIGIN/../../lib") @@ -35,6 +37,8 @@ else() message(STATUS "Cannot build the basic_io example when CUDA is not found") endif() +# Example: basic_no_cuda + add_executable(BASIC_NO_CUDA_TEST basic_no_cuda.cpp) set_target_properties(BASIC_NO_CUDA_TEST PROPERTIES INSTALL_RPATH "\$ORIGIN/../../lib") target_include_directories(BASIC_NO_CUDA_TEST PRIVATE ../include) diff --git a/cpp/include/kvikio/file_handle.hpp b/cpp/include/kvikio/file_handle.hpp index 80779c5282..cef5f4ed1f 100644 --- a/cpp/include/kvikio/file_handle.hpp +++ b/cpp/include/kvikio/file_handle.hpp @@ -144,7 +144,7 @@ class FileHandle { bool _initialized{false}; bool _compat_mode{false}; mutable std::size_t _nbytes{0}; // The size of the underlying file, zero means unknown. - CUfileHandle_t _handle{}; + CUfileHandle_t _handle{nullptr}; public: static constexpr mode_t m644 = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH; @@ -208,7 +208,7 @@ class FileHandle { _initialized{std::exchange(o._initialized, false)}, _compat_mode{std::exchange(o._compat_mode, false)}, _nbytes{std::exchange(o._nbytes, 0)}, - _handle{std::exchange(o._handle, CUfileHandle_t{})} + _handle{std::exchange(o._handle, CUfileHandle_t{nullptr})} { } FileHandle& operator=(FileHandle&& o) noexcept @@ -218,7 +218,7 @@ class FileHandle { _initialized = std::exchange(o._initialized, false); _compat_mode = std::exchange(o._compat_mode, false); _nbytes = std::exchange(o._nbytes, 0); - _handle = std::exchange(o._handle, CUfileHandle_t{}); + _handle = std::exchange(o._handle, CUfileHandle_t{nullptr}); return *this; } ~FileHandle() noexcept { close(); } @@ -232,8 +232,8 @@ class FileHandle { { if (closed()) { return; } - if (!_compat_mode) { cuFileAPI::instance().HandleDeregister(_handle); } - ::close(_fd_direct_off); + if (_handle != nullptr) { cuFileAPI::instance().HandleDeregister(_handle); } + if (_fd_direct_off != -1) { ::close(_fd_direct_off); } if (_fd_direct_on != -1) { ::close(_fd_direct_on); } _fd_direct_on = -1; _fd_direct_off = -1; diff --git a/cpp/include/kvikio/remote_handle.hpp b/cpp/include/kvikio/remote_handle.hpp new file mode 100644 index 0000000000..78f21d3024 --- /dev/null +++ b/cpp/include/kvikio/remote_handle.hpp @@ -0,0 +1,264 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +using namespace std::chrono; + +namespace kvikio { +namespace detail { + +/** + * Stream implementation of a fixed size buffer + */ +class BufferAsStream : public Aws::IOStream { + public: + using Base = Aws::IOStream; + explicit BufferAsStream(std::streambuf* buf) : Base(buf) {} + + ~BufferAsStream() override = default; +}; + +class S3Context { + public: + S3Context() : _client{S3Context::create_client()} {} + + Aws::S3::S3Client& client() { return *_client; } + + static S3Context& default_context() + { + static S3Context _default_context; + return _default_context; + } + + S3Context(S3Context const&) = delete; + void operator=(S3Context const&) = delete; + + private: + static void ensure_aws_s3_api_init() + { + static bool not_initalized{true}; + if (not_initalized) { + std::cout << "ensure_aws_s3_api_initalized INIT" << std::endl; + not_initalized = false; + + Aws::SDKOptions options; + // options.loggingOptions.logLevel = Aws::Utils::Logging::LogLevel::Error; + Aws::InitAPI(options); // Should only be called once. + } + } + + static std::shared_ptr create_client() + { + S3Context::ensure_aws_s3_api_init(); + + Aws::Client::ClientConfiguration clientConfig; + // Optional: Set to the AWS Region (overrides config file). + // clientConfig.region = "us-east-1"; + + const char* endpointOverride = getenv("AWS_ENDPOINT_URL"); + if (endpointOverride != nullptr) { clientConfig.endpointOverride = endpointOverride; } + + // You don't normally have to test that you are authenticated. But the S3 service permits + // anonymous requests, thus the s3Client will return "success" even if you are + // unauthenticated, which can be confusing to a new user. + auto provider = Aws::MakeShared("alloc-tag"); + auto creds = provider->GetAWSCredentials(); + if (creds.IsEmpty()) { + throw std::runtime_error(std::string("Failed authentication to ") + endpointOverride); + } + auto ret = std::make_shared(Aws::S3::S3Client(clientConfig)); + + // Try the connection + auto outcome = ret->ListBuckets(); + if (!outcome.IsSuccess()) { + throw std::runtime_error(std::string("S3 error: ") + outcome.GetError().GetMessage()); + } + return ret; + } + + std::shared_ptr _client; +}; + +inline std::size_t get_s3_file_size(const std::string& bucket_name, const std::string& object_name) +{ + KVIKIO_NVTX_FUNC_RANGE(); + Aws::S3::Model::HeadObjectRequest req; + req.SetBucket(bucket_name.c_str()); + req.SetKey(object_name.c_str()); + Aws::S3::Model::HeadObjectOutcome outcome = S3Context::default_context().client().HeadObject(req); + if (!outcome.IsSuccess()) { + const Aws::S3::S3Error& err = outcome.GetError(); + throw std::invalid_argument("get_s3_file_size(): " + err.GetExceptionName() + ": " + + err.GetMessage()); + } + return outcome.GetResult().GetContentLength(); +} + +inline std::pair parse_s3_path(const std::string& path) +{ + if (path.empty()) { throw std::invalid_argument("The remote path cannot be an empty string."); } + if (path.size() < 5 || path.substr(0, 5) != "s3://") { + throw std::invalid_argument("The remote path must start with the S3 scheme (\"s3://\")."); + } + std::string p = path.substr(5); + if (p.empty()) { throw std::invalid_argument("The remote path cannot be an empty string."); } + size_t pos = p.find_first_of('/'); + if (pos == 0) { throw std::invalid_argument("The remote path does not contain a bucket name."); } + return std::make_pair(p.substr(0, pos), (pos == std::string::npos) ? "" : p.substr(pos + 1)); +} + +} // namespace detail + +/** + * @brief Handle of + * + * At the moment, only AWS S3 is the supported + */ +class RemoteHandle { + private: + std::string _bucket_name{}; + std::string _object_name{}; + std::size_t _nbytes{}; + + public: + RemoteHandle() noexcept = default; + + RemoteHandle(std::string bucket_name, std::string object_name) + : _bucket_name(std::move(bucket_name)), + _object_name(std::move(object_name)), + _nbytes(detail::get_s3_file_size(_bucket_name, _object_name)) + { + std::cout << "RemoteHandle() - bucket_name: " << _bucket_name + << ", object_name: " << _object_name << ", nbytes: " << _nbytes << std::endl; + } + + RemoteHandle(const std::string& remote_path) + { + auto [bucket_name, object_name] = detail::parse_s3_path(remote_path); + _bucket_name = std::move(bucket_name); + _object_name = std::move(object_name); + _nbytes = detail::get_s3_file_size(_bucket_name, _object_name); + + std::cout << "RemoteHandle() - remote_path: " << remote_path + << ", bucket_name: " << _bucket_name << ", object_name: " << _object_name + << ", nbytes: " << _nbytes << std::endl; + } + + /** + * @brief Get the file size + * + * @return The number of bytes + */ + [[nodiscard]] inline std::size_t nbytes() const { return _nbytes; } + + std::size_t read_to_host(void* buf, std::size_t size, std::size_t file_offset = 0) + { + KVIKIO_NVTX_FUNC_RANGE("AWS S3 receive", size); + auto t0 = high_resolution_clock::now(); + + auto& default_context = detail::S3Context::default_context(); + Aws::S3::Model::GetObjectRequest req; + req.SetBucket(_bucket_name.c_str()); + req.SetKey(_object_name.c_str()); + const std::string byte_range = + "bytes=" + std::to_string(file_offset) + "-" + std::to_string(file_offset + size - 1); + req.SetRange(byte_range.c_str()); + + // To write directly to `buf`, we register a "factory" that wraps a buffer as a output stream. + Aws::Utils::Stream::PreallocatedStreamBuf buf_stream(static_cast(buf), size); + req.SetResponseStreamFactory( + [&]() { return Aws::New("BufferAsStream", &buf_stream); }); + + Aws::S3::Model::GetObjectOutcome outcome = default_context.client().GetObject(req); + if (!outcome.IsSuccess()) { + const Aws::S3::S3Error& err = outcome.GetError(); + throw std::runtime_error(err.GetExceptionName() + ": " + err.GetMessage()); + } + const std::size_t n = outcome.GetResult().GetContentLength(); + if (n != size) { + throw std::runtime_error("S3 read of " + std::to_string(size) + " bytes failed, received " + + std::to_string(n) + " bytes"); + } + auto t1 = high_resolution_clock::now(); + float duration = size / (duration_cast(t1 - t0).count() / 1000000.0); + + std::cout << "RemoteHandle::read_to_host() - buf: " << buf << ", size: " << size + << ", file_offset: " << file_offset << ", bw: " << duration / (2 << 20) << " MiB/s" + << std::endl; + return n; + } + + std::size_t read(void* buf, std::size_t size, std::size_t file_offset = 0) + { + KVIKIO_NVTX_FUNC_RANGE("RemoteHandle::read()", size); + if (is_host_memory(buf)) { return read_to_host(buf, size, file_offset); } + + CUcontext ctx = get_context_from_pointer(buf); + PushAndPopContext c(ctx); + + auto alloc = detail::AllocRetain::instance().get(); // Host memory allocation + CUdeviceptr devPtr = convert_void2deviceptr(buf); + CUstream stream = detail::StreamsByThread::get(); + + std::size_t cur_file_offset = convert_size2off(file_offset); + std::size_t byte_remaining = convert_size2off(size); + + while (byte_remaining > 0) { + const std::size_t nbytes_requested = std::min(posix_bounce_buffer_size, byte_remaining); + std::size_t nbytes_got = nbytes_requested; + nbytes_got = read_to_host(alloc.get(), nbytes_requested, cur_file_offset); + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync(devPtr, alloc.get(), nbytes_got, stream)); + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); + cur_file_offset += nbytes_got; + devPtr += nbytes_got; + byte_remaining -= nbytes_got; + } + return size; + } + + std::future pread(void* buf, std::size_t size, std::size_t file_offset = 0) + { + KVIKIO_NVTX_FUNC_RANGE("RemoteHandle::pread()", size); + std::cout << "RemoteHandle::pread()" << std::endl; + auto task = [this](void* devPtr_base, + std::size_t size, + std::size_t file_offset, + std::size_t devPtr_offset) -> std::size_t { + return read(static_cast(devPtr_base) + devPtr_offset, size, file_offset); + }; + return parallel_io(task, buf, size, file_offset, posix_bounce_buffer_size, 0); + } +}; + +} // namespace kvikio diff --git a/dependencies.yaml b/dependencies.yaml index b3617388ce..2dfd954af1 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -106,6 +106,7 @@ dependencies: packages: - c-compiler - cxx-compiler + - aws-sdk-cpp specific: - output_types: conda matrices: @@ -319,6 +320,8 @@ dependencies: - &dask dask>=2022.05.2 - pytest - pytest-cov + - boto3>=1.21.21 + - moto>=4.0.8 specific: - output_types: [conda, requirements, pyproject] matrices: @@ -329,9 +332,3 @@ dependencies: - matrix: # All CUDA 11 versions packages: - cuda-python>=11.7.1,<12.0a0 - test_python_legate: - common: - - output_types: [conda, requirements, pyproject] - packages: - - *dask - - distributed>=2022.05.2 diff --git a/python/kvikio/kvikio/__init__.py b/python/kvikio/kvikio/__init__.py index d31d308916..e3bb0ffd37 100644 --- a/python/kvikio/kvikio/__init__.py +++ b/python/kvikio/kvikio/__init__.py @@ -4,6 +4,7 @@ from kvikio._lib import buffer, driver_properties # type: ignore from kvikio._version import __git_commit__, __version__ from kvikio.cufile import CuFile +from kvikio.remote_file import RemoteFile, is_remote_file_available def memory_register(buf) -> None: @@ -18,4 +19,10 @@ def memory_deregister(buf) -> None: DriverProperties = driver_properties.DriverProperties -__all__ = ["__git_commit__", "__version__", "CuFile"] +__all__ = [ + "__git_commit__", + "__version__", + "CuFile", + "RemoteFile", + "is_remote_file_available", +] diff --git a/python/kvikio/kvikio/_lib/CMakeLists.txt b/python/kvikio/kvikio/_lib/CMakeLists.txt index c77d8e3df1..2eec2d2668 100644 --- a/python/kvikio/kvikio/_lib/CMakeLists.txt +++ b/python/kvikio/kvikio/_lib/CMakeLists.txt @@ -17,8 +17,15 @@ set(cython_modules arr.pyx buffer.pyx defaults.pyx driver_properties.pyx file_ha libnvcomp.pyx libnvcomp_ll.pyx ) +if(AWSSDK_FOUND) + message(STATUS "Building remote_handle.pyx (aws-cpp-sdk-s3 found)") + list(APPEND cython_modules remote_handle.pyx) +else() + message(WARNING "Skipping remote_handle.pyx (aws-cpp-sdk-s3 not found)") +endif() + rapids_cython_create_modules( CXX SOURCE_FILES "${cython_modules}" - LINKED_LIBRARIES kvikio::kvikio nvcomp::nvcomp + LINKED_LIBRARIES kvikio::kvikio nvcomp::nvcomp ${AWSSDK_LINK_LIBRARIES} ) diff --git a/python/kvikio/kvikio/_lib/remote_handle.pyx b/python/kvikio/kvikio/_lib/remote_handle.pyx new file mode 100644 index 0000000000..1fa436b01c --- /dev/null +++ b/python/kvikio/kvikio/_lib/remote_handle.pyx @@ -0,0 +1,78 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +# distutils: language = c++ +# cython: language_level=3 + +from typing import Optional + +from libc.stdint cimport uintptr_t +from libcpp.string cimport string +from libcpp.utility cimport pair + +from kvikio._lib.arr cimport parse_buffer_argument +from kvikio._lib.future cimport IOFuture, _wrap_io_future, future + + +cdef extern from "" namespace "kvikio" nogil: + cdef cppclass RemoteHandle: + RemoteHandle() except + + RemoteHandle( + string bucket_name, + string object_name, + ) except + + RemoteHandle( + string remote_path, + ) except + + int nbytes() + size_t read( + void* buf, + size_t size, + size_t file_offset + ) except + + future[size_t] pread( + void* buf, + size_t size, + size_t file_offset + ) except + + + +cdef class RemoteFile: + """ Remote file handle""" + cdef RemoteHandle _handle + + @classmethod + def from_bucket_and_object(cls, bucket_name: str, object_name: str): + cdef RemoteFile ret = RemoteFile() + ret._handle = RemoteHandle( + str.encode(str(bucket_name)), + str.encode(str(object_name)), + ) + return ret + + @classmethod + def from_url(cls, url: str): + cdef RemoteFile ret = RemoteFile() + ret._handle = RemoteHandle(str.encode(str(url))) + return ret + + def nbytes(self) -> int: + return self._handle.nbytes() + + def read(self, buf, size: Optional[int], file_offset: int) -> int: + cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + return self._handle.read( + info.first, + info.second, + file_offset, + ) + + def pread(self, buf, size: Optional[int], file_offset: int) -> IOFuture: + cdef pair[uintptr_t, size_t] info = parse_buffer_argument(buf, size, True) + return _wrap_io_future( + self._handle.pread( + info.first, + info.second, + file_offset, + ) + ) diff --git a/python/kvikio/kvikio/benchmarks/aws_s3_io.py b/python/kvikio/kvikio/benchmarks/aws_s3_io.py new file mode 100644 index 0000000000..3c9d32e6c7 --- /dev/null +++ b/python/kvikio/kvikio/benchmarks/aws_s3_io.py @@ -0,0 +1,230 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +import argparse +import contextlib +import multiprocessing +import os +import socket +import statistics +import sys +import time +from functools import partial +from typing import ContextManager +from urllib.parse import urlparse + +import boto3 +import cupy +import numpy +from dask.utils import format_bytes + +import kvikio +import kvikio.defaults + + +def get_local_port() -> int: + """Return an available port""" + sock = socket.socket() + sock.bind(("127.0.0.1", 0)) + port = sock.getsockname()[1] + sock.close() + return port + + +def start_s3_server(lifetime=3600): + from moto.server import ThreadedMotoServer + + # Silence the activity info from ThreadedMotoServer + sys.stderr = open("/dev/null", "w") + url = urlparse(os.environ["AWS_ENDPOINT_URL"]) + server = ThreadedMotoServer(ip_address=url.hostname, port=url.port) + server.start() + time.sleep(lifetime) + + +@contextlib.contextmanager +def local_s3_server(): + # Use fake aws credentials + os.environ["AWS_ACCESS_KEY_ID"] = "foobar_key" + os.environ["AWS_SECRET_ACCESS_KEY"] = "foobar_secret" + os.environ["AWS_DEFAULT_REGION"] = "us-east-1" + p = multiprocessing.Process(target=start_s3_server) + p.start() + yield + p.kill() + + +def create_client_and_bucket(): + client = boto3.client("s3", endpoint_url=os.getenv("AWS_ENDPOINT_URL", None)) + try: + client.create_bucket(Bucket=args.bucket, ACL="public-read-write") + except ( + client.exceptions.BucketAlreadyOwnedByYou, + client.exceptions.BucketAlreadyExists, + ): + pass + except Exception: + print( + "Problem accessing the S3 server? using wrong credentials? Try setting " + "AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and/or AWS_ENDPOINT_URL. " + "Alternatively, use the bundled server `--use-bundled-server`\n", + file=sys.stderr, + flush=True, + ) + raise + return client + + +def run_numpy_like(args, xp): + # Upload data to S3 server + data = numpy.arange(args.nelem, dtype=args.dtype) + recv = xp.empty_like(data) + + client = create_client_and_bucket() + client.put_object(Bucket=args.bucket, Key="data1", Body=bytes(data)) + + def run() -> float: + t0 = time.perf_counter() + with kvikio.RemoteFile(bucket_name=args.bucket, object_name="data1") as f: + res = f.read(recv) + t1 = time.perf_counter() + assert res == args.nbytes, f"IO mismatch, expected {args.nbytes} got {res}" + xp.testing.assert_array_equal(data, recv) + return t1 - t0 + + for _ in range(args.nruns): + yield run() + + +def run_cudf(args, use_kvikio_s3): + import cudf + + # Upload data to S3 server + create_client_and_bucket() + data = cupy.random.rand(args.nelem).astype(args.dtype) + df = cudf.DataFrame({"a": data}) + df.to_parquet(f"s3://{args.bucket}/data1") + + def run() -> float: + t0 = time.perf_counter() + cudf.read_parquet(f"s3://{args.bucket}/data1", use_kvikio_s3=use_kvikio_s3) + t1 = time.perf_counter() + return t1 - t0 + + for _ in range(args.nruns): + yield run() + + +API = { + "cupy-kvikio": partial(run_numpy_like, xp=cupy), + "numpy-kvikio": partial(run_numpy_like, xp=numpy), + "cudf-kvikio": partial(run_cudf, use_kvikio_s3=True), + "cudf-fsspec": partial(run_cudf, use_kvikio_s3=False), +} + + +def main(args): + cupy.cuda.set_allocator(None) # Disable CuPy's default memory pool + cupy.arange(10) # Make sure CUDA is initialized + + kvikio.defaults.num_threads_reset(args.nthreads) + print("Roundtrip benchmark") + print("--------------------------------------") + print(f"nelem | {args.nelem} ({format_bytes(args.nbytes)})") + print(f"dtype | {args.dtype}") + print(f"nthreads | {args.nthreads}") + print(f"nruns | {args.nruns}") + print(f"server | {os.getenv('AWS_ENDPOINT_URL', 'http://*.amazonaws.com')}") + if args.use_bundled_server: + print("--------------------------------------") + print("Using the bundled local server is slow") + print("and can be misleading. Consider using") + print("a local MinIO or officel S3 server.") + print("======================================") + + # Run each benchmark using the requested APIs + for api in args.api: + res = [] + for elapsed in API[api](args): + res.append(elapsed) + + def pprint_api_res(name, samples): + samples = [args.nbytes / s for s in samples] # Convert to throughput + mean = statistics.mean(samples) if len(samples) > 1 else samples[0] + ret = f"{api}-{name}".ljust(18) + ret += f"| {format_bytes(mean).rjust(10)}/s".ljust(14) + if len(samples) > 1: + stdev = statistics.stdev(samples) / mean * 100 + ret += " ± %5.2f %%" % stdev + ret += " (" + for sample in samples: + ret += f"{format_bytes(sample)}/s, " + ret = ret[:-2] + ")" # Replace trailing comma + return ret + + print(pprint_api_res("read", res)) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Roundtrip benchmark") + parser.add_argument( + "-n", + "--nelem", + metavar="NELEM", + default="1024", + type=int, + help="Number of elements (default: %(default)s).", + ) + parser.add_argument( + "--dtype", + metavar="DATATYPE", + default="float32", + type=numpy.dtype, + help="The data type of each element (default: %(default)s).", + ) + parser.add_argument( + "--nruns", + metavar="RUNS", + default=1, + type=int, + help="Number of runs per API (default: %(default)s).", + ) + parser.add_argument( + "-t", + "--nthreads", + metavar="THREADS", + default=1, + type=int, + help="Number of threads to use (default: %(default)s).", + ) + parser.add_argument( + "--use-bundled-server", + action="store_true", + help="Launch and use a local slow S3 server (ThreadedMotoServer).", + ) + parser.add_argument( + "--bucket", + metavar="NAME", + default="kvikio-s3-benchmark", + type=str, + help="Name of the AWS S3 bucket to use (default: %(default)s).", + ) + parser.add_argument( + "--api", + metavar="API", + default=list(API.keys())[0], # defaults to the first API + nargs="+", + choices=tuple(API.keys()) + ("all",), + help="List of APIs to use {%(choices)s} (default: %(default)s).", + ) + args = parser.parse_args() + args.nbytes = args.nelem * args.dtype.itemsize + if "all" in args.api: + args.api = tuple(API.keys()) + + ctx: ContextManager = contextlib.nullcontext() + if args.use_bundled_server: + os.environ["AWS_ENDPOINT_URL"] = f"http://127.0.0.1:{get_local_port()}" + ctx = local_s3_server() + with ctx: + main(args) diff --git a/python/kvikio/kvikio/remote_file.py b/python/kvikio/kvikio/remote_file.py new file mode 100644 index 0000000000..00c788b6b0 --- /dev/null +++ b/python/kvikio/kvikio/remote_file.py @@ -0,0 +1,57 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +from __future__ import annotations + +from typing import Optional + +from kvikio.cufile import IOFuture + + +def is_remote_file_available() -> bool: + try: + import kvikio._lib.remote_handle # noqa: F401 + except ImportError: + return False + else: + return True + + +def _get_remote_remote_file_class(): + if not is_remote_file_available(): + raise RuntimeError( + "RemoteFile not available, please build KvikIO with AWS S3 support" + ) + import kvikio._lib.remote_handle + + return kvikio._lib.remote_handle.RemoteFile + + +class RemoteFile: + """File handle of a remote file""" + + def __init__(self, bucket_name: str, object_name: str): + self._handle = _get_remote_remote_file_class().from_bucket_and_object( + bucket_name, object_name + ) + + @classmethod + def from_url(cls, url: str) -> RemoteFile: + ret = object.__new__(cls) + ret._handle = _get_remote_remote_file_class().from_url(url) + return ret + + def __enter__(self) -> RemoteFile: + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + pass + + def nbytes(self) -> int: + return self._handle.nbytes() + + def pread(self, buf, size: Optional[int] = None, file_offset: int = 0) -> IOFuture: + return IOFuture(self._handle.pread(buf, size, file_offset)) + + def read(self, buf, size: Optional[int] = None, file_offset: int = 0) -> int: + return self.pread(buf, size, file_offset).get() diff --git a/python/kvikio/pyproject.toml b/python/kvikio/pyproject.toml index 3f68177280..9ceb8be8f6 100644 --- a/python/kvikio/pyproject.toml +++ b/python/kvikio/pyproject.toml @@ -38,8 +38,10 @@ classifiers = [ [project.optional-dependencies] test = [ + "boto3>=1.21.21", "cuda-python>=11.7.1,<12.0a0", "dask>=2022.05.2", + "moto>=4.0.8", "pytest", "pytest-cov", ] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`. diff --git a/python/kvikio/tests/test_aws_s3.py b/python/kvikio/tests/test_aws_s3.py new file mode 100644 index 0000000000..04e0d08c0a --- /dev/null +++ b/python/kvikio/tests/test_aws_s3.py @@ -0,0 +1,137 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +import multiprocessing as mp +import os +import socket +import time +from contextlib import contextmanager + +import pytest + +import kvikio + +# TODO: remove before PR merge. Trigger CI error if the remote module wasn't built +import kvikio._lib.remote_handle # isort: skip + +moto = pytest.importorskip("moto", minversion="3.1.6") +boto3 = pytest.importorskip("boto3") + +if not kvikio.is_remote_file_available(): + pytest.skip( + "cannot test remote IO, please build KvikIO with with AWS S3 support", + allow_module_level=True, + ) + +ThreadedMotoServer = pytest.importorskip("moto.server").ThreadedMotoServer + + +@pytest.fixture(scope="session") +def endpoint_ip(): + return "127.0.0.1" + + +@pytest.fixture(scope="session") +def endpoint_port(): + # Return a free port per worker session. + sock = socket.socket() + sock.bind(("127.0.0.1", 0)) + port = sock.getsockname()[1] + sock.close() + return port + + +@contextmanager +def ensure_safe_environment_variables(): + """ + Get a context manager to safely set environment variables + All changes will be undone on close, hence environment variables set + within this contextmanager will neither persist nor change global state. + """ + saved_environ = dict(os.environ) + try: + yield + finally: + os.environ.clear() + os.environ.update(saved_environ) + + +def start_s3_server(ip_address, port): + server = ThreadedMotoServer(ip_address=ip_address, port=port) + server.start() + time.sleep(120) + print("ThreadedMotoServer shutting down because of timeout (120s)") + + +@pytest.fixture(scope="session") +def s3_base(endpoint_ip, endpoint_port): + """ + Fixture to set up moto server in separate process + """ + with ensure_safe_environment_variables(): + # Use fake aws credentials + os.environ["AWS_ACCESS_KEY_ID"] = "foobar_key" + os.environ["AWS_SECRET_ACCESS_KEY"] = "foobar_secret" + os.environ["AWS_SECURITY_TOKEN"] = "foobar_security_token" + os.environ["AWS_SESSION_TOKEN"] = "foobar_session_token" + os.environ["AWS_DEFAULT_REGION"] = "us-east-1" + os.environ["AWS_ENDPOINT_URL"] = f"http://{endpoint_ip}:{endpoint_port}" + + p = mp.Process(target=start_s3_server, args=(endpoint_ip, endpoint_port)) + p.start() + yield os.environ["AWS_ENDPOINT_URL"] + p.kill() + + +@contextmanager +def s3_context(s3_base, bucket, files=None): + if files is None: + files = {} + with ensure_safe_environment_variables(): + client = boto3.client("s3", endpoint_url=s3_base) + client.create_bucket(Bucket=bucket, ACL="public-read-write") + for f, data in files.items(): + client.put_object(Bucket=bucket, Key=f, Body=data) + yield + for f, data in files.items(): + try: + client.delete_object(Bucket=bucket, Key=f) + except Exception: + pass + + +def test_read(s3_base, xp): + bucket_name = "test_read" + object_name = "a1" + a = xp.arange(10_000_000) + with s3_context(s3_base=s3_base, bucket=bucket_name, files={object_name: bytes(a)}): + with kvikio.RemoteFile(bucket_name, object_name) as f: + assert f.nbytes() == a.nbytes + b = xp.empty_like(a) + assert f.read(buf=b) == a.nbytes + xp.testing.assert_array_equal(a, b) + + +@pytest.mark.parametrize( + "start,end", + [ + (0, 10 * 4096), + (1, int(1.3 * 4096)), + (int(2.1 * 4096), int(5.6 * 4096)), + (42, int(2**23)), + ], +) +def test_read_with_file_offset(s3_base, xp, start, end): + bucket_name = "test_read" + object_name = "a1" + a = xp.arange(end, dtype=xp.int64) + with s3_context(s3_base=s3_base, bucket=bucket_name, files={object_name: bytes(a)}): + with kvikio.RemoteFile(bucket_name, object_name) as f: + b = xp.zeros(shape=(end - start,), dtype=xp.int64) + assert f.read(b, file_offset=start * a.itemsize) == b.nbytes + xp.testing.assert_array_equal(a[start:end], b) + + with kvikio.RemoteFile.from_url(f"s3://{bucket_name}/{object_name}") as f: + b = xp.zeros(shape=(end - start,), dtype=xp.int64) + assert f.read(b, file_offset=start * a.itemsize) == b.nbytes + xp.testing.assert_array_equal(a[start:end], b) diff --git a/python/kvikio/tests/test_benchmarks.py b/python/kvikio/tests/test_benchmarks.py index 3bdaf6613e..30cee08be9 100644 --- a/python/kvikio/tests/test_benchmarks.py +++ b/python/kvikio/tests/test_benchmarks.py @@ -8,6 +8,8 @@ import pytest +import kvikio + benchmarks_path = ( Path(os.path.realpath(__file__)).parent.parent / "kvikio" / "benchmarks" ) @@ -78,3 +80,47 @@ def test_zarr_io(run_cmd, tmp_path, api): cwd=benchmarks_path, ) assert retcode == 0 + + +@pytest.mark.parametrize( + "api", + [ + "cupy-kvikio", + "numpy-kvikio", + "cudf-kvikio", + "cudf-fsspec", + ], +) +def test_aws_s3_io(run_cmd, api): + """Test benchmarks/aws_s3_io.py""" + + if not kvikio.is_remote_file_available(): + pytest.skip( + "cannot test remote IO, please build KvikIO with with AWS S3 support", + allow_module_level=True, + ) + pytest.importorskip("boto3") + pytest.importorskip("moto") + if "cudf" in api: + pytest.importorskip("cudf") + + if api == "cudf-kvikio": + pytest.skip( + "Enable when has been merged" + ) + + retcode = run_cmd( + cmd=[ + sys.executable or "python", + "aws_s3_io.py", + "--use-bundled-server", + "-n", + "1000", + "-t", + "4", + "--api", + api, + ], + cwd=benchmarks_path, + ) + assert retcode == 0