Skip to content

Commit

Permalink
up
Browse files Browse the repository at this point in the history
  • Loading branch information
beats-dh committed Jan 1, 2025
1 parent 211a288 commit 7ef9013
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 45 deletions.
7 changes: 5 additions & 2 deletions src/server/network/message/outputmessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,19 @@ void OutputMessagePool::removeProtocolFromAutosend(const Protocol_ptr &protocol)
OutputMessage_ptr OutputMessagePool::getOutputMessage() {
OutputMessage* rawPtr = outputMessageAllocator.allocate(1);

if (!rawPtr) {
throw std::runtime_error("Failed to allocate OutputMessage");
}

try {
new (rawPtr) OutputMessage();
rawPtr->reset(); // Reutiliza o objeto
} catch (...) {
outputMessageAllocator.deallocate(rawPtr, 1);
throw;
}

return { rawPtr, [](OutputMessage* ptr) {
if (ptr != nullptr) {
ptr->~OutputMessage();
outputMessageAllocator.deallocate(ptr, 1);
}
} };
Expand Down
6 changes: 6 additions & 0 deletions src/server/network/message/outputmessage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ class OutputMessage : public NetworkMessage {
OutputMessage(const OutputMessage &) = delete;
OutputMessage &operator=(const OutputMessage &) = delete;

void reset() {
NetworkMessage::reset();
outputBufferStart = INITIAL_BUFFER_POSITION;
}


uint8_t* getOutputBuffer() {
return buffer.data() + outputBufferStart;
}
Expand Down
109 changes: 66 additions & 43 deletions src/utils/lockfree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,18 @@
#endif

constexpr size_t STATIC_PREALLOCATION_SIZE = 500;
constexpr size_t NUM_SHARDS = 4;

template <typename T, size_t CAPACITY>
struct LockfreeFreeList {
using FreeList = atomic_queue::AtomicQueue2<T*, CAPACITY>;
using FreeList = atomic_queue::AtomicQueue2<T*, CAPACITY / NUM_SHARDS>;

// Increased for better cache utilization
static constexpr size_t DEFAULT_BATCH_SIZE = 128;
static constexpr size_t PREFETCH_DISTANCE = 4;

// Aligned structure to avoid false sharing
struct alignas(CACHE_LINE_SIZE) AlignedCounters {
std::atomic<size_t> count;
char padding[CACHE_LINE_SIZE - sizeof(std::atomic<size_t>)];
char padding[CACHE_LINE_SIZE - sizeof(std::atomic<size_t>)] {};

AlignedCounters() :
count(0) { }
Expand All @@ -146,61 +145,80 @@ struct LockfreeFreeList {
static AlignedCounters allocated_count;
static AlignedCounters failed_allocations;

// Thread-local memory pool
static thread_local std::array<T*, DEFAULT_BATCH_SIZE> local_cache;
static thread_local std::vector<T*> local_cache;
static thread_local size_t local_cache_size;

[[nodiscard]] static FreeList &get() noexcept {
static FreeList freeList;
return freeList;
struct MemoryBlock {
T* start;
size_t used;
size_t capacity;

MemoryBlock(size_t block_size, std::pmr::memory_resource* resource) :
start(static_cast<T*>(resource->allocate(block_size * sizeof(T), alignof(T)))),
used(0), capacity(block_size) { }

~MemoryBlock() = default;

T* allocate() {
if (used < capacity) {
return &start[used++];
}
return nullptr;
}

[[nodiscard]] bool has_space() const {
return used < capacity;
}
};

static std::vector<MemoryBlock> memory_blocks;
static std::pmr::memory_resource* default_resource;

static FreeList &get_sharded_list() noexcept {
static thread_local size_t shard_id = std::hash<std::thread::id> {}(std::this_thread::get_id()) % NUM_SHARDS;
static std::array<FreeList, NUM_SHARDS> freeLists;
return freeLists[shard_id];
}

static void preallocate(const size_t count, std::pmr::memory_resource* resource = std::pmr::get_default_resource()) noexcept {
auto &freeList = get();
size_t batchSize = DEFAULT_BATCH_SIZE;
memory_blocks.emplace_back(batchSize, resource);

T* batch[DEFAULT_BATCH_SIZE];
size_t successful = 0;

for (size_t i = 0; i < count; i += DEFAULT_BATCH_SIZE) {
const size_t batchSize = std::min(DEFAULT_BATCH_SIZE, count - i);

// Pre-allocate with prefetch
for (size_t j = 0; j < batchSize; ++j) {
if (j + PREFETCH_DISTANCE < batchSize) {
PREFETCH(&batch[j + PREFETCH_DISTANCE]);
}
batch[j] = static_cast<T*>(resource->allocate(sizeof(T), alignof(T)));
for (size_t i = 0; i < batchSize; ++i) {
batch[i] = memory_blocks.back().allocate();
if (!batch[i]) {
break;
}
}

// Optimized insertion
for (size_t j = 0; j < batchSize; ++j) {
if (UNLIKELY(!freeList.try_push(batch[j]))) {
for (size_t k = j; k < batchSize; ++k) {
resource->deallocate(batch[k], sizeof(T), alignof(T));
}
failed_allocations.count.fetch_add(1, std::memory_order_relaxed);
return;
}
++successful;
for (size_t i = 0; i < batchSize; ++i) {
if (UNLIKELY(!get_sharded_list().try_push(batch[i]))) {
resource->deallocate(batch[i], sizeof(T), alignof(T));
failed_allocations.count.fetch_add(1, std::memory_order_relaxed);
return;
}
++successful;
}

allocated_count.count.fetch_add(successful, std::memory_order_release);
}

// Thread-local cache
[[nodiscard]] static T* fast_allocate() noexcept {
if (LIKELY(local_cache_size > 0)) {
return local_cache[--local_cache_size];
}

// Refill local cache
auto &freeList = get();
size_t fetched = 0;
auto &freeList = get_sharded_list();
while (fetched < DEFAULT_BATCH_SIZE) {
T* ptr;
if (!freeList.try_pop(ptr)) {
T* item;
if (!freeList.try_pop(item)) {
break;
}
local_cache[fetched++] = ptr;
local_cache[fetched++] = item;
}

local_cache_size = fetched;
Expand All @@ -213,16 +231,13 @@ struct LockfreeFreeList {
return true;
}

// Local cache full, try to empty half
auto &freeList = get();
const size_t half = DEFAULT_BATCH_SIZE / 2;
size_t half = DEFAULT_BATCH_SIZE / 2;
for (size_t i = 0; i < half; ++i) {
if (!freeList.try_push(local_cache[i])) {
if (!get_sharded_list().try_push(local_cache[i])) {
return false;
}
}

// Move the other half to the beginning
std::move(local_cache.begin() + half, local_cache.begin() + DEFAULT_BATCH_SIZE, local_cache.begin());
local_cache_size = half;
local_cache[local_cache_size++] = ptr;
Expand All @@ -235,8 +250,10 @@ struct LockfreeFreeList {

static void try_grow() noexcept {
const size_t current = get_allocated_count();
if (LIKELY(CAPACITY - current >= DEFAULT_BATCH_SIZE)) {
preallocate(DEFAULT_BATCH_SIZE);
const size_t target = current + DEFAULT_BATCH_SIZE;
if (CAPACITY - current < DEFAULT_BATCH_SIZE) {
const size_t growth = std::min(CAPACITY, target * 2);
preallocate(growth - current);
}
}
};
Expand All @@ -248,11 +265,17 @@ template <typename T, size_t CAPACITY>
typename LockfreeFreeList<T, CAPACITY>::AlignedCounters LockfreeFreeList<T, CAPACITY>::failed_allocations;

template <typename T, size_t CAPACITY>
thread_local std::array<T*, LockfreeFreeList<T, CAPACITY>::DEFAULT_BATCH_SIZE> LockfreeFreeList<T, CAPACITY>::local_cache;
thread_local std::vector<T*> LockfreeFreeList<T, CAPACITY>::local_cache(DEFAULT_BATCH_SIZE);

template <typename T, size_t CAPACITY>
thread_local size_t LockfreeFreeList<T, CAPACITY>::local_cache_size = 0;

template <typename T, size_t CAPACITY>
std::vector<typename LockfreeFreeList<T, CAPACITY>::MemoryBlock> LockfreeFreeList<T, CAPACITY>::memory_blocks;

template <typename T, size_t CAPACITY>
std::pmr::memory_resource* LockfreeFreeList<T, CAPACITY>::default_resource = std::pmr::get_default_resource();

template <typename T, size_t CAPACITY>
class LockfreePoolingAllocator {
public:
Expand Down

0 comments on commit 7ef9013

Please sign in to comment.