Skip to content

Commit

Permalink
[feat] Add redis_hash_tags_hypodispersion to redis backend config.
Browse files Browse the repository at this point in the history
Distribution of storag_slice tag will be hypodispersion in 16354 regardless cluster slot,
but still depends on redis_hash_tags_import/runtime if they aren't empty.

[feat] Add using_hash_storage_slice in redis config.
If True, IDs will be calculated hash(CRC32) value and then MOD to decide which bucket number they belong to.
If False, only calculate the remainder.

[feat] Change redis_connection_mode config
now redis_connection_mode = 2 will be standalone mode.
  • Loading branch information
MoFHeka committed May 26, 2022
1 parent b02d738 commit cfa891d
Show file tree
Hide file tree
Showing 11 changed files with 373 additions and 220 deletions.
6 changes: 5 additions & 1 deletion docs/api_docs/tfra/dynamic_embedding/RedisBackend.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Below is an example of a JSON file, along with comments on the corresponding pro
**Attention! Json files cannot contain comments when actually used!**
```json
{
"redis_connection_mode": 1, // ClusterMode = 0, SentinelMode = 1, StreamMode = 2
"redis_connection_mode": 2, // ClusterMode = 0, SentinelMode = 1, StandaloneMode = 2
"redis_master_name": "master",

// connection_options
Expand All @@ -50,8 +50,12 @@ Below is an example of a JSON file, along with comments on the corresponding pro
// Below there is user-defined parameters in this custom op, not Redis setting parameters
"storage_slice_import": 2, // If storage_slice_import is not equal to storage_slice, rehash will happen. Equaling -1 means same as storage_slice.
"storage_slice": 2, // For deciding bucket number, which usually is how many Redis instance may be used in the trainning.
"using_hash_storage_slice":
False, // If True, IDs will be calculated hash(CRC32) value and then MOD to decide which bucket number they belong to. If False, only calculate the remainder.
"keys_sending_size": 1024, // Determines how many keys to send at a time for performance tuning
"using_md5_prefix_name": False, // 1=true, 0=false
"redis_hash_tags_hypodispersion":
True, // distribution of storag_slice will be hypodispersion in 16354 regardless cluster slot, but still depends on redis_hash_tags_import/runtime if they aren't empty.
"model_tag_import": "test", // model_tag_import for version and any other information from last time.
"redis_hash_tags_import": ["{6379}","{26379}"], // Deciding hash tag for every bucket from last time, Note that the hash tag must be wrapped in curly braces {}.
"model_tag_runtime": "test", // model_tag_runtime for version and any other information for now.
Expand Down
4 changes: 3 additions & 1 deletion docs/api_docs/tfra/dynamic_embedding/RedisTableConfig.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ assign the embedding table starage properties.
An example of a configuration file is shown below:
```python
{
"redis_connection_mode": 1,
"redis_connection_mode": 2,
"redis_master_name": "master",
"redis_host_ip": [
"127.0.0.1"
Expand All @@ -54,8 +54,10 @@ An example of a configuration file is shown below:
"redis_sentinel_socket_timeout": 1000,
"storage_slice_import": 1,
"storage_slice": 1,
"using_hash_storage_slice": False,
"keys_sending_size": 1024,
"using_md5_prefix_name": False,
"redis_hash_tags_hypodispersion": False,
"model_tag_import": "test",
"redis_hash_tags_import": [],
"model_tag_runtime": "test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Below is an example of a JSON file, along with comments on the corresponding pro
**Attention! Json files cannot contain comments when actually used!**
```json
{
"redis_connection_mode": 1, // ClusterMode = 0, SentinelMode = 1, StreamMode = 2
"redis_connection_mode": 2, // ClusterMode = 0, SentinelMode = 1, StandaloneMode = 2
"redis_master_name": "master",

// connection_options
Expand All @@ -50,8 +50,12 @@ Below is an example of a JSON file, along with comments on the corresponding pro
// Below there is user-defined parameters in this custom op, not Redis setting parameters
"storage_slice_import": 2, // If storage_slice_import is not equal to storage_slice, rehash will happen. Equaling -1 means same as storage_slice.
"storage_slice": 2, // For deciding bucket number, which usually is how many Redis instance may be used in the trainning.
"using_hash_storage_slice":
False, // If True, IDs will be calculated hash(CRC32) value and then MOD to decide which bucket number they belong to. If False, only calculate the remainder.
"keys_sending_size": 1024, // Determines how many keys to send at a time for performance tuning
"using_md5_prefix_name": False, // 1=true, 0=false
"redis_hash_tags_hypodispersion":
False, // distribution of storag_slice will be hypodispersion in 16354 regardless cluster slot, but still depends on redis_hash_tags_import/runtime if they aren't empty.
"model_tag_import": "test", // model_tag_import for version and any other information from last time.
"redis_hash_tags_import": ["{6379}","{26379}"], // Deciding hash tag for every bucket from last time, Note that the hash tag must be wrapped in curly braces {}.
"model_tag_runtime": "test", // model_tag_runtime for version and any other information for now.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ class RedisWrapper<RedisInstance, K, V,
}

virtual std::unique_ptr<redisReply, ::sw::redis::ReplyDeleter> MgetInBucket(
const Tensor &keys, const int64 begin, const int64 max_i,
const Tensor &keys, const int64_t begin, const int64_t max_i,
const std::string &keys_prefix_name_slice) override {
std::unique_ptr<BucketContext> bucket_context_temp(new BucketContext());
const static char *redis_command = "HMGET";
Expand Down Expand Up @@ -937,8 +937,8 @@ every bucket has its own BucketContext for sending data---for locating reply-
*/
virtual std::vector<std::unique_ptr<redisReply, ::sw::redis::ReplyDeleter>>
MgetCommand(
const Tensor &keys, ThreadContext *thread_context, const int64 begin,
const int64 max_i,
const Tensor &keys, ThreadContext *thread_context, const int64_t begin,
const int64_t max_i,
const std::vector<std::string> &keys_prefix_name_slices) override {
const int &&total = max_i - begin;
const int &&argc = total + 2;
Expand All @@ -953,7 +953,7 @@ every bucket has its own BucketContext for sending data---for locating reply-

const unsigned &storage_slice = redis_connection_params.storage_slice;
const unsigned &&vector_len =
(static_cast<int64>(reinterpret_cast<int>(argc)) /
(static_cast<int64_t>(reinterpret_cast<int>(argc)) /
redis_connection_params.storage_slice) +
2;

Expand All @@ -968,7 +968,8 @@ every bucket has its own BucketContext for sending data---for locating reply-
unsigned *pbucket_loc = thread_context->bucket_locs->data();
unsigned key_bucket_locs = 0;
for (; pk_raw != pk_raw_end; ++pk_raw) {
key_bucket_locs = KBucketNum<K>(pk_raw, storage_slice);
key_bucket_locs =
KBucketNum<K>(this->K_bucket_num_handle, pk_raw, storage_slice);
// The bucket to which the key belongs is recorded to facilitate future
// memory writes that do not recompute the redis hash
*pbucket_loc = key_bucket_locs;
Expand Down Expand Up @@ -1019,7 +1020,7 @@ every bucket has its own BucketContext for sending data---for locating reply-
inline void CopyDefaultToTensor(const bool is_full_default, const V *pv_raw,
const V *dft_raw,
const V *const dft_raw_begin,
const int64 Velems_per_dim0) {
const int64_t Velems_per_dim0) {
if (is_full_default) {
DefaultMemcpyToTensor<V>(
pv_raw, dft_raw,
Expand All @@ -1036,8 +1037,8 @@ every bucket has its own BucketContext for sending data---for locating reply-
ThreadContext *thread_context,
std::vector<std::unique_ptr<redisReply, ::sw::redis::ReplyDeleter>>
&reply,
const int64 begin, const int64 max_i,
const int64 Velems_per_dim0) override {
const int64_t begin, const int64_t max_i,
const int64_t Velems_per_dim0) override {
const V *pv_raw =
reinterpret_cast<const V *>(values->tensor_data().data()) +
begin * Velems_per_dim0;
Expand Down Expand Up @@ -1096,8 +1097,8 @@ every bucket has its own BucketContext for sending data---for locating reply-
const bool is_full_default, ThreadContext *thread_context,
std::vector<std::unique_ptr<redisReply, ::sw::redis::ReplyDeleter>>
&reply,
const int64 begin, const int64 max_i,
const int64 Velems_per_dim0) override {
const int64_t begin, const int64_t max_i,
const int64_t Velems_per_dim0) override {
const V *pv_raw =
reinterpret_cast<const V *>(values->tensor_data().data()) +
begin * Velems_per_dim0;
Expand All @@ -1117,7 +1118,7 @@ every bucket has its own BucketContext for sending data---for locating reply-
redisReply *temp_reply;
bool print_once[storage_slice];
memset(print_once, false, sizeof(print_once));
for (int64 i = 0, j = begin; i < (max_i - begin);
for (int64_t i = 0, j = begin; i < (max_i - begin);
++i, ++j, pv_raw += Velems_per_dim0, dft_raw += Velems_per_dim0) {
bucket_loc = (*bucket_locs)[i];
if (reply[bucket_loc] != nullptr) {
Expand Down Expand Up @@ -1157,7 +1158,7 @@ every bucket has its own BucketContext for sending data---for locating reply-

virtual Status MsetCommand(
const Tensor &keys, const Tensor &values, ThreadContext *thread_context,
const int64 begin, const int64 max_i, const int64 Velems_per_dim0,
const int64_t begin, const int64_t max_i, const int64_t Velems_per_dim0,
const std::vector<std::string> &keys_prefix_name_slices) override {
const int &&total = max_i - begin;
const int &&argc = total * 2 + 2;
Expand All @@ -1177,7 +1178,7 @@ every bucket has its own BucketContext for sending data---for locating reply-

const unsigned &storage_slice = redis_connection_params.storage_slice;
const unsigned &&vector_len =
(static_cast<int64>(reinterpret_cast<int>(argc)) /
(static_cast<int64_t>(reinterpret_cast<int>(argc)) /
redis_connection_params.storage_slice) +
2;

Expand All @@ -1198,7 +1199,8 @@ every bucket has its own BucketContext for sending data---for locating reply-
VCATS_temp = VContentAndTypeSize<V>(VCATS_temp, Velems_per_dim0,
V_byte_size, pv_raw, buff_temp[i]);
key_bucket_locs =
KBucketNum<K>(pk_raw, storage_slice); // TODO: change it to AVX512
KBucketNum<K>(this->K_bucket_num_handle, pk_raw,
storage_slice); // TODO: change it to AVX512

// Direct access to Tensor data in TensorFlow
thread_context->HandlePushBack(
Expand Down Expand Up @@ -1246,8 +1248,8 @@ every bucket has its own BucketContext for sending data---for locating reply-

virtual Status MaccumCommand(
const Tensor &keys, const Tensor &values_or_delta, const Tensor &exists,
ThreadContext *thread_context, const int64 begin, const int64 max_i,
const int64 Velems_per_dim0,
ThreadContext *thread_context, const int64_t begin, const int64_t max_i,
const int64_t Velems_per_dim0,
const std::vector<std::string> &keys_prefix_name_slices) override {
const int &&total = max_i - begin;
const int &&argc = total * 2 + 4;
Expand All @@ -1270,7 +1272,7 @@ every bucket has its own BucketContext for sending data---for locating reply-

const unsigned &storage_slice = redis_connection_params.storage_slice;
const unsigned &&vector_len =
(static_cast<int64>(reinterpret_cast<int>(argc)) /
(static_cast<int64_t>(reinterpret_cast<int>(argc)) /
redis_connection_params.storage_slice) +
4;

Expand All @@ -1292,7 +1294,8 @@ every bucket has its own BucketContext for sending data---for locating reply-
VCATS_temp = VContentAndTypeSize<V>(VCATS_temp, Velems_per_dim0,
V_byte_size, pv_raw, buff_temp[i]);
key_bucket_locs =
KBucketNum<K>(pk_raw, storage_slice); // TODO: change it to AVX512
KBucketNum<K>(this->K_bucket_num_handle, pk_raw,
storage_slice); // TODO: change it to AVX512

// Direct access to Tensor data in TensorFlow
thread_context->HandlePushBack(
Expand Down Expand Up @@ -1346,8 +1349,8 @@ every bucket has its own BucketContext for sending data---for locating reply-
}

virtual Status DelCommand(
const Tensor &keys, ThreadContext *thread_context, const int64 begin,
const int64 max_i,
const Tensor &keys, ThreadContext *thread_context, const int64_t begin,
const int64_t max_i,
const std::vector<std::string> &keys_prefix_name_slices) override {
const int &&total = max_i - begin;
const int &&argc = total + 2;
Expand All @@ -1362,7 +1365,7 @@ every bucket has its own BucketContext for sending data---for locating reply-

const unsigned &storage_slice = redis_connection_params.storage_slice;
const unsigned &&vector_len =
(static_cast<int64>(reinterpret_cast<int>(argc)) /
(static_cast<int64_t>(reinterpret_cast<int>(argc)) /
redis_connection_params.storage_slice) +
2;

Expand All @@ -1377,7 +1380,8 @@ every bucket has its own BucketContext for sending data---for locating reply-
unsigned *pbucket_loc = thread_context->bucket_locs->data();
unsigned key_bucket_locs = 0;
for (; pk_raw != pk_raw_end; ++pk_raw) {
key_bucket_locs = KBucketNum<K>(pk_raw, storage_slice);
key_bucket_locs =
KBucketNum<K>(this->K_bucket_num_handle, pk_raw, storage_slice);
// The bucket to which the key belongs is recorded to facilitate future
// memory writes that do not recompute the redis hash
*pbucket_loc = key_bucket_locs;
Expand Down
Loading

0 comments on commit cfa891d

Please sign in to comment.