Skip to content

Commit

Permalink
Fix issue with memory pools and add mutexes
Browse files Browse the repository at this point in the history
  • Loading branch information
wdeconinck committed Oct 29, 2024
1 parent f0c7a26 commit 6920ce1
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 48 deletions.
93 changes: 62 additions & 31 deletions pluto/src/pluto/memory_resource/MemoryPoolResource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ static std::size_t to_bytes(const std::string& str) {
return std::stoull(str);
}

static std::string bytes_to_string(std::size_t bytes) {
std::string str;
constexpr std::size_t MB = 1024*1024;
constexpr std::size_t GB = 1024*MB;
if( bytes >= GB ) {
str = std::to_string(bytes/GB) + "GB";
}
else {
str = std::to_string(bytes/MB) + "MB";
}
return str;
}


static pool_options default_pool_options_;
static bool default_pool_options_setup_ = false;

Expand All @@ -60,6 +74,51 @@ void set_default_pool_options(pool_options options) {
default_pool_options_setup_ = true;
}

static GatorMemoryResource* to_gator_resource(memory_resource* pool) {
GatorMemoryResource* gator;
if (TraceMemoryResource* traced = dynamic_cast<TraceMemoryResource*>(pool)) {
gator = dynamic_cast<GatorMemoryResource*>(traced->upstream_resource());
}
else {
gator = dynamic_cast<GatorMemoryResource*>(pool);
}
return gator;
}

void* MemoryPoolResource::do_allocate(std::size_t bytes, std::size_t alignment) {
std::lock_guard lock(mtx_);
return resource(bytes)->allocate(bytes, alignment);
}

void MemoryPoolResource::do_deallocate(void* ptr, std::size_t bytes, std::size_t alignment) {
std::lock_guard lock(mtx_);
if (pools_.size() == 1) {
pools_[0]->deallocate(ptr, bytes, alignment);
}
else {
for (int i=0; i<pool_block_sizes_.size(); ++i) {
if (pools_[i]) {
auto& gator = to_gator_resource(pools_[i].get())->gator();
if (gator.thisIsMyPointer(ptr)) {
pools_[i]->deallocate(ptr, bytes, alignment);
}

// Cleanup empty gator when a larger gator exists
if (gator.get_bytes_currently_allocated() == 0 && pool_block_size_ > pool_block_sizes_[i]) {
#if PLUTO_DEBUGGING
std::cout << " - Releasing memory_pool["<<i<<"] with block_size " << bytes_to_string(pool_block_sizes_[i]) << " and capacity " << bytes_to_string(gator.get_pool_capacity()) << std::endl;
#endif
pools_[i].reset();
}
}
}
}
}

bool MemoryPoolResource::do_is_equal(const memory_resource& other) const noexcept {
if (this == &other) { return true; }
return false;
}

