Skip to content

Commit

Permalink
Update conversation membership (#1540)
Browse files Browse the repository at this point in the history
* Implement updateConversationMemberships RPC

* updateConversationMembership now takes a list of qualified user ids

The event generated by adding users to a remote conversation should be
sent to all local users in the conversation, regardless of whether any
local users were added, and it should contain all users added, not just
the local ones.

* Add test for updateConversationMemberships

* Add notification test for remote join event

* Also notify just joined members
  • Loading branch information
pcapriotti authored May 31, 2021
1 parent a468fe9 commit f637703
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import Control.Monad.Except (MonadError (..))
import Data.Aeson (FromJSON, ToJSON)
import Data.Id (ConvId, UserId)
import Data.Qualified (Qualified)
import Data.Time.Clock (UTCTime)
import Imports
import Servant.API (JSON, Post, ReqBody, (:>))
import Servant.API.Generic ((:-))
import Servant.Client.Generic (AsClientT, genericClient)
import Wire.API.Arbitrary (Arbitrary, GenericUniform (..))
import Wire.API.Conversation (Conversation)
import Wire.API.Conversation.Role (RoleName)
import Wire.API.Federation.Client (FederationClientError, FederatorClient)
import qualified Wire.API.Federation.GRPC.Types as Proto
import Wire.API.Federation.Util.Aeson (CustomEncoded (CustomEncoded))
Expand Down Expand Up @@ -69,8 +71,11 @@ newtype GetConversationsResponse = GetConversationsResponse
deriving (ToJSON, FromJSON) via (CustomEncoded GetConversationsResponse)

data ConversationMemberUpdate = ConversationMemberUpdate
{ cmuConvId :: Qualified ConvId,
cmuUsersAdd :: [UserId],
{ cmuTime :: UTCTime,
cmuOrigUserId :: Qualified UserId,
cmuConvId :: Qualified ConvId,
cmuAlreadyPresentUsers :: [UserId], -- pre-existing users in the conversation from the receiving domain
cmuUsersAdd :: [(Qualified UserId, RoleName)],
cmuUsersRemove :: [UserId]
}
deriving stock (Eq, Show, Generic)
Expand Down
4 changes: 2 additions & 2 deletions libs/wire-api/src/Wire/API/Event/Conversation.hs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ import Wire.API.User (UserIdList (..))

data Event = Event
{ evtType :: EventType,
evtConv :: ConvId,
evtFrom :: UserId,
evtConv :: ConvId, -- FUTUREWORK: make this qualified
evtFrom :: UserId, -- FUTUREWORK: make this qualified
evtTime :: UTCTime,
evtData :: EventData
}
Expand Down
36 changes: 30 additions & 6 deletions services/galley/src/Galley/API/Federation.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,28 @@
-- with this program. If not, see <https://www.gnu.org/licenses/>.
module Galley.API.Federation where

import Data.Qualified (Qualified (Qualified))
import Control.Arrow (first)
import Data.Containers.ListUtils (nubOrd)
import Data.Qualified (Qualified (..))
import qualified Galley.API.Mapping as Mapping
import Galley.API.Util (viewFederationDomain)
import Galley.API.Util (pushConversationEvent, viewFederationDomain)
import Galley.App (Galley)
import qualified Galley.Data as Data
import Imports
import Servant (ServerT)
import Servant.API.Generic (ToServantApi)
import Servant.Server.Generic (genericServerT)
import Wire.API.Event.Conversation
import Wire.API.Federation.API.Galley (ConversationMemberUpdate (..), GetConversationsRequest (..), GetConversationsResponse (..))
import qualified Wire.API.Federation.API.Galley as FederationAPIGalley

federationSitemap :: ServerT (ToServantApi FederationAPIGalley.Api) Galley
federationSitemap =
genericServerT $
FederationAPIGalley.Api
getConversations
updateConversationMembership
{ FederationAPIGalley.getConversations = getConversations,
FederationAPIGalley.updateConversationMemberships = updateConversationMemberships
}

getConversations :: GetConversationsRequest -> Galley GetConversationsResponse
getConversations (GetConversationsRequest (Qualified uid domain) gcrConvIds) = do
Expand All @@ -43,5 +47,25 @@ getConversations (GetConversationsRequest (Qualified uid domain) gcrConvIds) = d
then GetConversationsResponse . catMaybes <$> for convs (Mapping.conversationViewMaybe uid)
else error "FUTUREWORK: implement & exstend integration test when schema ready"

updateConversationMembership :: ConversationMemberUpdate -> Galley ()
updateConversationMembership = error "FUTUREWORK: implement after schema change"
-- FUTUREWORK: also remove users from conversation
updateConversationMemberships :: ConversationMemberUpdate -> Galley ()
updateConversationMemberships cmu = do
localDomain <- viewFederationDomain
let localUsers = filter ((== localDomain) . qDomain . fst) (cmuUsersAdd cmu)
localUserIds = map (qUnqualified . fst) localUsers
when (not (null localUsers)) $ do
Data.addLocalMembersToRemoteConv localUserIds (cmuConvId cmu)
-- FUTUREWORK: the resulting event should have qualified users and conversations
let mems = SimpleMembers (map (uncurry SimpleMember . first qUnqualified) (cmuUsersAdd cmu))
let event =
Event
MemberJoin
(qUnqualified (cmuConvId cmu))
(qUnqualified (cmuOrigUserId cmu))
(cmuTime cmu)
(EdMembersJoin mems)

-- send notifications
let targets = nubOrd $ cmuAlreadyPresentUsers cmu <> localUserIds
-- FUTUREWORK: support bots?
pushConversationEvent event targets []
2 changes: 1 addition & 1 deletion services/galley/src/Galley/API/Update.hs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ mapUpdateToServant Unchanged = Servant.respond NoContent
-- we need the following checks/implementation:
-- - (1) [DONE] Remote qualified users must exist before they can be added (a
-- call to the respective backend should be made): Avoid clients making up random
-- Ids, and increase the chances that the updateConversationMembership call
-- Ids, and increase the chances that the updateConversationMemberships call
-- suceeds
-- - (2) A call must be made to the remote backend informing it that this user is
-- now part of that conversation. Use and implement 'updateConversationMemberships'.
Expand Down
9 changes: 8 additions & 1 deletion services/galley/src/Galley/API/Util.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ import Galley.App
import qualified Galley.Data as Data
import Galley.Data.Services (BotMember, newBotMember)
import qualified Galley.Data.Types as DataTypes
import qualified Galley.External as External
import Galley.Intra.Push
import Galley.Intra.User
import Galley.Options (optSettings, setFederationDomain)
import Galley.Types
import Galley.Types.Conversations.Members (RemoteMember (rmId))
import Galley.Types.Conversations.Roles
import Galley.Types.Teams
import Galley.Types.Teams hiding (Event)
import Imports
import Network.HTTP.Types
import Network.Wai
Expand Down Expand Up @@ -293,6 +294,12 @@ canDeleteMember deleter deletee
-- here, so we pick a reasonable default.)
getRole mem = fromMaybe RoleMember $ permissionsRole $ mem ^. permissions

pushConversationEvent :: Event -> [UserId] -> [BotMember] -> Galley ()
pushConversationEvent e users bots = do
for_ (newPush ListComplete (evtFrom e) (ConvEvent e) (map userRecipient users)) $ \p ->
push1 $ p & pushConn .~ Nothing
void . forkIO $ void $ External.deliver (bots `zip` repeat e)

--------------------------------------------------------------------------------
-- Federation

Expand Down
16 changes: 16 additions & 0 deletions services/galley/src/Galley/Data.hs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ module Galley.Data
-- * Conversation Members
addMember,
addMembersWithRole,
addLocalMembersToRemoteConv,
member,
members,
removeMember,
Expand Down Expand Up @@ -851,6 +852,21 @@ addMembersUncheckedWithRole t conv (orig, _origRole) lusrs rusrs = do
toSimpleMembers :: [(UserId, RoleName)] -> [SimpleMember]
toSimpleMembers = fmap (uncurry SimpleMember)

-- | Set local users as belonging to a remote conversation. This is invoked by
-- a remote galley (using the RPC updateConversationMembership) when users from
-- the current backend are added to conversations on the remote end.
addLocalMembersToRemoteConv :: MonadClient m => [UserId] -> Qualified ConvId -> m ()
addLocalMembersToRemoteConv users qconv = do
-- FUTUREWORK: consider using pooledMapConcurrentlyN
for_ (List.chunksOf 32 users) $ \chunk ->
retry x5 . batch $ do
setType BatchLogged
setConsistency Quorum
for_ chunk $ \u ->
addPrepQuery
Cql.insertUserRemoteConv
(u, qDomain qconv, qUnqualified qconv)

updateMember :: MonadClient m => ConvId -> UserId -> MemberUpdate -> m MemberUpdateData
updateMember cid uid mup = do
retry x5 . batch $ do
Expand Down
8 changes: 4 additions & 4 deletions services/galley/src/Galley/Data/Queries.hs
Original file line number Diff line number Diff line change
Expand Up @@ -301,16 +301,16 @@ selectRemoteMembers = "select conv, user_remote_domain, user_remote_id, conversa

-- local user with remote conversations

-- FUTUREWORK: actually make use of these cql statements.
insertUserRemoteConv :: PrepQuery W (UserId, Domain, ConvId) ()
insertUserRemoteConv = "insert into user_remote_conv (user, conv_remote_domain, conv_remote_id) values (?, ?, ?)"

deleteUserRemoteConv :: PrepQuery W (UserId, Domain, ConvId) ()
deleteUserRemoteConv = "delete from user_remote_conv where user = ? and conv_remote_domain = ? and conv_remote_id = ?"

selectUserRemoteConvs :: PrepQuery R (Identity UserId) (Domain, ConvId)
selectUserRemoteConvs = "select conv_remote_domain, conv_remote_id from user_remote_conv where user = ? order by conv_remote_domain"

-- FUTUREWORK: actually make use of these cql statements.
deleteUserRemoteConv :: PrepQuery W (UserId, Domain, ConvId) ()
deleteUserRemoteConv = "delete from user_remote_conv where user = ? and conv_remote_domain = ? and conv_remote_id = ?"

-- Clients ------------------------------------------------------------------

selectClients :: PrepQuery R (Identity [UserId]) (UserId, C.Set ClientId)
Expand Down
86 changes: 82 additions & 4 deletions services/galley/test/integration/API/Federation.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,41 @@ module API.Federation where
import API.Util
import Bilge
import Bilge.Assert
import Control.Lens
import Data.Id (Id (..))
import qualified Cassandra as Cql
import Control.Lens hiding ((#))
import Data.Domain
import Data.Id (Id (..), randomId)
import Data.List1
import Data.Qualified (Qualified (..))
import Data.Time.Clock
import Data.Timeout (TimeoutUnit (..), (#))
import Data.UUID.V4 (nextRandom)
import qualified Galley.Data.Queries as Cql
import Galley.Types
import Imports
import Test.Tasty
import qualified Test.Tasty.Cannon as WS
import Test.Tasty.HUnit
import TestHelpers
import TestSetup
import Wire.API.Conversation.Role
import Wire.API.Federation.API.Galley (GetConversationsRequest (..), GetConversationsResponse (..))
import qualified Wire.API.Federation.API.Galley as FedGalley

tests :: IO TestSetup -> TestTree
tests s =
testGroup
"federation"
[ test s "GET /federation/get-conversations : All Found" getConversationsAllFound,
test s "GET /federation/get-conversations : Conversations user is not a part of are excluded from result" getConversationsNotPartOf
[ test s "POST /federation/get-conversations : All Found" getConversationsAllFound,
test s "POST /federation/get-conversations : Conversations user is not a part of are excluded from result" getConversationsNotPartOf,
test
s
"POST /federation/update-conversation-memberships : Add local user to remote conversation"
addLocalUser,
test
s
"POST /federation/update-conversation-memberships : Notify local user about other members joining"
notifyLocalUser
]

getConversationsAllFound :: TestM ()
Expand Down Expand Up @@ -98,3 +113,66 @@ getConversationsNotPartOf = do
let randoQualified = Qualified rando localDomain
GetConversationsResponse cs <- FedGalley.getConversations fedGalleyClient (GetConversationsRequest randoQualified [cnvId cnv1])
liftIO $ assertEqual "conversation list not empty" [] cs

addLocalUser :: TestM ()
addLocalUser = do
localDomain <- viewFederationDomain
c <- view tsCannon
alice <- randomUser
let qalice = Qualified alice localDomain
let dom = Domain "bobland.example.com"
bob <- randomId
let qbob = Qualified bob dom
conv <- randomId
let qconv = Qualified conv dom
fedGalleyClient <- view tsFedGalleyClient
now <- liftIO getCurrentTime
let cmu =
FedGalley.ConversationMemberUpdate
{ FedGalley.cmuTime = now,
FedGalley.cmuOrigUserId = qbob,
FedGalley.cmuConvId = qconv,
FedGalley.cmuAlreadyPresentUsers = [],
FedGalley.cmuUsersAdd = [(qalice, roleNameWireMember)],
FedGalley.cmuUsersRemove = []
}
WS.bracketR c alice $ \ws -> do
FedGalley.updateConversationMemberships fedGalleyClient cmu
void . liftIO $
WS.assertMatch (5 # Second) ws $
wsAssertMemberJoinWithRole conv bob [alice] roleNameWireMember
cassState <- view tsCass
convs <-
Cql.runClient cassState
. Cql.query Cql.selectUserRemoteConvs
$ Cql.params Cql.Quorum (Identity alice)
liftIO $ [(dom, conv)] @?= convs

notifyLocalUser :: TestM ()
notifyLocalUser = do
c <- view tsCannon
alice <- randomUser
bob <- randomId
charlie <- randomId
conv <- randomId
let bdom = Domain "bob.example.com"
cdom = Domain "charlie.example.com"
qbob = Qualified bob bdom
qconv = Qualified conv bdom
qcharlie = Qualified charlie cdom
fedGalleyClient <- view tsFedGalleyClient
now <- liftIO getCurrentTime
let cmu =
FedGalley.ConversationMemberUpdate
{ FedGalley.cmuTime = now,
FedGalley.cmuOrigUserId = qbob,
FedGalley.cmuConvId = qconv,
FedGalley.cmuAlreadyPresentUsers = [alice],
FedGalley.cmuUsersAdd = [(qcharlie, roleNameWireMember)],
FedGalley.cmuUsersRemove = []
}
WS.bracketR c alice $ \ws -> do
FedGalley.updateConversationMemberships fedGalleyClient cmu
void . liftIO $
WS.assertMatch (5 # Second) ws $
wsAssertMemberJoinWithRole conv bob [charlie] roleNameWireMember

0 comments on commit f637703

Please sign in to comment.