Skip to content

Commit

Permalink
remove rdd references
Browse files Browse the repository at this point in the history
  • Loading branch information
MrPowers committed Feb 7, 2024
1 parent 90d6044 commit 39b4e66
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 3 deletions.
4 changes: 2 additions & 2 deletions quinn/dataframe_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def column_to_list(df: DataFrame, col_name: str) -> list[Any]:

# sparksession from df is not available in older versions of pyspark
if sys.modules["pyspark"].__version__ < "3.3.0":
return [row[0] for row in df.select("mvv").collect()]
return [row[0] for row in df.select(col_name).collect()]

spark_config = df.sparkSession.sparkContext.getConf().getAll()

Expand All @@ -40,7 +40,7 @@ def column_to_list(df: DataFrame, col_name: str) -> list[Any]:
if pyarrow_valid and pandas_valid:
return df.select(col_name).toPandas()[col_name].tolist()

return [row[0] for row in df.select("mvv").collect()]
return [row[0] for row in df.select(col_name).collect()]


def two_columns_to_dictionary(
Expand Down
2 changes: 1 addition & 1 deletion quinn/transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def fix_nullability(field: StructField, result_dict: dict) -> None:
spark = SparkSession.getActiveSession()
spark = spark if spark is not None else SparkSession.builder.getOrCreate()

return spark.createDataFrame(output.rdd, output.schema)
return output


def flatten_struct(df: DataFrame, col_name: str, separator: str = ":") -> DataFrame:
Expand Down

0 comments on commit 39b4e66

Please sign in to comment.