diff --git a/pdr_backend/analytics/predictoor_stats.py b/pdr_backend/analytics/predictoor_stats.py index 27520a85d..bdc8809c9 100644 --- a/pdr_backend/analytics/predictoor_stats.py +++ b/pdr_backend/analytics/predictoor_stats.py @@ -34,7 +34,7 @@ class PredictoorStat(TypedDict): def get_feed_summary_stats(predictions_df: pl.DataFrame) -> pl.DataFrame: # 1 - filter from lake only the rows that you're looking for df = predictions_df.filter( - ~((pl.col("trueval").is_null()) | (pl.col("payout").is_null())) + ~((pl.col("truevalue").is_null()) | (pl.col("payout").is_null())) ) # Group by pair @@ -42,8 +42,8 @@ def get_feed_summary_stats(predictions_df: pl.DataFrame) -> pl.DataFrame: pl.col("source").first().alias("source"), pl.col("payout").sum().alias("sum_payout"), pl.col("stake").sum().alias("sum_stake"), - pl.col("prediction").count().alias("num_predictions"), - (pl.col("prediction").sum() / pl.col("pair").count() * 100).alias("accuracy"), + pl.col("predvalue").count().alias("num_predictions"), + (pl.col("predvalue").sum() / pl.col("pair").count() * 100).alias("accuracy"), ) return df @@ -53,7 +53,7 @@ def get_feed_summary_stats(predictions_df: pl.DataFrame) -> pl.DataFrame: def get_predictoor_summary_stats(predictions_df: pl.DataFrame) -> pl.DataFrame: # 1 - filter from lake only the rows that you're looking for df = predictions_df.filter( - ~((pl.col("trueval").is_null()) | (pl.col("payout").is_null())) + ~((pl.col("truevalue").is_null()) | (pl.col("payout").is_null())) ) # Group by pair @@ -61,8 +61,8 @@ def get_predictoor_summary_stats(predictions_df: pl.DataFrame) -> pl.DataFrame: pl.col("source").first().alias("source"), pl.col("payout").sum().alias("sum_payout"), pl.col("stake").sum().alias("sum_stake"), - pl.col("prediction").count().alias("num_predictions"), - (pl.col("prediction").sum() / pl.col("pair").count() * 100).alias("accuracy"), + pl.col("predvalue").count().alias("num_predictions"), + (pl.col("predvalue").sum() / pl.col("pair").count() * 100).alias("accuracy"), ) return df diff --git a/pdr_backend/lake/base_data_store.py b/pdr_backend/lake/base_data_store.py new file mode 100644 index 000000000..70c88e518 --- /dev/null +++ b/pdr_backend/lake/base_data_store.py @@ -0,0 +1,44 @@ +from hashlib import md5 +from abc import abstractmethod +from typing import Optional, Literal + +import duckdb +from enforce_typing import enforce_types + + +class BaseDataStore: + @enforce_types + def __init__(self, base_directory=str): + """ + Initialize a PartitionedDataStore instance. + @arguments: + base_directory - The base directory to store the partitioned Parquet files. + """ + + self.base_directory = base_directory + self.duckdb_conn = duckdb.connect( + database=f"{self.base_directory}/duckdb.db" + ) # Keep a persistent connection + + @enforce_types + def _generate_view_name(self, base_path=str) -> str: + """ + Generate a unique view name for a given base path. + @arguments: + base_path - The base path to generate a view name for. + @returns: + str - A unique view name. + """ + + path = f"{self.base_directory}/{base_path}" + hash_object = md5(path.encode()) + return f"dataset_{hash_object.hexdigest()}" + + @abstractmethod + def query_data( + self, + dataset_identifier: str, + query: str, + partition_type: Optional[Literal["date", "address"]] = None, + ): + pass diff --git a/pdr_backend/lake/csv_data_store.py b/pdr_backend/lake/csv_data_store.py new file mode 100644 index 000000000..a34b6aadf --- /dev/null +++ b/pdr_backend/lake/csv_data_store.py @@ -0,0 +1,315 @@ +import os +from typing import List, Optional +import polars as pl + +from polars.type_aliases import SchemaDict + + +class CSVDataStore: + def __init__(self, base_path: str): + self.base_path = base_path + + def _get_folder_path(self, dataset_identifier: str) -> str: + """ + Returns the folder path for the given dataset_identifier. + If the folder does not exist, it will be created. + @args: + dataset_identifier: str - identifier of the dataset + """ + folder_path = os.path.join(self.base_path, dataset_identifier) + os.makedirs(folder_path, exist_ok=True) + return folder_path + + def _fill_with_zero(self, number: int, length: int = 10) -> str: + """ + Fills the given number with zeros to make it 10 digits long. + @args: + number: int - number to fill with zeros + """ + number_str = str(number) + return f"{(length - len(number_str)) * '0'}{number_str}" + + def _create_file_name( + self, dataset_identifier: str, start_time: int, end_time: Optional[int] + ) -> str: + """ + Creates a file name using the given dataset_identifier, + start_time, end_time, and row_count. + @args: + dataset_identifier: str - identifier of the dataset + start_time: int - start time of the data TIMESTAMP + end_time: int - end time of the data TIMESTAMP + """ + start_time_str = self._fill_with_zero(start_time) + + start_phrase = f"_from_{start_time_str}" + + end_phrase = f"_to_{self._fill_with_zero(end_time)}" if end_time else "_to_" + + return f"{dataset_identifier}{start_phrase}{end_phrase}.csv" + + def _create_file_path( + self, dataset_identifier: str, start_time: int, end_time: Optional[int] + ) -> str: + """ + Creates the file path for the given dataset_identifier, + start_time, end_time. + @args: + dataset_identifier: str - identifier of the dataset + start_time: str - start time of the data + end_time: str - end time of the data + """ + + file_name = self._create_file_name(dataset_identifier, start_time, end_time) + folder_path = self._get_folder_path(dataset_identifier) + return os.path.join(folder_path, file_name) + + def _chunk_data(self, data: pl.DataFrame) -> List[pl.DataFrame]: + """ + Splits the given data into chunks of up to 1000 rows each. + @args: + data: pl.DataFrame - data to be chunked + """ + return [ + data.slice(i, min(1000, len(data) - i)) for i in range(0, len(data), 1000) + ] + + def write( + self, + dataset_identifier: str, + data: pl.DataFrame, + schema: Optional[SchemaDict] = None, + ): + """ + Writes the given data to a csv file in the folder + corresponding to the given dataset_identifier. + @args: + data: pl.DataFrame - The data to write, it has to be sorted by timestamp + dataset_identifier: str - The dataset identifier + """ + + max_row_count = 1000 + last_file_row_count = self._get_last_file_row_count(dataset_identifier) + if last_file_row_count is not None: + if last_file_row_count < max_row_count: + remaining_rows = max_row_count - last_file_row_count + + # get the first remaining_rows rows + remaining_rows = min(remaining_rows, len(data)) + + remaining_data = data.slice(0, remaining_rows) + + last_file_path = self._get_last_file_path( + self._get_folder_path(dataset_identifier) + ) + last_file_data = pl.read_csv(last_file_path, schema=schema) + last_file_data = last_file_data.vstack(remaining_data) + + t_start_time = last_file_data["timestamp"][0] + t_end_time = last_file_data["timestamp"][-1] + + last_file_data.write_csv(last_file_path) + # change the name of the file to reflect the new row count + new_file_path = self._create_file_path( + dataset_identifier, + t_start_time, + t_end_time if len(data) >= remaining_rows else None, + ) + + print("new_file_path", new_file_path) + os.rename(last_file_path, new_file_path) + + data = data.slice(remaining_rows, len(data) - remaining_rows) + + chunks = [ + data.slice(i, min(max_row_count, len(data) - i)) + for i in range(0, len(data), max_row_count) + ] + + for i, chunk in enumerate(chunks): + start_time = int(chunk["timestamp"][0]) + end_time = int(chunk["timestamp"][-1]) + file_path = self._create_file_path( + dataset_identifier, + start_time, + end_time if len(chunk) >= max_row_count else None, + ) + chunk.write_csv(file_path) + + def bulk_write(self, data_list: List[pl.DataFrame], dataset_identifier: str): + """ + Writes the given list of data to csv files in the folder + corresponding to the given dataset_identifier. + @args: + data_list: List[pl.DataFrame] - list of data to be written + dataset_identifier: str - identifier of the dataset + """ + for data in data_list: + self.write(dataset_identifier, data) + + def _get_to_value(self, file_path: str) -> int: + """ + Returns the end time from the given file_path. + @args: + file_path: str - path of the file + @returns: + int - end time from the file_path + """ + return int(file_path.split("/")[-1].split("_")[4].replace(".csv", "")) + + def _get_from_value(self, file_path: str) -> int: + """ + Returns the start time from the given file_path. + @args: + file_path: str - path of the file + @returns: + int - start time from the file_path + """ + return int(file_path.split("/")[-1].split("_")[2]) + + def _get_file_paths( + self, folder_path: str, start_time: str, end_time: str + ) -> List[str]: + """ + Returns a list of file paths in the given folder_path + that contain the given start_time and end_time. + @args: + folder_path: str - path of the folder + start_time: str - start time of the data + end_time: str - end time of the data + @returns: + List[str] - list of file paths + """ + + file_names = os.listdir(folder_path) + file_paths = [os.path.join(folder_path, file_name) for file_name in file_names] + + # find files which has a higher start time and lower end time + file_paths = [ + file_path + for file_path in file_paths + # firstly, take the filename from the path (/path/to/file.csv -> file.csv) + # then, split the filename by "_" and take the 4th and 5th elements + # then, convert them to int and check if they are in the range + if self._get_from_value(file_path) >= int(start_time) + and ( + self._get_to_value(file_path) <= int(end_time) + or self._get_to_value(file_path) == 0 + ) + ] + + return file_paths + + def read( + self, + dataset_identifier: str, + start_time: str, + end_time: str, + schema: Optional[SchemaDict] = None, + ) -> pl.DataFrame: + """ + Reads the data from the csv file in the folder + corresponding to the given dataset_identifier, + start_time, and end_time. + @args: + dataset_identifier: str - identifier of the dataset + start_time: str - start time of the data + end_time: str - end time of the data + @returns: + pl.DataFrame - data read from the csv file + """ + data = self.read_all(dataset_identifier, schema=schema) + # if the data is empty, return + if len(data) == 0: + return data + + # if the data is not empty, + # check the timestamp column exists and is of type int64 + if "timestamp" not in data.columns: + return data + + return data.filter(data["timestamp"] >= int(start_time)).filter( + data["timestamp"] <= int(end_time) + ) + # return pl.read_csv(file_paths[0]) if file_paths else pl.DataFrame() + + def read_all( + self, dataset_identifier: str, schema: Optional[SchemaDict] = None + ) -> pl.DataFrame: + """ + Reads all the data from the csv files in the folder + corresponding to the given dataset_identifier. + @args: + dataset_identifier: str - identifier of the dataset + @returns: + pl.DataFrame - data read from the csv files + """ + + folder_path = self._get_folder_path(dataset_identifier) + file_names = os.listdir(folder_path) + file_paths = [os.path.join(folder_path, file_name) for file_name in file_names] + file_paths.sort() + + # print("read_all_file_paths", file_paths) + if file_paths: + # Read the first file to create the DataFrame + data = pl.read_csv(file_paths[0], schema=schema) + # Read the remaining files and append them to the DataFrame + for file_path in file_paths[1:]: + data = data.vstack(pl.read_csv(file_path, schema=schema)) + return data + + return pl.DataFrame([], schema=schema) + + def _get_last_file_path(self, folder_path: str) -> str: + """ + Returns the path of the last file in the given folder_path. + @args: + folder_path: str - path of the folder + @returns: + str - path of the last file + """ + + file_names = sorted(os.listdir(folder_path)) + return os.path.join(folder_path, file_names[-1]) if file_names else "" + + def get_last_timestamp(self, dataset_identifier: str) -> Optional[int]: + """ + Returns the last timestamp from the csv files in the folder + corresponding to the given dataset_identifier. + @args: + dataset_identifier: str - identifier of the dataset + @returns: + Optional[int] - last timestamp from the csv files + """ + folder_path = self._get_folder_path(dataset_identifier) + last_file_path = self._get_last_file_path(folder_path) + if len(last_file_path): + return int(last_file_path.split("_")[3]) + + return None + + def _get_last_file_row_count(self, dataset_identifier: str) -> Optional[int]: + """ + Returns the row count of the last file for the given dataset_identifier. + @args: + dataset_identifier: str - The dataset identifier + @returns: + row_count: Optional[int] - The row count of the last file + """ + + folder_path = self._get_folder_path(dataset_identifier) + file_names = os.listdir(folder_path) + + # Sort by file name + file_names.sort() + if len(file_names) == 0: + return None + + last_file_path = os.path.join(folder_path, file_names[-1]) + + # Read the last file + last_file = pl.read_csv(last_file_path) + row_count = last_file.shape[0] + + return row_count diff --git a/pdr_backend/lake/etl.py b/pdr_backend/lake/etl.py index bcdfd3a96..94af91f62 100644 --- a/pdr_backend/lake/etl.py +++ b/pdr_backend/lake/etl.py @@ -93,4 +93,4 @@ def update_bronze_pdr_predictions(self): self.tables[bronze_pdr_predictions_table_name] = table table = get_bronze_pdr_predictions_table(self.tables, self.ppss) - table.save() + table.append_to_sources(table.df) diff --git a/pdr_backend/lake/persistent_data_store.py b/pdr_backend/lake/persistent_data_store.py new file mode 100644 index 000000000..b87c807f6 --- /dev/null +++ b/pdr_backend/lake/persistent_data_store.py @@ -0,0 +1,150 @@ +# The PersistentDataStore class is a subclass of the Base +import os +import glob + +from enforce_typing import enforce_types +import polars as pl + +from pdr_backend.lake.base_data_store import BaseDataStore + + +class PersistentDataStore(BaseDataStore): + """ + A class to store and retrieve persistent data. + """ + + def __init__(self, base_directory: str): + """ + Initialize a PersistentDataStore instance. + @arguments: + base_directory - The base directory to store the persistent data. + """ + super().__init__(base_directory) + + @enforce_types + def _create_and_fill_table( + self, df: pl.DataFrame, dataset_identifier: str + ): # pylint: disable=unused-argument + """ + Create the dataset and insert data to the persistent dataset. + @arguments: + df - The Polars DataFrame to append. + dataset_identifier - A unique identifier for the dataset. + """ + + view_name = self._generate_view_name(dataset_identifier) + + # self.duckdb_conn.register(view_name, df) + # Create the table + self.duckdb_conn.execute(f"CREATE TABLE {view_name} AS SELECT * FROM df") + + @enforce_types + def insert_to_table(self, df: pl.DataFrame, dataset_identifier: str): + """ + Insert data to an persistent dataset. + @arguments: + df - The Polars DataFrame to append. + dataset_identifier - A unique identifier for the dataset. + @example: + df = pl.DataFrame({ + "id": [1, 2, 3], + "name": ["John", "Jane", "Doe"], + "age": [25, 30, 35] + }) + insert_to_table(df, "people") + """ + + view_name = self._generate_view_name(dataset_identifier) + # Check if the table exists + tables = self.duckdb_conn.execute( + "SELECT table_name FROM information_schema.tables WHERE table_schema = 'main'" + ).fetchall() + + if view_name in [table[0] for table in tables]: + self.duckdb_conn.execute(f"INSERT INTO {view_name} SELECT * FROM df") + else: + self._create_and_fill_table(df, dataset_identifier) + + @enforce_types + def query_data( + self, dataset_identifier: str, query: str, partition_type: None = None + ) -> pl.DataFrame: + """ + Execute a SQL query across the persistent dataset using DuckDB. + @arguments: + dataset_identifier - A unique identifier for the dataset. + query - The SQL query to execute. + @returns: + pl.DataFrame - The result of the query. + @example: + query_data("people", "SELECT * FROM {view_name}") + """ + + view_name = self._generate_view_name(dataset_identifier) + result_df = self.duckdb_conn.execute(query.format(view_name=view_name)).df() + + return pl.DataFrame(result_df) + + @enforce_types + def drop_table(self, dataset_identifier: str, ds_type: str = "table"): + """ + Drop the persistent dataset. + @arguments: + dataset_identifier - A unique identifier for the dataset. + ds_type - The type of the dataset to drop. Either "table" or "view". + @example: + drop_table("people") + """ + + if ds_type not in ["view", "table"]: + raise ValueError("ds_type must be either 'view' or 'table'") + + view_name = self._generate_view_name(dataset_identifier) + self.duckdb_conn.execute(f"DROP {ds_type} {view_name}") + + @enforce_types + def fill_from_csv_destination(self, csv_folder_path: str, dataset_identifier: str): + """ + Fill the persistent dataset from CSV files. + @arguments: + csv_folder_path - The path to the folder containing the CSV files. + dataset_identifier - A unique identifier for the dataset. + @example: + fill_from_csv_destination("data/csv", "people") + """ + + csv_files = glob.glob(os.path.join(csv_folder_path, "*.csv")) + + print("csv_files", csv_files) + for csv_file in csv_files: + df = pl.read_csv(csv_file) + self.insert_to_table(df, dataset_identifier) + + @enforce_types + def update_data( + self, df: pl.DataFrame, dataset_identifier: str, identifier_column: str + ): + """ + Update the persistent dataset with the provided DataFrame. + @arguments: + df - The Polars DataFrame to update. + dataset_identifier - A unique identifier for the dataset. + identifier_column - The column to use as the identifier for the update. + @example: + df = pl.DataFrame({ + "id": [1, 2, 3], + "name": ["John", "Jane", "Doe"], + "age": [25, 30, 35] + }) + update_data(df, "people", "id") + """ + + view_name = self._generate_view_name(dataset_identifier) + update_columns = ", ".join( + [f"{column} = {df[column]}" for column in df.columns] + ) + self.duckdb_conn.execute( + f"""UPDATE {view_name} + SET {update_columns} + WHERE {identifier_column} = {df[identifier_column]}""" + ) diff --git a/pdr_backend/lake/table.py b/pdr_backend/lake/table.py index 8c6b12e03..7c56bbbdc 100644 --- a/pdr_backend/lake/table.py +++ b/pdr_backend/lake/table.py @@ -2,6 +2,8 @@ import os from typing import Dict, Callable import polars as pl +from polars.type_aliases import SchemaDict + from enforce_typing import enforce_types from pdr_backend.ppss.ppss import PPSS from pdr_backend.lake.plutil import has_data, newest_ut @@ -9,17 +11,20 @@ from pdr_backend.util.time_types import UnixTimeMs from pdr_backend.lake.plutil import _object_list_to_df from pdr_backend.lake.table_pdr_predictions import _transform_timestamp_to_ms +from pdr_backend.lake.csv_data_store import CSVDataStore +from pdr_backend.lake.persistent_data_store import PersistentDataStore logger = logging.getLogger("table") @enforce_types class Table: - def __init__(self, table_name: str, df_schema: object, ppss: PPSS): + def __init__(self, table_name: str, df_schema: SchemaDict, ppss: PPSS): self.ppss = ppss self.table_name = table_name self.df_schema = df_schema - self.df = pl.DataFrame() + self.df = pl.DataFrame([], schema=df_schema) + print("self.df", self.df) self.load() @enforce_types @@ -27,58 +32,47 @@ def load(self): """ Read the data from the Parquet file into a DataFrame object """ - filename = self._parquet_filename() + print(f"Loading data for {self.table_name}") + self.csv_data_store = CSVDataStore(self.ppss.lake_ss.parquet_dir) + self.persistent_data_store = PersistentDataStore(self.ppss.lake_ss.parquet_dir) + st_ut = self.ppss.lake_ss.st_timestamp fin_ut = self.ppss.lake_ss.fin_timestamp + self.df = self.csv_data_store.read( + self.table_name, st_ut, fin_ut, schema=self.df_schema + ) - # load all data from file - # check if file exists - # if file doesn't exist, return an empty dataframe with the expected schema - if os.path.exists(filename): - logger.info("Loading parquet for %s", self.table_name) - df = pl.read_parquet(filename) - df = df.filter( - (pl.col("timestamp") >= st_ut) & (pl.col("timestamp") <= fin_ut) - ) - else: - logger.info("Create initial df for %s", self.table_name) - df = pl.DataFrame(schema=self.df_schema) - - # save data frame in memory - self.df = df + def append_to_sources(self, data: pl.DataFrame): + self._append_to_csv(data) + self._append_to_db(data) - @enforce_types - def save(self): + def _append_to_csv(self, data: pl.DataFrame): """ - Get the data from subgraph and write it to Parquet file - write to parquet file - parquet only supports appending via the pyarrow engine + Append the data from the DataFrame object into the CSV file + It only saves the new data that has been fetched + + @arguments: + data - The Polars DataFrame to save. """ + self.csv_data_store.write(self.table_name, data, schema=self.df_schema) + n_new = data.shape[0] + print( + f" Just saved df with {n_new} df rows to the csv files of {self.table_name}" + ) - assert "timestamp" in self.df.columns and self.df["timestamp"].dtype == pl.Int64 - assert len(self.df) > 0 - if len(self.df) > 2: - assert ( - self.df.head(1)["timestamp"].to_list()[0] - <= self.df.tail(1)["timestamp"].to_list()[0] - ) + def _append_to_db(self, data: pl.DataFrame): + """ + Append the data from the DataFrame object into the database + It only saves the new data that has been fetched - filename = self._parquet_filename() - - if os.path.exists(filename): # "append" existing file - cur_df = pl.read_parquet(filename) - self.df = pl.concat([cur_df, self.df]) - - # drop duplicates - self.df = self.df.filter(pl.struct("ID").is_unique()) - self.df.write_parquet(filename) - n_new = self.df.shape[0] - cur_df.shape[0] - print(f" Just appended {n_new} df rows to file {filename}") - else: # write new file - self.df.write_parquet(filename) - print( - f" Just saved df with {self.df.shape[0]} rows to new file {filename}" - ) + @arguments: + data - The Polars DataFrame to save. + """ + self.persistent_data_store.insert_to_table(data, self.table_name) + n_new = data.shape[0] + print( + f" Just saved df with {n_new} df rows to the database of {self.table_name}" + ) @enforce_types def get_pdr_df( @@ -97,13 +91,14 @@ def get_pdr_df( Update function for graphql query, returns raw data + Transforms ts into ms as required for data factory """ + print(f"Fetching data for {self.table_name}") network = get_sapphire_postfix(network) # save to file when this amount of data is fetched save_backoff_count = 0 pagination_offset = 0 - final_df = pl.DataFrame() + final_df = pl.DataFrame([], schema=self.df_schema) while True: # call the function @@ -127,7 +122,7 @@ def get_pdr_df( if len(final_df) == 0: final_df = df else: - final_df = pl.concat([final_df, df]) + final_df = final_df.vstack(df) save_backoff_count += len(df) @@ -137,10 +132,10 @@ def get_pdr_df( ) and len(final_df) > 0: assert df.schema == self.df_schema # save to parquet - self.df = final_df - self.save() + self.append_to_sources(final_df) + print(f"Saved {len(final_df)} records to file while fetching") - final_df = pl.DataFrame() + final_df = pl.DataFrame([], schema=self.df_schema) save_backoff_count = 0 # avoids doing next fetch if we've reached the end @@ -148,6 +143,11 @@ def get_pdr_df( break pagination_offset += pagination_limit + if len(final_df) > 0: + self.append_to_sources(final_df) + + print(f"Saved {len(final_df)} records to file while fetching") + @enforce_types def _parquet_filename(self) -> str: """ diff --git a/pdr_backend/lake/table_bronze_pdr_predictions.py b/pdr_backend/lake/table_bronze_pdr_predictions.py index 8146ec391..314254a3d 100644 --- a/pdr_backend/lake/table_bronze_pdr_predictions.py +++ b/pdr_backend/lake/table_bronze_pdr_predictions.py @@ -59,8 +59,8 @@ def get_slot_id(_id: str) -> str: bronze_predictions_df = predictions_df.with_columns( [ pl.col("ID").map_elements(get_slot_id, return_dtype=Utf8).alias("slot_id"), - pl.col("prediction").alias("predvalue"), - pl.col("trueval").alias("truevalue"), + pl.col("predvalue").alias("predvalue"), + pl.col("truevalue").alias("truevalue"), pl.col("timestamp").alias("timestamp"), pl.col("timestamp").alias("last_event_timestamp"), ] @@ -93,14 +93,14 @@ def _process_truevals(tables: Dict[str, Table], ppss: PPSS) -> Dict[str, Table]: predictions_df.join(truevals_df, left_on="slot_id", right_on="ID", how="left") .with_columns( [ - pl.col("trueval").fill_null(pl.col("truevalue")), + pl.col("truevalue_right").fill_null(pl.col("truevalue")), pl.col("timestamp_right").fill_null(pl.col("last_event_timestamp")), ] ) .drop(["truevalue", "last_event_timestamp"]) .rename( { - "trueval": "truevalue", + "truevalue_right": "truevalue", "timestamp_right": "last_event_timestamp", } ) @@ -135,7 +135,7 @@ def _process_payouts(tables: Dict[str, Table], ppss: PPSS) -> Dict[str, Table]: .with_columns( [ pl.col("payout_right").fill_null(pl.col("payout")), - pl.col("predictedValue").fill_null(pl.col("predvalue")), + pl.col("predvalue_right").fill_null(pl.col("predvalue")), pl.col("stake_right").fill_null(pl.col("stake")), pl.col("timestamp_right").fill_null(pl.col("last_event_timestamp")), ] @@ -144,7 +144,7 @@ def _process_payouts(tables: Dict[str, Table], ppss: PPSS) -> Dict[str, Table]: .rename( { "payout_right": "payout", - "predictedValue": "predvalue", + "predvalue_right": "predvalue", "stake_right": "stake", "timestamp_right": "last_event_timestamp", } diff --git a/pdr_backend/lake/table_pdr_payouts.py b/pdr_backend/lake/table_pdr_payouts.py index 9304f5bdb..e76a281ba 100644 --- a/pdr_backend/lake/table_pdr_payouts.py +++ b/pdr_backend/lake/table_pdr_payouts.py @@ -10,7 +10,7 @@ "slot": Int64, "timestamp": Int64, "payout": Float64, - "predictedValue": Boolean, + "predvalue": Boolean, "revenue": Float64, "roundSumStakesUp": Float64, "roundSumStakes": Float64, diff --git a/pdr_backend/lake/table_pdr_predictions.py b/pdr_backend/lake/table_pdr_predictions.py index 8475af189..8b5903f74 100644 --- a/pdr_backend/lake/table_pdr_predictions.py +++ b/pdr_backend/lake/table_pdr_predictions.py @@ -10,9 +10,9 @@ "contract": Utf8, "pair": Utf8, "timeframe": Utf8, - "prediction": Boolean, + "predvalue": Boolean, "stake": Float64, - "trueval": Boolean, + "truevalue": Boolean, "timestamp": Int64, "source": Utf8, "payout": Float64, diff --git a/pdr_backend/lake/table_pdr_truevals.py b/pdr_backend/lake/table_pdr_truevals.py index 7f5b68fde..619bec2bc 100644 --- a/pdr_backend/lake/table_pdr_truevals.py +++ b/pdr_backend/lake/table_pdr_truevals.py @@ -7,6 +7,6 @@ "ID": Utf8, "token": Utf8, "timestamp": Int64, - "trueval": Boolean, + "truevalue": Boolean, "slot": Int64, } diff --git a/pdr_backend/lake/test/test_base_data_store.py b/pdr_backend/lake/test/test_base_data_store.py new file mode 100644 index 000000000..6dd20cfc4 --- /dev/null +++ b/pdr_backend/lake/test/test_base_data_store.py @@ -0,0 +1,20 @@ +from pdr_backend.lake.base_data_store import BaseDataStore + + +def _get_test_manager(tmpdir): + return BaseDataStore(str(tmpdir)) + + +def test__generate_view_name(tmpdir): + """ + Test the _generate_view_name method. + """ + test_manager = _get_test_manager(tmpdir) + view_name = test_manager._generate_view_name(str(tmpdir)) + + # check if the view name starts with "dataset_" + assert view_name.startswith( + "dataset_" + ), "The view name does not start with 'dataset_'" + # check if the view name continues with a hash + assert len(view_name) > 8, "The view name is too short" diff --git a/pdr_backend/lake/test/test_csv_data_store.py b/pdr_backend/lake/test/test_csv_data_store.py new file mode 100644 index 000000000..81c09063d --- /dev/null +++ b/pdr_backend/lake/test/test_csv_data_store.py @@ -0,0 +1,221 @@ +import os + +import polars as pl +from pdr_backend.lake.csv_data_store import CSVDataStore + + +def _get_test_manager(tmpdir): + return CSVDataStore(str(tmpdir)) + + +def _clean_up(tmpdir): + for root, dirs, files in os.walk(tmpdir): + for file in files: + os.remove(os.path.join(root, file)) + for directory in dirs: + # clean up the directory + _clean_up(os.path.join(root, directory)) + os.rmdir(os.path.join(root, directory)) + + +def test_get_folder_path(tmpdir): + manager = _get_test_manager(tmpdir) + folder_path = manager._get_folder_path("test") + assert folder_path == f"{tmpdir}/test" + + +def test_create_file_name(tmpdir): + manager = _get_test_manager(tmpdir) + file_name = manager._create_file_name("test", 1707030362, 1709060200) + print("file_name---", file_name) + assert file_name == "test_from_1707030362_to_1709060200.csv" + + +def test_get_file_paths(tmpdir): + manager = _get_test_manager(tmpdir) + file_name_1 = manager._create_file_name("test", 0, 20) + file_name_2 = manager._create_file_name("test", 21, 40) + file_name_3 = manager._create_file_name("test", 41, 60) + file_name_4 = manager._create_file_name("test", 61, 80) + + files = [file_name_1, file_name_2, file_name_3, file_name_4] + + folder_path = manager._get_folder_path("test") + + if not os.path.exists(folder_path): + os.makedirs(folder_path) + + for file in files: + # create empty files + with open(os.path.join(folder_path, file), "w"): + pass + + # check if empty files are created + for file in files: + assert os.path.exists(folder_path + "/" + file) + + file_paths = manager._get_file_paths(folder_path, 21, 60) + + for file_path in file_paths: + assert file_path in [ + folder_path + "/" + file_name_2, + folder_path + "/" + file_name_3, + ] + + _clean_up(tmpdir) + + +def test_create_file_path(tmpdir): + manager = _get_test_manager(tmpdir) + file_path = manager._create_file_path("test", 1, 2) + assert file_path == f"{tmpdir}/test/test_from_0000000001_to_0000000002.csv" + + +def test_create_file_path_without_endtime(tmpdir): + manager = _get_test_manager(tmpdir) + file_path = manager._create_file_path("test", 1, None) + assert file_path == f"{tmpdir}/test/test_from_0000000001_to_.csv" + + +def test_read(tmpdir): + manager = _get_test_manager(tmpdir) + file_path = manager._create_file_path("test", 1, 2) + + with open(file_path, "w") as file: + file.write("a,b,c\n1,2,3\n4,5,6") + + data = manager.read("test", 1, 2) + assert data.equals(pl.DataFrame({"a": [1, 4], "b": [2, 5], "c": [3, 6]})) + + _clean_up(tmpdir) + + +def test_read_all(tmpdir): + manager = _get_test_manager(tmpdir) + + file_path_1 = manager._create_file_path("test", 0, 20) + file_path_2 = manager._create_file_path("test", 21, 41) + + with open(file_path_1, "w") as file: + file.write("a,b,c\n1,2,3\n4,5,6") + + with open(file_path_2, "w") as file: + file.write("a,b,c\n7,8,9\n10,11,12") + + data = manager.read_all("test") + assert data["a"].to_list() == [1, 4, 7, 10] + assert data["b"].to_list() == [2, 5, 8, 11] + assert data["c"].to_list() == [3, 6, 9, 12] + + _clean_up(tmpdir) + + +def test_get_last_file_path(tmpdir): + manager = _get_test_manager(tmpdir) + file_path_1 = manager._create_file_path("test", 0, 20) + file_path_2 = manager._create_file_path("test", 21, 41) + file_path_3 = manager._create_file_path("test", 42, 62) + file_path_4 = manager._create_file_path("test", 63, 83) + + files = [file_path_1, file_path_2, file_path_3, file_path_4] + + folder_path = manager._get_folder_path("test") + + if not os.path.exists(folder_path): + os.makedirs(folder_path) + + for file in files: + # create empty files + with open(os.path.join(folder_path, file), "w"): + pass + + assert manager._get_last_file_path(f"{tmpdir}/test") == os.path.join( + folder_path, file_path_4 + ) + + _clean_up(tmpdir) + + +def test_write(tmpdir): + manager = _get_test_manager(tmpdir) + data = pl.DataFrame({"a": [1, 4], "b": [2, 5], "timestamp": [3, 6]}) + manager.write("test", data) + file_name = manager._create_file_path("test", 3, None) + + data = pl.read_csv(file_name) + + assert data["a"].to_list() == [1, 4] + assert data["b"].to_list() == [2, 5] + assert data["timestamp"].to_list() == [3, 6] + + _clean_up(tmpdir) + + +def test_write_1000_rows(tmpdir): + _clean_up(tmpdir) + + manager = _get_test_manager(tmpdir) + data = pl.DataFrame( + { + "a": list(range(1000)), + "b": list(range(1000)), + "timestamp": list(range(1000)), + } + ) + manager.write("test", data) + + # folder_path = manager._get_folder_path("test") + + # get folder including files + # folder = os.listdir(folder_path) + # print folder files + # print("folder---", folder) + + file_name = manager._create_file_path("test", 0, 999) + + data = pl.read_csv(file_name) + + assert data["a"].to_list() == list(range(1000)) + assert data["b"].to_list() == list(range(1000)) + assert data["timestamp"].to_list() == list(range(1000)) + + _clean_up(tmpdir) + + +def test_write_append(tmpdir): + manager = _get_test_manager(tmpdir) + data = pl.DataFrame({"a": [1, 4], "b": [2, 5], "timestamp": [3, 6]}) + manager.write("test", data) + + # new data + data = pl.DataFrame({"a": [11, 41], "b": [21, 51], "timestamp": [31, 61]}) + manager.write("test", data) + + file_name = manager._create_file_path("test", 3, 61) + + data = pl.read_csv(file_name) + + assert data["a"].to_list() == [1, 4, 11, 41] + assert data["b"].to_list() == [2, 5, 21, 51] + assert data["timestamp"].to_list() == [3, 6, 31, 61] + + _clean_up(tmpdir) + + +def test_fill_with_zero(): + manager = CSVDataStore("test") + assert manager._fill_with_zero(1, 10) == "0000000001" + assert manager._fill_with_zero(100) == "0000000100" + assert manager._fill_with_zero(1000) == "0000001000" + + +def test_get_to_value(): + manager = CSVDataStore("test") + assert manager._get_to_value("test/test_from_0_to_0000000001.csv") == 1 + assert manager._get_to_value("test/test_from_0_to_0000000005.csv") == 5 + + +def test_get_from_value(): + manager = CSVDataStore("test") + assert manager._get_from_value("test/test_from_0000000001_to_0000000001.csv") == 1 + assert manager._get_from_value("test/test_from_0000000005_to_.csv") == 5 diff --git a/pdr_backend/lake/test/test_etl.py b/pdr_backend/lake/test/test_etl.py index 64d747642..091b00b1e 100644 --- a/pdr_backend/lake/test/test_etl.py +++ b/pdr_backend/lake/test/test_etl.py @@ -201,11 +201,11 @@ def test_etl_do_bronze_step( assert ( bronze_pdr_predictions_df["truevalue"][1] - == _gql_datafactory_etl_truevals_df["trueval"][1] + == _gql_datafactory_etl_truevals_df["truevalue"][1] ) assert ( bronze_pdr_predictions_df["truevalue"][2] - == _gql_datafactory_etl_truevals_df["trueval"][2] + == _gql_datafactory_etl_truevals_df["truevalue"][2] ) # Assert payout ts > prediction ts diff --git a/pdr_backend/lake/test/test_gql_data_factory.py b/pdr_backend/lake/test/test_gql_data_factory.py index b15d86c29..9c14993c3 100644 --- a/pdr_backend/lake/test/test_gql_data_factory.py +++ b/pdr_backend/lake/test/test_gql_data_factory.py @@ -66,7 +66,7 @@ def test_update(): assert count_updates == len(gql_data_factory.record_config["tables"].items()) -def test_load_parquet(): +def test_load_parquet(tmpdir): """ Test GQLDataFactory loads the data for all the tables """ @@ -75,7 +75,7 @@ def test_load_parquet(): ppss = mock_ppss( ["binance BTC/USDT c 5m"], "sapphire-mainnet", - ".", + str(tmpdir), st_timestr=st_timestr, fin_timestr=fin_timestr, ) @@ -85,6 +85,7 @@ def test_load_parquet(): assert len(gql_data_factory.record_config["tables"].items()) == 4 table = gql_data_factory.record_config["tables"]["pdr_predictions"] + assert table is not None assert type(table.df) == pl.DataFrame assert table.df.schema == table.df_schema diff --git a/pdr_backend/lake/test/test_persistent_data_store.py b/pdr_backend/lake/test/test_persistent_data_store.py new file mode 100644 index 000000000..c5677a67f --- /dev/null +++ b/pdr_backend/lake/test/test_persistent_data_store.py @@ -0,0 +1,174 @@ +import os +import polars as pl +from pdr_backend.lake.persistent_data_store import ( + PersistentDataStore, +) # Adjust the import based on your project structure + + +# Initialize the PartitionedDataStore instance for testing +def _get_test_manager(tmpdir): + example_df = pl.DataFrame( + {"timestamp": ["2022-01-01", "2022-02-01", "2022-03-01"], "value": [10, 20, 30]} + ) + dataset_identifier = "test_df" + + return [PersistentDataStore(str(tmpdir)), example_df, dataset_identifier] + + +def _clean_up_test_manager(tmpdir, dataset_identifier): + # Clean up the test manager + dataset_path = os.path.join(str(tmpdir), dataset_identifier) + + persistent_ds_instance = PersistentDataStore(str(tmpdir)) + + view_name = persistent_ds_instance._generate_view_name(dataset_path) + + # Select tables from duckdb + views = persistent_ds_instance.duckdb_conn.execute( + "SELECT table_name FROM information_schema.tables WHERE table_schema = 'main'" + ).fetchall() + + # Drop the view and table + if view_name in [table[0] for table in views]: + persistent_ds_instance.duckdb_conn.execute(f"DROP TABLE {view_name}") + + +def _check_view_exists(test_manager, dataset_identifier): + view_name = test_manager._generate_view_name(dataset_identifier) + tables = test_manager.duckdb_conn.execute( + "SELECT table_name FROM information_schema.tables WHERE table_schema = 'main'" + ).fetchall() + return [view_name in [table[0] for table in tables], view_name] + + +def test_create_and_fill_table(tmpdir): + test_manager, example_df, dataset_identifier = _get_test_manager(tmpdir) + + test_manager._create_and_fill_table(example_df, dataset_identifier) + + # Check if the view is registered + assert _check_view_exists(test_manager, dataset_identifier) + _clean_up_test_manager(tmpdir, dataset_identifier) + + +def test_insert_to_exist_table(tmpdir): + test_manager, example_df, dataset_identifier = _get_test_manager(tmpdir) + + test_manager._create_and_fill_table(example_df, dataset_identifier) + + # Check if the view is registered + check_result, view_name = _check_view_exists(test_manager, dataset_identifier) + assert check_result + + # Insert new data to the table + example_df = pl.DataFrame( + {"timestamp": ["2022-04-01", "2022-05-01", "2022-06-01"], "value": [40, 50, 60]} + ) + test_manager.insert_to_table(example_df, dataset_identifier) + + # Check if the view is registered + check_result, view_name = _check_view_exists(test_manager, dataset_identifier) + assert check_result + + # Check if the new data is inserted + result = test_manager.duckdb_conn.execute(f"SELECT * FROM {view_name}").fetchall() + assert len(result) == 6 + print(result) + assert result[3][0] == "2022-04-01" + assert result[3][1] == 40 + assert result[4][0] == "2022-05-01" + assert result[4][1] == 50 + assert result[5][0] == "2022-06-01" + assert result[5][1] == 60 + _clean_up_test_manager(tmpdir, dataset_identifier) + + +def test_insert_to_new_table(tmpdir): + test_manager, example_df, dataset_identifier = _get_test_manager(tmpdir) + + test_manager.insert_to_table(example_df, dataset_identifier) + + # Check if the view is registered + check_result, view_name = _check_view_exists(test_manager, dataset_identifier) + assert check_result + + # Check if the new data is inserted + result = test_manager.duckdb_conn.execute(f"SELECT * FROM {view_name}").fetchall() + assert len(result) == 3 + assert result[0][0] == "2022-01-01" + assert result[0][1] == 10 + assert result[1][0] == "2022-02-01" + assert result[1][1] == 20 + assert result[2][0] == "2022-03-01" + assert result[2][1] == 30 + _clean_up_test_manager(tmpdir, dataset_identifier) + + +def test_query_data(tmpdir): + test_manager, example_df, dataset_identifier = _get_test_manager(tmpdir) + test_manager.insert_to_table(example_df, dataset_identifier) + + # Check if the view is registered + check_result, _ = _check_view_exists(test_manager, dataset_identifier) + assert check_result + + # Execute the provided SQL query + result_df = test_manager.query_data( + dataset_identifier, "SELECT * FROM {view_name} WHERE value > 15" + ) + assert len(result_df) == 2, "Query did not return the expected number of rows." + _clean_up_test_manager(tmpdir, dataset_identifier) + + +def test_drop_table(tmpdir): + test_manager, example_df, dataset_identifier = _get_test_manager(tmpdir) + + test_manager.insert_to_table(example_df, dataset_identifier) + + # Check if the view is registered + check_result, view_name = _check_view_exists(test_manager, dataset_identifier) + assert check_result + + # Drop the table + test_manager.drop_table(dataset_identifier, ds_type="table") + + # Check if the view is dropped + tables = test_manager.duckdb_conn.execute( + "SELECT table_name FROM information_schema.tables WHERE table_schema = 'main'" + ).fetchall() + assert view_name not in [table[0] for table in tables] + _clean_up_test_manager(tmpdir, dataset_identifier) + + +def test_fill_from_csv_destination(tmpdir): + test_manager, example_df, dataset_identifier = _get_test_manager(tmpdir) + csv_folder_path = os.path.join(str(tmpdir), "csv_folder") + os.makedirs(csv_folder_path, exist_ok=True) + example_df.write_csv(os.path.join(str(csv_folder_path), "data.csv")) + + test_manager.fill_from_csv_destination(csv_folder_path, dataset_identifier) + + # Check if the view is registered + check_result, view_name = _check_view_exists(test_manager, dataset_identifier) + + assert check_result + + # Check if the new data is inserted + result = test_manager.duckdb_conn.execute(f"SELECT * FROM {view_name}").fetchall() + assert len(result) == 3 + assert result[0][0] == "2022-01-01" + assert result[0][1] == 10 + assert result[1][0] == "2022-02-01" + assert result[1][1] == 20 + assert result[2][0] == "2022-03-01" + assert result[2][1] == 30 + + _clean_up_test_manager(tmpdir, dataset_identifier) + # clean csv folder + # delete files in the folder + for file in os.listdir(csv_folder_path): + file_path = os.path.join(csv_folder_path, file) + os.remove(file_path) + + # delete the folder + os.rmdir(csv_folder_path) diff --git a/pdr_backend/lake/test/test_table.py b/pdr_backend/lake/test/test_table.py index ceb300574..b08153fd9 100644 --- a/pdr_backend/lake/test/test_table.py +++ b/pdr_backend/lake/test/test_table.py @@ -1,6 +1,7 @@ from io import StringIO import os import sys + from polars import Boolean, Float64, Int64, Utf8 import polars as pl from pdr_backend.ppss.ppss import mock_ppss @@ -16,7 +17,7 @@ def __init__(self, data): self.ID = data["ID"] self.pair = data["pair"] self.timeframe = data["timeframe"] - self.prediction = data["prediction"] + self.predvalue = data["predvalue"] self.payout = data["payout"] self.timestamp = data["timestamp"] self.slot = data["slot"] @@ -27,14 +28,28 @@ def __init__(self, data): "ID": "0x123", "pair": "ADA-USDT", "timeframe": "5m", - "prediction": True, + "predvalue": True, "payout": 28.2, - "timestamp": 1701634400000, - "slot": 1701634400000, + "timestamp": 1701634400, + "slot": 1701634400, "user": "0x123", } +def _clean_up(tmp_path): + """ + Delete test file if already exists + """ + folder_path = os.path.join(tmp_path, table_name) + + if os.path.exists(folder_path): + # delete files + for file in os.listdir(folder_path): + file_path = os.path.join(folder_path, file) + os.remove(file_path) + os.remove(folder_path) + + def mock_fetch_function( network, st_ut, fin_ut, save_backoff_limit, pagination_limit, config ): @@ -51,21 +66,13 @@ def get_table_df(network, st_ut, fin_ut, config): "ID": Utf8, "pair": Utf8, "timeframe": Utf8, - "prediction": Boolean, + "predvalue": Boolean, "payout": Float64, "timestamp": Int64, "slot": Int64, "user": Utf8, } table_name = "pdr_test_df" -file_path = f"./parquet_data/{table_name}.parquet" -file_path2 = "./parquet_data/test_prediction_table_multiple.parquet" - -# delete test file if already exists -if os.path.exists(file_path): - os.remove(file_path) -if os.path.exists(file_path2): - os.remove(file_path2) def test_table_initialization(): @@ -111,79 +118,28 @@ def test_load_table(): assert len(table.df) == 0 -def test_save_table(): - """ - Test that table is saving to local file - """ +def test_get_pdr_df(tmpdir): st_timestr = "2023-12-03" fin_timestr = "2023-12-05" ppss = mock_ppss( ["binance BTC/USDT c 5m"], "sapphire-mainnet", - ".", + str(tmpdir), st_timestr=st_timestr, fin_timestr=fin_timestr, ) + _clean_up(ppss.lake_ss.parquet_dir) + table = Table(table_name, table_df_schema, ppss) captured_output = StringIO() sys.stdout = captured_output - assert len(table.df) == 0 - table.df = pl.DataFrame([mocked_object], table_df_schema) - table.save() - - assert os.path.exists(file_path) - printed_text = captured_output.getvalue().strip() - - assert "Just saved df with" in printed_text - - -def test_all(): - """ - Test multiple table actions in one go - """ - st_timestr = "2023-12-03" - fin_timestr = "2023-12-05" - ppss = mock_ppss( - ["binance BTC/USDT c 5m"], - "sapphire-mainnet", - ".", - st_timestr=st_timestr, - fin_timestr=fin_timestr, - ) - - table = Table(table_name, table_df_schema, ppss) - table.df = pl.DataFrame([], table_df_schema) - assert len(table.df) == 0 - table.df = pl.DataFrame([mocked_object], table_df_schema) - table.load() - - assert len(table.df) == 1 - - -def test_get_pdr_df(): - """ - Test multiple table actions in one go - """ - - st_timestr = "2023-12-03" - fin_timestr = "2023-12-05" - ppss = mock_ppss( - ["binance BTC/USDT c 5m"], - "sapphire-mainnet", - ".", - st_timestr=st_timestr, - fin_timestr=fin_timestr, - ) - - table = Table(table_name, table_df_schema, ppss) - save_backoff_limit = 5000 pagination_limit = 1000 - st_timest = UnixTimeMs(1701634400000) - fin_timest = UnixTimeMs(1701634400000) + st_timest = UnixTimeMs(1701634300000) + fin_timest = UnixTimeMs(1701634500000) table.get_pdr_df( mock_fetch_function, "sapphire-mainnet", @@ -193,10 +149,14 @@ def test_get_pdr_df(): pagination_limit, {"contract_list": ["0x123"]}, ) - assert len(table.df) == 1 + + printed_text = captured_output.getvalue().strip() + count_fetches = printed_text.count("Fetched") + assert count_fetches == 1 + # assert table.df.shape[0] == 1 -def test_get_pdr_df_multiple_fetches(): +def test_get_pdr_df_multiple_fetches(tmpdir): """ Test multiple table actions in one go """ @@ -206,12 +166,15 @@ def test_get_pdr_df_multiple_fetches(): ppss = mock_ppss( ["binance BTC/USDT c 5m"], "sapphire-mainnet", - ".", + str(tmpdir), st_timestr=st_timestr, fin_timestr=fin_timestr, ) + _clean_up(ppss.lake_ss.parquet_dir) + table = Table("test_prediction_table_multiple", predictions_schema, ppss) + captured_output = StringIO() sys.stdout = captured_output @@ -228,6 +191,7 @@ def test_get_pdr_df_multiple_fetches(): pagination_limit=pagination_limit, config={"contract_list": ["0x18f54cc21b7a2fdd011bea06bba7801b280e3151"]}, ) + printed_text = captured_output.getvalue().strip() # test fetches multiple times @@ -238,4 +202,110 @@ def test_get_pdr_df_multiple_fetches(): count_saves = printed_text.count("Saved") assert count_saves == 2 - assert len(table.df) == 50 + +def test_all(tmpdir): + """ + Test multiple table actions in one go + """ + st_timestr = "2021-12-03" + fin_timestr = "2023-12-31" + ppss = mock_ppss( + ["binance BTC/USDT c 5m"], + "sapphire-mainnet", + str(tmpdir), + st_timestr=st_timestr, + fin_timestr=fin_timestr, + ) + + _clean_up(ppss.lake_ss.parquet_dir) + + folder_path = os.path.join(ppss.lake_ss.parquet_dir, table_name) + if not os.path.exists(folder_path): + os.makedirs(folder_path) + + # create the csv file + file_path = os.path.join( + folder_path, f"{table_name}_from_1701634400_to_1701634400_1.csv" + ) + + # write the file + with open(file_path, "w") as file: + file.write("ID,pair,timeframe,prediction,payout,timestamp,slot,user\n") + file.write("0x123,ADA-USDT,5m,True,28.2,1701634400000,1701634400000,0x123\n") + + table = Table(table_name, table_df_schema, ppss) + table.df = pl.DataFrame([], table_df_schema) + assert len(table.df) == 0 + + table.load() + assert len(table.df) == 1 + + +def test_append_to_db(tmpdir): + """ + Test that table is loading the data from file + """ + st_timestr = "2023-12-03" + fin_timestr = "2024-12-05" + ppss = mock_ppss( + ["binance BTC/USDT c 5m"], + "sapphire-mainnet", + str(tmpdir), + st_timestr=st_timestr, + fin_timestr=fin_timestr, + ) + + _clean_up(ppss.lake_ss.parquet_dir) + + table = Table(table_name, table_df_schema, ppss) + table.load() + + assert len(table.df) == 0 + + table._append_to_db(pl.DataFrame([mocked_object] * 1000, schema=table_df_schema)) + + result = table.persistent_data_store.query_data( + table.table_name, "SELECT * FROM {view_name}" + ) + + assert result["ID"][0] == "0x123" + assert result["pair"][0] == "ADA-USDT" + assert result["timeframe"][0] == "5m" + assert result["predvalue"][0] is True + assert len(result) == 1000 + + +def test_append_to_csv(tmpdir): + """ + Test that table is loading the data from file + """ + st_timestr = "2023-12-03" + fin_timestr = "2024-12-05" + ppss = mock_ppss( + ["binance BTC/USDT c 5m"], + "sapphire-mainnet", + str(tmpdir), + st_timestr=st_timestr, + fin_timestr=fin_timestr, + ) + + _clean_up(ppss.lake_ss.parquet_dir) + + table = Table(table_name, table_df_schema, ppss) + table.load() + + assert len(table.df) == 0 + + table._append_to_csv(pl.DataFrame([mocked_object] * 1000, schema=table_df_schema)) + + file_path = os.path.join( + ppss.lake_ss.parquet_dir, + table_name, + f"{table_name}_from_1701634400_to_1701634400.csv", + ) + + assert os.path.exists(file_path) + + with open(file_path, "r") as file: + lines = file.readlines() + assert len(lines) == 1001 diff --git a/pdr_backend/subgraph/payout.py b/pdr_backend/subgraph/payout.py index 5fcd6dd2f..f1baead0e 100644 --- a/pdr_backend/subgraph/payout.py +++ b/pdr_backend/subgraph/payout.py @@ -14,7 +14,7 @@ def __init__( slot: UnixTimeS, timestamp: UnixTimeS, payout: float, - predictedValue: bool, + predvalue: bool, revenue: float, roundSumStakesUp: float, roundSumStakes: float, @@ -26,7 +26,7 @@ def __init__( self.token = token self.slot = slot self.payout = payout - self.predictedValue = predictedValue + self.predvalue = predvalue self.revenue = revenue self.roundSumStakesUp = roundSumStakesUp self.roundSumStakes = roundSumStakes @@ -42,7 +42,7 @@ def mock_payout(payout_tuple: tuple) -> Payout: token, slot, payout, - predictedValue, + predvalue, revenue, roundSumStakesUp, roundSumStakes, @@ -56,7 +56,7 @@ def mock_payout(payout_tuple: tuple) -> Payout: token=token, slot=UnixTimeS(slot), payout=payout, - predictedValue=predictedValue, + predvalue=predvalue, revenue=revenue, roundSumStakesUp=roundSumStakesUp, roundSumStakes=roundSumStakes, diff --git a/pdr_backend/subgraph/prediction.py b/pdr_backend/subgraph/prediction.py index c9d5dbb47..b959df666 100644 --- a/pdr_backend/subgraph/prediction.py +++ b/pdr_backend/subgraph/prediction.py @@ -13,9 +13,9 @@ def __init__( contract: str, pair: str, timeframe: str, - prediction: Union[bool, None], # prediction = subgraph.predicted_value + predvalue: Union[bool, None], stake: Union[float, None], - trueval: Union[bool, None], + truevalue: Union[bool, None], timestamp: UnixTimeS, # timestamp == prediction submitted timestamp source: str, payout: Union[float, None], @@ -26,9 +26,9 @@ def __init__( self.contract = contract self.pair = pair self.timeframe = timeframe - self.prediction = prediction # predvalue + self.predvalue = predvalue self.stake = stake - self.trueval = trueval # truevalue + self.truevalue = truevalue self.timestamp = timestamp self.source = source self.payout = payout @@ -46,9 +46,9 @@ def mock_prediction(prediction_tuple: tuple) -> Prediction: contract, pair_str, timeframe_str, - prediction, + predvalue, stake, - trueval, + truevalue, timestamp, source, payout, @@ -62,9 +62,9 @@ def mock_prediction(prediction_tuple: tuple) -> Prediction: contract=contract, pair=pair_str, timeframe=timeframe_str, - prediction=prediction, + predvalue=predvalue, stake=stake, - trueval=trueval, + truevalue=truevalue, timestamp=UnixTimeS(timestamp), source=source, payout=payout, diff --git a/pdr_backend/subgraph/subgraph_payout.py b/pdr_backend/subgraph/subgraph_payout.py index 069b64f10..2a77e94a0 100644 --- a/pdr_backend/subgraph/subgraph_payout.py +++ b/pdr_backend/subgraph/subgraph_payout.py @@ -156,8 +156,8 @@ def fetch_payouts( "token": payout["prediction"]["slot"]["predictContract"]["token"][ "name" ], + "predvalue": bool(payout["predictedValue"]), "slot": UnixTimeS(int(payout["id"].split("-")[1])), - "predictedValue": bool(payout["predictedValue"]), "revenue": float(payout["prediction"]["slot"]["revenue"]), "roundSumStakesUp": float( payout["prediction"]["slot"]["roundSumStakesUp"] diff --git a/pdr_backend/subgraph/subgraph_predictions.py b/pdr_backend/subgraph/subgraph_predictions.py index 81b48d665..62fa0d4ad 100644 --- a/pdr_backend/subgraph/subgraph_predictions.py +++ b/pdr_backend/subgraph/subgraph_predictions.py @@ -139,14 +139,14 @@ def fetch_filtered_predictions( slot = UnixTimeS(int(prediction_sg_dict["slot"]["slot"])) user = prediction_sg_dict["user"]["id"] address = prediction_sg_dict["id"].split("-")[0] - trueval = None + truevalue = None payout = None predicted_value = None stake = None if not prediction_sg_dict["payout"] is None: stake = float(prediction_sg_dict["stake"]) - trueval = prediction_sg_dict["payout"]["trueValue"] + truevalue = prediction_sg_dict["payout"]["trueValue"] predicted_value = prediction_sg_dict["payout"]["predictedValue"] payout = float(prediction_sg_dict["payout"]["payout"]) @@ -155,9 +155,9 @@ def fetch_filtered_predictions( contract=address, pair=pair, timeframe=timeframe, - prediction=predicted_value, + predvalue=predicted_value, stake=stake, - trueval=trueval, + truevalue=truevalue, timestamp=timestamp, source=source, payout=payout, diff --git a/pdr_backend/subgraph/subgraph_trueval.py b/pdr_backend/subgraph/subgraph_trueval.py index b6578c3cb..e9dcc885d 100644 --- a/pdr_backend/subgraph/subgraph_trueval.py +++ b/pdr_backend/subgraph/subgraph_trueval.py @@ -118,7 +118,7 @@ def fetch_truevals( ID=ID, token=token, timestamp=timestamp, - trueval=truevalue, + truevalue=truevalue, slot=slot, ) diff --git a/pdr_backend/subgraph/test/test_subgraph_payout.py b/pdr_backend/subgraph/test/test_subgraph_payout.py index 9d89a36be..fd71bc978 100644 --- a/pdr_backend/subgraph/test/test_subgraph_payout.py +++ b/pdr_backend/subgraph/test/test_subgraph_payout.py @@ -81,7 +81,7 @@ def test_fetch_payouts(mock_query_subgraph): assert payouts[0].timestamp == 1698527000 assert payouts[0].slot == 1696880700 assert payouts[0].payout == float(0) - assert payouts[0].predictedValue is True + assert payouts[0].predvalue is True assert payouts[0].user == "0xd2a24cb4ff2584bad80ff5f109034a891c3d88dd" assert payouts[0].stake == float(1.2) assert mock_query_subgraph.call_count == 1 diff --git a/pdr_backend/subgraph/test/test_subgraph_predictions.py b/pdr_backend/subgraph/test/test_subgraph_predictions.py index 266525b59..5cc9a5f85 100644 --- a/pdr_backend/subgraph/test/test_subgraph_predictions.py +++ b/pdr_backend/subgraph/test/test_subgraph_predictions.py @@ -20,9 +20,9 @@ contract="0x18f54cc21b7a2fdd011bea06bba7801b280e3151", pair="ADA/USDT", timeframe="5m", - prediction=True, + predvalue=True, stake=0.050051425480971974, - trueval=False, + truevalue=False, timestamp=UnixTimeS(1698527000), source="binance", payout=0.0, @@ -128,8 +128,8 @@ def test_fetch_filtered_predictions(mock_query_subgraph): assert predictions[0].user == "0xd2a24cb4ff2584bad80ff5f109034a891c3d88dd" assert predictions[0].pair == "ADA/USDT" assert predictions[0].contract == "0x18f54cc21b7a2fdd011bea06bba7801b280e3151" - assert predictions[0].trueval is False - assert predictions[0].prediction is True + assert predictions[0].truevalue is False + assert predictions[0].predvalue is True assert mock_query_subgraph.call_count == 1 diff --git a/pdr_backend/subgraph/test/test_subgraph_trueval.py b/pdr_backend/subgraph/test/test_subgraph_trueval.py index 44e4d92f2..72e76f0b6 100644 --- a/pdr_backend/subgraph/test/test_subgraph_trueval.py +++ b/pdr_backend/subgraph/test/test_subgraph_trueval.py @@ -63,5 +63,5 @@ def test_fetch_filtered_truevals(mock_query_subgraph): assert truevals[0].token == "ADA/USDT" assert truevals[0].timestamp == 1698527000 assert truevals[0].slot == 1698527100 - assert truevals[0].trueval is True + assert truevals[0].truevalue is True assert mock_query_subgraph.call_count == 1 diff --git a/pdr_backend/subgraph/trueval.py b/pdr_backend/subgraph/trueval.py index 946ec7906..67841249c 100644 --- a/pdr_backend/subgraph/trueval.py +++ b/pdr_backend/subgraph/trueval.py @@ -12,11 +12,11 @@ def __init__( ID: str, timestamp: UnixTimeS, token: str, - trueval: Union[bool, None], + truevalue: Union[bool, None], slot: UnixTimeS, # slot/epoch timestamp ) -> None: self.ID = ID - self.trueval = trueval + self.truevalue = truevalue self.timestamp = timestamp self.token = token self.slot = slot @@ -28,12 +28,12 @@ def __init__( @enforce_types def mock_trueval(trueval_tuple: tuple) -> Trueval: - (ID, timestamp, token, trueval, slot) = trueval_tuple + (ID, timestamp, token, truevalue, slot) = trueval_tuple return Trueval( ID=ID, token=token, + truevalue=truevalue, slot=UnixTimeS(slot), - trueval=trueval, timestamp=UnixTimeS(timestamp), ) diff --git a/pdr_backend/util/csvs.py b/pdr_backend/util/csvs.py index c3300c5c8..ebf5f38fa 100644 --- a/pdr_backend/util/csvs.py +++ b/pdr_backend/util/csvs.py @@ -80,8 +80,8 @@ def save_prediction_csv(all_predictions: List[Prediction], csv_output_dir: str): all_predictions, csv_output_dir, { - "Predicted Value": "prediction", - "True Value": "trueval", + "Predicted Value": "predvalue", + "True Value": "truevalue", "Timestamp": "timestamp", "Stake": "stake", "Payout": "payout", @@ -101,7 +101,7 @@ def save_analysis_csv(all_predictions: List[Prediction], csv_output_dir: str): "Stake": "stake", "Wallet": "user", "Payout": "payout", - "True Value": "trueval", - "Predicted Value": "prediction", + "True Value": "truevalue", + "Predicted Value": "predvalue", }, ) diff --git a/pdr_backend/util/test_noganache/test_csvs.py b/pdr_backend/util/test_noganache/test_csvs.py index 35a5f91e0..d285797bc 100644 --- a/pdr_backend/util/test_noganache/test_csvs.py +++ b/pdr_backend/util/test_noganache/test_csvs.py @@ -18,8 +18,8 @@ def test_save_analysis_csv(tmpdir): data = csv.DictReader(f) data_rows = list(data) - assert data_rows[0]["Predicted Value"] == str(predictions[0].prediction) - assert data_rows[0]["True Value"] == str(predictions[0].trueval) + assert data_rows[0]["Predicted Value"] == str(predictions[0].predvalue) + assert data_rows[0]["True Value"] == str(predictions[0].truevalue) assert data_rows[0]["Timestamp"] == str(predictions[0].timestamp) assert list(data_rows[0].keys()) == [ "PredictionID", @@ -46,8 +46,8 @@ def test_save_prediction_csv(tmpdir): data = csv.DictReader(f) data_rows = list(row for row in data) - assert data_rows[0]["Predicted Value"] == str(predictions[0].prediction) - assert data_rows[0]["True Value"] == str(predictions[0].trueval) + assert data_rows[0]["Predicted Value"] == str(predictions[0].predvalue) + assert data_rows[0]["True Value"] == str(predictions[0].truevalue) assert data_rows[0]["Timestamp"] == str(predictions[0].timestamp) assert list(data_rows[0].keys()) == [ "Predicted Value", diff --git a/setup.py b/setup.py index 14dca3d36..8d0c1cbb1 100644 --- a/setup.py +++ b/setup.py @@ -12,6 +12,7 @@ "bumpversion", "ccxt>=4.1.59", "coverage", + "duckdb", "enforce_typing", "eth-account==0.11.0", "eth-keys==0.5.0", diff --git a/system_tests/test_get_traction_info_system.py b/system_tests/test_get_traction_info_system.py index 17c8e0e0c..c6125d14b 100644 --- a/system_tests/test_get_traction_info_system.py +++ b/system_tests/test_get_traction_info_system.py @@ -17,7 +17,7 @@ @patch("pdr_backend.analytics.get_predictions_info.plot_slot_daily_statistics") @patch("pdr_backend.analytics.get_predictions_info.GQLDataFactory.get_gql_tables") -def test_traction_info_system(mock_get_gql_tables, mock_plot_stats, caplog): +def test_traction_info_system(mock_get_gql_tables, mock_plot_stats, caplog, tmpdir): feed_addr = "0x2d8e2267779d27c2b3ed5408408ff15d9f3a3152" user_addr = "0xaaaa4cb4ff2584bad80ff5f109034a891c3d88dd" mock_predictions = [ @@ -42,7 +42,7 @@ def test_traction_info_system(mock_get_gql_tables, mock_plot_stats, caplog): ppss = mock_ppss( ["binance BTC/USDT c 5m"], "sapphire-mainnet", - ".", + str(tmpdir), st_timestr=st_timestr, fin_timestr=fin_timestr, ) @@ -72,7 +72,7 @@ def test_traction_info_system(mock_get_gql_tables, mock_plot_stats, caplog): "get_traction_info", "2023-12-01", "2023-12-31", - "./dir", + str(tmpdir), "ppss.yaml", "sapphire-testnet", ]