From ff56ded859cecbe316c0c9869618b9b3565a469d Mon Sep 17 00:00:00 2001 From: Tim Glaser Date: Wed, 13 Jan 2021 14:20:27 +0100 Subject: [PATCH 01/16] Upload static cohort using CSV, closes #2868 --- .../migrations/0007_static_cohorts_table.py | 8 + ee/clickhouse/models/cohort.py | 34 +++- ee/clickhouse/models/test/test_cohort.py | 20 ++ ee/clickhouse/sql/person.py | 51 ++++- ee/clickhouse/util.py | 4 + ee/idl/gen/person_static_cohort_pb2.py | 137 ++++++++++++++ ee/idl/person_static_cohort.proto | 8 + ee/kafka_client/topics.py | 1 + ee/management/commands/migrate_clickhouse.py | 1 + frontend/src/global.scss | 7 + frontend/src/lib/api.js | 2 +- frontend/src/scenes/persons/Cohort.tsx | 177 ++++++++++++++---- frontend/src/scenes/persons/Cohorts.tsx | 2 - frontend/src/scenes/persons/cohortLogic.js | 22 ++- frontend/src/types.ts | 1 + latest_migrations.manifest | 2 +- posthog/api/cohort.py | 27 ++- posthog/api/test/test_cohort.py | 47 +++++ posthog/migrations/0112_cohort_is_static.py | 14 ++ posthog/models/cohort.py | 45 ++++- posthog/settings.py | 2 +- posthog/tasks/calculate_cohort.py | 25 ++- posthog/test/test_cohort_model.py | 18 ++ 23 files changed, 598 insertions(+), 57 deletions(-) create mode 100644 ee/clickhouse/migrations/0007_static_cohorts_table.py create mode 100644 ee/idl/gen/person_static_cohort_pb2.py create mode 100644 ee/idl/person_static_cohort.proto create mode 100644 posthog/migrations/0112_cohort_is_static.py diff --git a/ee/clickhouse/migrations/0007_static_cohorts_table.py b/ee/clickhouse/migrations/0007_static_cohorts_table.py new file mode 100644 index 0000000000000..7f25ee94a0a66 --- /dev/null +++ b/ee/clickhouse/migrations/0007_static_cohorts_table.py @@ -0,0 +1,8 @@ +from infi.clickhouse_orm import migrations + +from ee.clickhouse.sql.person import KAFKA_PERSON_STATIC_COHORT_TABLE_SQL, PERSON_STATIC_COHORT_TABLE_SQL + +operations = [ + migrations.RunSQL(PERSON_STATIC_COHORT_TABLE_SQL), + migrations.RunSQL(KAFKA_PERSON_STATIC_COHORT_TABLE_SQL), +] diff --git a/ee/clickhouse/models/cohort.py b/ee/clickhouse/models/cohort.py index aaca378437487..0da4615dcac6a 100644 --- a/ee/clickhouse/models/cohort.py +++ b/ee/clickhouse/models/cohort.py @@ -1,15 +1,33 @@ -from typing import Any, Dict, Tuple +import uuid +from typing import Any, Dict, List, Optional, Tuple from ee.clickhouse.client import sync_execute from ee.clickhouse.models.action import format_action_filter from ee.clickhouse.sql.cohort import CALCULATE_COHORT_PEOPLE_SQL -from ee.clickhouse.sql.person import GET_LATEST_PERSON_ID_SQL, GET_PERSON_IDS_BY_FILTER +from ee.clickhouse.sql.person import ( + GET_LATEST_PERSON_ID_SQL, + GET_PERSON_IDS_BY_FILTER, + INSERT_PERSON_STATIC_COHORT, + PERSON_STATIC_COHORT_TABLE, +) +from ee.idl.gen import person_static_cohort_pb2 +from ee.kafka_client.client import ClickhouseProducer +from ee.kafka_client.topics import KAFKA_PERSON_STATIC_COHORT from posthog.models import Action, Cohort, Filter, Team def format_person_query(cohort: Cohort) -> Tuple[str, Dict[str, Any]]: filters = [] params: Dict[str, Any] = {} + + if cohort.is_static: + return ( + "person_id IN (SELECT person_id FROM {} WHERE cohort_id = {})".format( + PERSON_STATIC_COHORT_TABLE, cohort.pk + ), + {}, + ) + for group_idx, group in enumerate(cohort.groups): if group.get("action_id"): action = Action.objects.get(pk=group["action_id"], team_id=cohort.team.pk) @@ -52,3 +70,15 @@ def get_person_ids_by_cohort_id(team: Team, cohort_id: int): results = sync_execute(GET_PERSON_IDS_BY_FILTER.format(distinct_query=filter_query, query=""), filter_params) return [str(row[0]) for row in results] + + +def insert_static_cohort(person_uuids: List[Optional[uuid.UUID]], cohort_id: int, team: Team): + for person_uuid in person_uuids: + pb_event = person_static_cohort_pb2.PersonStaticCohort() + pb_event.id = str(uuid.uuid4()) + pb_event.person_id = str(person_uuid) + pb_event.cohort_id = cohort_id + pb_event.team_id = team.pk + + p = ClickhouseProducer() + p.produce_proto(sql=INSERT_PERSON_STATIC_COHORT, topic=KAFKA_PERSON_STATIC_COHORT, data=pb_event) diff --git a/ee/clickhouse/models/test/test_cohort.py b/ee/clickhouse/models/test/test_cohort.py index b792e645a54c0..02706d514f21b 100644 --- a/ee/clickhouse/models/test/test_cohort.py +++ b/ee/clickhouse/models/test/test_cohort.py @@ -192,3 +192,23 @@ def test_cohort_get_person_ids_by_cohort_id(self): self.assertEqual(len(results), 2) self.assertIn(user1.uuid, results) self.assertIn(user3.uuid, results) + + def test_insert_by_distinct_id_or_email(self): + Person.objects.create(team_id=self.team.pk, properties={"email": "email@example.org"}, distinct_ids=["1"]) + Person.objects.create(team_id=self.team.pk, distinct_ids=["123"]) + Person.objects.create(team_id=self.team.pk, distinct_ids=["2"]) + # Team leakage + team2 = Team.objects.create() + Person.objects.create(team=team2, properties={"email": "email@example.org"}) + + cohort = Cohort.objects.create(team=self.team, groups=[], is_static=True) + cohort.insert_users_by_list(["email@example.org", "123"]) + cohort = Cohort.objects.get() + results = get_person_ids_by_cohort_id(self.team, cohort.id) + self.assertEqual(len(results), 2) + self.assertEqual(cohort.is_calculating, False) + + #  If we accidentally call calculate_people it shouldn't erase people + cohort.calculate_people() + results = get_person_ids_by_cohort_id(self.team, cohort.id) + self.assertEqual(len(results), 2) diff --git a/ee/clickhouse/sql/person.py b/ee/clickhouse/sql/person.py index e145ab72dd2df..4166c780b4777 100644 --- a/ee/clickhouse/sql/person.py +++ b/ee/clickhouse/sql/person.py @@ -1,4 +1,4 @@ -from ee.kafka_client.topics import KAFKA_PERSON, KAFKA_PERSON_UNIQUE_ID +from ee.kafka_client.topics import KAFKA_PERSON, KAFKA_PERSON_STATIC_COHORT, KAFKA_PERSON_UNIQUE_ID from .clickhouse import KAFKA_COLUMNS, STORAGE_POLICY, kafka_engine, table_engine @@ -10,6 +10,7 @@ DROP TABLE person_distinct_id """ + PERSONS_TABLE = "person" PERSONS_TABLE_BASE_SQL = """ @@ -122,6 +123,54 @@ table_name=PERSONS_DISTINCT_ID_TABLE ) +# +# Static Cohort +# + +PERSON_STATIC_COHORT_TABLE = "person_static_cohort" +PERSON_STATIC_COHORT_BASE_SQL = """ +CREATE TABLE {table_name} +( + id UUID, + person_id UUID, + cohort_id Int64, + team_id Int64 + {extra_fields} +) ENGINE = {engine} +""" + +PERSON_STATIC_COHORT_TABLE_SQL = ( + PERSON_STATIC_COHORT_BASE_SQL + + """Order By (team_id, cohort_id, person_id, id) +{storage_policy} +""" +).format( + table_name=PERSON_STATIC_COHORT_TABLE, + engine=table_engine(PERSON_STATIC_COHORT_TABLE, "_timestamp"), + storage_policy=STORAGE_POLICY, + extra_fields=KAFKA_COLUMNS, +) + +KAFKA_PERSON_STATIC_COHORT_TABLE_SQL = PERSON_STATIC_COHORT_BASE_SQL.format( + table_name="kafka_" + PERSON_STATIC_COHORT_TABLE, engine=kafka_engine(KAFKA_PERSON_STATIC_COHORT), extra_fields="", +) + +DROP_PERSON_STATIC_COHORT_TABLE_SQL = """ +DROP TABLE {} +""".format( + PERSON_STATIC_COHORT_TABLE +) + +INSERT_PERSON_STATIC_COHORT = """ +INSERT INTO {} SELECT %(id)s, %(person_id)s, %(cohort_id)s, %(team_id)s, now(), 0 VALUES +""".format( + PERSON_STATIC_COHORT_TABLE +) + +# +# Other queries +# + GET_DISTINCT_IDS_SQL = """ SELECT * FROM person_distinct_id WHERE team_id = %(team_id)s """ diff --git a/ee/clickhouse/util.py b/ee/clickhouse/util.py index 39eb3e87912c6..82ce8c3bde3f6 100644 --- a/ee/clickhouse/util.py +++ b/ee/clickhouse/util.py @@ -12,7 +12,9 @@ ) from ee.clickhouse.sql.person import ( DROP_PERSON_DISTINCT_ID_TABLE_SQL, + DROP_PERSON_STATIC_COHORT_TABLE_SQL, DROP_PERSON_TABLE_SQL, + PERSON_STATIC_COHORT_TABLE_SQL, PERSONS_DISTINCT_ID_TABLE_SQL, PERSONS_TABLE_SQL, ) @@ -39,10 +41,12 @@ def tearDown(self): def _destroy_person_tables(self): sync_execute(DROP_PERSON_TABLE_SQL) sync_execute(DROP_PERSON_DISTINCT_ID_TABLE_SQL) + sync_execute(DROP_PERSON_STATIC_COHORT_TABLE_SQL) def _create_person_tables(self): sync_execute(PERSONS_TABLE_SQL) sync_execute(PERSONS_DISTINCT_ID_TABLE_SQL) + sync_execute(PERSON_STATIC_COHORT_TABLE_SQL) def _destroy_session_recording_tables(self): sync_execute(DROP_SESSION_RECORDING_EVENTS_TABLE_SQL) diff --git a/ee/idl/gen/person_static_cohort_pb2.py b/ee/idl/gen/person_static_cohort_pb2.py new file mode 100644 index 0000000000000..5bcd3a768dec3 --- /dev/null +++ b/ee/idl/gen/person_static_cohort_pb2.py @@ -0,0 +1,137 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: person_static_cohort.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor.FileDescriptor( + name="person_static_cohort.proto", + package="", + syntax="proto3", + serialized_options=None, + create_key=_descriptor._internal_create_key, + serialized_pb=b'\n\x1aperson_static_cohort.proto"W\n\x12PersonStaticCohort\x12\n\n\x02id\x18\x01 \x01(\t\x12\x11\n\tperson_id\x18\x02 \x01(\t\x12\x11\n\tcohort_id\x18\x03 \x01(\x04\x12\x0f\n\x07team_id\x18\x04 \x01(\x04\x62\x06proto3', +) + + +_PERSONSTATICCOHORT = _descriptor.Descriptor( + name="PersonStaticCohort", + full_name="PersonStaticCohort", + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name="id", + full_name="PersonStaticCohort.id", + index=0, + number=1, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="person_id", + full_name="PersonStaticCohort.person_id", + index=1, + number=2, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="cohort_id", + full_name="PersonStaticCohort.cohort_id", + index=2, + number=3, + type=4, + cpp_type=4, + label=1, + has_default_value=False, + default_value=0, + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="team_id", + full_name="PersonStaticCohort.team_id", + index=3, + number=4, + type=4, + cpp_type=4, + label=1, + has_default_value=False, + default_value=0, + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + ], + extensions=[], + nested_types=[], + enum_types=[], + serialized_options=None, + is_extendable=False, + syntax="proto3", + extension_ranges=[], + oneofs=[], + serialized_start=30, + serialized_end=117, +) + +DESCRIPTOR.message_types_by_name["PersonStaticCohort"] = _PERSONSTATICCOHORT +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +PersonStaticCohort = _reflection.GeneratedProtocolMessageType( + "PersonStaticCohort", + (_message.Message,), + { + "DESCRIPTOR": _PERSONSTATICCOHORT, + "__module__": "person_static_cohort_pb2" + # @@protoc_insertion_point(class_scope:PersonStaticCohort) + }, +) +_sym_db.RegisterMessage(PersonStaticCohort) + + +# @@protoc_insertion_point(module_scope) diff --git a/ee/idl/person_static_cohort.proto b/ee/idl/person_static_cohort.proto new file mode 100644 index 0000000000000..7900b602553ae --- /dev/null +++ b/ee/idl/person_static_cohort.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +message PersonStaticCohort { + string id = 1; + string person_id = 2; + uint64 cohort_id = 3; + uint64 team_id = 4; +} \ No newline at end of file diff --git a/ee/kafka_client/topics.py b/ee/kafka_client/topics.py index ed9dcdd6fb310..3da548400816f 100644 --- a/ee/kafka_client/topics.py +++ b/ee/kafka_client/topics.py @@ -1,5 +1,6 @@ KAFKA_EVENTS = "clickhouse_events_proto" KAFKA_PERSON = "clickhouse_person" KAFKA_PERSON_UNIQUE_ID = "clickhouse_person_unique_id" +KAFKA_PERSON_STATIC_COHORT = "clickhouse_static_cohort" KAFKA_SESSION_RECORDING_EVENTS = "clickhouse_session_recording_events" KAFKA_EVENTS_WAL = "events_write_ahead_log" diff --git a/ee/management/commands/migrate_clickhouse.py b/ee/management/commands/migrate_clickhouse.py index 9d59d775316a2..429b5dfa0309c 100644 --- a/ee/management/commands/migrate_clickhouse.py +++ b/ee/management/commands/migrate_clickhouse.py @@ -22,5 +22,6 @@ def handle(self, *args, **options): password=CLICKHOUSE_PASSWORD, verify_ssl_cert=False, ).migrate("ee.clickhouse.migrations") + print("migration successful") except Exception as e: print(e) diff --git a/frontend/src/global.scss b/frontend/src/global.scss index 36fc0ea156f65..e828c9e0a9a90 100644 --- a/frontend/src/global.scss +++ b/frontend/src/global.scss @@ -414,3 +414,10 @@ style files without adding already imported styles. */ ); } } + +.clickable-card { + cursor: pointer; + &:hover { + border-color: $primary; + } +} diff --git a/frontend/src/lib/api.js b/frontend/src/lib/api.js index 4b2ef1799fc04..b4da2fcbad14f 100644 --- a/frontend/src/lib/api.js +++ b/frontend/src/lib/api.js @@ -1,4 +1,4 @@ -function getCookie(name) { +export function getCookie(name) { var cookieValue = null if (document.cookie && document.cookie !== '') { var cookies = document.cookie.split(';') diff --git a/frontend/src/scenes/persons/Cohort.tsx b/frontend/src/scenes/persons/Cohort.tsx index ddba1e996610e..c73762a5e3868 100644 --- a/frontend/src/scenes/persons/Cohort.tsx +++ b/frontend/src/scenes/persons/Cohort.tsx @@ -1,25 +1,145 @@ import React from 'react' import { CohortGroup } from './CohortGroup' import { cohortLogic } from './cohortLogic' -import { Button, Divider, Input } from 'antd' +import { Button, Card, Col, Divider, Input, Row } from 'antd' +import { UploadOutlined, AimOutlined, ArrowLeftOutlined, InboxOutlined } from '@ant-design/icons' import { useValues, useActions } from 'kea' import { CohortType } from '~/types' import { Persons } from './Persons' +import { cohortLogicType } from 'types/scenes/persons/cohortLogicType' +import Dragger from 'antd/lib/upload/Dragger' const isSubmitDisabled = (cohort: CohortType): boolean => { + if (cohort && cohort.csv) { + return false + } if (cohort && cohort.groups) { return !cohort.groups.some((group) => Object.keys(group).length) } return true } -export function Cohort(props: { onChange: CallableFunction; cohort: CohortType }): JSX.Element { - const { setCohort, saveCohort } = useActions(cohortLogic(props)) - const { cohort, lastSavedAt } = useValues(cohortLogic(props)) +function StaticCohort({ logic }: { logic: cohortLogicType }): JSX.Element { + const { setCohort } = useActions(logic) + const { cohort } = useValues(logic) + const props = { + name: 'file', + multiple: false, + fileList: cohort.csv ? [cohort.csv] : [], + beforeUpload(file: File) { + setCohort({ ...cohort, csv: file }) - if (cohort.groups.length == 0) { - return null + return false + }, + accept: '.csv', } + return ( + <> + {cohort.id === 'new' && ( + <> + +
+
+ + )} + +

