Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Cohort analytics #3163

Merged
merged 13 commits into from
May 16, 2018
Merged
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,4 @@ env/
*.config

.vscode/
.ropeproject/
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated to this PR just want to ignore Atom python ide

12 changes: 12 additions & 0 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import logging
import os
import sys
import datetime

import synapse
import synapse.config.logger
Expand Down Expand Up @@ -475,6 +476,17 @@ def performance_stats_init():
" changes across releases."
)

def generate_user_daily_visit_stats():
hs.get_datastore().generate_user_daily_visits()

def recurring_user_daily_visit_stats():
clock.looping_call(generate_user_daily_visit_stats, 60 * 60 * 1000)

# Rather than update on per session basis, batch up the requests.
# If you increase the loop period, the accuracy of user_daily_visits
# table will decrease
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)

if hs.config.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals")
clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
Expand Down
68 changes: 64 additions & 4 deletions synapse/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
from dateutil import tz
import time
import logging

from synapse.storage.devices import DeviceStore
from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore
Expand Down Expand Up @@ -55,10 +60,6 @@
from synapse.api.constants import PresenceState
from synapse.util.caches.stream_change_cache import StreamChangeCache


import logging


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -213,6 +214,9 @@ def __init__(self, db_conn, hs):
self._stream_order_on_start = self.get_room_max_stream_ordering()
self._min_stream_order_on_start = self.get_room_min_stream_ordering()

# Used in _generate_user_daily_visits to keep track of progress
self._last_user_visit_update = self._get_start_of_day()

super(DataStore, self).__init__(db_conn, hs)

def take_presence_startup_info(self):
Expand Down Expand Up @@ -347,6 +351,62 @@ def _count_r30_users(txn):

return self.runInteraction("count_r30_users", _count_r30_users)

def _get_start_of_day(self):
"""
Returns millisecond unixtime for start of UTC day.
"""
now = datetime.datetime.utcnow()
today_start = datetime.datetime(now.year, now.month,
now.day, tzinfo=tz.tzutc())
return int(time.mktime(today_start.timetuple())) * 1000

def generate_user_daily_visits(self):
"""
Generates daily visit data for use in cohort/ retention analysis
"""
def _generate_user_daily_visits(txn):
logger.info("Calling _generate_user_daily_visits")
today_start = self._get_start_of_day()
a_day_in_milliseconds = 24 * 60 * 60 * 1000

sql = """
INSERT INTO user_daily_visits (user_id, device_id, timestamp)
SELECT u.user_id, u.device_id, ?
FROM user_ips AS u
LEFT JOIN (
SELECT user_id, device_id, timestamp FROM user_daily_visits
WHERE timestamp IS ?
) udv
ON u.user_id = udv.user_id AND u.device_id=udv.device_id
WHERE last_seen > ? AND last_seen <= ? AND udv.timestamp IS NULL
"""

# This means that the day has rolled over but there could still
# be entries from the previous day. There is an edge case
# where if the user logs in at 23:59 and overwrites their
# last_seen at 00:01 then they will not be counted in the
# previous day's stats - it is important that the query is run
# to minimise this case.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean:

it is important that the query is run often...

if today_start > self._last_user_visit_update:
yesterday_start = today_start - a_day_in_milliseconds
txn.execute(sql, (yesterday_start, yesterday_start,
self._last_user_visit_update, today_start))
self._last_user_visit_update = today_start

txn.execute(sql, (today_start, today_start,
self._last_user_visit_update,
today_start + a_day_in_milliseconds))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(The identing style we use tends to not use this continuation indent. See other comment for how it normally looks)

# Update _last_user_visit_update to now. The reason to do this
# rather just clamping to the beginning of the day is to limit
# the size of the join - meaning that the query can be run more
# frequently

now = datetime.datetime.utcnow()
self._last_user_visit_update = int(time.mktime(now.timetuple())) * 1000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just use self.clock.time_msec()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, You probably want to get this before the query, so you do:

now = self.clock.time_msec()
txn.execute(sql, (
    today_start, today_start,
    self._last_user_visit_update,
    now,
))
self._last_user_visit_update = now

This ensures there isn't going to be any overlaps or missed updates.


return self.runInteraction("generate_user_daily_visits",
_generate_user_daily_visits)

def get_users(self):
"""Function to reterive a list of users in users table.

Expand Down
7 changes: 7 additions & 0 deletions synapse/storage/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ def __init__(self, db_conn, hs):
columns=["user_id", "last_seen"],
)

self.register_background_index_update(
"user_ips_last_seen_only_index",
index_name="user_ips_last_seen_only",
table="user_ips",
columns=["last_seen"],
)

# (user_id, access_token, ip) -> (user_agent, device_id, last_seen)
self._batch_row_update = {}

Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/prepare_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 48
SCHEMA_VERSION = 49

dir_path = os.path.abspath(os.path.dirname(__file__))

Expand Down
21 changes: 21 additions & 0 deletions synapse/storage/schema/delta/49/add_user_daily_visits.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


CREATE TABLE user_daily_visits ( user_id TEXT NOT NULL,
device_id TEXT,
timestamp BIGINT NOT NULL );
CREATE INDEX user_daily_visits_uts_idx ON user_daily_visits(user_id, timestamp);
CREATE INDEX user_daily_visits_ts_idx ON user_daily_visits(timestamp);
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

INSERT into background_updates (update_name, progress_json)
VALUES ('user_ips_last_seen_only_index', '{}');