Skip to content

Commit

Permalink
Funnel persons path (#5699)
Browse files Browse the repository at this point in the history
* initial implementation

* add basic test

* move timestamp select

* fix tests and types

* fix test
  • Loading branch information
EDsCODE authored Aug 25, 2021
1 parent c76cf20 commit bf51de1
Show file tree
Hide file tree
Showing 17 changed files with 176 additions and 35 deletions.
6 changes: 4 additions & 2 deletions ee/clickhouse/materialized_columns/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ def get_materialized_columns(table: str) -> Dict[PropertyName, ColumnName]:
""",
{"database": CLICKHOUSE_DATABASE, "table": table},
)

return {extract_property(comment): column_name for comment, column_name in rows}
if rows:
return {extract_property(comment): column_name for comment, column_name in rows}
else:
return {}


def materialize(table: str, property: str, distributed: bool = False) -> None:
Expand Down
4 changes: 2 additions & 2 deletions ee/clickhouse/queries/funnels/funnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ def get_step_counts_query(self):
breakdown_clause = self._get_breakdown_prop()

return f"""
SELECT person_id, steps {self._get_step_time_avgs(max_steps, inner_query=True)} {self._get_step_time_median(max_steps, inner_query=True)} {breakdown_clause} FROM (
SELECT person_id, steps, max(steps) over (PARTITION BY person_id {breakdown_clause}) as max_steps {self._get_step_time_names(max_steps)} {breakdown_clause} FROM (
SELECT person_id, steps {self._get_step_time_avgs(max_steps, inner_query=True)} {self._get_step_time_median(max_steps, inner_query=True)} {breakdown_clause}, argMax(timestamp, steps) as timestamp FROM (
SELECT person_id, steps, max(steps) over (PARTITION BY person_id {breakdown_clause}) as max_steps {self._get_step_time_names(max_steps)} {breakdown_clause}, timestamp FROM (
{steps_per_person_query}
)
) GROUP BY person_id, steps {breakdown_clause}
Expand Down
1 change: 1 addition & 0 deletions ee/clickhouse/queries/funnels/funnel_persons.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def get_query(self):
offset=self._filter.offset,
steps_per_person_query=self.get_step_counts_query(),
persons_steps=self._get_funnel_person_step_condition(),
timestamp=", timestamp",
)

def _format_results(self, results):
Expand Down
4 changes: 2 additions & 2 deletions ee/clickhouse/queries/funnels/funnel_strict.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ def get_step_counts_query(self):
breakdown_clause = self._get_breakdown_prop()

return f"""
SELECT person_id, steps {self._get_step_time_avgs(max_steps, inner_query=True)} {self._get_step_time_median(max_steps, inner_query=True)} {breakdown_clause} FROM (
SELECT person_id, steps, max(steps) over (PARTITION BY person_id {breakdown_clause}) as max_steps {self._get_step_time_names(max_steps)} {breakdown_clause} FROM (
SELECT person_id, steps {self._get_step_time_avgs(max_steps, inner_query=True)} {self._get_step_time_median(max_steps, inner_query=True)} {breakdown_clause}, argMax(timestamp, steps) as timestamp FROM (
SELECT person_id, steps, max(steps) over (PARTITION BY person_id {breakdown_clause}) as max_steps {self._get_step_time_names(max_steps)} {breakdown_clause}, timestamp FROM (
{steps_per_person_query}
)
) GROUP BY person_id, steps {breakdown_clause}
Expand Down
1 change: 1 addition & 0 deletions ee/clickhouse/queries/funnels/funnel_strict_persons.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def get_query(self):
offset=self._filter.offset,
steps_per_person_query=self.get_step_counts_query(),
persons_steps=self._get_funnel_person_step_condition(),
timestamp=", timestamp",
)

def _format_results(self, results):
Expand Down
1 change: 1 addition & 0 deletions ee/clickhouse/queries/funnels/funnel_trends_persons.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def get_query(self) -> str:
offset=self._filter.offset,
steps_per_person_query=step_counts_query,
persons_steps=did_not_reach_to_step_count_condition if drop_off else reached_to_step_count_condition,
timestamp="",
)

def _summarize_data(self, results):
Expand Down
4 changes: 2 additions & 2 deletions ee/clickhouse/queries/funnels/funnel_unordered.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ def get_step_counts_query(self):
breakdown_clause = self._get_breakdown_prop()

return f"""
SELECT person_id, steps {self._get_step_time_avgs(max_steps, inner_query=True)} {self._get_step_time_median(max_steps, inner_query=True)} {breakdown_clause} FROM (
SELECT person_id, steps, max(steps) over (PARTITION BY person_id {breakdown_clause}) as max_steps {self._get_step_time_names(max_steps)} {breakdown_clause} FROM (
SELECT person_id, steps {self._get_step_time_avgs(max_steps, inner_query=True)} {self._get_step_time_median(max_steps, inner_query=True)} {breakdown_clause}, argMax(timestamp, steps) as timestamp FROM (
SELECT person_id, steps, max(steps) over (PARTITION BY person_id {breakdown_clause}) as max_steps {self._get_step_time_names(max_steps)} {breakdown_clause}, timestamp FROM (
{union_query}
)
) GROUP BY person_id, steps {breakdown_clause}
Expand Down
1 change: 1 addition & 0 deletions ee/clickhouse/queries/funnels/funnel_unordered_persons.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def get_query(self):
offset=self._filter.offset,
steps_per_person_query=self.get_step_counts_query(),
persons_steps=self._get_funnel_person_step_condition(),
timestamp=", timestamp",
)

def _format_results(self, results):
Expand Down
28 changes: 28 additions & 0 deletions ee/clickhouse/queries/paths/path_event_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,39 @@
from ee.clickhouse.models.property import get_property_string_expr
from ee.clickhouse.queries.event_query import ClickhouseEventQuery
from posthog.constants import AUTOCAPTURE_EVENT, PAGEVIEW_EVENT, SCREEN_EVENT
from posthog.models.filters.path_filter import PathFilter


class PathEventQuery(ClickhouseEventQuery):
FUNNEL_PERSONS_ALIAS = "funnel_persons"
_filter: PathFilter

def __init__(
self,
filter: PathFilter,
team_id: int,
round_interval=False,
should_join_distinct_ids=False,
should_join_persons=False,
**kwargs,
) -> None:
super().__init__(filter, team_id, round_interval, should_join_distinct_ids, should_join_persons, **kwargs)

def get_query(self) -> Tuple[str, Dict[str, Any]]:

funnel_paths_timestamp = ""
funnel_paths_join = ""
funnel_paths_filter = ""

if self._filter.funnel_paths:
funnel_paths_timestamp = f", {self.FUNNEL_PERSONS_ALIAS}.timestamp as min_timestamp"
funnel_paths_join = f"JOIN {self.FUNNEL_PERSONS_ALIAS} ON {self.FUNNEL_PERSONS_ALIAS}.person_id = {self.DISTINCT_ID_TABLE_ALIAS}.person_id"
funnel_paths_filter = f"AND {self.EVENT_TABLE_ALIAS}.timestamp >= min_timestamp"

_fields = (
f"{self.EVENT_TABLE_ALIAS}.timestamp AS timestamp, if(event = %(screen)s, {self._get_screen_name_parsing()}, if({self.EVENT_TABLE_ALIAS}.event = %(pageview)s, {self._get_current_url_parsing()}, if({self.EVENT_TABLE_ALIAS}.event = %(autocapture)s, concat('autocapture:', {self.EVENT_TABLE_ALIAS}.elements_chain), {self.EVENT_TABLE_ALIAS}.event))) AS path_item"
+ (f", {self.DISTINCT_ID_TABLE_ALIAS}.person_id as person_id" if self._should_join_distinct_ids else "")
+ funnel_paths_timestamp
)

date_query, date_params = self._get_date_filter()
Expand All @@ -23,10 +49,12 @@ def get_query(self) -> Tuple[str, Dict[str, Any]]:
SELECT {_fields} FROM events {self.EVENT_TABLE_ALIAS}
{self._get_disintct_id_query()}
{self._get_person_query()}
{funnel_paths_join}
WHERE team_id = %(team_id)s
AND (event = %(pageview)s OR event = %(screen)s OR event = %(autocapture)s OR NOT event LIKE %(custom_event_match)s)
{date_query}
{prop_query}
{funnel_paths_filter}
ORDER BY {self.DISTINCT_ID_TABLE_ALIAS}.person_id, {self.EVENT_TABLE_ALIAS}.timestamp
"""
self.params.update(
Expand Down
38 changes: 31 additions & 7 deletions ee/clickhouse/queries/paths/paths.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from abc import ABC
from typing import Dict, List, Tuple
from typing import Dict, List, Optional, Tuple

import sqlparse

from ee.clickhouse.client import sync_execute
from ee.clickhouse.queries.funnels.funnel_persons import ClickhouseFunnelPersons
from ee.clickhouse.queries.paths.path_event_query import PathEventQuery
from ee.clickhouse.sql.paths.path import PATH_ARRAY_QUERY
from posthog.constants import LIMIT
Expand All @@ -15,9 +18,10 @@

class ClickhousePathsNew:
_filter: PathFilter
_funnel_filter: Optional[Filter]
_team: Team

def __init__(self, filter: PathFilter, team: Team) -> None:
def __init__(self, filter: PathFilter, team: Team, funnel_filter: Optional[Filter] = None) -> None:
self._filter = filter
self._team = team
self.params = {
Expand All @@ -27,6 +31,7 @@ def __init__(self, filter: PathFilter, team: Team) -> None:
"session_time_threshold": SESSION_TIME_THRESHOLD_DEFAULT,
"autocapture_match": "%autocapture:%",
}
self._funnel_filter = funnel_filter

def run(self, *args, **kwargs):

Expand All @@ -43,19 +48,38 @@ def _format_results(self, results):
return resp

def _exec_query(self) -> List[Tuple]:
query, _ = self.get_query()
query = self.get_query()
return sync_execute(query, self.params)

def get_query(self) -> Tuple[str, dict]:
def get_query(self) -> str:

if self._filter.funnel_paths and self._funnel_filter:
return self.get_path_query_by_funnel(funnel_filter=self._funnel_filter)
else:
return self.get_path_query()

def get_path_query(self) -> str:
path_event_query, params = PathEventQuery(filter=self._filter, team_id=self._team.pk).get_query()
self.params.update(params)

boundary_event_filter, start_params = self.get_start_point_filter()
self.params.update(start_params)
return (
PATH_ARRAY_QUERY.format(path_event_query=path_event_query, boundary_event_filter=boundary_event_filter),
self.params,
return PATH_ARRAY_QUERY.format(path_event_query=path_event_query, boundary_event_filter=boundary_event_filter)

def get_path_query_by_funnel(self, funnel_filter: Filter):
path_query = self.get_path_query()
funnel_persons_generator = ClickhouseFunnelPersons(funnel_filter, self._team)
funnel_persons_query = funnel_persons_generator.get_query()
funnel_persons_query_new_params = funnel_persons_query.replace("%(", "%(funnel_")
funnel_persons_param = funnel_persons_generator.params
new_funnel_params = {"funnel_" + str(key): val for key, val in funnel_persons_param.items()}
self.params.update(new_funnel_params)
return f"""
WITH {PathEventQuery.FUNNEL_PERSONS_ALIAS} AS (
{funnel_persons_query_new_params}
)
{path_query}
"""

def get_start_point_filter(self) -> Tuple[str, Dict]:

Expand Down
77 changes: 71 additions & 6 deletions ee/clickhouse/queries/test/test_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from ee.clickhouse.queries.clickhouse_paths import ClickhousePaths
from ee.clickhouse.queries.paths.paths import ClickhousePathsNew
from ee.clickhouse.util import ClickhouseTestMixin
from posthog.constants import PAGEVIEW_EVENT, SCREEN_EVENT
from posthog.models.filters.path_filter import PathFilter
from posthog.constants import INSIGHT_FUNNELS, PAGEVIEW_EVENT, SCREEN_EVENT
from posthog.models.filters import Filter, PathFilter
from posthog.models.person import Person
from posthog.queries.test.test_paths import paths_test_factory

Expand All @@ -27,10 +27,11 @@ def test_denormalized_properties(self):
materialize("events", "$current_url")
materialize("events", "$screen_name")

query, _ = ClickhousePathsNew(team=self.team, filter=PathFilter(data={"path_type": PAGEVIEW_EVENT})).get_query()
filter = PathFilter(data={"path_type": PAGEVIEW_EVENT})
query, _ = ClickhousePaths(team=self.team, filter=filter).get_query(team=self.team, filter=filter)
self.assertNotIn("json", query.lower())

query, _ = ClickhousePathsNew(team=self.team, filter=PathFilter(data={"path_type": SCREEN_EVENT})).get_query()
query, _ = ClickhousePaths(team=self.team, filter=filter).get_query(team=self.team, filter=filter)
self.assertNotIn("json", query.lower())

self.test_current_url_paths_and_logic()
Expand All @@ -41,10 +42,10 @@ def test_denormalized_properties(self):
materialize("events", "$current_url")
materialize("events", "$screen_name")

query, _ = ClickhousePathsNew(team=self.team, filter=PathFilter(data={"path_type": PAGEVIEW_EVENT})).get_query()
query = ClickhousePathsNew(team=self.team, filter=PathFilter(data={"path_type": PAGEVIEW_EVENT})).get_query()
self.assertNotIn("json", query.lower())

query, _ = ClickhousePathsNew(team=self.team, filter=PathFilter(data={"path_type": SCREEN_EVENT})).get_query()
query = ClickhousePathsNew(team=self.team, filter=PathFilter(data={"path_type": SCREEN_EVENT})).get_query()
self.assertNotIn("json", query.lower())

self.test_current_url_paths_and_logic()
Expand Down Expand Up @@ -193,3 +194,67 @@ def test_path_event_ordering(self):
{"source": "3_step three", "target": "4_step branch", "value": 25, "average_conversion_time": 60000.0},
],
)

def _create_sample_data_multiple_dropoffs(self):
for i in range(5):
Person.objects.create(distinct_ids=[f"user_{i}"], team=self.team)
_create_event(event="step one", distinct_id=f"user_{i}", team=self.team, timestamp="2021-05-01 00:00:00")
_create_event(event="step two", distinct_id=f"user_{i}", team=self.team, timestamp="2021-05-03 00:00:00")
_create_event(event="step three", distinct_id=f"user_{i}", team=self.team, timestamp="2021-05-05 00:00:00")

for i in range(5, 15):
Person.objects.create(distinct_ids=[f"user_{i}"], team=self.team)
_create_event(event="step one", distinct_id=f"user_{i}", team=self.team, timestamp="2021-05-01 00:00:00")
_create_event(event="step two", distinct_id=f"user_{i}", team=self.team, timestamp="2021-05-03 00:00:00")

for i in range(15, 35):
Person.objects.create(distinct_ids=[f"user_{i}"], team=self.team)
_create_event(event="step one", distinct_id=f"user_{i}", team=self.team, timestamp="2021-05-01 00:00:00")
_create_event(
event="step dropoff1", distinct_id=f"user_{i}", team=self.team, timestamp="2021-05-01 00:01:00"
)
_create_event(
event="step dropoff2", distinct_id=f"user_{i}", team=self.team, timestamp="2021-05-01 00:02:00"
)
if i % 2 == 0:
_create_event(
event="step branch", distinct_id=f"user_{i}", team=self.team, timestamp="2021-05-01 00:03:00"
)

def test_path_by_funnel(self):
self._create_sample_data_multiple_dropoffs()
data = {
"insight": INSIGHT_FUNNELS,
"funnel_paths": True,
"interval": "day",
"date_from": "2021-05-01 00:00:00",
"date_to": "2021-05-07 00:00:00",
"funnel_window_days": 7,
"funnel_step": -2,
"events": [
{"id": "step one", "order": 0},
{"id": "step two", "order": 1},
{"id": "step three", "order": 2},
],
}
funnel_filter = Filter(data=data)
path_filter = PathFilter(data=data)
response = ClickhousePathsNew(team=self.team, filter=path_filter, funnel_filter=funnel_filter).run()
self.assertEqual(
response,
[
{"source": "1_step one", "target": "2_step dropoff1", "value": 20, "average_conversion_time": 60000.0},
{
"source": "2_step dropoff1",
"target": "3_step dropoff2",
"value": 20,
"average_conversion_time": 60000.0,
},
{
"source": "3_step dropoff2",
"target": "4_step branch",
"value": 10,
"average_conversion_time": 60000.0,
},
],
)
2 changes: 1 addition & 1 deletion ee/clickhouse/sql/funnels/funnel.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
FUNNEL_PERSONS_BY_STEP_SQL = """
SELECT person_id
SELECT person_id {timestamp}
FROM (
{steps_per_person_query}
)
Expand Down
1 change: 1 addition & 0 deletions posthog/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
BIN_COUNT = "bin_count"
ENTRANCE_PERIOD_START = "entrance_period_start"
DROP_OFF = "drop_off"
FUNNEL_PATHS = "funnel_paths"


class FunnelOrderType(str, Enum):
Expand Down
14 changes: 3 additions & 11 deletions posthog/models/filters/mixins/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
)
from posthog.models.entity import Entity, ExclusionEntity
from posthog.models.filters.mixins.base import BaseParamMixin, BreakdownType, IntervalType
from posthog.models.filters.mixins.utils import cached_property, include_dict
from posthog.utils import relative_date_parse, str_to_bool
from posthog.models.filters.mixins.utils import cached_property, include_dict, process_bool
from posthog.utils import relative_date_parse

ALLOWED_FORMULA_CHARACTERS = r"([a-zA-Z \-\*\^0-9\+\/\(\)]+)"

Expand Down Expand Up @@ -226,18 +226,10 @@ def limit_to_dict(self):


class CompareMixin(BaseParamMixin):
def _process_compare(self, compare: Optional[Union[str, bool]]) -> bool:
if isinstance(compare, bool):
return compare
elif isinstance(compare, str):
return str_to_bool(compare)
else:
return False

@cached_property
def compare(self) -> bool:
_compare = self._data.get(COMPARE, None)
return self._process_compare(_compare)
return process_bool(_compare)

@include_dict
def compare_to_dict(self):
Expand Down
14 changes: 13 additions & 1 deletion posthog/models/filters/mixins/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
from posthog.constants import (
AUTOCAPTURE_EVENT,
CUSTOM_EVENT,
FUNNEL_PATHS,
PAGEVIEW_EVENT,
PATH_TYPE,
SCREEN_EVENT,
START_POINT,
STEP_LIMIT,
)
from posthog.models.filters.mixins.common import BaseParamMixin
from posthog.models.filters.mixins.utils import cached_property, include_dict
from posthog.models.filters.mixins.utils import cached_property, include_dict, process_bool

PathType = Literal["$pageview", "$autocapture", "$screen", "custom_event"]

Expand Down Expand Up @@ -82,3 +83,14 @@ def step_limit(self) -> Optional[str]:
@include_dict
def step_limit_to_dict(self):
return {"step_limit": self.step_limit} if self.step_limit else {}


class FunnelPathsMixin(BaseParamMixin):
@cached_property
def funnel_paths(self) -> bool:
_funnel_paths = self._data.get(FUNNEL_PATHS, None)
return process_bool(_funnel_paths)

@include_dict
def funnel_paths_to_dict(self):
return {"funnel_paths": self.funnel_paths} if self.funnel_paths else {}
Loading

0 comments on commit bf51de1

Please sign in to comment.