+ +

+

Click or drag CSV to this area to upload

+

+ Make sure the file has a single column with either the user's distinct_id or email. +

+
+ {cohort.id !== 'new' && ( +

+ This is a static cohort with {cohort.count} user{cohort.count !== 1 && 's'}. If you + upload another .csv file, those users will be added to this cohort. +

+ )} + + ) +} + +function DynamicCohort({ logic }: { logic: cohortLogicType }): JSX.Element { + const { setCohort } = useActions(logic) + const { cohort } = useValues(logic) + return ( + <> + {cohort.id === 'new' && ( + <> + +
+
+ + )} + {cohort.groups.map((group, index) => ( + + 1} + index={index} + onRemove={() => { + cohort.groups.splice(index, 1) + setCohort({ ...cohort }) + }} + onChange={(group: Record) => { + cohort.groups[index] = group + setCohort({ ...cohort }) + }} + /> + {index < cohort.groups.length - 1 && ( +
+ {' '} + OR{' '} +
+ )} +
+ ))} + + ) +} + +function CohortChoice({ setCohort, cohort }: { setCohort: CallableFunction; cohort: CohortType }): JSX.Element { + return ( + + + setCohort({ ...cohort, is_static: true })} + > +
+ +
+
+ + + setCohort({ ...cohort, is_static: false })} + > +
+ +
+
+ +
+ ) +} + +export function Cohort(props: { onChange: CallableFunction; cohort: CohortType }): JSX.Element { + const logic = cohortLogic(props) + const { setCohort, saveCohort } = useActions(logic) + const { cohort, lastSavedAt } = useValues(logic) + return (
setCohort({ ...cohort, name: e.target.value })} />
- {cohort.groups.map((group, index) => ( - - 1} - index={index} - onRemove={() => { - cohort.groups.splice(index, 1) - setCohort({ ...cohort }) - }} - onChange={(group: Record) => { - cohort.groups[index] = group - setCohort({ ...cohort }) - }} - /> - {index < cohort.groups.length - 1 && ( -
- {' '} - OR{' '} -
- )} -
- ))} + {cohort.id === 'new' && cohort.is_static === undefined && ( + + )} + {cohort.is_static && } + {(cohort.is_static === false || (cohort.is_static === null && cohort.id !== 'new')) && ( + + )} +
- + {!cohort.is_static && ( + + )}
diff --git a/frontend/src/scenes/persons/Cohorts.tsx b/frontend/src/scenes/persons/Cohorts.tsx index 969565ca51737..39bea5cebbb39 100644 --- a/frontend/src/scenes/persons/Cohorts.tsx +++ b/frontend/src/scenes/persons/Cohorts.tsx @@ -34,9 +34,7 @@ const cohortsUrlLogic = kea({ }), urlToAction: ({ actions, values }) => ({ '/cohorts(/:cohortId)': async ({ cohortId }: Record) => { - console.log(cohortId, values.openCohort) if (cohortId && cohortId !== 'new' && cohortId !== values.openCohort.id) { - console.log('hi') const cohort = await api.get('api/cohort/' + cohortId) actions.setOpenCohort(cohort) } diff --git a/frontend/src/scenes/persons/cohortLogic.js b/frontend/src/scenes/persons/cohortLogic.js index 482bf137bef36..1c309c7210342 100644 --- a/frontend/src/scenes/persons/cohortLogic.js +++ b/frontend/src/scenes/persons/cohortLogic.js @@ -46,15 +46,31 @@ export const cohortLogic = kea({ ], }), - listeners: ({ sharedListeners }) => ({ + listeners: ({ sharedListeners, actions }) => ({ saveCohort: async ({ cohort }) => { + const cohortFormData = new FormData() + for (const [key, value] of Object.entries(cohort)) { + if (key === 'groups') { + if (!cohort.csv) { + cohortFormData.append(key, JSON.stringify(value)) + } else { + // If we have a static cohort uploaded by CSV we don't need to send groups + cohortFormData.append(key, '[]') + } + } else { + cohortFormData.append(key, value) + } + } + if (cohort.id !== 'new') { - cohort = await api.update('api/cohort/' + cohort.id, cohort) + cohort = await api.update('api/cohort/' + cohort.id, cohortFormData) cohortsModel.actions.updateCohort(cohort) } else { - cohort = await api.create('api/cohort', cohort) + cohort = await api.create('api/cohort', cohortFormData) cohortsModel.actions.createCohort(cohort) } + delete cohort['csv'] + actions.setCohort(cohort) sharedListeners.pollIsFinished(cohort) }, checkIsFinished: async ({ cohort }) => { diff --git a/frontend/src/types.ts b/frontend/src/types.ts index afd2e1de22dda..4032392cadfae 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -187,6 +187,7 @@ export interface CohortType { is_calculating?: boolean last_calculation?: string name?: string + csv?: File groups: Record[] } diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 85828b827295f..eba39889dd1f6 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -3,7 +3,7 @@ auth: 0011_update_proxy_permissions axes: 0006_remove_accesslog_trusted contenttypes: 0002_remove_content_type_name ee: 0002_hook -posthog: 0111_plugin_storage +posthog: 0112_cohort_is_static rest_hooks: 0002_swappable_hook_model sessions: 0001_initial social_django: 0008_partial_timestamp diff --git a/posthog/api/cohort.py b/posthog/api/cohort.py index a7a1a6aa4fcf5..13a0727cdce76 100644 --- a/posthog/api/cohort.py +++ b/posthog/api/cohort.py @@ -1,15 +1,19 @@ +import csv from typing import Any, Dict, Optional import posthoganalytics from django.db.models import Count, QuerySet from rest_framework import serializers, viewsets +from rest_framework.decorators import action from rest_framework.permissions import IsAuthenticated +from rest_framework.request import Request +from rest_framework.response import Response from posthog.api.routing import StructuredViewSetMixin from posthog.api.user import UserSerializer from posthog.models import Cohort from posthog.permissions import ProjectMembershipNecessaryPermissions -from posthog.tasks.calculate_cohort import calculate_cohort +from posthog.tasks.calculate_cohort import calculate_cohort, calculate_cohort_from_csv class CohortSerializer(serializers.ModelSerializer): @@ -29,15 +33,27 @@ class Meta: "last_calculation", "errors_calculating", "count", + "is_static", ] + def _handle_csv(self, file, cohort: Cohort) -> None: + decoded_file = file.read().decode("utf-8").splitlines() + reader = csv.reader(decoded_file) + distinct_ids_and_emails = [row[0] for row in reader if len(row) > 0 and row] + calculate_cohort_from_csv.delay(cohort.pk, distinct_ids_and_emails) + def create(self, validated_data: Dict, *args: Any, **kwargs: Any) -> Cohort: request = self.context["request"] validated_data["created_by"] = request.user validated_data["is_calculating"] = True cohort = Cohort.objects.create(team_id=self.context["team_id"], **validated_data) + + if request.FILES.get("csv"): + self._handle_csv(request.FILES["csv"], cohort) + posthoganalytics.capture(request.user.distinct_id, "cohort created", cohort.get_analytics_metadata()) - calculate_cohort.delay(cohort_id=cohort.pk) + if not cohort.is_static: + calculate_cohort.delay(cohort_id=cohort.pk) return cohort def update(self, cohort: Cohort, validated_data: Dict, *args: Any, **kwargs: Any) -> Cohort: # type: ignore @@ -47,12 +63,17 @@ def update(self, cohort: Cohort, validated_data: Dict, *args: Any, **kwargs: Any cohort.deleted = validated_data.get("deleted", cohort.deleted) cohort.is_calculating = True cohort.save() + + if request.FILES.get("csv") and cohort.is_static: + self._handle_csv(request.FILES["csv"], cohort) + posthoganalytics.capture( request.user.distinct_id, "cohort updated", {**cohort.get_analytics_metadata(), "updated_by_creator": request.user == cohort.created_by}, ) - calculate_cohort.delay(cohort_id=cohort.pk) + if not cohort.is_static: + calculate_cohort.delay(cohort_id=cohort.pk) return cohort def get_count(self, action: Cohort) -> Optional[int]: diff --git a/posthog/api/test/test_cohort.py b/posthog/api/test/test_cohort.py index f67ce72d9658d..52237fd23b9cc 100644 --- a/posthog/api/test/test_cohort.py +++ b/posthog/api/test/test_cohort.py @@ -1,6 +1,10 @@ from unittest.mock import patch +from django.core.files.uploadedfile import SimpleUploadedFile +from rest_framework.test import APIClient + from posthog.models import Person +from posthog.models.cohort import Cohort from posthog.test.base import BaseTest @@ -62,3 +66,46 @@ def test_creating_update_and_calculating(self, patch_calculate_cohort, patch_cap "updated_by_creator": True, }, ) + + @patch("posthog.tasks.calculate_cohort.calculate_cohort_from_csv.delay") + def test_static_cohort_csv_upload(self, patch_calculate_cohort_from_csv): + self.team.app_urls = ["http://somewebsite.com"] + self.team.save() + Person.objects.create(team=self.team, properties={"email": "email@example.org"}) + Person.objects.create(team=self.team, distinct_ids=["123"]) + Person.objects.create(team=self.team, distinct_ids=["456"]) + + csv = SimpleUploadedFile( + "example.csv", + str.encode( + """ +User ID, +email@example.org, +123 +""" + ), + content_type="application/csv", + ) + + response = self.client.post("/api/cohort/", {"name": "test", "csv": csv, "is_static": True},) + self.assertEqual(response.status_code, 201, response.content) + self.assertEqual(patch_calculate_cohort_from_csv.call_count, 1) + + csv = SimpleUploadedFile( + "example.csv", + str.encode( + """ +User ID, +456 +""" + ), + content_type="application/csv", + ) + + #  A weird issue with pytest client, need to user Rest framework's one + #  see https://stackoverflow.com/questions/39906956/patch-and-put-dont-work-as-expected-when-pytest-is-interacting-with-rest-framew + client = APIClient() + client.force_login(self.user) + response = client.patch("/api/cohort/%s/" % response.json()["id"], {"name": "test", "csv": csv,}) + self.assertEqual(response.status_code, 200, response.content) + self.assertEqual(patch_calculate_cohort_from_csv.call_count, 2) diff --git a/posthog/migrations/0112_cohort_is_static.py b/posthog/migrations/0112_cohort_is_static.py new file mode 100644 index 0000000000000..d0250e021511d --- /dev/null +++ b/posthog/migrations/0112_cohort_is_static.py @@ -0,0 +1,14 @@ +# Generated by Django 3.0.11 on 2021-01-12 13:42 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("posthog", "0111_plugin_storage"), + ] + + operations = [ + migrations.AddField(model_name="cohort", name="is_static", field=models.BooleanField(default=False),), + ] diff --git a/posthog/models/cohort.py b/posthog/models/cohort.py index da564850ef295..ae155971f7932 100644 --- a/posthog/models/cohort.py +++ b/posthog/models/cohort.py @@ -1,5 +1,5 @@ import json -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from dateutil.relativedelta import relativedelta from django.contrib.postgres.fields import JSONField @@ -41,7 +41,8 @@ def __init__( class CohortManager(models.Manager): def create(self, *args: Any, **kwargs: Any): - kwargs["groups"] = [Group(**group).__dict__ for group in kwargs["groups"]] + if kwargs.get("groups"): + kwargs["groups"] = [Group(**group).__dict__ for group in kwargs["groups"]] cohort = super().create(*args, **kwargs) return cohort @@ -59,6 +60,8 @@ class Cohort(models.Model): last_calculation: models.DateTimeField = models.DateTimeField(blank=True, null=True) errors_calculating: models.IntegerField = models.IntegerField(default=0) + is_static: models.BooleanField = models.BooleanField(default=False) + objects = CohortManager() def get_analytics_metadata(self): @@ -69,7 +72,7 @@ def get_analytics_metadata(self): properties_groups_count += 1 if group.get("properties") else 0 return { - "name_length": len(self.name), + "name_length": len(self.name) if self.name else 0, "person_count_precalc": self.people.count(), "groups_count": len(self.groups), "action_groups_count": action_groups_count, @@ -78,6 +81,8 @@ def get_analytics_metadata(self): } def calculate_people(self, use_clickhouse=is_ee_enabled()): + if self.is_static: + return try: if not use_clickhouse: self.is_calculating = True @@ -109,6 +114,40 @@ def calculate_people(self, use_clickhouse=is_ee_enabled()): self.save() capture_exception() + def insert_users_by_list(self, items: List[str]) -> None: + """ + Items can be distinct_id or email + """ + batchsize = 1000 + use_clickhouse = is_ee_enabled() + if use_clickhouse: + from ee.clickhouse.models.cohort import insert_static_cohort + try: + cursor = connection.cursor() + for i in range(0, len(items), batchsize): + batch = items[i : i + batchsize] + persons_query = Person.objects.filter(team_id=self.team_id).filter( + Q(persondistinctid__distinct_id__in=batch) | Q(properties__email__in=batch) + ) + if use_clickhouse: + insert_static_cohort([p for p in persons_query.values_list("uuid", flat=True)], self.pk, self.team) + sql, params = persons_query.distinct("pk").only("pk").query.sql_with_params() + query = UPDATE_QUERY.format( + cohort_id=self.pk, + values_query=sql.replace('FROM "posthog_person"', ', {} FROM "posthog_person"'.format(self.pk), 1,), + ) + cursor.execute(query, params) + + self.is_calculating = False + self.last_calculation = timezone.now() + self.errors_calculating = 0 + self.save() + except Exception: + self.is_calculating = False + self.errors_calculating = F("errors_calculating") + 1 + self.save() + capture_exception() + def __str__(self): return self.name diff --git a/posthog/settings.py b/posthog/settings.py index e4a2a48b574ac..b2d71ff0f6129 100644 --- a/posthog/settings.py +++ b/posthog/settings.py @@ -448,8 +448,8 @@ def print_warning(warning_lines: Sequence[str]): ], "DEFAULT_PAGINATION_CLASS": "rest_framework.pagination.LimitOffsetPagination", "DEFAULT_PERMISSION_CLASSES": ["rest_framework.permissions.IsAuthenticated"], - "EXCEPTION_HANDLER": "exceptions_hog.exception_handler", "PAGE_SIZE": 100, + **({"EXCEPTION_HANDLER": "exceptions_hog.exception_handler"} if not TEST else {}), } EXCEPTIONS_HOG = { diff --git a/posthog/tasks/calculate_cohort.py b/posthog/tasks/calculate_cohort.py index c605bcdffa8ee..030573f9bf4a5 100644 --- a/posthog/tasks/calculate_cohort.py +++ b/posthog/tasks/calculate_cohort.py @@ -1,6 +1,7 @@ import logging import os import time +from typing import List from celery import shared_task from dateutil.relativedelta import relativedelta @@ -18,11 +19,15 @@ def calculate_cohorts() -> None: # This task will be run every minute # Every minute, grab a few cohorts off the list and execute them - for cohort in Cohort.objects.filter( - is_calculating=False, - last_calculation__lte=timezone.now() - relativedelta(minutes=MAX_AGE_MINUTES), - errors_calculating__lte=2, - ).order_by(F("last_calculation").asc(nulls_first=True))[0:PARALLEL_COHORTS]: + for cohort in ( + Cohort.objects.filter( + is_calculating=False, + last_calculation__lte=timezone.now() - relativedelta(minutes=MAX_AGE_MINUTES), + errors_calculating__lte=2, + ) + .exclude(is_static=True) + .order_by(F("last_calculation").asc(nulls_first=True))[0:PARALLEL_COHORTS] + ): calculate_cohort.delay(cohort.id) @@ -32,3 +37,13 @@ def calculate_cohort(cohort_id: int) -> None: cohort = Cohort.objects.get(pk=cohort_id) cohort.calculate_people() logger.info("Calculating cohort {} took {:.2f} seconds".format(cohort.pk, (time.time() - start_time))) + + +@shared_task(ignore_result=True, max_retries=1) +def calculate_cohort_from_csv(cohort_id: int, items: List[str]) -> None: + start_time = time.time() + cohort = Cohort.objects.get(pk=cohort_id) + + cohort.insert_users_by_list(items) + + logger.info("Calculating cohort {} from CSV took {:.2f} seconds".format(cohort.pk, (time.time() - start_time))) diff --git a/posthog/test/test_cohort_model.py b/posthog/test/test_cohort_model.py index 1549540d9d660..7d764d6909818 100644 --- a/posthog/test/test_cohort_model.py +++ b/posthog/test/test_cohort_model.py @@ -40,6 +40,24 @@ def test_postgres_get_distinct_ids_from_cohort(self): cohort.calculate_people(use_clickhouse=False) self.assertCountEqual([p for p in cohort.people.all()], [person1, person2]) + def test_insert_by_distinct_id_or_email(self): + team2 = Team.objects.create() + Person.objects.create(team=self.team, properties={"email": "email@example.org"}) + Person.objects.create(team=self.team, distinct_ids=["123"]) + Person.objects.create(team=self.team) + # Team leakage + Person.objects.create(team=team2, properties={"email": "email@example.org"}) + + cohort = Cohort.objects.create(team=self.team, groups=[], is_static=True) + cohort.insert_users_by_list(["email@example.org", "123"]) + cohort = Cohort.objects.get() + self.assertEqual(cohort.people.count(), 2) + self.assertEqual(cohort.is_calculating, False) + + #  If we accidentally call calculate_people it shouldn't erase people + cohort.calculate_people() + self.assertEqual(cohort.people.count(), 2) + @tag("ee") @patch("ee.clickhouse.models.cohort.get_person_ids_by_cohort_id") def test_calculating_cohort_clickhouse(self, get_person_ids_by_cohort_id): From d18dce9fed43b37d4e9350c053326fca689db75c Mon Sep 17 00:00:00 2001 From: Tim Glaser Date: Wed, 13 Jan 2021 14:36:49 +0100 Subject: [PATCH 02/16] fix tests --- posthog/settings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posthog/settings.py b/posthog/settings.py index b2d71ff0f6129..4828e72d90216 100644 --- a/posthog/settings.py +++ b/posthog/settings.py @@ -449,7 +449,7 @@ def print_warning(warning_lines: Sequence[str]): "DEFAULT_PAGINATION_CLASS": "rest_framework.pagination.LimitOffsetPagination", "DEFAULT_PERMISSION_CLASSES": ["rest_framework.permissions.IsAuthenticated"], "PAGE_SIZE": 100, - **({"EXCEPTION_HANDLER": "exceptions_hog.exception_handler"} if not TEST else {}), + "EXCEPTION_HANDLER": "exceptions_hog.exception_handler", } EXCEPTIONS_HOG = { From 09a5c71358a2ee7559fab006f3371eb1945866d7 Mon Sep 17 00:00:00 2001 From: Tim Glaser Date: Wed, 13 Jan 2021 14:44:28 +0100 Subject: [PATCH 03/16] Fix tests --- ee/clickhouse/models/test/test_cohort.py | 2 +- latest_migrations.manifest | 6 +----- posthog/test/test_cohort_model.py | 13 +++++++++++-- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/ee/clickhouse/models/test/test_cohort.py b/ee/clickhouse/models/test/test_cohort.py index 7a9f935fe071d..c7dc08ae49e6e 100644 --- a/ee/clickhouse/models/test/test_cohort.py +++ b/ee/clickhouse/models/test/test_cohort.py @@ -199,7 +199,7 @@ def test_insert_by_distinct_id_or_email(self): Person.objects.create(team_id=self.team.pk, distinct_ids=["123"]) Person.objects.create(team_id=self.team.pk, distinct_ids=["2"]) # Team leakage - team2 = Team.objects.create() + team2 = Team.objects.create(organization=self.organization) Person.objects.create(team=team2, properties={"email": "email@example.org"}) cohort = Cohort.objects.create(team=self.team, groups=[], is_static=True) diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 1006e68ea61ff..7b65830ffe48e 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -3,11 +3,7 @@ auth: 0011_update_proxy_permissions axes: 0006_remove_accesslog_trusted contenttypes: 0002_remove_content_type_name ee: 0002_hook -<<<<<<< HEAD -posthog: 0112_cohort_is_static -======= -posthog: 0112_sessions_filter ->>>>>>> master +posthog: 0113_cohort_is_static rest_hooks: 0002_swappable_hook_model sessions: 0001_initial social_django: 0008_partial_timestamp diff --git a/posthog/test/test_cohort_model.py b/posthog/test/test_cohort_model.py index 7d764d6909818..f44dfdf0babe9 100644 --- a/posthog/test/test_cohort_model.py +++ b/posthog/test/test_cohort_model.py @@ -3,7 +3,16 @@ from django.test import tag from freezegun import freeze_time -from posthog.models import Action, ActionStep, Cohort, Element, Event, Person, Team +from posthog.models import ( + Action, + ActionStep, + Cohort, + Element, + Event, + Person, + Team, + organization, +) from posthog.test.base import BaseTest @@ -41,11 +50,11 @@ def test_postgres_get_distinct_ids_from_cohort(self): self.assertCountEqual([p for p in cohort.people.all()], [person1, person2]) def test_insert_by_distinct_id_or_email(self): - team2 = Team.objects.create() Person.objects.create(team=self.team, properties={"email": "email@example.org"}) Person.objects.create(team=self.team, distinct_ids=["123"]) Person.objects.create(team=self.team) # Team leakage + team2 = Team.objects.create(organization=self.organization) Person.objects.create(team=team2, properties={"email": "email@example.org"}) cohort = Cohort.objects.create(team=self.team, groups=[], is_static=True) From 4c3d4e0ebb9c44f55a065ba52258404b3ea8e4bf Mon Sep 17 00:00:00 2001 From: Tim Glaser Date: Wed, 13 Jan 2021 15:07:04 +0100 Subject: [PATCH 04/16] Fix e2e test --- cypress/integration/cohorts.js | 1 + frontend/src/scenes/persons/Cohort.tsx | 2 ++ 2 files changed, 3 insertions(+) diff --git a/cypress/integration/cohorts.js b/cypress/integration/cohorts.js index 30025fe46f813..b43c08df36547 100644 --- a/cypress/integration/cohorts.js +++ b/cypress/integration/cohorts.js @@ -10,6 +10,7 @@ describe('Cohorts', () => { // go to create a new cohort cy.get('[data-attr="create-cohort"]').click() + cy.get('[data-attr="cohort-choice-definition"]').click() cy.get('[data-attr="cohort-name"]').type('Test Cohort') // select "add filter" and "property" diff --git a/frontend/src/scenes/persons/Cohort.tsx b/frontend/src/scenes/persons/Cohort.tsx index c73762a5e3868..b6d807c2c704f 100644 --- a/frontend/src/scenes/persons/Cohort.tsx +++ b/frontend/src/scenes/persons/Cohort.tsx @@ -112,6 +112,7 @@ function CohortChoice({ setCohort, cohort }: { setCohort: CallableFunction; coho title="Upload CSV" size="small" className="clickable-card" + data-attr="cohort-choice-upload-csv" onClick={() => setCohort({ ...cohort, is_static: true })} >
@@ -124,6 +125,7 @@ function CohortChoice({ setCohort, cohort }: { setCohort: CallableFunction; coho title="Create cohort by definition" size="small" className="clickable-card" + data-attr="cohort-choice-definition" onClick={() => setCohort({ ...cohort, is_static: false })} >
From def8c4c5a34d21625c122f23c0c6b79313c80080 Mon Sep 17 00:00:00 2001 From: Tim Glaser Date: Wed, 13 Jan 2021 16:45:38 +0100 Subject: [PATCH 05/16] Avoid double inserts --- ee/clickhouse/models/test/test_cohort.py | 5 +++++ posthog/models/cohort.py | 6 ++++-- posthog/test/test_cohort_model.py | 6 ++++++ 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/ee/clickhouse/models/test/test_cohort.py b/ee/clickhouse/models/test/test_cohort.py index c7dc08ae49e6e..79cb6a2f3410d 100644 --- a/ee/clickhouse/models/test/test_cohort.py +++ b/ee/clickhouse/models/test/test_cohort.py @@ -213,3 +213,8 @@ def test_insert_by_distinct_id_or_email(self): cohort.calculate_people() results = get_person_ids_by_cohort_id(self.team, cohort.id) self.assertEqual(len(results), 2) + + # if we add people again, don't increase the number of people in cohort + cohort.insert_users_by_list(["123"]) + results = get_person_ids_by_cohort_id(self.team, cohort.id) + self.assertEqual(len(results), 2) diff --git a/posthog/models/cohort.py b/posthog/models/cohort.py index ae155971f7932..6b5c1dbb10aaa 100644 --- a/posthog/models/cohort.py +++ b/posthog/models/cohort.py @@ -126,8 +126,10 @@ def insert_users_by_list(self, items: List[str]) -> None: cursor = connection.cursor() for i in range(0, len(items), batchsize): batch = items[i : i + batchsize] - persons_query = Person.objects.filter(team_id=self.team_id).filter( - Q(persondistinctid__distinct_id__in=batch) | Q(properties__email__in=batch) + persons_query = ( + Person.objects.filter(team_id=self.team_id) + .filter(Q(persondistinctid__distinct_id__in=batch) | Q(properties__email__in=batch)) + .exclude(cohort__id=self.id) ) if use_clickhouse: insert_static_cohort([p for p in persons_query.values_list("uuid", flat=True)], self.pk, self.team) diff --git a/posthog/test/test_cohort_model.py b/posthog/test/test_cohort_model.py index f44dfdf0babe9..cf7236efe3c4a 100644 --- a/posthog/test/test_cohort_model.py +++ b/posthog/test/test_cohort_model.py @@ -67,6 +67,12 @@ def test_insert_by_distinct_id_or_email(self): cohort.calculate_people() self.assertEqual(cohort.people.count(), 2) + # if we add people again, don't increase the number of people in cohort + cohort.insert_users_by_list(["123"]) + cohort = Cohort.objects.get() + self.assertEqual(cohort.people.count(), 2) + self.assertEqual(cohort.is_calculating, False) + @tag("ee") @patch("ee.clickhouse.models.cohort.get_person_ids_by_cohort_id") def test_calculating_cohort_clickhouse(self, get_person_ids_by_cohort_id): From e0442e084914eb065495ef8999f293b61d2f7c32 Mon Sep 17 00:00:00 2001 From: Tim Glaser Date: Wed, 13 Jan 2021 16:47:04 +0100 Subject: [PATCH 06/16] Speed up query --- ee/clickhouse/models/cohort.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ee/clickhouse/models/cohort.py b/ee/clickhouse/models/cohort.py index 0da4615dcac6a..015f062191ba8 100644 --- a/ee/clickhouse/models/cohort.py +++ b/ee/clickhouse/models/cohort.py @@ -22,8 +22,8 @@ def format_person_query(cohort: Cohort) -> Tuple[str, Dict[str, Any]]: if cohort.is_static: return ( - "person_id IN (SELECT person_id FROM {} WHERE cohort_id = {})".format( - PERSON_STATIC_COHORT_TABLE, cohort.pk + "person_id IN (SELECT person_id FROM {} WHERE cohort_id = {} AND team_id = {})".format( + PERSON_STATIC_COHORT_TABLE, cohort.pk, cohort.team_id ), {}, ) From 9945cdc5bb633eea0e2069dfb33ea4338f661e81 Mon Sep 17 00:00:00 2001 From: Tim Glaser Date: Wed, 13 Jan 2021 17:24:47 +0100 Subject: [PATCH 07/16] Move to params --- ee/clickhouse/models/cohort.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ee/clickhouse/models/cohort.py b/ee/clickhouse/models/cohort.py index 015f062191ba8..657a3da96f92d 100644 --- a/ee/clickhouse/models/cohort.py +++ b/ee/clickhouse/models/cohort.py @@ -22,10 +22,10 @@ def format_person_query(cohort: Cohort) -> Tuple[str, Dict[str, Any]]: if cohort.is_static: return ( - "person_id IN (SELECT person_id FROM {} WHERE cohort_id = {} AND team_id = {})".format( + "person_id IN (SELECT person_id FROM {} WHERE cohort_id = %(cohort_id)s AND team_id = %(team_id)s)".format( PERSON_STATIC_COHORT_TABLE, cohort.pk, cohort.team_id ), - {}, + {"cohort_id": cohort.pk, "team_id": cohort.team_id}, ) for group_idx, group in enumerate(cohort.groups): From b1f1898c7efa17f9b0d59272e48383e7c353eb2d Mon Sep 17 00:00:00 2001 From: Tim Glaser Date: Wed, 13 Jan 2021 17:49:09 +0100 Subject: [PATCH 08/16] fix tests --- ee/clickhouse/models/cohort.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ee/clickhouse/models/cohort.py b/ee/clickhouse/models/cohort.py index 657a3da96f92d..8323bb6411c4c 100644 --- a/ee/clickhouse/models/cohort.py +++ b/ee/clickhouse/models/cohort.py @@ -23,7 +23,7 @@ def format_person_query(cohort: Cohort) -> Tuple[str, Dict[str, Any]]: if cohort.is_static: return ( "person_id IN (SELECT person_id FROM {} WHERE cohort_id = %(cohort_id)s AND team_id = %(team_id)s)".format( - PERSON_STATIC_COHORT_TABLE, cohort.pk, cohort.team_id + PERSON_STATIC_COHORT_TABLE ), {"cohort_id": cohort.pk, "team_id": cohort.team_id}, ) From ac56920543fe2646f726d116e46e93e97d5f633f Mon Sep 17 00:00:00 2001 From: Tim Glaser Date: Thu, 14 Jan 2021 21:05:15 +0100 Subject: [PATCH 09/16] Use JSON instead of protobuf --- ee/clickhouse/models/cohort.py | 13 +-- ee/idl/gen/person_static_cohort_pb2.py | 137 ------------------------- 2 files changed, 7 insertions(+), 143 deletions(-) delete mode 100644 ee/idl/gen/person_static_cohort_pb2.py diff --git a/ee/clickhouse/models/cohort.py b/ee/clickhouse/models/cohort.py index 8323bb6411c4c..4a2924b054ad2 100644 --- a/ee/clickhouse/models/cohort.py +++ b/ee/clickhouse/models/cohort.py @@ -74,11 +74,12 @@ def get_person_ids_by_cohort_id(team: Team, cohort_id: int): def insert_static_cohort(person_uuids: List[Optional[uuid.UUID]], cohort_id: int, team: Team): for person_uuid in person_uuids: - pb_event = person_static_cohort_pb2.PersonStaticCohort() - pb_event.id = str(uuid.uuid4()) - pb_event.person_id = str(person_uuid) - pb_event.cohort_id = cohort_id - pb_event.team_id = team.pk + person_cohort = { + "id": str(uuid.uuid4()), + "person_id": str(person_uuid), + "cohort_id": cohort_id, + "team_id": team.pk, + } p = ClickhouseProducer() - p.produce_proto(sql=INSERT_PERSON_STATIC_COHORT, topic=KAFKA_PERSON_STATIC_COHORT, data=pb_event) + p.produce_proto(sql=INSERT_PERSON_STATIC_COHORT, topic=KAFKA_PERSON_STATIC_COHORT, data=person_cohort) diff --git a/ee/idl/gen/person_static_cohort_pb2.py b/ee/idl/gen/person_static_cohort_pb2.py deleted file mode 100644 index 5bcd3a768dec3..0000000000000 --- a/ee/idl/gen/person_static_cohort_pb2.py +++ /dev/null @@ -1,137 +0,0 @@ -# -*- coding: utf-8 -*- -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: person_static_cohort.proto -"""Generated protocol buffer code.""" -from google.protobuf import descriptor as _descriptor -from google.protobuf import message as _message -from google.protobuf import reflection as _reflection -from google.protobuf import symbol_database as _symbol_database - -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - - -DESCRIPTOR = _descriptor.FileDescriptor( - name="person_static_cohort.proto", - package="", - syntax="proto3", - serialized_options=None, - create_key=_descriptor._internal_create_key, - serialized_pb=b'\n\x1aperson_static_cohort.proto"W\n\x12PersonStaticCohort\x12\n\n\x02id\x18\x01 \x01(\t\x12\x11\n\tperson_id\x18\x02 \x01(\t\x12\x11\n\tcohort_id\x18\x03 \x01(\x04\x12\x0f\n\x07team_id\x18\x04 \x01(\x04\x62\x06proto3', -) - - -_PERSONSTATICCOHORT = _descriptor.Descriptor( - name="PersonStaticCohort", - full_name="PersonStaticCohort", - filename=None, - file=DESCRIPTOR, - containing_type=None, - create_key=_descriptor._internal_create_key, - fields=[ - _descriptor.FieldDescriptor( - name="id", - full_name="PersonStaticCohort.id", - index=0, - number=1, - type=9, - cpp_type=9, - label=1, - has_default_value=False, - default_value=b"".decode("utf-8"), - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - create_key=_descriptor._internal_create_key, - ), - _descriptor.FieldDescriptor( - name="person_id", - full_name="PersonStaticCohort.person_id", - index=1, - number=2, - type=9, - cpp_type=9, - label=1, - has_default_value=False, - default_value=b"".decode("utf-8"), - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - create_key=_descriptor._internal_create_key, - ), - _descriptor.FieldDescriptor( - name="cohort_id", - full_name="PersonStaticCohort.cohort_id", - index=2, - number=3, - type=4, - cpp_type=4, - label=1, - has_default_value=False, - default_value=0, - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - create_key=_descriptor._internal_create_key, - ), - _descriptor.FieldDescriptor( - name="team_id", - full_name="PersonStaticCohort.team_id", - index=3, - number=4, - type=4, - cpp_type=4, - label=1, - has_default_value=False, - default_value=0, - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - create_key=_descriptor._internal_create_key, - ), - ], - extensions=[], - nested_types=[], - enum_types=[], - serialized_options=None, - is_extendable=False, - syntax="proto3", - extension_ranges=[], - oneofs=[], - serialized_start=30, - serialized_end=117, -) - -DESCRIPTOR.message_types_by_name["PersonStaticCohort"] = _PERSONSTATICCOHORT -_sym_db.RegisterFileDescriptor(DESCRIPTOR) - -PersonStaticCohort = _reflection.GeneratedProtocolMessageType( - "PersonStaticCohort", - (_message.Message,), - { - "DESCRIPTOR": _PERSONSTATICCOHORT, - "__module__": "person_static_cohort_pb2" - # @@protoc_insertion_point(class_scope:PersonStaticCohort) - }, -) -_sym_db.RegisterMessage(PersonStaticCohort) - - -# @@protoc_insertion_point(module_scope) From e42c0690b886f75566ee812966de01a9b6d5d29e Mon Sep 17 00:00:00 2001 From: Tim Glaser Date: Thu, 14 Jan 2021 21:38:33 +0100 Subject: [PATCH 10/16] unused import --- ee/clickhouse/models/cohort.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ee/clickhouse/models/cohort.py b/ee/clickhouse/models/cohort.py index 4a2924b054ad2..e12eb44b774aa 100644 --- a/ee/clickhouse/models/cohort.py +++ b/ee/clickhouse/models/cohort.py @@ -10,7 +10,6 @@ INSERT_PERSON_STATIC_COHORT, PERSON_STATIC_COHORT_TABLE, ) -from ee.idl.gen import person_static_cohort_pb2 from ee.kafka_client.client import ClickhouseProducer from ee.kafka_client.topics import KAFKA_PERSON_STATIC_COHORT from posthog.models import Action, Cohort, Filter, Team From 715cc17cab7e04dfbfc75909845fa060e842f56e Mon Sep 17 00:00:00 2001 From: Tim Glaser Date: Thu, 14 Jan 2021 21:56:34 +0100 Subject: [PATCH 11/16] produce instead of produce_proto --- ee/clickhouse/models/cohort.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ee/clickhouse/models/cohort.py b/ee/clickhouse/models/cohort.py index e12eb44b774aa..49b2624896306 100644 --- a/ee/clickhouse/models/cohort.py +++ b/ee/clickhouse/models/cohort.py @@ -81,4 +81,4 @@ def insert_static_cohort(person_uuids: List[Optional[uuid.UUID]], cohort_id: int } p = ClickhouseProducer() - p.produce_proto(sql=INSERT_PERSON_STATIC_COHORT, topic=KAFKA_PERSON_STATIC_COHORT, data=person_cohort) + p.produce(sql=INSERT_PERSON_STATIC_COHORT, topic=KAFKA_PERSON_STATIC_COHORT, data=person_cohort) From edcb5b33de975c16da55419a9ab8f71b20ade755 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Thu, 14 Jan 2021 16:02:51 -0800 Subject: [PATCH 12/16] Insert directly into clickhouse, no need for kafka --- .../migrations/0007_static_cohorts_table.py | 1 - ee/clickhouse/models/cohort.py | 18 +++++++----------- ee/clickhouse/sql/person.py | 6 +----- ee/kafka_client/topics.py | 1 - 4 files changed, 8 insertions(+), 18 deletions(-) diff --git a/ee/clickhouse/migrations/0007_static_cohorts_table.py b/ee/clickhouse/migrations/0007_static_cohorts_table.py index 7f25ee94a0a66..7d3a078389497 100644 --- a/ee/clickhouse/migrations/0007_static_cohorts_table.py +++ b/ee/clickhouse/migrations/0007_static_cohorts_table.py @@ -4,5 +4,4 @@ operations = [ migrations.RunSQL(PERSON_STATIC_COHORT_TABLE_SQL), - migrations.RunSQL(KAFKA_PERSON_STATIC_COHORT_TABLE_SQL), ] diff --git a/ee/clickhouse/models/cohort.py b/ee/clickhouse/models/cohort.py index 49b2624896306..37652fa19dfcc 100644 --- a/ee/clickhouse/models/cohort.py +++ b/ee/clickhouse/models/cohort.py @@ -11,7 +11,6 @@ PERSON_STATIC_COHORT_TABLE, ) from ee.kafka_client.client import ClickhouseProducer -from ee.kafka_client.topics import KAFKA_PERSON_STATIC_COHORT from posthog.models import Action, Cohort, Filter, Team @@ -72,13 +71,10 @@ def get_person_ids_by_cohort_id(team: Team, cohort_id: int): def insert_static_cohort(person_uuids: List[Optional[uuid.UUID]], cohort_id: int, team: Team): - for person_uuid in person_uuids: - person_cohort = { - "id": str(uuid.uuid4()), - "person_id": str(person_uuid), - "cohort_id": cohort_id, - "team_id": team.pk, - } - - p = ClickhouseProducer() - p.produce(sql=INSERT_PERSON_STATIC_COHORT, topic=KAFKA_PERSON_STATIC_COHORT, data=person_cohort) + p = ClickhouseProducer() + p.send_to_kafka = False + persons = ( + {"id": str(uuid.uuid4()), "person_id": str(person_uuid), "cohort_id": cohort_id, "team_id": team.pk,} + for person_uuid in person_uuids + ) + sync_execute(INSERT_PERSON_STATIC_COHORT, persons) diff --git a/ee/clickhouse/sql/person.py b/ee/clickhouse/sql/person.py index 4166c780b4777..cbb614d4c1c4e 100644 --- a/ee/clickhouse/sql/person.py +++ b/ee/clickhouse/sql/person.py @@ -151,10 +151,6 @@ extra_fields=KAFKA_COLUMNS, ) -KAFKA_PERSON_STATIC_COHORT_TABLE_SQL = PERSON_STATIC_COHORT_BASE_SQL.format( - table_name="kafka_" + PERSON_STATIC_COHORT_TABLE, engine=kafka_engine(KAFKA_PERSON_STATIC_COHORT), extra_fields="", -) - DROP_PERSON_STATIC_COHORT_TABLE_SQL = """ DROP TABLE {} """.format( @@ -162,7 +158,7 @@ ) INSERT_PERSON_STATIC_COHORT = """ -INSERT INTO {} SELECT %(id)s, %(person_id)s, %(cohort_id)s, %(team_id)s, now(), 0 VALUES +INSERT INTO {} VALUES """.format( PERSON_STATIC_COHORT_TABLE ) diff --git a/ee/kafka_client/topics.py b/ee/kafka_client/topics.py index 3da548400816f..ed9dcdd6fb310 100644 --- a/ee/kafka_client/topics.py +++ b/ee/kafka_client/topics.py @@ -1,6 +1,5 @@ KAFKA_EVENTS = "clickhouse_events_proto" KAFKA_PERSON = "clickhouse_person" KAFKA_PERSON_UNIQUE_ID = "clickhouse_person_unique_id" -KAFKA_PERSON_STATIC_COHORT = "clickhouse_static_cohort" KAFKA_SESSION_RECORDING_EVENTS = "clickhouse_session_recording_events" KAFKA_EVENTS_WAL = "events_write_ahead_log" From f1e0f272b97bd01eeb56538bf6c2de25fdd98044 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Thu, 14 Jan 2021 17:59:29 -0800 Subject: [PATCH 13/16] a few quick fixes --- ee/clickhouse/migrations/0007_static_cohorts_table.py | 2 +- ee/clickhouse/models/cohort.py | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/ee/clickhouse/migrations/0007_static_cohorts_table.py b/ee/clickhouse/migrations/0007_static_cohorts_table.py index 7d3a078389497..a377866bd55f3 100644 --- a/ee/clickhouse/migrations/0007_static_cohorts_table.py +++ b/ee/clickhouse/migrations/0007_static_cohorts_table.py @@ -1,6 +1,6 @@ from infi.clickhouse_orm import migrations -from ee.clickhouse.sql.person import KAFKA_PERSON_STATIC_COHORT_TABLE_SQL, PERSON_STATIC_COHORT_TABLE_SQL +from ee.clickhouse.sql.person import PERSON_STATIC_COHORT_TABLE_SQL operations = [ migrations.RunSQL(PERSON_STATIC_COHORT_TABLE_SQL), diff --git a/ee/clickhouse/models/cohort.py b/ee/clickhouse/models/cohort.py index 37652fa19dfcc..4d32da817f3f6 100644 --- a/ee/clickhouse/models/cohort.py +++ b/ee/clickhouse/models/cohort.py @@ -10,7 +10,6 @@ INSERT_PERSON_STATIC_COHORT, PERSON_STATIC_COHORT_TABLE, ) -from ee.kafka_client.client import ClickhouseProducer from posthog.models import Action, Cohort, Filter, Team @@ -71,8 +70,6 @@ def get_person_ids_by_cohort_id(team: Team, cohort_id: int): def insert_static_cohort(person_uuids: List[Optional[uuid.UUID]], cohort_id: int, team: Team): - p = ClickhouseProducer() - p.send_to_kafka = False persons = ( {"id": str(uuid.uuid4()), "person_id": str(person_uuid), "cohort_id": cohort_id, "team_id": team.pk,} for person_uuid in person_uuids From b66001ee6eea43b150e030cc47e628987cba0e14 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Thu, 14 Jan 2021 19:30:37 -0800 Subject: [PATCH 14/16] insert in batch to clickhouse --- ee/clickhouse/models/cohort.py | 9 ++++++++- ee/clickhouse/sql/person.py | 4 ++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/ee/clickhouse/models/cohort.py b/ee/clickhouse/models/cohort.py index 4d32da817f3f6..b51145da146dc 100644 --- a/ee/clickhouse/models/cohort.py +++ b/ee/clickhouse/models/cohort.py @@ -1,4 +1,5 @@ import uuid +from datetime import datetime from typing import Any, Dict, List, Optional, Tuple from ee.clickhouse.client import sync_execute @@ -71,7 +72,13 @@ def get_person_ids_by_cohort_id(team: Team, cohort_id: int): def insert_static_cohort(person_uuids: List[Optional[uuid.UUID]], cohort_id: int, team: Team): persons = ( - {"id": str(uuid.uuid4()), "person_id": str(person_uuid), "cohort_id": cohort_id, "team_id": team.pk,} + { + "id": str(uuid.uuid4()), + "person_id": str(person_uuid), + "cohort_id": cohort_id, + "team_id": team.pk, + "_timestamp": datetime.now(), + } for person_uuid in person_uuids ) sync_execute(INSERT_PERSON_STATIC_COHORT, persons) diff --git a/ee/clickhouse/sql/person.py b/ee/clickhouse/sql/person.py index cbb614d4c1c4e..a816cfc8ff4e7 100644 --- a/ee/clickhouse/sql/person.py +++ b/ee/clickhouse/sql/person.py @@ -1,4 +1,4 @@ -from ee.kafka_client.topics import KAFKA_PERSON, KAFKA_PERSON_STATIC_COHORT, KAFKA_PERSON_UNIQUE_ID +from ee.kafka_client.topics import KAFKA_PERSON, KAFKA_PERSON_UNIQUE_ID from .clickhouse import KAFKA_COLUMNS, STORAGE_POLICY, kafka_engine, table_engine @@ -158,7 +158,7 @@ ) INSERT_PERSON_STATIC_COHORT = """ -INSERT INTO {} VALUES +INSERT INTO {} (id, person_id, cohort_id, team_id, _timestamp) VALUES """.format( PERSON_STATIC_COHORT_TABLE ) From 3fa214862f20dc6a2f4185f39349f8e98f2b58d8 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Thu, 14 Jan 2021 19:52:35 -0800 Subject: [PATCH 15/16] test SQLi since we can't really trust what we are inserting --- ee/clickhouse/models/test/test_cohort.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/ee/clickhouse/models/test/test_cohort.py b/ee/clickhouse/models/test/test_cohort.py index 79cb6a2f3410d..f6b2c6407f7ac 100644 --- a/ee/clickhouse/models/test/test_cohort.py +++ b/ee/clickhouse/models/test/test_cohort.py @@ -209,12 +209,18 @@ def test_insert_by_distinct_id_or_email(self): self.assertEqual(len(results), 2) self.assertEqual(cohort.is_calculating, False) + # test SQLi + Person.objects.create(team_id=self.team.pk, distinct_ids=["'); truncate person_static_cohort; --"]) + cohort.insert_users_by_list(["'); truncate person_static_cohort; --", "123"]) + results = sync_execute("select count(1) from person_static_cohort")[0][0] + self.assertEqual(results, 3) + #  If we accidentally call calculate_people it shouldn't erase people cohort.calculate_people() results = get_person_ids_by_cohort_id(self.team, cohort.id) - self.assertEqual(len(results), 2) + self.assertEqual(len(results), 3) # if we add people again, don't increase the number of people in cohort cohort.insert_users_by_list(["123"]) results = get_person_ids_by_cohort_id(self.team, cohort.id) - self.assertEqual(len(results), 2) + self.assertEqual(len(results), 3) From 96b4a444c28b305d1314ca427bb229d8ec2bc93d Mon Sep 17 00:00:00 2001 From: Tim Glaser Date: Fri, 15 Jan 2021 11:09:02 +0100 Subject: [PATCH 16/16] Extra check for duplicates --- posthog/test/test_cohort_model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posthog/test/test_cohort_model.py b/posthog/test/test_cohort_model.py index cf7236efe3c4a..105cef1b9dc10 100644 --- a/posthog/test/test_cohort_model.py +++ b/posthog/test/test_cohort_model.py @@ -58,7 +58,7 @@ def test_insert_by_distinct_id_or_email(self): Person.objects.create(team=team2, properties={"email": "email@example.org"}) cohort = Cohort.objects.create(team=self.team, groups=[], is_static=True) - cohort.insert_users_by_list(["email@example.org", "123"]) + cohort.insert_users_by_list(["email@example.org", "123", "123", "email@example.org"]) cohort = Cohort.objects.get() self.assertEqual(cohort.people.count(), 2) self.assertEqual(cohort.is_calculating, False)