Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload static cohort csv #2932

Merged
merged 18 commits into from
Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cypress/integration/cohorts.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ describe('Cohorts', () => {

// go to create a new cohort
cy.get('[data-attr="create-cohort"]').click()
cy.get('[data-attr="cohort-choice-definition"]').click()
cy.get('[data-attr="cohort-name"]').type('Test Cohort')

// select "add filter" and "property"
Expand Down
8 changes: 8 additions & 0 deletions ee/clickhouse/migrations/0007_static_cohorts_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from infi.clickhouse_orm import migrations

from ee.clickhouse.sql.person import KAFKA_PERSON_STATIC_COHORT_TABLE_SQL, PERSON_STATIC_COHORT_TABLE_SQL

operations = [
migrations.RunSQL(PERSON_STATIC_COHORT_TABLE_SQL),
migrations.RunSQL(KAFKA_PERSON_STATIC_COHORT_TABLE_SQL),
]
34 changes: 32 additions & 2 deletions ee/clickhouse/models/cohort.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,33 @@
from typing import Any, Dict, Tuple
import uuid
from typing import Any, Dict, List, Optional, Tuple

from ee.clickhouse.client import sync_execute
from ee.clickhouse.models.action import format_action_filter
from ee.clickhouse.sql.cohort import CALCULATE_COHORT_PEOPLE_SQL
from ee.clickhouse.sql.person import GET_LATEST_PERSON_ID_SQL, GET_PERSON_IDS_BY_FILTER
from ee.clickhouse.sql.person import (
GET_LATEST_PERSON_ID_SQL,
GET_PERSON_IDS_BY_FILTER,
INSERT_PERSON_STATIC_COHORT,
PERSON_STATIC_COHORT_TABLE,
)
from ee.idl.gen import person_static_cohort_pb2
from ee.kafka_client.client import ClickhouseProducer
from ee.kafka_client.topics import KAFKA_PERSON_STATIC_COHORT
from posthog.models import Action, Cohort, Filter, Team


def format_person_query(cohort: Cohort) -> Tuple[str, Dict[str, Any]]:
filters = []
params: Dict[str, Any] = {}

if cohort.is_static:
return (
"person_id IN (SELECT person_id FROM {} WHERE cohort_id = %(cohort_id)s AND team_id = %(team_id)s)".format(
PERSON_STATIC_COHORT_TABLE
),
{"cohort_id": cohort.pk, "team_id": cohort.team_id},
)

for group_idx, group in enumerate(cohort.groups):
if group.get("action_id"):
action = Action.objects.get(pk=group["action_id"], team_id=cohort.team.pk)
Expand Down Expand Up @@ -52,3 +70,15 @@ def get_person_ids_by_cohort_id(team: Team, cohort_id: int):
results = sync_execute(GET_PERSON_IDS_BY_FILTER.format(distinct_query=filter_query, query=""), filter_params)

return [str(row[0]) for row in results]


def insert_static_cohort(person_uuids: List[Optional[uuid.UUID]], cohort_id: int, team: Team):
for person_uuid in person_uuids:
pb_event = person_static_cohort_pb2.PersonStaticCohort()
pb_event.id = str(uuid.uuid4())
pb_event.person_id = str(person_uuid)
pb_event.cohort_id = cohort_id
pb_event.team_id = team.pk

p = ClickhouseProducer()
p.produce_proto(sql=INSERT_PERSON_STATIC_COHORT, topic=KAFKA_PERSON_STATIC_COHORT, data=pb_event)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is perfect for a proto encoded table, but the table you have defined is json encoded.

Check out producing for persons to see what producing to a json table looks like
https://github.com/PostHog/posthog/blob/master/ee/clickhouse/models/person.py#L74

26 changes: 26 additions & 0 deletions ee/clickhouse/models/test/test_cohort.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from posthog.models.filters import Filter
from posthog.models.organization import Organization
from posthog.models.person import Person
from posthog.models.team import Team
from posthog.models.utils import UUIDT
from posthog.test.base import BaseTest

Expand Down Expand Up @@ -192,3 +193,28 @@ def test_cohort_get_person_ids_by_cohort_id(self):
self.assertEqual(len(results), 2)
self.assertIn(user1.uuid, results)
self.assertIn(user3.uuid, results)

def test_insert_by_distinct_id_or_email(self):
Person.objects.create(team_id=self.team.pk, properties={"email": "[email protected]"}, distinct_ids=["1"])
Person.objects.create(team_id=self.team.pk, distinct_ids=["123"])
Person.objects.create(team_id=self.team.pk, distinct_ids=["2"])
# Team leakage
team2 = Team.objects.create(organization=self.organization)
Person.objects.create(team=team2, properties={"email": "[email protected]"})

cohort = Cohort.objects.create(team=self.team, groups=[], is_static=True)
cohort.insert_users_by_list(["[email protected]", "123"])
cohort = Cohort.objects.get()
results = get_person_ids_by_cohort_id(self.team, cohort.id)
self.assertEqual(len(results), 2)
self.assertEqual(cohort.is_calculating, False)

#  If we accidentally call calculate_people it shouldn't erase people
cohort.calculate_people()
results = get_person_ids_by_cohort_id(self.team, cohort.id)
self.assertEqual(len(results), 2)

# if we add people again, don't increase the number of people in cohort
cohort.insert_users_by_list(["123"])
results = get_person_ids_by_cohort_id(self.team, cohort.id)
self.assertEqual(len(results), 2)
51 changes: 50 additions & 1 deletion ee/clickhouse/sql/person.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ee.kafka_client.topics import KAFKA_PERSON, KAFKA_PERSON_UNIQUE_ID
from ee.kafka_client.topics import KAFKA_PERSON, KAFKA_PERSON_STATIC_COHORT, KAFKA_PERSON_UNIQUE_ID

from .clickhouse import KAFKA_COLUMNS, STORAGE_POLICY, kafka_engine, table_engine

Expand All @@ -10,6 +10,7 @@
DROP TABLE person_distinct_id
"""


PERSONS_TABLE = "person"

PERSONS_TABLE_BASE_SQL = """
Expand Down Expand Up @@ -122,6 +123,54 @@
table_name=PERSONS_DISTINCT_ID_TABLE
)

#
# Static Cohort
#

PERSON_STATIC_COHORT_TABLE = "person_static_cohort"
PERSON_STATIC_COHORT_BASE_SQL = """
CREATE TABLE {table_name}
(
id UUID,
person_id UUID,
cohort_id Int64,
team_id Int64
{extra_fields}
) ENGINE = {engine}
"""

PERSON_STATIC_COHORT_TABLE_SQL = (
PERSON_STATIC_COHORT_BASE_SQL
+ """Order By (team_id, cohort_id, person_id, id)
{storage_policy}
"""
).format(
table_name=PERSON_STATIC_COHORT_TABLE,
engine=table_engine(PERSON_STATIC_COHORT_TABLE, "_timestamp"),
storage_policy=STORAGE_POLICY,
extra_fields=KAFKA_COLUMNS,
)

KAFKA_PERSON_STATIC_COHORT_TABLE_SQL = PERSON_STATIC_COHORT_BASE_SQL.format(
table_name="kafka_" + PERSON_STATIC_COHORT_TABLE, engine=kafka_engine(KAFKA_PERSON_STATIC_COHORT), extra_fields="",
)

DROP_PERSON_STATIC_COHORT_TABLE_SQL = """
DROP TABLE {}
""".format(
PERSON_STATIC_COHORT_TABLE
)

INSERT_PERSON_STATIC_COHORT = """
INSERT INTO {} SELECT %(id)s, %(person_id)s, %(cohort_id)s, %(team_id)s, now(), 0 VALUES
""".format(
PERSON_STATIC_COHORT_TABLE
)

#
# Other queries
#

GET_DISTINCT_IDS_SQL = """
SELECT * FROM person_distinct_id WHERE team_id = %(team_id)s
"""
Expand Down
4 changes: 4 additions & 0 deletions ee/clickhouse/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
)
from ee.clickhouse.sql.person import (
DROP_PERSON_DISTINCT_ID_TABLE_SQL,
DROP_PERSON_STATIC_COHORT_TABLE_SQL,
DROP_PERSON_TABLE_SQL,
PERSON_STATIC_COHORT_TABLE_SQL,
PERSONS_DISTINCT_ID_TABLE_SQL,
PERSONS_TABLE_SQL,
)
Expand All @@ -39,10 +41,12 @@ def tearDown(self):
def _destroy_person_tables(self):
sync_execute(DROP_PERSON_TABLE_SQL)
sync_execute(DROP_PERSON_DISTINCT_ID_TABLE_SQL)
sync_execute(DROP_PERSON_STATIC_COHORT_TABLE_SQL)

