Skip to content

Commit

Permalink
#22: Returned dataframe. Need to pass also timestamp format.
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Hoffmann <[email protected]>
  • Loading branch information
dh1542 committed Nov 6, 2024
1 parent e8b6500 commit 3a41d10
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,26 @@ def filter(self) -> DataFrame:
for i in range(1, len(rows)):
current_row = rows[i]
current_time_stamp = current_row[self.time_stamp_column_name]
if ((last_time_stamp - current_time_stamp).total_seconds()) >= time_delta.total_seconds():


if ((last_time_stamp - last_time_stamp).total_seconds()) >= time_delta.total_seconds():
print(current_row)
cleansed_df.append(current_row)
last_time_stamp = current_time_stamp

print(cleansed_df)

# Create Dataframe from cleansed data
result_df = pd.DataFrame(cleansed_df)

# rename the columns back to original
column_names = self.df.columns
result_df.columns = column_names

# Convert Dataframe time_stamp column back to string
result_df[self.time_stamp_column_name] = result_df[self.time_stamp_column_name].dt.strftime('%Y-%m-%d %H:%M:%S.%f')


return self.df
print(result_df)
return result_df



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@
import pytest

from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import DataFrame

from rtdip_sdk.pipelines.data_wranglers.spark.data_quality.interval_filtering import IntervalFiltering
from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_quality.duplicate_detection import (
DuplicateDetection,
)



@pytest.fixture(scope="session")
Expand Down Expand Up @@ -53,7 +51,7 @@ def test_interval_detection(spark_session: SparkSession):
["TagName", "EventTime", "Status", "Value"],
)

interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "milliseconds")
interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "seconds", "EventTime")
actual_df = interval_filtering_wrangler.filter()

assert isinstance(actual_df, DataFrame)
Expand Down

0 comments on commit 3a41d10

Please sign in to comment.