Skip to content

Commit

Permalink
Configure datafusion to prefer existing sort
Browse files Browse the repository at this point in the history
This ensures queries with filter clauses return
data in order.

apache/datafusion#10572 (comment)
  • Loading branch information
twitu committed Oct 3, 2024
1 parent c712216 commit 99d71af
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 5 deletions.
9 changes: 5 additions & 4 deletions nautilus_core/persistence/src/backend/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ impl DataBackendSession {
.enable_all()
.build()
.unwrap();
let session_cfg =
SessionConfig::new().set_str("datafusion.optimizer.repartition_file_scans", "false");
let session_cfg = SessionConfig::new()
.set_str("datafusion.optimizer.repartition_file_scans", "false")
.set_str("datafusion.optimizer.prefer_existing_sort", "true");
let session_ctx = SessionContext::new_with_config(session_cfg);
Self {
session_ctx,
Expand Down Expand Up @@ -119,7 +120,7 @@ impl DataBackendSession {
file_sort_order: vec![vec![Expr::Sort(Sort {
expr: Box::new(col("ts_init")),
asc: true,
nulls_first: true,
nulls_first: false,
})]],
..Default::default()
};
Expand All @@ -129,7 +130,7 @@ impl DataBackendSession {
parquet_options,
))?;

let default_query = format!("SELECT * FROM {}", &table_name);
let default_query = format!("SELECT * FROM {} ORDER BY ts_init", &table_name);
let sql_query = sql_query.unwrap_or(&default_query);
let query = self.runtime.block_on(self.session_ctx.sql(sql_query))?;

Expand Down
2 changes: 1 addition & 1 deletion nautilus_core/persistence/tests/test_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ fn test_quote_tick_query_with_filter() {
.add_file::<QuoteTick>(
"quote_005",
file_path,
Some("SELECT * FROM quote_005 WHERE ts_init >= 1701388832486000000"),
Some("SELECT * FROM quote_005 WHERE ts_init >= 1701388832486000000 ORDER BY ts_init"),
)
.unwrap();
let query_result: QueryResult = catalog.get_query_result();
Expand Down
1 change: 1 addition & 0 deletions nautilus_trader/persistence/catalog/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ def _build_query(
if conditions:
query += f" WHERE {' AND '.join(conditions)}"

query += " ORDER BY ts_init"
return query

@staticmethod
Expand Down

0 comments on commit 99d71af

Please sign in to comment.