Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(funnels): add filter on pdi.team_id to speed up query #5997

Merged
merged 4 commits into from
Sep 20, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 49 additions & 27 deletions posthog/queries/funnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ def _gen_lateral_bodies(self, within_time: Optional[str] = None):
# not after `ON pdi.distinct_id = posthog_event.distinct_id`
r'FROM "posthog_event"( [A-Z][0-9])?',
r"FROM posthog_event\1 JOIN posthog_persondistinctid pdi "
r"ON pdi.distinct_id = posthog_event.distinct_id",
#  NOTE: here we are joining on the unique identifier of the
#  persondistinctid table, i.e. (team_id, distinct_id)
r"ON pdi.distinct_id = posthog_event.distinct_id AND pdi.team_id = posthog_event.team_id",
event_string,
)
query = sql.SQL(event_string)
Expand All @@ -104,40 +106,23 @@ def _serialize_step(self, step: Entity, count: int, people: Optional[List[uuid.U
"type": step.type,
}

def _build_query(self, query_bodies: dict):
def _build_query(self, within_time: Optional[str] = None):
"""Build query using lateral joins using a combination of Django generated SQL
and sql built using psycopg2
and sql built using psycopg2
"""
query_bodies = self._gen_lateral_bodies(within_time=within_time)

ON_TRUE = "ON TRUE"
LEFT_JOIN_LATERAL = "LEFT JOIN LATERAL"
QUERY_HEADER = "SELECT {people}, {fields} FROM "
LAT_JOIN_BODY = (
"""({query}) {step} {on_true} {join}""" if len(query_bodies) > 1 else """({query}) {step} {on_true} """
)
PERSON_FIELDS = [
[sql.Identifier("posthog_person"), sql.Identifier("uuid")],
[sql.Identifier("posthog_person"), sql.Identifier("created_at")],
[sql.Identifier("posthog_person"), sql.Identifier("team_id")],
[sql.Identifier("posthog_person"), sql.Identifier("properties")],
[sql.Identifier("posthog_person"), sql.Identifier("is_user_id")],
]
QUERY_FOOTER = sql.SQL(
"""
JOIN posthog_person ON posthog_person.id = {step0}.person_id
WHERE {step0}.person_id IS NOT NULL
GROUP BY {group_by}"""
)

person_fields = sql.SQL(",").join([sql.SQL(".").join(col) for col in PERSON_FIELDS])

steps = [sql.Identifier(step) for step, query in query_bodies.items()]
steps = [sql.Identifier(step) for step, _ in query_bodies.items()]
select_steps = [
sql.Composed([sql.SQL("MIN("), step, sql.SQL("."), sql.Identifier("step_ts"), sql.SQL(") as "), step,])
for step in steps
sql.Composed([step, sql.SQL("."), sql.Identifier("step_ts"), sql.SQL(" as "), step,]) for step in steps
]
lateral_joins = []
query = sql.SQL(QUERY_HEADER).format(fields=sql.SQL(",").join(select_steps), people=person_fields)
i = 0
for step, qb in query_bodies.items():
if i > 0:
Expand Down Expand Up @@ -170,8 +155,29 @@ def _build_query(self, query_bodies: dict):
)
lateral_joins.append(base_body)
i += 1
query_footer = QUERY_FOOTER.format(step0=steps[0], group_by=person_fields)
query = query + sql.SQL(" ").join(lateral_joins) + query_footer

event_chain_query = sql.SQL(" ").join(lateral_joins).as_string(connection.connection)

query = f"""
SELECT
DISTINCT ON (person.id)
person.uuid,
person.created_at,
person.team_id,
person.properties,
person.is_user_id,
{sql.SQL(",").join(select_steps).as_string(connection.connection)}
FROM posthog_person person
JOIN posthog_persondistinctid pdi ON pdi.person_id = person.id
JOIN {event_chain_query}
-- join on person_id for the first event.
-- NOTE: there is some implicit coupling here in that I am
-- assuming the name of the first event select is "step_0".
-- Maybe worth cleaning up in the future
ON person.id = step_0.person_id
WHERE person.team_id = {self._team.pk} AND person.id IS NOT NULL
ORDER BY person.id, step_0.step_ts ASC
"""
return query

def _build_trends_query(self, filter: Filter) -> sql.SQL:
Expand All @@ -191,7 +197,7 @@ def _build_trends_query(self, filter: Filter) -> sql.SQL:
).format(
interval=sql.Literal(filter.interval),
particular_steps=sql.SQL(",\n").join(particular_steps),
steps_query=self._build_query(self._gen_lateral_bodies(within_time="'1 day'")),
steps_query=self._build_query(within_time="'1 day'"),
interval_field=sql.SQL("step_0")
if filter.interval != "week"
else sql.SQL("(\"step_0\" + interval '1 day') AT TIME ZONE 'UTC'"),
Expand Down Expand Up @@ -282,14 +288,30 @@ def data_to_return(self, results: List[Person]) -> List[Dict[str, Any]]:
return steps

def run(self, *args, **kwargs) -> List[Dict[str, Any]]:
"""
Builds and runs a query to get all persons that have been in the funnel
steps defined by `self._filter.entities`. For example, entities may be
defined as:

1. event with event name "user signed up"
2. event with event name "user looked at report"

For a person to match they have to have gone through all `entities` in
order. We also only return one such chain of entities, the earliest one
we find.
"""

# If no steps are defined, then there's no point in querying the database
if len(self._filter.entities) == 0:
return []

if self._filter.display == TRENDS_LINEAR:
return self._get_trends()

with connection.cursor() as cursor:
qstring = self._build_query(self._gen_lateral_bodies()).as_string(cursor.connection)
# Then we build a query to query for them in order
qstring = self._build_query(within_time=None)

cursor.execute(qstring)
results = namedtuplefetchall(cursor)
return self.data_to_return(results)
Expand Down