Skip to content

Commit

Permalink
Groupmsg (#2548)
Browse files Browse the repository at this point in the history
* feat: add EnableHistoryForNewMembers

* feat: change name

* refactor: move first create conversation

* fix: set min seq
  • Loading branch information
icey-yu authored Aug 23, 2024
1 parent d938754 commit 7deb45e
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 50 deletions.
3 changes: 3 additions & 0 deletions config/openim-rpc-group.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ prometheus:
enable: true
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
ports: [ 20103 ]


enableHistoryForNewMembers: true
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.69
github.com/openimsdk/protocol v0.0.72-alpha.2
github.com/openimsdk/tools v0.0.49-alpha.55
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
Expand Down Expand Up @@ -41,6 +41,7 @@ require (
github.com/spf13/viper v1.18.2
github.com/stathat/consistent v1.0.0
go.uber.org/automaxprocs v1.5.3
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/sync v0.6.0
)

Expand Down Expand Up @@ -74,7 +75,6 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic v1.9.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chai2010/webp v1.1.1 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/clbanning/mxj v1.8.4 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
Expand Down Expand Up @@ -170,7 +170,6 @@ require (
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/image v0.15.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/oauth2 v0.17.0 // indirect
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZX
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chai2010/webp v1.1.1 h1:jTRmEccAJ4MGrhFOrPMpNGIJ/eybIgwKpcACsrTEapk=
github.com/chai2010/webp v1.1.1/go.mod h1:0XVwvZWdjjdxpUEIf7b9g9VkHFnInUSYujwqTLEuldU=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
Expand Down Expand Up @@ -321,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.69 h1:dVi8meSg8kmUzSH1XQab4MjihqKkkcCAmt1BYXPJuXo=
github.com/openimsdk/protocol v0.0.69/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/protocol v0.0.72-alpha.2 h1:H5IcoAR4jTzJ7uvmFmc6/6rPOKEGRLWnu+qre8B5hk0=
github.com/openimsdk/protocol v0.0.72-alpha.2/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k=
github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
Expand Down
8 changes: 8 additions & 0 deletions internal/rpc/conversation/conversaion.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,14 @@ func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbc
return &pbconversation.SetConversationMaxSeqResp{}, nil
}

func (c *conversationServer) SetConversationMinSeq(ctx context.Context, req *pbconversation.SetConversationMinSeqReq) (*pbconversation.SetConversationMinSeqResp, error) {
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID,
map[string]any{"min_seq": req.MinSeq}); err != nil {
return nil, err
}
return &pbconversation.SetConversationMinSeqResp{}, nil
}

func (c *conversationServer) GetConversationIDs(ctx context.Context, req *pbconversation.GetConversationIDsReq) (*pbconversation.GetConversationIDsResp, error) {
conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.UserID)
if err != nil {
Expand Down
35 changes: 20 additions & 15 deletions internal/rpc/group/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,20 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
database := controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs))
gs.db = database
gs.user = userRpcClient
gs.notification = NewGroupNotificationSender(database, &msgRpcClient, &userRpcClient, config, func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) {
users, err := userRpcClient.GetUsersInfo(ctx, userIDs)
if err != nil {
return nil, err
}
return datautil.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil
})
gs.notification = NewGroupNotificationSender(
database,
&msgRpcClient,
&userRpcClient,
&conversationRpcClient,
config,
func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) {
users, err := userRpcClient.GetUsersInfo(ctx, userIDs)
if err != nil {
return nil, err
}
return datautil.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil
},
)
localcache.InitLocalCache(&config.LocalCacheConfig)
gs.conversationRpcClient = conversationRpcClient
gs.msgRpcClient = msgRpcClient
Expand Down Expand Up @@ -450,10 +457,10 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
if err := s.db.CreateGroup(ctx, nil, groupMembers); err != nil {
return nil, err
}
if err := s.conversationRpcClient.GroupChatFirstCreateConversation(ctx, req.GroupID, req.InvitedUserIDs); err != nil {

if err = s.notification.MemberEnterNotification(ctx, req.GroupID, req.InvitedUserIDs...); err != nil {
return nil, err
}
s.notification.MemberInvitedNotification(ctx, req.GroupID, req.Reason, req.InvitedUserIDs)
return &pbgroup.InviteUserToGroupResp{}, nil
}

Expand Down Expand Up @@ -822,14 +829,13 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup
}
switch req.HandleResult {
case constant.GroupResponseAgree:
if err := s.conversationRpcClient.GroupChatFirstCreateConversation(ctx, req.GroupID, []string{req.FromUserID}); err != nil {
return nil, err
}
s.notification.GroupApplicationAcceptedNotification(ctx, req)
if member == nil {
log.ZDebug(ctx, "GroupApplicationResponse", "member is nil")
} else {
s.notification.MemberEnterNotification(ctx, req.GroupID, req.FromUserID)
if err = s.notification.MemberEnterNotification(ctx, req.GroupID, req.FromUserID); err != nil {
return nil, err
}
}
case constant.GroupResponseRefuse:
s.notification.GroupApplicationRejectedNotification(ctx, req)
Expand Down Expand Up @@ -889,10 +895,9 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)
return nil, err
}

