Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create embedding cache interface and impl RocksDB cache #2858

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/


#pragma once

namespace embedding_cache {

class EmbeddingCacheInterface {
public:
/**
* @brief getter for the embedding.
*
* @param indices The indices of the embeddings to be retrieved.
* @param weights The weights (placeholders) of the embeddings to update in
* place.
* @param count The number of embeddings to be retrieved.
*/
virtual void get(
const at::Tensor& indices,
const at::Tensor& weights,
const at::Tensor& count) = 0;

/**
* @brief setter for the embedding.
*
* @param indices The indices of the embeddings to be update.
* @param weights The weights of the embeddings to update.
* @param count The number of embeddings to update.
*/
virtual void set(
const at::Tensor& indices,
const at::Tensor& weights,
const at::Tensor& count) = 0;

virtual ~EmbeddingCacheInterface() {}
};

} // namespace embedding_cache
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/


#include "fbcode/deeplearning/fbgemm/fbgemm_gpu/src/ssd_split_embeddings_cache/rocksdb_embedding_cache.h"
#include <folly/logging/xlog.h>

namespace embedding_cache {

RocksDBEmbeddingCache::RocksDBEmbeddingCache(
std::unique_ptr<ssd::EmbeddingRocksDB> rocksdb) noexcept
: rocksdb_(std::move(rocksdb)) {
XLOG(INFO) << "Initializing RocksDBEmbeddingCache";
}

void RocksDBEmbeddingCache::get(
const at::Tensor& indices,
const at::Tensor& weights,
const at::Tensor& count) {
rocksdb_->get(indices, weights, count);
}

void RocksDBEmbeddingCache::set(
const at::Tensor& indices,
const at::Tensor& weights,
const at::Tensor& count) {
rocksdb_->set(indices, weights, count);
}

} // namespace embedding_cache
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/


#pragma once

#include <ATen/ATen.h>
#include <string>
#include "fbcode/deeplearning/fbgemm/fbgemm_gpu/src/ssd_split_embeddings_cache/embedding_cache_interface.h"
#include "fbcode/deeplearning/fbgemm/fbgemm_gpu/src/ssd_split_embeddings_cache/ssd_table_batched_embeddings.h"

namespace embedding_cache {

class RocksDBEmbeddingCache : public EmbeddingCacheInterface {
public:
explicit RocksDBEmbeddingCache(
std::unique_ptr<ssd::EmbeddingRocksDB> rocksdb) noexcept;

void get(
const at::Tensor& indices,
const at::Tensor& weights,
const at::Tensor& count) override;
void set(
const at::Tensor& indices,
const at::Tensor& weights,
const at::Tensor& count) override;

private:
std::unique_ptr<ssd::EmbeddingRocksDB> rocksdb_;
};

} // namespace embedding_cache
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/


#include <ATen/ATen.h>
#include <folly/logging/xlog.h>
#include <gtest/gtest.h>
#include <filesystem>
#include "deeplearning/fbgemm/fbgemm_gpu/src/ssd_split_embeddings_cache/rocksdb_embedding_cache.h"
#include "deeplearning/fbgemm/fbgemm_gpu/src/ssd_split_embeddings_cache/ssd_table_batched_embeddings.h"

namespace embedding_cache {

TEST(RocksDbEmbeddingCacheTest, TestPutAndGet) {
std::filesystem::path temp_dir = std::filesystem::temp_directory_path();
std::filesystem::path rocksdb_dir = temp_dir / "rocksdb";
std::filesystem::create_directories(rocksdb_dir);
auto rocks_db = std::make_unique<ssd::EmbeddingRocksDB>(
rocksdb_dir,
8, // num_shards,
8, // num_threads,
0, // memtable_flush_period,
0, // memtable_flush_offset,
4, // l0_files_per_compact,
128, // max embedding dimension,
0, // rate_limit_mbps,
1, // size_ratio,
8, // compaction_trigger,
536870912, // 512M write_buffer_size,
8, // max_write_buffer_num,
-0.01, // uniform_init_lower,
0.01, // uniform_init_upper,
32, // row_storage_bitwidth = 32,
0 // cache_size = 0
);

auto cache = std::make_unique<RocksDBEmbeddingCache>(std::move(rocks_db));
auto write_indices =
at::tensor({10, 2, 1}, at::TensorOptions().dtype(at::kLong));

auto EMBEDDING_DIMENSION = 3;
auto write_buffer = at::randn(
{write_indices.size(0), EMBEDDING_DIMENSION},
at::TensorOptions().dtype(at::kFloat));
XLOG(INFO) << "weights to write:\n" << write_buffer;
auto write_count = at::tensor({3}, at::TensorOptions().dtype(at::kLong));
cache->set(write_indices, write_buffer, write_count);

auto read_indices = at::tensor({1, 2}, at::TensorOptions().dtype(at::kLong));
auto read_buffer = at::empty(
{read_indices.size(0), EMBEDDING_DIMENSION},
at::TensorOptions().dtype(at::kFloat));
XLOG(INFO) << "read_indices:\n" << read_indices;
auto read_count = at::tensor({2}, at::TensorOptions().dtype(at::kLong));
cache->get(read_indices, read_buffer, read_count);
XLOG(INFO) << "weights loaded for index 1:\n" << read_buffer;

EXPECT_EQ(
write_buffer.index({2, 0}).item<float>(),
read_buffer.index({0, 0}).item<float>());

std::filesystem::remove_all(rocksdb_dir);
}
} // namespace embedding_cache
Loading