diff --git a/datafusion/tests/data_test_context/data.json b/datafusion/tests/data_test_context/data.json new file mode 100644 index 000000000..ff895b61f --- /dev/null +++ b/datafusion/tests/data_test_context/data.json @@ -0,0 +1,3 @@ +{"A": "a", "B": 1} +{"A": "b", "B": 2} +{"A": "c", "B": 3} diff --git a/datafusion/tests/test_context.py b/datafusion/tests/test_context.py index 19b9d0e31..50bdf4363 100644 --- a/datafusion/tests/test_context.py +++ b/datafusion/tests/test_context.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +import os + import pyarrow as pa import pyarrow.dataset as ds @@ -181,6 +183,38 @@ def test_table_exist(ctx): assert ctx.table_exist("t") is True +def test_read_json(ctx): + path = os.path.dirname(os.path.abspath(__file__)) + + # Default + test_data_path = os.path.join(path, "data_test_context", "data.json") + df = ctx.read_json(test_data_path) + result = df.collect() + + assert result[0].column(0) == pa.array(["a", "b", "c"]) + assert result[0].column(1) == pa.array([1, 2, 3]) + + # Schema + schema = pa.schema( + [ + pa.field("A", pa.string(), nullable=True), + ] + ) + df = ctx.read_json(test_data_path, schema=schema) + result = df.collect() + + assert result[0].column(0) == pa.array(["a", "b", "c"]) + assert result[0].schema == schema + + # File extension + test_data_path = os.path.join(path, "data_test_context", "data.json") + df = ctx.read_json(test_data_path, file_extension=".json") + result = df.collect() + + assert result[0].column(0) == pa.array(["a", "b", "c"]) + assert result[0].column(1) == pa.array([1, 2, 3]) + + def test_read_csv(ctx): csv_df = ctx.read_csv(path="testing/data/csv/aggregate_test_100.csv") csv_df.select(column("c1")).show() diff --git a/src/context.rs b/src/context.rs index 918624007..21b3f06c4 100644 --- a/src/context.rs +++ b/src/context.rs @@ -29,7 +29,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::datasource::TableProvider; use datafusion::datasource::MemTable; use datafusion::execution::context::{SessionConfig, SessionContext}; -use datafusion::prelude::{AvroReadOptions, CsvReadOptions, ParquetReadOptions}; +use datafusion::prelude::{AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions}; use crate::catalog::{PyCatalog, PyTable}; use crate::dataframe::PyDataFrame; @@ -269,6 +269,36 @@ impl PySessionContext { Ok(self.ctx.session_id()) } + #[allow(clippy::too_many_arguments)] + #[args( + schema = "None", + schema_infer_max_records = "1000", + file_extension = "\".json\"", + table_partition_cols = "vec![]" + )] + fn read_json( + &mut self, + path: PathBuf, + schema: Option>, + schema_infer_max_records: usize, + file_extension: &str, + table_partition_cols: Vec, + py: Python, + ) -> PyResult { + let path = path + .to_str() + .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; + + let mut options = NdJsonReadOptions::default().table_partition_cols(table_partition_cols); + options.schema = schema.map(|s| Arc::new(s.0)); + options.schema_infer_max_records = schema_infer_max_records; + options.file_extension = file_extension; + + let result = self.ctx.read_json(path, options); + let df = wait_for_future(py, result).map_err(DataFusionError::from)?; + Ok(PyDataFrame::new(df)) + } + #[allow(clippy::too_many_arguments)] #[args( schema = "None",