Skip to content

Commit

Permalink
Add new MPUBLISH command (apache#1657)
Browse files Browse the repository at this point in the history
The new `MPUBLISH` command allows publishing one or more messages to a channel. 
Syntax: `MPUBLISH channel-name message1 message2 ... messageN`
  • Loading branch information
torwig authored and p1u3o committed Aug 15, 2023
1 parent 7743cf7 commit 527a1d2
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 5 deletions.
39 changes: 34 additions & 5 deletions src/commands/cmd_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,48 @@ namespace redis {

class CommandPublish : public Commander {
public:
// mark is_write as false here because slave should be able to execute publish command
Status Execute(Server *svr, Connection *conn, std::string *output) override {
if (!svr->IsSlave()) {
// Compromise: can't replicate message to sub-replicas in a cascading-like structure.
// Replication relies on WAL seq, increase the seq on slave will break the replication, hence the compromise
// Compromise: can't replicate a message to sub-replicas in a cascading-like structure.
// Replication relies on WAL seq; increasing the seq on a replica will break the replication process,
// hence the compromise solution
redis::PubSub pubsub_db(svr->storage);

auto s = pubsub_db.Publish(args_[1], args_[2]);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
}

int receivers = svr->PublishMessage(args_[1], args_[2]);

*output = redis::Integer(receivers);

return Status::OK();
}
};

class CommandMPublish : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
int total_receivers = 0;

for (size_t i = 2; i < args_.size(); i++) {
if (!svr->IsSlave()) {
redis::PubSub pubsub_db(svr->storage);

auto s = pubsub_db.Publish(args_[1], args_[i]);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
}

int receivers = svr->PublishMessage(args_[1], args_[i]);
total_receivers += receivers;
}

*output = redis::Integer(total_receivers);

return Status::OK();
}
};
Expand Down Expand Up @@ -132,7 +160,7 @@ class CommandPubSub : public Commander {
return Status::OK();
}

return {Status::RedisInvalidCmd, "Unknown subcommand or wrong number of arguments"};
return {Status::RedisInvalidCmd, errUnknownSubcommandOrWrongArguments};
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
Expand Down Expand Up @@ -161,7 +189,7 @@ class CommandPubSub : public Commander {
return Status::OK();
}

return {Status::RedisInvalidCmd, "Unknown subcommand or wrong number of arguments"};
return {Status::RedisInvalidCmd, errUnknownSubcommandOrWrongArguments};
}

private:
Expand All @@ -172,6 +200,7 @@ class CommandPubSub : public Commander {

REDIS_REGISTER_COMMANDS(
MakeCmdAttr<CommandPublish>("publish", 3, "read-only pub-sub", 0, 0, 0),
MakeCmdAttr<CommandMPublish>("mpublish", -3, "read-only pub-sub", 0, 0, 0),
MakeCmdAttr<CommandSubscribe>("subscribe", -2, "read-only pub-sub no-multi no-script", 0, 0, 0),
MakeCmdAttr<CommandUnSubscribe>("unsubscribe", -1, "read-only pub-sub no-multi no-script", 0, 0, 0),
MakeCmdAttr<CommandPSubscribe>("psubscribe", -2, "read-only pub-sub no-multi no-script", 0, 0, 0),
Expand Down
1 change: 1 addition & 0 deletions src/commands/error_constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ inline constexpr const char *errScoreIsNotValidFloat = "score is not a valid flo
inline constexpr const char *errValueIsNotFloat = "value is not a valid float";
inline constexpr const char *errNoMatchingScript = "NOSCRIPT No matching script. Please use EVAL";
inline constexpr const char *errUnknownOption = "unknown option";
inline constexpr const char *errUnknownSubcommandOrWrongArguments = "Unknown subcommand or wrong number of arguments";

} // namespace redis
57 changes: 57 additions & 0 deletions tests/gocase/unit/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,63 @@ func TestPubSub(t *testing.T) {
require.Equal(t, "hello", msg.Payload)
})

t.Run("MPUBLISH basics", func(t *testing.T) {
var (
channelName = "channel1"
msg1 = "hello"
msg2 = "world"
msg3 = "!"
msg4 = "foo-bar"
)

c1 := srv.NewClient()
defer func() { require.NoError(t, c1.Close()) }()
c2 := srv.NewClient()
defer func() { require.NoError(t, c2.Close()) }()

pubsub1 := c1.Subscribe(ctx, channelName)
pubsub2 := c2.Subscribe(ctx, channelName)

require.EqualValues(t, 1, receiveType(t, pubsub1, &redis.Subscription{}).Count)
require.EqualValues(t, 1, receiveType(t, pubsub2, &redis.Subscription{}).Count)

require.EqualValues(t, 6, rdb.Do(ctx, "MPUBLISH", channelName, msg1, msg2, msg3).Val())

msg := receiveType(t, pubsub1, &redis.Message{})
require.Equal(t, channelName, msg.Channel)
require.Equal(t, msg1, msg.Payload)

msg = receiveType(t, pubsub2, &redis.Message{})
require.Equal(t, channelName, msg.Channel)
require.Equal(t, msg1, msg.Payload)

msg = receiveType(t, pubsub1, &redis.Message{})
require.Equal(t, channelName, msg.Channel)
require.Equal(t, msg2, msg.Payload)

msg = receiveType(t, pubsub2, &redis.Message{})
require.Equal(t, channelName, msg.Channel)
require.Equal(t, msg2, msg.Payload)

msg = receiveType(t, pubsub1, &redis.Message{})
require.Equal(t, channelName, msg.Channel)
require.Equal(t, msg3, msg.Payload)

msg = receiveType(t, pubsub2, &redis.Message{})
require.Equal(t, channelName, msg.Channel)
require.Equal(t, msg3, msg.Payload)

require.EqualValues(t, 2, rdb.Do(ctx, "MPUBLISH", channelName, msg4).Val())

msg = receiveType(t, pubsub1, &redis.Message{})
require.Equal(t, channelName, msg.Channel)
require.Equal(t, msg4, msg.Payload)

msg = receiveType(t, pubsub2, &redis.Message{})
require.Equal(t, channelName, msg.Channel)
require.Equal(t, msg4, msg.Payload)
})

t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t *testing.T) {
pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
Expand Down

0 comments on commit 527a1d2

Please sign in to comment.