Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update all users when getting their data [AR-5057] #2200

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import com.wire.kalium.logic.data.team.Team
import com.wire.kalium.logic.data.team.TeamMapper
import com.wire.kalium.logic.data.user.type.DomainUserTypeMapper
import com.wire.kalium.logic.data.user.type.UserEntityTypeMapper
import com.wire.kalium.logic.data.user.type.isFederated
import com.wire.kalium.logic.di.MapperProvider
import com.wire.kalium.logic.failure.SelfUserDeleted
import com.wire.kalium.logic.feature.SelfTeamIdProvider
Expand Down Expand Up @@ -116,6 +115,7 @@ internal interface UserRepository {
* when backends stops federating.
*/
suspend fun defederateUser(userId: UserId): Either<CoreFailure, Unit>

// TODO: move to migration repo
suspend fun insertUsersIfUnknown(users: List<User>): Either<StorageFailure, Unit>
suspend fun fetchUserInfo(userId: UserId): Either<CoreFailure, Unit>
Expand Down Expand Up @@ -153,12 +153,12 @@ internal class UserDataSource internal constructor(
) : UserRepository {

/**
* In case of federated users, we need to refresh their info every time.
* Since the current backend implementation at wire does not emit user events across backends.
* Stores the last time a user's details were fetched from remote.
*
* This is an in-memory cache, to help avoid unnecessary requests in a time window.
* @see Event.User.Update
* @see USER_DETAILS_MAX_AGE
*/
private val federatedUsersExpirationCache = ConcurrentMap<UserId, Instant>()
private val userDetailsRefreshInstantCache = ConcurrentMap<UserId, Instant>()

override suspend fun fetchSelfUser(): Either<CoreFailure, Unit> = wrapApiRequest { selfApi.getSelfInfo() }
.flatMap { userDTO ->
Expand All @@ -184,20 +184,25 @@ internal class UserDataSource internal constructor(
.map { userEntity ->
userEntity?.let { publicUserMapper.fromUserEntityToOtherUser(userEntity) }
}.onEach { otherUser ->
processFederatedUserRefresh(userId, otherUser)
if (otherUser != null) {
refreshUserDetailsIfNeeded(userId)
}
}

/**
* Only in case of federated users and if it's expired or not cached, we fetch and refresh the user info.
* Only refresh user profiles if it wasn't fetched recently.
*
* @see userDetailsRefreshInstantCache
* @see USER_DETAILS_MAX_AGE
*/
private suspend fun processFederatedUserRefresh(userId: UserId, otherUser: OtherUser?) {
if (otherUser != null && otherUser.userType.isFederated()
&& federatedUsersExpirationCache[userId]?.let { DateTimeUtil.currentInstant() > it } != false
) {
private suspend fun refreshUserDetailsIfNeeded(userId: UserId) {
val now = DateTimeUtil.currentInstant()
val wasFetchedRecently = userDetailsRefreshInstantCache[userId]?.let { now < it + USER_DETAILS_MAX_AGE } ?: false
if (!wasFetchedRecently) {
fetchUserInfo(userId).also {
kaliumLogger.d("Federated user, refreshing user info from API after $FEDERATED_USER_TTL")
kaliumLogger.d("Federated user, refreshing user info from API after $USER_DETAILS_MAX_AGE")
}
federatedUsersExpirationCache[userId] = DateTimeUtil.currentInstant().plus(FEDERATED_USER_TTL)
userDetailsRefreshInstantCache[userId] = now
}
}

Expand Down Expand Up @@ -472,7 +477,16 @@ internal class UserDataSource internal constructor(

companion object {
internal const val SELF_USER_ID_KEY = "selfUserID"
internal val FEDERATED_USER_TTL = 5.minutes

/**
* Maximum age for user details.
*
* The USER_DETAILS_MAX_AGE constant represents the maximum age in minutes that user details can be considered valid. After
* this duration, the user details should be refreshed.
*
* This is needed because some users don't get `user.update` events, so we need to refresh their details every so often.
*/
internal val USER_DETAILS_MAX_AGE = 5.minutes
internal const val BATCH_SIZE = 500
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,9 @@ class UserRepositoryTest {
}

@Test
fun givenAKnownNOTFederatedUser_whenGettingFromDb_thenShouldNotRefreshItsDataFromAPI() = runTest {
fun givenAKnownUser_whenGettingFromDb_thenShouldRefreshItsDataFromAPI() = runTest {
val (arrangement, userRepository) = Arrangement()
.withUserDaoReturning(TestUser.ENTITY.copy(userType = UserTypeEntity.STANDARD))
.withUserDaoReturning(TestUser.ENTITY)
.withSuccessfulGetUsersInfo()
.arrange()

Expand All @@ -365,22 +365,22 @@ class UserRepositoryTest {
verify(arrangement.userDetailsApi)
.suspendFunction(arrangement.userDetailsApi::getUserInfo)
.with(any())
.wasNotInvoked()
.wasInvoked(exactly = once)
verify(arrangement.userDAO)
.suspendFunction(arrangement.userDAO::upsertTeamMembers)
.with(any())
.wasNotInvoked()
.wasInvoked(exactly = once)
verify(arrangement.userDAO)
.suspendFunction(arrangement.userDAO::upsertUsers)
.with(any())
.wasNotInvoked()
.wasInvoked(exactly = once)
}
}

@Test
fun givenAKnownFederatedUser_whenGettingFromDbAndCacheValid_thenShouldNOTRefreshItsDataFromAPI() = runTest {
fun givenAKnownUser_whenGettingFromDbAndCacheValid_thenShouldNOTRefreshItsDataFromAPI() = runTest {
val (arrangement, userRepository) = Arrangement()
.withUserDaoReturning(TestUser.ENTITY.copy(userType = UserTypeEntity.FEDERATED))
.withUserDaoReturning(TestUser.ENTITY)
.withSuccessfulGetUsersInfo()
.arrange()

Expand Down
Loading