Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More granular incremental stats #3164

Merged
merged 13 commits into from
Feb 11, 2025
15 changes: 10 additions & 5 deletions listenbrainz_spark/popularity/listens.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import List
from datetime import datetime
from typing import List, Optional

from listenbrainz_spark.path import LISTENBRAINZ_POPULARITY_DIRECTORY, RELEASE_METADATA_CACHE_DATAFRAME
from listenbrainz_spark.popularity.common import get_popularity_per_artist_query, \
Expand All @@ -23,11 +24,13 @@ def get_table_prefix(self) -> str:
def get_base_path(self) -> str:
return LISTENBRAINZ_POPULARITY_DIRECTORY

def get_filter_aggregate_query(self, existing_aggregate: str, incremental_aggregate: str) -> str:
def get_filter_aggregate_query(self, existing_aggregate: str, incremental_aggregate: str,
existing_created: Optional[datetime]) -> str:
inc_where_clause = f"WHERE created >= to_timestamp('{existing_created}')" if existing_created else ""
entity_id = self.get_entity_id()
return f"""
WITH incremental_users AS (
SELECT DISTINCT {entity_id} FROM {incremental_aggregate}
SELECT DISTINCT {entity_id} FROM {incremental_aggregate} {inc_where_clause}
)
SELECT *
FROM {existing_aggregate} ea
Expand Down Expand Up @@ -91,11 +94,13 @@ def get_table_prefix(self) -> str:
def get_base_path(self) -> str:
return LISTENBRAINZ_POPULARITY_DIRECTORY

def get_filter_aggregate_query(self, existing_aggregate: str, incremental_aggregate: str) -> str:
def get_filter_aggregate_query(self, existing_aggregate: str, incremental_aggregate: str,
existing_created: Optional[datetime]) -> str:
inc_where_clause = f"WHERE created >= to_timestamp('{existing_created}')" if existing_created else ""
entity_id = self.get_entity_id()
return f"""
WITH incremental_artists AS (
SELECT DISTINCT artist_mbid, {entity_id} FROM {incremental_aggregate}
SELECT DISTINCT artist_mbid, {entity_id} FROM {incremental_aggregate} {inc_where_clause}
)
SELECT *
FROM {existing_aggregate} ea
Expand Down
5 changes: 5 additions & 0 deletions listenbrainz_spark/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
BOOKKEEPING_SCHEMA = StructType([
StructField('from_date', TimestampType(), nullable=False),
StructField('to_date', TimestampType(), nullable=False),
StructField('updated_at', TimestampType(), nullable=False),
])

INCREMENTAL_BOOKKEEPING_SCHEMA = StructType([
StructField('created', TimestampType(), nullable=False),
StructField('updated_at', TimestampType(), nullable=False),
])

