Skip to content

Commit

Permalink
Add read_json to SessionContext (#56)
Browse files Browse the repository at this point in the history
* Expose read_json

* Add additional tests cases

* Address review comments

* Fix Release Audit Tool error

* Fix fmt issues

* Add empty line to test data

Co-authored-by: Lars <[email protected]>
  • Loading branch information
larskarg and Lars authored Nov 2, 2022
1 parent 940f118 commit 12bb587
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 1 deletion.
3 changes: 3 additions & 0 deletions datafusion/tests/data_test_context/data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"A": "a", "B": 1}
{"A": "b", "B": 2}
{"A": "c", "B": 3}
34 changes: 34 additions & 0 deletions datafusion/tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.

import os

import pyarrow as pa
import pyarrow.dataset as ds

Expand Down Expand Up @@ -181,6 +183,38 @@ def test_table_exist(ctx):
assert ctx.table_exist("t") is True


def test_read_json(ctx):
path = os.path.dirname(os.path.abspath(__file__))

# Default
test_data_path = os.path.join(path, "data_test_context", "data.json")
df = ctx.read_json(test_data_path)
result = df.collect()

assert result[0].column(0) == pa.array(["a", "b", "c"])
assert result[0].column(1) == pa.array([1, 2, 3])

# Schema
schema = pa.schema(
[
pa.field("A", pa.string(), nullable=True),
]
)
df = ctx.read_json(test_data_path, schema=schema)
result = df.collect()

assert result[0].column(0) == pa.array(["a", "b", "c"])
assert result[0].schema == schema

# File extension
test_data_path = os.path.join(path, "data_test_context", "data.json")
df = ctx.read_json(test_data_path, file_extension=".json")
result = df.collect()

assert result[0].column(0) == pa.array(["a", "b", "c"])
assert result[0].column(1) == pa.array([1, 2, 3])


def test_read_csv(ctx):
csv_df = ctx.read_csv(path="testing/data/csv/aggregate_test_100.csv")
csv_df.select(column("c1")).show()
Expand Down
32 changes: 31 additions & 1 deletion src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::datasource::TableProvider;
use datafusion::datasource::MemTable;
use datafusion::execution::context::{SessionConfig, SessionContext};
use datafusion::prelude::{AvroReadOptions, CsvReadOptions, ParquetReadOptions};
use datafusion::prelude::{AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions};

use crate::catalog::{PyCatalog, PyTable};
use crate::dataframe::PyDataFrame;
Expand Down Expand Up @@ -269,6 +269,36 @@ impl PySessionContext {
Ok(self.ctx.session_id())
}

#[allow(clippy::too_many_arguments)]
#[args(
schema = "None",
schema_infer_max_records = "1000",
file_extension = "\".json\"",
table_partition_cols = "vec![]"
)]
fn read_json(
&mut self,
path: PathBuf,
schema: Option<PyArrowType<Schema>>,
schema_infer_max_records: usize,
file_extension: &str,
table_partition_cols: Vec<String>,
py: Python,
) -> PyResult<PyDataFrame> {
let path = path
.to_str()
.ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?;

let mut options = NdJsonReadOptions::default().table_partition_cols(table_partition_cols);
options.schema = schema.map(|s| Arc::new(s.0));
options.schema_infer_max_records = schema_infer_max_records;
options.file_extension = file_extension;

let result = self.ctx.read_json(path, options);
let df = wait_for_future(py, result).map_err(DataFusionError::from)?;
Ok(PyDataFrame::new(df))
}

#[allow(clippy::too_many_arguments)]
#[args(
schema = "None",
Expand Down

0 comments on commit 12bb587

Please sign in to comment.