memory_resource* MemoryPoolResource::resource(std::size_t bytes) {
constexpr std::size_t MB = 1024*1024;
Expand Down Expand Up @@ -91,15 +150,8 @@ memory_resource* MemoryPoolResource::resource(std::size_t bytes) {
options.initial_size = blocks_per_chunk*pool_block_size_;
options.grow_size = blocks_per_chunk*pool_block_size_;
if (TraceOptions::instance().enabled) {
std::string size_str;
if( pool_block_size_ >= GB ) {
size_str = std::to_string(pool_block_size_/GB) +"GB";
}
else {
size_str = std::to_string(pool_block_size_/MB) +"MB";
}
pools_[pool_index] =
std::make_unique<TraceMemoryResource>("gator["+size_str+"]",
std::make_unique<TraceMemoryResource>("gator["+bytes_to_string(pool_block_size_)+"]",
std::make_unique<GatorMemoryResource>(options, upstream_) );
}
else {
Expand All @@ -110,30 +162,8 @@ memory_resource* MemoryPoolResource::resource(std::size_t bytes) {
return pool_;
}

static GatorMemoryResource* to_gator_resource(memory_resource* pool) {
GatorMemoryResource* gator;
if (TraceMemoryResource* traced = dynamic_cast<TraceMemoryResource*>(pool)) {
gator = dynamic_cast<GatorMemoryResource*>(traced->upstream_resource());
}
else {
gator = dynamic_cast<GatorMemoryResource*>(pool);
}
return gator;
}

void MemoryPoolResource::cleanup_unused_gators() {
for (int i=0; i<pool_block_sizes_.size(); ++i) {
if (pools_[i] && pool_block_size_ > pool_block_sizes_[i]) {
#if PLUTO_DEBUGGING
auto& gator = to_gator_resource(pools_[i].get())->gator();
std::cout << " - Releasing memory_pool["<<i<<"] with block_size " << pool_block_sizes_[i] << " and capacity " << gator.get_pool_capacity() << std::endl;
#endif
pools_[i].reset();
}
}
}

std::size_t MemoryPoolResource::size() const {
std::lock_guard lock(mtx_);
std::size_t _size{0};
for (const auto& pool: pools_) {
if (pool) {
Expand All @@ -147,6 +177,7 @@ std::size_t MemoryPoolResource::size() const {
}

std::size_t MemoryPoolResource::capacity() const {
std::lock_guard lock(mtx_);
std::size_t _capacity{0};
for (const auto& pool: pools_) {
if (pool) {
Expand Down
24 changes: 7 additions & 17 deletions pluto/src/pluto/memory_resource/MemoryPoolResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include <memory>
#include <vector>
#include <mutex>

#include "pluto/memory_resource/memory_resource.h"

Expand Down Expand Up @@ -44,6 +45,7 @@ class MemoryPoolResource : public memory_pool_resource {
}

void release() override {
std::lock_guard lock(mtx_);
pools_.clear();
}

Expand All @@ -60,26 +62,13 @@ class MemoryPoolResource : public memory_pool_resource {
}

protected:
void* do_allocate(std::size_t bytes, std::size_t alignment) override {
return resource(bytes)->allocate(bytes, alignment);
}
void* do_allocate(std::size_t bytes, std::size_t alignment) override;
void do_deallocate(void* ptr, std::size_t bytes, std::size_t alignment) override;
bool do_is_equal(const memory_resource& other) const noexcept override;

void do_deallocate(void* ptr, std::size_t bytes, std::size_t alignment) override {
resource(bytes)->deallocate(ptr, bytes, alignment);
// possible to cleanup no longer used gators
cleanup_unused_gators();
}

bool do_is_equal(const memory_resource& other) const noexcept override {
if (this == &other) { return true; }
return false;
}

// A suitable pool or upstream resource to allocate and deallocate given bytes
// A suitable pool or upstream resource to allocate given bytes
memory_resource* resource(std::size_t bytes);

void cleanup_unused_gators();

private:
pool_options options_;
std::unique_ptr<memory_resource> owned_upstream_;
Expand All @@ -88,6 +77,7 @@ class MemoryPoolResource : public memory_pool_resource {
std::vector<std::size_t> pool_block_sizes_;
memory_resource* pool_;
std::size_t pool_block_size_;
mutable std::mutex mtx_;
};

}
11 changes: 11 additions & 0 deletions pluto/src/pluto/memory_resource/yakl/YAKL_Gator.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,17 @@ namespace yakl {
};


bool thisIsMyPointer(void *ptr) {
std::lock_guard lock(mtx1);
// Go through each pool.
for (auto it = pools.rbegin() ; it != pools.rend() ; it++) {
if (it->thisIsMyPointer(ptr)) {
return true;
}
}
return false;
}

/** @brief Free the passed pointer, and return the pointer to allocated space.
* @details Attempting to free a pointer not found in the list of pools will result in a thrown exception */
void free(void *ptr , char const * label = "" ) {
Expand Down
1 change: 1 addition & 0 deletions pluto/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ add_subdirectory(sandbox)

ecbuild_add_test( TARGET pluto_test_pluto_f SOURCES test_pluto_f.F90 LIBS pluto_f )

ecbuild_add_test( TARGET pluto_test_memory_pool SOURCES pluto_test_memory_pool.cc LIBS pluto )
55 changes: 55 additions & 0 deletions pluto/tests/pluto_test_memory_pool.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@

/*
* (C) Copyright 2024- ECMWF.
*
* This software is licensed under the terms of the Apache Licence Version 2.0
* which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
* In applying this licence, ECMWF does not waive the privileges and immunities
* granted to it by virtue of its status as an intergovernmental organisation
* nor does it submit to any jurisdiction.
*/

#include <iostream>
#include <cstddef>

#include "pluto/pluto.h"

[[maybe_unused]] constexpr std::size_t kb = 1024;
[[maybe_unused]] constexpr std::size_t mb = 1024*kb;
[[maybe_unused]] constexpr std::size_t gb = 1024*mb;

class vector {
public:
vector(std::size_t n, const pluto::allocator<std::byte>& alloc) :
size_{n},
alloc_{alloc} {
if (size_) {
data_ = alloc_.allocate(size_);
}
}
~vector() {
if(size_) {
alloc_.deallocate(data_,size_);
}
}
std::byte* data_ = nullptr;
std::size_t size_ = 0;
pluto::allocator<std::byte> alloc_;
};


int main(int argc, char* argv[]) {
std::cout << "BEGIN" << std::endl;

pluto::TraceOptions::instance().enabled = true;

pluto::allocator<std::byte> allocator(pluto::pool_resource());
// using vector = pluto::host::vector<std::byte>;

vector array1(200*mb,allocator);
vector array2(200*mb,allocator);
vector array3(1.2*gb,allocator);


std::cout << "END" << std::endl;
}

0 comments on commit 6920ce1

Please sign in to comment.