Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark snowflake connector does not end session with snowflake after every write in streaming application which results in non-deletion of temporary stage created in snowflake #597

Open
OS-himanshupandey opened this issue Dec 16, 2024 · 0 comments

Comments

@OS-himanshupandey
Copy link

We have a Spark streaming application written in Scala which uses spark snowflake connector(versions: net.snowflake.spark-snowflake:2.12.0-spark_3.4, net.snowflake.snowflake-jdbc:3.13.30, spark version: 3.4.0, scala version: 2.12.15).
As per my understanding, how ingestion works in Snowflake is it creates a temporary stage when the connector initiates the session, and then through the stage, the ingestion to Snowflake happens and the temporary stage gets deleted after the session ends.
But what is happening is on every write from the spark app the session is somehow not getting killed and resulting in the non-deletion of the temporary stage(which eventually raises cost on us from the snowflake side).

can we add the capability to end the session after every micro-batch write if it is not already there?
I expected that the capability is already there with some configuration but if it is already there and I could not find it, could you let me know the solution for this?

We are using the streaming application with Foreach method and below is the write function for the dataframe to snowflake
snowFlakeDF
.write
.format(SNOWFLAKE_SOURCE_NAME)
.options(configuration.getSnowFlakeConfigurations) //Configuration like user creds, table, warehouse name etc.
.option("columnmap", <column_map>) //To map the dataframe columns to snowflake table columns
.option("column_mapping", "name")
.option("internal_execute_query_in_sync_mode", "true")
.option("dbtable", <table_name_in_snowflake>)
.mode(SaveMode.Append)
.save()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant