From 199a7c061e3d009a973f8bcfa17eda97cf476766 Mon Sep 17 00:00:00 2001 From: Trent Hauck Date: Thu, 30 Nov 2023 11:43:36 -0800 Subject: [PATCH] feat: add execute method on session adds .execute(query) on the sesson context so to that various queries like `COPY` can be run eagerly --- python/tests/test_session.py | 18 ++++++++++++++++++ src/session_context.rs | 12 +++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/python/tests/test_session.py b/python/tests/test_session.py index 3870d43..9827f95 100644 --- a/python/tests/test_session.py +++ b/python/tests/test_session.py @@ -64,6 +64,24 @@ def test_with_error(): session.sql(query) +def test_execute(tmp_path): + """Test the execute query returns immediately.""" + + output_path = tmp_path / "output.parquet" + + session = connect() + + gff_path = DATA / "test.gff" + + query = f"CREATE EXTERNAL TABLE gff_file STORED AS GFF LOCATION '{gff_path}'" + session.execute(query) + + copy_query = f"COPY (SELECT * FROM gff_file) TO '{output_path}' (FORMAT PARQUET)" + session.execute(copy_query) + + assert output_path.exists() + + def test_to_record_batch_reader(): """Test converting to a record batch reader.""" session = connect() diff --git a/src/session_context.rs b/src/session_context.rs index a16937f..19d9a0e 100644 --- a/src/session_context.rs +++ b/src/session_context.rs @@ -41,7 +41,7 @@ impl ExonSessionContext { Ok(Self::default()) } - /// Execute a SQL query and return the result as a [`PyExecutionResult`]. + /// Generate the plan from a SQL query and return the result as a [`PyExecutionResult`]. fn sql(&mut self, query: &str, py: Python) -> PyResult { let result = self.ctx.sql(query); let df = wait_for_future(py, result).map_err(error::BioBearError::from)?; @@ -49,6 +49,16 @@ impl ExonSessionContext { Ok(PyExecutionResult::new(df)) } + /// Execute the SQL query eagerly, but do not collect the results. + fn execute(&mut self, query: &str, py: Python) -> PyResult<()> { + let result = self.ctx.sql(query); + let df = wait_for_future(py, result).map_err(error::BioBearError::from)?; + + wait_for_future(py, df.collect()).map_err(error::BioBearError::from)?; + + Ok(()) + } + /// Register an object store with the given URI. fn register_object_store_from_url(&mut self, url: &str, py: Python) -> PyResult<()> { let runtime = self.ctx.runtime_env();