Skip to content

Commit

Permalink
Fix memory leak in Redis by using auto memory management (#4054)
Browse files Browse the repository at this point in the history
* Table appends should always succeed

* Use Redis auto memory management

* Remove unneeded namespace
  • Loading branch information
stephanie-wang authored and ericl committed Feb 15, 2019
1 parent 0c0bd4d commit 3684e5b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 12 deletions.
40 changes: 28 additions & 12 deletions src/ray/gcs/redis_module/ray_redis_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ extern RedisChainModule module;
} \
}

// Wrap a Redis command with automatic memory management.
#define AUTO_MEMORY(FUNC) \
int FUNC(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { \
RedisModule_AutoMemory(ctx); \
return internal_redis_commands::FUNC(ctx, argv, argc); \
}

// Commands in this namespace should not be used directly. They should first be
// wrapped with AUTO_MEMORY in the global namespace to enable automatic memory
// management.
// TODO(swang): Ideally, we would make the commands that don't have auto memory
// management inaccessible instead of just using a separate namespace.
namespace internal_redis_commands {

/// Map from pub sub channel to clients that are waiting on that channel.
std::unordered_map<std::string, std::vector<std::string>> notification_map;

Expand Down Expand Up @@ -272,14 +286,12 @@ int TableAdd_DoPublish(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
/// \param data The data to insert at the key.
/// \return The current value at the key, or OK if there is no value.
int TableAdd_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);
TableAdd_DoWrite(ctx, argv, argc, /*mutated_key_str=*/nullptr);
return TableAdd_DoPublish(ctx, argv, argc);
}

#if RAY_USE_NEW_GCS
int ChainTableAdd_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);
return module.ChainReplicate(ctx, argv, argc, /*node_func=*/TableAdd_DoWrite,
/*tail_func=*/TableAdd_DoPublish);
}
Expand Down Expand Up @@ -388,7 +400,6 @@ int TableAppend_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
#if RAY_USE_NEW_GCS
int ChainTableAppend_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc) {
RedisModule_AutoMemory(ctx);
return module.ChainReplicate(ctx, argv, argc,
/*node_func=*/TableAppend_DoWrite,
/*tail_func=*/TableAppend_DoPublish);
Expand Down Expand Up @@ -471,8 +482,6 @@ Status TableEntryToFlatbuf(RedisModuleCtx *ctx, RedisModuleKey *table_key,
/// \return nil if the key is empty, the current value if the key type is a
/// string, or an array of the current values if the key type is a set.
int TableLookup_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);

if (argc < 4) {
return RedisModule_WrongArity(ctx);
}
Expand Down Expand Up @@ -515,8 +524,6 @@ int TableLookup_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
/// string, or an array of the current values if the key type is a set.
int TableRequestNotifications_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc) {
RedisModule_AutoMemory(ctx);

if (argc != 5) {
return RedisModule_WrongArity(ctx);
}
Expand Down Expand Up @@ -569,8 +576,6 @@ int TableRequestNotifications_RedisCommand(RedisModuleCtx *ctx, RedisModuleStrin
/// \return OK.
int TableCancelNotifications_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc) {
RedisModule_AutoMemory(ctx);

if (argc < 5) {
return RedisModule_WrongArity(ctx);
}
Expand Down Expand Up @@ -620,8 +625,6 @@ Status is_nil(bool *out, const std::string &data) {
// Be careful, this only supports Task Table payloads.
int TableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc) {
RedisModule_AutoMemory(ctx);

if (argc != 5) {
return RedisModule_WrongArity(ctx);
}
Expand Down Expand Up @@ -675,14 +678,27 @@ std::string DebugString() {

int DebugString_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
RedisModule_AutoMemory(ctx);

if (argc != 1) {
return RedisModule_WrongArity(ctx);
}
std::string debug_string = DebugString();
return RedisModule_ReplyWithStringBuffer(ctx, debug_string.data(), debug_string.size());
}
};

// Wrap all Redis commands with Redis' auto memory management.
AUTO_MEMORY(TableAdd_RedisCommand);
AUTO_MEMORY(TableAppend_RedisCommand);
AUTO_MEMORY(TableLookup_RedisCommand);
AUTO_MEMORY(TableRequestNotifications_RedisCommand);
AUTO_MEMORY(TableCancelNotifications_RedisCommand);
AUTO_MEMORY(TableTestAndUpdate_RedisCommand);
AUTO_MEMORY(DebugString_RedisCommand);
#if RAY_USE_NEW_GCS
AUTO_MEMORY(ChainTableAdd_RedisCommand);
AUTO_MEMORY(ChainTableAppend_RedisCommand);
#endif

extern "C" {

Expand Down
3 changes: 3 additions & 0 deletions src/ray/gcs/tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ Status Log<ID, Data>::Append(const JobID &job_id, const ID &id,
std::shared_ptr<DataT> &dataT, const WriteCallback &done) {
num_appends_++;
auto callback = [this, id, dataT, done](const std::string &data) {
// If data is not empty, then Redis failed to append the entry.
RAY_CHECK(data.empty()) << "TABLE_APPEND command failed: " << data;

if (done != nullptr) {
(done)(client_, id, *dataT);
}
Expand Down

0 comments on commit 3684e5b

Please sign in to comment.