From b618afdc05a9ca6d905c88966d40745339885740 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 27 May 2022 13:52:52 +0100 Subject: [PATCH] Windows compatibility --- .../core/src/datasource/listing/path.rs | 66 ++++++++++++------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/datafusion/core/src/datasource/listing/path.rs b/datafusion/core/src/datasource/listing/path.rs index dcb179d52f171..6050686524050 100644 --- a/datafusion/core/src/datasource/listing/path.rs +++ b/datafusion/core/src/datasource/listing/path.rs @@ -21,6 +21,8 @@ use datafusion_data_access::FileMeta; use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use glob::Pattern; +use std::borrow::Cow; +use std::path::{is_separator, MAIN_SEPARATOR}; use url::Url; /// A parsed URL identifying files for a listing table, see [`ListingTableUrl::parse`] @@ -99,17 +101,26 @@ impl ListingTableUrl { /// Returns the path as expected by [`ObjectStore`] /// - /// In particular for file scheme URLs, this has a leading `/` - /// and describes an absolute path on the local filesystem + /// In particular for file scheme URLs, this has a leading is + /// an absolute path on the local filesystem with the local + /// filesystems path representation /// /// For other URLs, this also contains the host component /// and lacks a leading `/` /// /// TODO: Handle paths consistently (#2489) - fn prefix(&self) -> &str { + fn prefix(&self) -> Cow<'_, str> { match self.scheme() { - "file" => self.url.path(), - _ => &self.url[url::Position::BeforeHost..url::Position::AfterPath], + "file" => match MAIN_SEPARATOR { + '/' => Cow::Borrowed(self.url.path()), + _ => { + let path = self.url.to_file_path().unwrap(); + Cow::Owned(path.to_string_lossy().to_string()) + } + }, + _ => Cow::Borrowed( + &self.url[url::Position::BeforeHost..url::Position::AfterPath], + ), } } @@ -119,10 +130,12 @@ impl ListingTableUrl { &'a self, path: &'b str, ) -> Option + 'a> { + let prefix = self.prefix(); // Ignore empty path segments let diff = itertools::diff_with( - path.split('/').filter(|s| !s.is_empty()), - self.prefix().split('/').filter(|s| !s.is_empty()), + // TODO: Handle paths consistently (#2489) + path.split(is_separator).filter(|s| !s.is_empty()), + prefix.split(is_separator).filter(|s| !s.is_empty()), |a, b| a == b, ); @@ -139,24 +152,27 @@ impl ListingTableUrl { store: &'a dyn ObjectStore, file_extension: &'a str, ) -> BoxStream<'a, Result> { - futures::stream::once(store.list_file(self.prefix())) - .try_flatten() - .map_err(DataFusionError::IoError) - .try_filter(move |meta| { - let path = meta.path(); - - let extension_match = path.ends_with(file_extension); - let glob_match = match &self.glob { - Some(glob) => match path.strip_prefix(self.url.path()) { - Some(stripped) => glob.matches(stripped), - None => false, - }, - None => true, - }; - - futures::future::ready(extension_match && glob_match) - }) - .boxed() + futures::stream::once(async move { + let prefix = self.prefix(); + store.list_file(prefix.as_ref()).await + }) + .try_flatten() + .map_err(DataFusionError::IoError) + .try_filter(move |meta| { + let path = meta.path(); + + let extension_match = path.ends_with(file_extension); + let glob_match = match &self.glob { + Some(glob) => match path.strip_prefix(self.url.path()) { + Some(stripped) => glob.matches(stripped), + None => false, + }, + None => true, + }; + + futures::future::ready(extension_match && glob_match) + }) + .boxed() } }