if err := s.conversationRpcClient.GroupChatFirstCreateConversation(ctx, req.GroupID, []string{req.InviterUserID}); err != nil {
if err = s.notification.MemberEnterNotification(ctx, req.GroupID, req.InviterUserID); err != nil {
return nil, err
}
s.notification.MemberEnterNotification(ctx, req.GroupID, req.InviterUserID)
s.webhookAfterJoinGroup(ctx, &s.config.WebhooksConfig.AfterJoinGroup, req)

return &pbgroup.JoinGroupResp{}, nil
Expand Down
61 changes: 34 additions & 27 deletions internal/rpc/group/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"

"github.com/openimsdk/open-im-server/v3/pkg/authverify"
Expand All @@ -43,12 +44,22 @@ const (
adminReceiver
)

func NewGroupNotificationSender(db controller.GroupDatabase, msgRpcClient *rpcclient.MessageRpcClient, userRpcClient *rpcclient.UserRpcClient, config *Config, fn func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error)) *GroupNotificationSender {
func NewGroupNotificationSender(
db controller.GroupDatabase,
msgRpcClient *rpcclient.MessageRpcClient,
userRpcClient *rpcclient.UserRpcClient,
conversationRpcClient *rpcclient.ConversationRpcClient,
config *Config,
fn func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error),
) *GroupNotificationSender {
return &GroupNotificationSender{
NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(msgRpcClient), rpcclient.WithUserRpcClient(userRpcClient)),
getUsersInfo: fn,
db: db,
config: config,

conversationRpcClient: conversationRpcClient,
msgRpcClient: msgRpcClient,
}
}

Expand All @@ -57,6 +68,9 @@ type GroupNotificationSender struct {
getUsersInfo func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error)
db controller.GroupDatabase
config *Config

conversationRpcClient *rpcclient.ConversationRpcClient
msgRpcClient *rpcclient.MessageRpcClient
}

func (g *GroupNotificationSender) PopulateGroupMember(ctx context.Context, members ...*model.GroupMember) error {
Expand Down Expand Up @@ -494,50 +508,43 @@ func (g *GroupNotificationSender) MemberKickedNotification(ctx context.Context,
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.MemberKickedNotification, tips)
}

func (g *GroupNotificationSender) MemberInvitedNotification(ctx context.Context, groupID, reason string, invitedUserIDList []string) {
func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID ...string) error {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
if err != nil {
return

if !g.config.RpcConfig.EnableHistoryForNewMembers {
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID)
maxSeq, err := g.msgRpcClient.GetConversationMaxSeq(ctx, conversationID)
if err != nil {
return err
}
err = g.conversationRpcClient.SetConversationMinSeq(ctx, entrantUserID, conversationID, maxSeq)
if err != nil {
return err
}
}

var users []*sdkws.GroupMemberFullInfo
users, err = g.getGroupMembers(ctx, groupID, invitedUserIDList)
if err != nil {
return
if err := g.conversationRpcClient.GroupChatFirstCreateConversation(ctx, groupID, entrantUserID); err != nil {
return err
}
tips := &sdkws.MemberInvitedTips{Group: group, InvitedUserList: users}
err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID)
g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberInvitedNotification, tips)
}

func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID string) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
if err != nil {
return
return err
}
var user *sdkws.GroupMemberFullInfo
user, err = g.getGroupMember(ctx, groupID, entrantUserID)
users, err := g.getGroupMembers(ctx, groupID, entrantUserID)
if err != nil {
return
return err
}
tips := &sdkws.MemberEnterTips{Group: group, EntrantUser: user}
tips := &sdkws.MemberEnterTips{Group: group, EntrantUsers: users}
g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberEnterNotification, tips)
return nil
}

func (g *GroupNotificationSender) GroupDismissedNotification(ctx context.Context, tips *sdkws.GroupDismissedTips) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ type Group struct {
ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
Prometheus Prometheus `mapstructure:"prometheus"`
EnableHistoryForNewMembers bool `mapstructure:"enableHistoryForNewMembers"`
}

type Msg struct {
Expand Down
5 changes: 5 additions & 0 deletions pkg/rpcclient/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ func (c *ConversationRpcClient) SetConversationMaxSeq(ctx context.Context, owner
return err
}

func (c *ConversationRpcClient) SetConversationMinSeq(ctx context.Context, ownerUserIDs []string, conversationID string, minSeq int64) error {
_, err := c.Client.SetConversationMinSeq(ctx, &pbconversation.SetConversationMinSeqReq{OwnerUserID: ownerUserIDs, ConversationID: conversationID, MinSeq: minSeq})
return err
}

func (c *ConversationRpcClient) SetConversations(ctx context.Context, userIDs []string, conversation *pbconversation.ConversationReq) error {
_, err := c.Client.SetConversations(ctx, &pbconversation.SetConversationsReq{UserIDs: userIDs, Conversation: conversation})
return err
Expand Down

0 comments on commit 7deb45e

Please sign in to comment.