Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak in Redis by using auto memory management #4054

Merged
merged 3 commits into from
Feb 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions src/ray/gcs/redis_module/ray_redis_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ extern RedisChainModule module;
} \
}

// Wrap a Redis command with automatic memory management.
#define AUTO_MEMORY(FUNC) \
int FUNC(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { \
RedisModule_AutoMemory(ctx); \
return internal_redis_commands::FUNC(ctx, argv, argc); \
}

// Commands in this namespace should not be used directly. They should first be
// wrapped with AUTO_MEMORY in the global namespace to enable automatic memory
// management.
// TODO(swang): Ideally, we would make the commands that don't have auto memory
// management inaccessible instead of just using a separate namespace.
namespace internal_redis_commands {

/// Map from pub sub channel to clients that are waiting on that channel.
std::unordered_map<std::string, std::vector<std::string>> notification_map;

Expand Down Expand Up @@ -272,14 +286,12 @@ int TableAdd_DoPublish(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
/// \param data The data to insert at the key.
/// \return The current value at the key, or OK if there is no value.
int TableAdd_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);
TableAdd_DoWrite(ctx, argv, argc, /*mutated_key_str=*/nullptr);
return TableAdd_DoPublish(ctx, argv, argc);
}

#if RAY_USE_NEW_GCS
int ChainTableAdd_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);
return module.ChainReplicate(ctx, argv, argc, /*node_func=*/TableAdd_DoWrite,
/*tail_func=*/TableAdd_DoPublish);
}
Expand Down Expand Up @@ -388,7 +400,6 @@ int TableAppend_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
#if RAY_USE_NEW_GCS
int ChainTableAppend_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc) {
RedisModule_AutoMemory(ctx);
return module.ChainReplicate(ctx, argv, argc,
/*node_func=*/TableAppend_DoWrite,
/*tail_func=*/TableAppend_DoPublish);
Expand Down Expand Up @@ -471,8 +482,6 @@ Status TableEntryToFlatbuf(RedisModuleCtx *ctx, RedisModuleKey *table_key,
/// \return nil if the key is empty, the current value if the key type is a
/// string, or an array of the current values if the key type is a set.
int TableLookup_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);

if (argc < 4) {
return RedisModule_WrongArity(ctx);
}
Expand Down Expand Up @@ -515,8 +524,6 @@ int TableLookup_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
/// string, or an array of the current values if the key type is a set.
int TableRequestNotifications_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc) {
RedisModule_AutoMemory(ctx);

if (argc != 5) {
return RedisModule_WrongArity(ctx);
}
Expand Down Expand Up @@ -569,8 +576,6 @@ int TableRequestNotifications_RedisCommand(RedisModuleCtx *ctx, RedisModuleStrin
/// \return OK.
int TableCancelNotifications_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc) {
RedisModule_AutoMemory(ctx);

if (argc < 5) {
return RedisModule_WrongArity(ctx);
}
Expand Down Expand Up @@ -620,8 +625,6 @@ Status is_nil(bool *out, const std::string &data) {
// Be careful, this only supports Task Table payloads.
int TableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc) {
RedisModule_AutoMemory(ctx);

if (argc != 5) {
return RedisModule_WrongArity(ctx);
}
Expand Down Expand Up @@ -675,14 +678,27 @@ std::string DebugString() {

int DebugString_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
RedisModule_AutoMemory(ctx);

if (argc != 1) {
return RedisModule_WrongArity(ctx);
}
std::string debug_string = DebugString();
return RedisModule_ReplyWithStringBuffer(ctx, debug_string.data(), debug_string.size());
}
};

// Wrap all Redis commands with Redis' auto memory management.
AUTO_MEMORY(TableAdd_RedisCommand);
AUTO_MEMORY(TableAppend_RedisCommand);
AUTO_MEMORY(TableLookup_RedisCommand);
AUTO_MEMORY(TableRequestNotifications_RedisCommand);
AUTO_MEMORY(TableCancelNotifications_RedisCommand);
AUTO_MEMORY(TableTestAndUpdate_RedisCommand);
AUTO_MEMORY(DebugString_RedisCommand);
#if RAY_USE_NEW_GCS
AUTO_MEMORY(ChainTableAdd_RedisCommand);
AUTO_MEMORY(ChainTableAppend_RedisCommand);
#endif

extern "C" {

Expand Down
3 changes: 3 additions & 0 deletions src/ray/gcs/tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ Status Log<ID, Data>::Append(const JobID &job_id, const ID &id,
std::shared_ptr<DataT> &dataT, const WriteCallback &done) {
num_appends_++;
auto callback = [this, id, dataT, done](const std::string &data) {
// If data is not empty, then Redis failed to append the entry.
RAY_CHECK(data.empty()) << "TABLE_APPEND command failed: " << data;

if (done != nullptr) {
(done)(client_, id, *dataT);
}
Expand Down