Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial support for getting ChannelMember info of all bridges #678

Merged
merged 2 commits into from
Jan 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/42wim/matterbridge/bridge/config"
"github.com/sirupsen/logrus"
"sync"
)

type Bridger interface {
Expand All @@ -16,14 +17,16 @@ type Bridger interface {

type Bridge struct {
Bridger
Name string
Account string
Protocol string
Channels map[string]config.ChannelInfo
Joined map[string]bool
Log *logrus.Entry
Config config.Config
General *config.Protocol
Name string
Account string
Protocol string
Channels map[string]config.ChannelInfo
Joined map[string]bool
ChannelMembers *config.ChannelMembers
Log *logrus.Entry
Config config.Config
General *config.Protocol
*sync.RWMutex
}

type Config struct {
Expand All @@ -37,15 +40,17 @@ type Config struct {
type Factory func(*Config) Bridger

func New(bridge *config.Bridge) *Bridge {
b := new(Bridge)
b.Channels = make(map[string]config.ChannelInfo)
b := &Bridge{
Channels: make(map[string]config.ChannelInfo),
RWMutex: new(sync.RWMutex),
Joined: make(map[string]bool),
}
accInfo := strings.Split(bridge.Account, ".")
protocol := accInfo[0]
name := accInfo[1]
b.Name = name
b.Protocol = protocol
b.Account = bridge.Account
b.Joined = make(map[string]bool)
return b
}

Expand All @@ -54,6 +59,13 @@ func (b *Bridge) JoinChannels() error {
return err
}

// SetChannelMembers sets the newMembers to the bridge ChannelMembers
func (b *Bridge) SetChannelMembers(newMembers *config.ChannelMembers) {
b.Lock()
b.ChannelMembers = newMembers
b.Unlock()
}

func (b *Bridge) joinChannels(channels map[string]config.ChannelInfo, exists map[string]bool) error {
for ID, channel := range channels {
if !exists[ID] {
Expand Down
31 changes: 21 additions & 10 deletions bridge/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@ import (
)

const (
EventJoinLeave = "join_leave"
EventTopicChange = "topic_change"
EventFailure = "failure"
EventFileFailureSize = "file_failure_size"
EventAvatarDownload = "avatar_download"
EventRejoinChannels = "rejoin_channels"
EventUserAction = "user_action"
EventMsgDelete = "msg_delete"
EventAPIConnected = "api_connected"
EventUserTyping = "user_typing"
EventJoinLeave = "join_leave"
EventTopicChange = "topic_change"
EventFailure = "failure"
EventFileFailureSize = "file_failure_size"
EventAvatarDownload = "avatar_download"
EventRejoinChannels = "rejoin_channels"
EventUserAction = "user_action"
EventMsgDelete = "msg_delete"
EventAPIConnected = "api_connected"
EventUserTyping = "user_typing"
EventGetChannelMembers = "get_channel_members"
)

type Message struct {
Expand Down Expand Up @@ -61,6 +62,16 @@ type ChannelInfo struct {
Options ChannelOptions
}

type ChannelMember struct {
42wim marked this conversation as resolved.
Show resolved Hide resolved
Username string
Nick string
UserID string
ChannelID string
ChannelName string
}

type ChannelMembers []ChannelMember
42wim marked this conversation as resolved.
Show resolved Hide resolved

type Protocol struct {
AuthCode string // steam
BindAddress string // mattermost, slack // DEPRECATED
Expand Down
52 changes: 52 additions & 0 deletions bridge/slack/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,58 @@ func (b *Bslack) handleDownloadFile(rmsg *config.Message, file *slack.File, retr
return nil
}

// handleGetChannelMembers handles messages containing the GetChannelMembers event
// Sends a message to the router containing *config.ChannelMembers
func (b *Bslack) handleGetChannelMembers(rmsg *config.Message) bool {
if rmsg.Event != config.EventGetChannelMembers {
return false
}

cMembers := config.ChannelMembers{}

b.channelMembersMutex.RLock()

for channelID, members := range b.channelMembers {
for _, member := range members {
channelName := ""
userName := ""
userNick := ""
user := b.getUser(member)
if user != nil {
userName = user.Name
userNick = user.Profile.DisplayName
}
channel, _ := b.getChannelByID(channelID)
if channel != nil {
channelName = channel.Name
}
cMember := config.ChannelMember{
Username: userName,
Nick: userNick,
UserID: member,
ChannelID: channelID,
ChannelName: channelName,
}
cMembers = append(cMembers, cMember)
}
}

b.channelMembersMutex.RUnlock()

extra := make(map[string][]interface{})
extra[config.EventGetChannelMembers] = append(extra[config.EventGetChannelMembers], cMembers)
msg := config.Message{
Extra: extra,
Event: config.EventGetChannelMembers,
Account: b.Account,
}

b.Log.Debugf("sending msg to remote %#v", msg)
b.Remote <- msg

return true
}

// fileCached implements Matterbridge's caching logic for files
// shared via Slack.
//
Expand Down
46 changes: 46 additions & 0 deletions bridge/slack/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ func (b *Bslack) populateUsers(wait bool) {
return
}
for b.refreshInProgress {
b.refreshMutex.Unlock()
time.Sleep(time.Second)
b.refreshMutex.Lock()
}
b.refreshInProgress = true
b.refreshMutex.Unlock()
Expand Down Expand Up @@ -139,13 +141,16 @@ func (b *Bslack) populateChannels(wait bool) {
return
}
for b.refreshInProgress {
b.refreshMutex.Unlock()
time.Sleep(time.Second)
b.refreshMutex.Lock()
}
b.refreshInProgress = true
b.refreshMutex.Unlock()

newChannelsByID := map[string]*slack.Channel{}
newChannelsByName := map[string]*slack.Channel{}
newChannelMembers := make(map[string][]string)

// We only retrieve public and private channels, not IMs
// and MPIMs as those do not have a channel name.
Expand All @@ -166,7 +171,18 @@ func (b *Bslack) populateChannels(wait bool) {
for i := range channels {
newChannelsByID[channels[i].ID] = &channels[i]
newChannelsByName[channels[i].Name] = &channels[i]
// also find all the members in every channel
members, err := b.getUsersInConversation(channels[i].ID)
if err != nil {
if err = b.handleRateLimit(err); err != nil {
b.Log.Errorf("Could not retrieve channel members: %#v", err)
return
}
continue
}
newChannelMembers[channels[i].ID] = members
}

if nextCursor == "" {
break
}
Expand All @@ -178,6 +194,10 @@ func (b *Bslack) populateChannels(wait bool) {
b.channelsByID = newChannelsByID
b.channelsByName = newChannelsByName

b.channelMembersMutex.Lock()
defer b.channelMembersMutex.Unlock()
b.channelMembers = newChannelMembers

b.refreshMutex.Lock()
defer b.refreshMutex.Unlock()
b.earliestChannelRefresh = time.Now().Add(minimumRefreshInterval)
Expand Down Expand Up @@ -362,3 +382,29 @@ func (b *Bslack) handleRateLimit(err error) error {
time.Sleep(rateLimit.RetryAfter)
return nil
}

// getUsersInConversation returns an array of userIDs that are members of channelID
func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) {
channelMembers := []string{}
for {
queryParams := &slack.GetUsersInConversationParameters{
ChannelID: channelID,
}

members, nextCursor, err := b.sc.GetUsersInConversation(queryParams)
if err != nil {
if err = b.handleRateLimit(err); err != nil {
return channelMembers, fmt.Errorf("Could not retrieve users in channels: %#v", err)
}
continue
}

channelMembers = append(channelMembers, members...)

if nextCursor == "" {
break
}
queryParams.Cursor = nextCursor
}
return channelMembers, nil
}
8 changes: 8 additions & 0 deletions bridge/slack/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type Bslack struct {
channelsByName map[string]*slack.Channel
channelsMutex sync.RWMutex

channelMembers map[string][]string
channelMembersMutex sync.RWMutex

refreshInProgress bool
earliestChannelRefresh time.Time
earliestUserRefresh time.Time
Expand Down Expand Up @@ -265,6 +268,11 @@ func (b *Bslack) sendWebhook(msg config.Message) error {
}

func (b *Bslack) sendRTM(msg config.Message) (string, error) {
// Handle channelmember messages.
if handled := b.handleGetChannelMembers(&msg); handled {
return "", nil
}

channelInfo, err := b.getChannel(msg.Channel)
if err != nil {
return "", fmt.Errorf("could not send message: %v", err)
Expand Down
17 changes: 17 additions & 0 deletions gateway/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,23 @@ func (r *Router) handleEventFailure(msg *config.Message) {
}
}

// handleEventGetChannelMembers handles channel members
func (r *Router) handleEventGetChannelMembers(msg *config.Message) {
if msg.Event != config.EventGetChannelMembers {
return
}
for _, gw := range r.Gateways {
for _, br := range gw.Bridges {
if msg.Account == br.Account {
cMembers := msg.Extra[config.EventGetChannelMembers][0].(config.ChannelMembers)
flog.Debugf("Syncing channelmembers from %s", msg.Account)
br.SetChannelMembers(&cMembers)
return
}
}
}
}

// handleEventRejoinChannels handles rejoining of channels.
func (r *Router) handleEventRejoinChannels(msg *config.Message) {
if msg.Event != config.EventRejoinChannels {
Expand Down
22 changes: 22 additions & 0 deletions gateway/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gateway

import (
"fmt"
"sync"
"time"

"github.com/42wim/matterbridge/bridge"
Expand All @@ -16,6 +17,7 @@ type Router struct {
Gateways map[string]*Gateway
Message chan config.Message
MattermostPlugin chan config.Message
sync.RWMutex
}

func NewRouter(cfg config.Config, bridgeMap map[string]bridge.Factory) (*Router, error) {
Expand Down Expand Up @@ -81,6 +83,7 @@ func (r *Router) Start() error {
}
}
go r.handleReceive()
go r.updateChannelMembers()
return nil
}

Expand Down Expand Up @@ -108,6 +111,7 @@ func (r *Router) getBridge(account string) *bridge.Bridge {
func (r *Router) handleReceive() {
for msg := range r.Message {
msg := msg // scopelint
r.handleEventGetChannelMembers(&msg)
r.handleEventFailure(&msg)
r.handleEventRejoinChannels(&msg)
for _, gw := range r.Gateways {
Expand All @@ -129,3 +133,21 @@ func (r *Router) handleReceive() {
}
}
}

// updateChannelMembers sends every minute an GetChannelMembers event to all bridges.
func (r *Router) updateChannelMembers() {
// TODO sleep a minute because slack can take a while
// fix this by having actually connectionDone events send to the router
time.Sleep(time.Minute)
for {
for _, gw := range r.Gateways {
for _, br := range gw.Bridges {
flog.Debugf("sending %s to %s", config.EventGetChannelMembers, br.Account)
if _, err := br.Send(config.Message{Event: config.EventGetChannelMembers}); err != nil {
flog.Errorf("updateChannelMembers: %s", err)
}
}
}
time.Sleep(time.Minute)
}
}