Skip to content

Commit

Permalink
Revert "perf(funnels): add filter on pdi.team_id to speed up query
Browse files Browse the repository at this point in the history
(#5997)"

This merged caused an issue with the funnels endpoint when display was
set to `ActionsLineGraph`

I'm reverting so we can add in a snapshot test that will fail when this
revert is reverted.

This reverts commit 2fb7cf8.
  • Loading branch information
Harry Waye committed Oct 19, 2021
1 parent f0e6776 commit 49761a3
Showing 1 changed file with 27 additions and 49 deletions.
76 changes: 27 additions & 49 deletions posthog/queries/funnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ def _gen_lateral_bodies(self, within_time: Optional[str] = None):
# not after `ON pdi.distinct_id = posthog_event.distinct_id`
r'FROM "posthog_event"( [A-Z][0-9])?',
r"FROM posthog_event\1 JOIN posthog_persondistinctid pdi "
#  NOTE: here we are joining on the unique identifier of the
#  persondistinctid table, i.e. (team_id, distinct_id)
r"ON pdi.distinct_id = posthog_event.distinct_id AND pdi.team_id = posthog_event.team_id",
r"ON pdi.distinct_id = posthog_event.distinct_id",
event_string,
)
query = sql.SQL(event_string)
Expand All @@ -101,23 +99,40 @@ def _serialize_step(self, step: Entity, count: int, people: Optional[List[uuid.U
"type": step.type,
}

def _build_query(self, within_time: Optional[str] = None):
def _build_query(self, query_bodies: dict):
"""Build query using lateral joins using a combination of Django generated SQL
and sql built using psycopg2
and sql built using psycopg2
"""
query_bodies = self._gen_lateral_bodies(within_time=within_time)

ON_TRUE = "ON TRUE"
LEFT_JOIN_LATERAL = "LEFT JOIN LATERAL"
QUERY_HEADER = "SELECT {people}, {fields} FROM "
LAT_JOIN_BODY = (
"""({query}) {step} {on_true} {join}""" if len(query_bodies) > 1 else """({query}) {step} {on_true} """
)
PERSON_FIELDS = [
[sql.Identifier("posthog_person"), sql.Identifier("uuid")],
[sql.Identifier("posthog_person"), sql.Identifier("created_at")],
[sql.Identifier("posthog_person"), sql.Identifier("team_id")],
[sql.Identifier("posthog_person"), sql.Identifier("properties")],
[sql.Identifier("posthog_person"), sql.Identifier("is_user_id")],
]
QUERY_FOOTER = sql.SQL(
"""
JOIN posthog_person ON posthog_person.id = {step0}.person_id
WHERE {step0}.person_id IS NOT NULL
GROUP BY {group_by}"""
)

person_fields = sql.SQL(",").join([sql.SQL(".").join(col) for col in PERSON_FIELDS])

steps = [sql.Identifier(step) for step, _ in query_bodies.items()]
steps = [sql.Identifier(step) for step, query in query_bodies.items()]
select_steps = [
sql.Composed([step, sql.SQL("."), sql.Identifier("step_ts"), sql.SQL(" as "), step,]) for step in steps
sql.Composed([sql.SQL("MIN("), step, sql.SQL("."), sql.Identifier("step_ts"), sql.SQL(") as "), step,])
for step in steps
]
lateral_joins = []
query = sql.SQL(QUERY_HEADER).format(fields=sql.SQL(",").join(select_steps), people=person_fields)
i = 0
for step, qb in query_bodies.items():
if i > 0:
Expand Down Expand Up @@ -150,29 +165,8 @@ def _build_query(self, within_time: Optional[str] = None):
)
lateral_joins.append(base_body)
i += 1

event_chain_query = sql.SQL(" ").join(lateral_joins).as_string(connection.connection)

query = f"""
SELECT
DISTINCT ON (person.id)
person.uuid,
person.created_at,
person.team_id,
person.properties,
person.is_user_id,
{sql.SQL(",").join(select_steps).as_string(connection.connection)}
FROM posthog_person person
JOIN posthog_persondistinctid pdi ON pdi.person_id = person.id
JOIN {event_chain_query}
-- join on person_id for the first event.
-- NOTE: there is some implicit coupling here in that I am
-- assuming the name of the first event select is "step_0".
-- Maybe worth cleaning up in the future
ON person.id = step_0.person_id
WHERE person.team_id = {self._team.pk} AND person.id IS NOT NULL
ORDER BY person.id, step_0.step_ts ASC
"""
query_footer = QUERY_FOOTER.format(step0=steps[0], group_by=person_fields)
query = query + sql.SQL(" ").join(lateral_joins) + query_footer
return query

def _build_trends_query(self, filter: Filter) -> sql.SQL:
Expand All @@ -192,7 +186,7 @@ def _build_trends_query(self, filter: Filter) -> sql.SQL:
).format(
interval=sql.Literal(filter.interval),
particular_steps=sql.SQL(",\n").join(particular_steps),
steps_query=self._build_query(within_time="'1 day'"),
steps_query=self._build_query(self._gen_lateral_bodies(within_time="'1 day'")),
interval_field=sql.SQL("step_0")
if filter.interval != "week"
else sql.SQL("(\"step_0\" + interval '1 day') AT TIME ZONE 'UTC'"),
Expand Down Expand Up @@ -283,30 +277,14 @@ def data_to_return(self, results: List[Person]) -> List[Dict[str, Any]]:
return steps

def run(self, *args, **kwargs) -> List[Dict[str, Any]]:
"""
Builds and runs a query to get all persons that have been in the funnel
steps defined by `self._filter.entities`. For example, entities may be
defined as:
1. event with event name "user signed up"
2. event with event name "user looked at report"
For a person to match they have to have gone through all `entities` in
order. We also only return one such chain of entities, the earliest one
we find.
"""

# If no steps are defined, then there's no point in querying the database
if len(self._filter.entities) == 0:
return []

if self._filter.display == TRENDS_LINEAR:
return self._get_trends()

with connection.cursor() as cursor:
# Then we build a query to query for them in order
qstring = self._build_query(within_time=None)

qstring = self._build_query(self._gen_lateral_bodies()).as_string(cursor.connection)
cursor.execute(qstring)
results = namedtuplefetchall(cursor)
return self.data_to_return(results)
Expand Down

0 comments on commit 49761a3

Please sign in to comment.