From e300ab04576d3dfb7abf8a8bf6836c1528ed529c Mon Sep 17 00:00:00 2001 From: Trent Hauck Date: Wed, 6 Dec 2023 17:15:44 -0800 Subject: [PATCH] feat: add execute method on session (#77) adds .execute(query) on the sesson context so to that various queries like `COPY` can be run eagerly --- python/tests/test_session.py | 20 ++++++++++++++++++++ src/session_context.rs | 12 +++++++++++- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/python/tests/test_session.py b/python/tests/test_session.py index 3870d43..6f09918 100644 --- a/python/tests/test_session.py +++ b/python/tests/test_session.py @@ -64,6 +64,26 @@ def test_with_error(): session.sql(query) +def test_execute(tmp_path): + """Test the execute query returns immediately.""" + + output_path = tmp_path / "output.parquet" + + session = connect() + + gff_path = DATA / "test.gff" + + query = f"CREATE EXTERNAL TABLE gff_file STORED AS GFF LOCATION '{gff_path}'" + session.execute(query) + + copy_query = ( + f"COPY (SELECT seqname FROM gff_file) TO '{output_path}' (FORMAT PARQUET)" + ) + session.execute(copy_query) + + assert output_path.exists() + + def test_to_record_batch_reader(): """Test converting to a record batch reader.""" session = connect() diff --git a/src/session_context.rs b/src/session_context.rs index a16937f..19d9a0e 100644 --- a/src/session_context.rs +++ b/src/session_context.rs @@ -41,7 +41,7 @@ impl ExonSessionContext { Ok(Self::default()) } - /// Execute a SQL query and return the result as a [`PyExecutionResult`]. + /// Generate the plan from a SQL query and return the result as a [`PyExecutionResult`]. fn sql(&mut self, query: &str, py: Python) -> PyResult { let result = self.ctx.sql(query); let df = wait_for_future(py, result).map_err(error::BioBearError::from)?; @@ -49,6 +49,16 @@ impl ExonSessionContext { Ok(PyExecutionResult::new(df)) } + /// Execute the SQL query eagerly, but do not collect the results. + fn execute(&mut self, query: &str, py: Python) -> PyResult<()> { + let result = self.ctx.sql(query); + let df = wait_for_future(py, result).map_err(error::BioBearError::from)?; + + wait_for_future(py, df.collect()).map_err(error::BioBearError::from)?; + + Ok(()) + } + /// Register an object store with the given URI. fn register_object_store_from_url(&mut self, url: &str, py: Python) -> PyResult<()> { let runtime = self.ctx.runtime_env();