def _create_person_tables(self):
sync_execute(PERSONS_TABLE_SQL)
sync_execute(PERSONS_DISTINCT_ID_TABLE_SQL)
sync_execute(PERSON_STATIC_COHORT_TABLE_SQL)

def _destroy_session_recording_tables(self):
sync_execute(DROP_SESSION_RECORDING_EVENTS_TABLE_SQL)
Expand Down
137 changes: 137 additions & 0 deletions ee/idl/gen/person_static_cohort_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions ee/idl/person_static_cohort.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
syntax = "proto3";

message PersonStaticCohort {
string id = 1;
string person_id = 2;
uint64 cohort_id = 3;
uint64 team_id = 4;
}
1 change: 1 addition & 0 deletions ee/kafka_client/topics.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
KAFKA_EVENTS = "clickhouse_events_proto"
KAFKA_PERSON = "clickhouse_person"
KAFKA_PERSON_UNIQUE_ID = "clickhouse_person_unique_id"
KAFKA_PERSON_STATIC_COHORT = "clickhouse_static_cohort"
KAFKA_SESSION_RECORDING_EVENTS = "clickhouse_session_recording_events"
KAFKA_EVENTS_WAL = "events_write_ahead_log"
1 change: 1 addition & 0 deletions ee/management/commands/migrate_clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ def handle(self, *args, **options):
password=CLICKHOUSE_PASSWORD,
verify_ssl_cert=False,
).migrate("ee.clickhouse.migrations")
print("migration successful")
except Exception as e:
print(e)
7 changes: 7 additions & 0 deletions frontend/src/global.scss
Original file line number Diff line number Diff line change
Expand Up @@ -414,3 +414,10 @@ style files without adding already imported styles. */
);
}
}

.clickable-card {
cursor: pointer;
&:hover {
border-color: $primary;
}
}
2 changes: 1 addition & 1 deletion frontend/src/lib/api.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
function getCookie(name) {
export function getCookie(name) {
var cookieValue = null
if (document.cookie && document.cookie !== '') {
var cookies = document.cookie.split(';')
Expand Down
Loading