mlhd_schema = StructType([
Expand Down
65 changes: 53 additions & 12 deletions listenbrainz_spark/stats/incremental/incremental_stats_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from listenbrainz_spark import hdfs_connection
from listenbrainz_spark.config import HDFS_CLUSTER_URI
from listenbrainz_spark.path import INCREMENTAL_DUMPS_SAVE_PATH
from listenbrainz_spark.schema import BOOKKEEPING_SCHEMA
from listenbrainz_spark.schema import BOOKKEEPING_SCHEMA, INCREMENTAL_BOOKKEEPING_SCHEMA
from listenbrainz_spark.stats import run_query
from listenbrainz_spark.stats.incremental.message_creator import MessageCreator
from listenbrainz_spark.stats.incremental.query_provider import QueryProvider
Expand Down Expand Up @@ -50,6 +50,7 @@ def __init__(self, provider: QueryProvider, message_creator: MessageCreator):
self.message_creator = message_creator
self._cache_tables = []
self._only_inc = None
self.incremental_table = None

@property
def only_inc(self):
Expand All @@ -68,7 +69,7 @@ def _setup_cache_tables(self):

def partial_aggregate_usable(self) -> bool:
""" Checks whether a partial aggregate exists and is fresh to generate the required stats. """
metadata_path = self.provider.get_bookkeeping_path()
metadata_path = f"{self.provider.get_bookkeeping_path()}/full"
existing_aggregate_path = self.provider.get_existing_aggregate_path()

try:
Expand All @@ -95,7 +96,7 @@ def create_partial_aggregate(self) -> DataFrame:
Returns:
DataFrame: The generated partial aggregate DataFrame.
"""
metadata_path = self.provider.get_bookkeeping_path()
metadata_path = f"{self.provider.get_bookkeeping_path()}/full"
existing_aggregate_path = self.provider.get_existing_aggregate_path()

table = f"{self.provider.get_table_prefix()}_full_listens"
Expand Down Expand Up @@ -129,12 +130,35 @@ def create_incremental_aggregate(self) -> DataFrame:
Returns:
DataFrame: The generated incremental aggregate DataFrame.
"""
table = f"{self.provider.get_table_prefix()}_incremental_listens"
self.incremental_table = f"{self.provider.get_table_prefix()}_incremental_listens"
read_files_from_HDFS(INCREMENTAL_DUMPS_SAVE_PATH) \
.createOrReplaceTempView(table)
inc_query = self.provider.get_aggregate_query(table, self._cache_tables)
.createOrReplaceTempView(self.incremental_table)
inc_query = self.provider.get_aggregate_query(self.incremental_table, self._cache_tables)
return run_query(inc_query)

def bookkeep_incremental_aggregate(self):
metadata_path = f"{self.provider.get_bookkeeping_path()}/incremental"
query = f"SELECT max(created) AS latest_created_at FROM {self.incremental_table}"
latest_created_at = run_query(query).collect()[0]["latest_created_at"]
metadata_df = listenbrainz_spark.session.createDataFrame(
[(latest_created_at, datetime.now())],
schema=INCREMENTAL_BOOKKEEPING_SCHEMA
)
metadata_df.write.mode("overwrite").json(metadata_path)

def get_incremental_dumps_existing_created(self):
metadata_path = f"{self.provider.get_bookkeeping_path()}/incremental"
try:
metadata = listenbrainz_spark \
.session \
.read \
.schema(INCREMENTAL_BOOKKEEPING_SCHEMA) \
.json(f"{HDFS_CLUSTER_URI}{metadata_path}") \
.collect()[0]
return metadata["created"]
except AnalysisException:
return None

def generate_stats(self) -> DataFrame:
self._setup_cache_tables()
prefix = self.provider.get_table_prefix()
Expand All @@ -155,14 +179,30 @@ def generate_stats(self) -> DataFrame:
inc_df.createOrReplaceTempView(inc_table)

if self._only_inc:
filter_query = self.provider.get_filter_aggregate_query(partial_table, inc_table)
filtered_aggregate_df = run_query(filter_query)
filtered_table = f"{prefix}_filtered_aggregate"
filtered_aggregate_df.createOrReplaceTempView(filtered_table)
existing_created = self.get_incremental_dumps_existing_created()

filter_existing_query = self.provider.get_filter_aggregate_query(
partial_table,
self.incremental_table,
existing_created
)
filtered_existing_aggregate_df = run_query(filter_existing_query)
filtered_existing_table = f"{prefix}_filtered_existing_aggregate"
filtered_existing_aggregate_df.createOrReplaceTempView(filtered_existing_table)

filter_incremental_query = self.provider.get_filter_aggregate_query(
inc_table,
self.incremental_table,
existing_created
)
filtered_incremental_aggregate_df = run_query(filter_incremental_query)
filtered_incremental_table = f"{prefix}_filtered_incremental_aggregate"
filtered_incremental_aggregate_df.createOrReplaceTempView(filtered_incremental_table)
else:
filtered_table = partial_table
filtered_existing_table = partial_table
filtered_incremental_table = inc_table

final_query = self.provider.get_combine_aggregates_query(filtered_table, inc_table)
final_query = self.provider.get_combine_aggregates_query(filtered_existing_table, filtered_incremental_table)
final_df = run_query(final_query)
else:
final_df = partial_df
Expand All @@ -183,3 +223,4 @@ def run(self) -> Iterator[Dict]:
yield message
if not self.only_inc:
yield self.message_creator.create_end_message()
self.bookkeep_incremental_aggregate()
15 changes: 8 additions & 7 deletions listenbrainz_spark/stats/incremental/query_provider.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import abc
from typing import List
from datetime import datetime
from typing import List, Optional

from listenbrainz_spark.stats.incremental.range_selector import ListenRangeSelector

Expand Down Expand Up @@ -41,7 +42,7 @@ def get_existing_aggregate_path(self) -> str:
return f"{self.get_base_path()}/aggregates/{self.entity}/{self.stats_range}"

def get_bookkeeping_path(self) -> str:
""" Returns the HDFS path for bookkeeping metadata. """
""" Returns the HDFS path for bookkeeping metadata directory. """
return f"{self.get_base_path()}/bookkeeping/{self.entity}/{self.stats_range}"

@abc.abstractmethod
Expand All @@ -68,14 +69,14 @@ def get_combine_aggregates_query(self, existing_aggregate: str, incremental_aggr
raise NotImplementedError()

@abc.abstractmethod
def get_filter_aggregate_query(self, existing_aggregate: str, incremental_aggregate: str) -> str:
def get_filter_aggregate_query(self, aggregate: str, inc_listens_table: str, existing_created: datetime) -> str:
"""
Return the query to filter the existing aggregate based on the listens present in incremental
aggregate.
Return the query to filter the aggregate based on the listens submitted since existing created timestamp.

Args:
existing_aggregate: The table name for existing aggregate.
incremental_aggregate: The table name for incremental aggregate.
aggregate: The table name for the aggregate to filter
inc_listens_table: The table name for incremental listens.
existing_created: The max listen created value last time incremental stats for this query was run.
"""
raise NotImplementedError()

Expand Down
6 changes: 4 additions & 2 deletions listenbrainz_spark/stats/incremental/sitewide/entity.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import abc
import logging
from typing import Iterator, Dict
from datetime import datetime
from typing import Iterator, Dict, Optional

from pydantic import ValidationError
from pyspark.sql import DataFrame
Expand Down Expand Up @@ -34,7 +35,8 @@ def get_base_path(self) -> str:
def get_table_prefix(self) -> str:
return f"sitewide_{self.entity}_{self.stats_range}"

def get_filter_aggregate_query(self, existing_aggregate: str, incremental_aggregate: str) -> str:
def get_filter_aggregate_query(self, existing_aggregate: str, incremental_aggregate: str,
existed_created: Optional[datetime]) -> str:
return f"SELECT * FROM {existing_aggregate}"


Expand Down
12 changes: 9 additions & 3 deletions listenbrainz_spark/stats/incremental/user/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from data.model.user_release_group_stat import ReleaseGroupRecord
from data.model.user_release_stat import ReleaseRecord
from listenbrainz_spark.path import LISTENBRAINZ_USER_STATS_DIRECTORY
from listenbrainz_spark.stats import run_query
from listenbrainz_spark.stats.incremental.message_creator import StatsMessageCreator
from listenbrainz_spark.stats.incremental.query_provider import QueryProvider
from listenbrainz_spark.stats.incremental.range_selector import ListenRangeSelector
Expand All @@ -37,17 +38,22 @@ def get_table_prefix(self) -> str:
def get_entity_id(self):
return "user_id"

def get_filter_aggregate_query(self, existing_aggregate, incremental_aggregate):
def get_filter_aggregate_query(self, aggregate, inc_listens_table, existing_created):
""" Filter listens from existing aggregate to only include listens for entities having listens in the
incremental dumps.
"""
entity_id = self.get_entity_id()
inc_clause = f"""
SELECT DISTINCT {entity_id}
FROM {inc_listens_table}
WHERE created >= to_timestamp('{existing_created}')
"""
return f"""
WITH incremental_users AS (
SELECT DISTINCT {entity_id} FROM {incremental_aggregate}
{inc_clause}
)
SELECT *
FROM {existing_aggregate} ea
FROM {aggregate} ea
WHERE EXISTS(SELECT 1 FROM incremental_users iu WHERE iu.{entity_id} = ea.{entity_id})
"""

Expand Down
Loading