Skip to content

Commit

Permalink
wip - CometNativeScan (apache#1076)
Browse files Browse the repository at this point in the history
* wip - CometNativeScan

* fix and make config internal
  • Loading branch information
parthchandra authored Nov 12, 2024
1 parent ad46821 commit 38e32f7
Show file tree
Hide file tree
Showing 9 changed files with 716 additions and 148 deletions.
10 changes: 10 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(true)

val COMET_FULL_NATIVE_SCAN_ENABLED: ConfigEntry[Boolean] = conf(
"spark.comet.native.scan.enabled")
.internal()
.doc(
"Whether to enable the fully native scan. When this is turned on, Spark will use Comet to " +
"read supported data sources (currently only Parquet is supported natively)." +
" By default, this config is true.")
.booleanConf
.createWithDefault(true)

val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.parquet.read.parallel.io.enabled")
.doc(
Expand Down
186 changes: 92 additions & 94 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,118 +948,116 @@ impl PhysicalPlanner {
Arc::new(SortExec::new(LexOrdering::new(exprs?), child).with_fetch(fetch)),
))
}
OpStruct::Scan(scan) => {
OpStruct::NativeScan(scan) => {
let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec();

if scan.source == "CometScan parquet (unknown)" {
let data_schema = parse_message_type(&scan.data_schema).unwrap();
let required_schema = parse_message_type(&scan.required_schema).unwrap();
println!("data_schema: {:?}", data_schema);
println!("required_schema: {:?}", required_schema);

let data_schema_descriptor =
parquet::schema::types::SchemaDescriptor::new(Arc::new(data_schema));
let data_schema_arrow = Arc::new(
parquet::arrow::schema::parquet_to_arrow_schema(
&data_schema_descriptor,
None,
)
.unwrap(),
);
println!("data_schema_arrow: {:?}", data_schema_arrow);

let required_schema_descriptor =
parquet::schema::types::SchemaDescriptor::new(Arc::new(required_schema));
let required_schema_arrow = Arc::new(
parquet::arrow::schema::parquet_to_arrow_schema(
&required_schema_descriptor,
None,
)
println!("NATIVE: SCAN: {:?}", scan);
let data_schema = parse_message_type(&*scan.data_schema).unwrap();
let required_schema = parse_message_type(&*scan.required_schema).unwrap();
println!("data_schema: {:?}", data_schema);
println!("required_schema: {:?}", required_schema);

let data_schema_descriptor =
parquet::schema::types::SchemaDescriptor::new(Arc::new(data_schema));
let data_schema_arrow = Arc::new(
parquet::arrow::schema::parquet_to_arrow_schema(&data_schema_descriptor, None)
.unwrap(),
);
println!("required_schema_arrow: {:?}", required_schema_arrow);
);
println!("data_schema_arrow: {:?}", data_schema_arrow);

let required_schema_descriptor =
parquet::schema::types::SchemaDescriptor::new(Arc::new(required_schema));
let required_schema_arrow = Arc::new(
parquet::arrow::schema::parquet_to_arrow_schema(
&required_schema_descriptor,
None,
)
.unwrap(),
);
println!("required_schema_arrow: {:?}", required_schema_arrow);

assert!(!required_schema_arrow.fields.is_empty());
assert!(!required_schema_arrow.fields.is_empty());

let mut projection_vector: Vec<usize> =
Vec::with_capacity(required_schema_arrow.fields.len());
// TODO: could be faster with a hashmap rather than iterating over data_schema_arrow with index_of.
required_schema_arrow.fields.iter().for_each(|field| {
projection_vector.push(data_schema_arrow.index_of(field.name()).unwrap());
});
println!("projection_vector: {:?}", projection_vector);
let mut projection_vector: Vec<usize> =
Vec::with_capacity(required_schema_arrow.fields.len());
// TODO: could be faster with a hashmap rather than iterating over data_schema_arrow with index_of.
required_schema_arrow.fields.iter().for_each(|field| {
projection_vector.push(data_schema_arrow.index_of(field.name()).unwrap());
});
println!("projection_vector: {:?}", projection_vector);

assert_eq!(projection_vector.len(), required_schema_arrow.fields.len());
assert_eq!(projection_vector.len(), required_schema_arrow.fields.len());

// Convert the Spark expressions to Physical expressions
let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = scan
.data_filters
.iter()
.map(|expr| self.create_expr(expr, Arc::clone(&required_schema_arrow)))
.collect();

// Create a conjunctive form of the vector because ParquetExecBuilder takes
// a single expression
let data_filters = data_filters?;
let test_data_filters =
data_filters.clone().into_iter().reduce(|left, right| {
Arc::new(BinaryExpr::new(
left,
datafusion::logical_expr::Operator::And,
right,
))
});

println!("data_filters: {:?}", data_filters);
println!("test_data_filters: {:?}", test_data_filters);

let object_store_url = ObjectStoreUrl::local_filesystem();
let paths: Vec<Url> = scan
.path
.iter()
.map(|path| Url::parse(path).unwrap())
.collect();
// Convert the Spark expressions to Physical expressions
let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = scan
.data_filters
.iter()
.map(|expr| self.create_expr(expr, Arc::clone(&required_schema_arrow)))
.collect();

let object_store = object_store::local::LocalFileSystem::new();
// register the object store with the runtime environment
let url = Url::try_from("file://").unwrap();
self.session_ctx
.runtime_env()
.register_object_store(&url, Arc::new(object_store));
// Create a conjunctive form of the vector because ParquetExecBuilder takes
// a single expression
let data_filters = data_filters?;
let test_data_filters = data_filters.clone().into_iter().reduce(|left, right| {
Arc::new(BinaryExpr::new(
left,
datafusion::logical_expr::Operator::And,
right,
))
});

let files: Vec<PartitionedFile> = paths
.iter()
.map(|path| PartitionedFile::from_path(path.path().to_string()).unwrap())
.collect();
println!("data_filters: {:?}", data_filters);
println!("test_data_filters: {:?}", test_data_filters);

// partition the files
// TODO really should partition the row groups
let object_store_url = ObjectStoreUrl::local_filesystem();
let paths: Vec<Url> = scan
.path
.iter()
.map(|path| Url::parse(path).unwrap())
.collect();

let mut file_groups = vec![vec![]; partition_count];
files.iter().enumerate().for_each(|(idx, file)| {
file_groups[idx % partition_count].push(file.clone());
});
let object_store = object_store::local::LocalFileSystem::new();
// register the object store with the runtime environment
let url = Url::try_from("file://").unwrap();
self.session_ctx
.runtime_env()
.register_object_store(&url, Arc::new(object_store));

let file_scan_config =
FileScanConfig::new(object_store_url, Arc::clone(&data_schema_arrow))
.with_file_groups(file_groups)
.with_projection(Some(projection_vector));
let files: Vec<PartitionedFile> = paths
.iter()
.map(|path| PartitionedFile::from_path(path.path().to_string()).unwrap())
.collect();

let mut table_parquet_options = TableParquetOptions::new();
table_parquet_options.global.pushdown_filters = true;
table_parquet_options.global.reorder_filters = true;
// partition the files
// TODO really should partition the row groups

let mut builder = ParquetExecBuilder::new(file_scan_config)
.with_table_parquet_options(table_parquet_options);
let mut file_groups = vec![vec![]; partition_count];
files.iter().enumerate().for_each(|(idx, file)| {
file_groups[idx % partition_count].push(file.clone());
});

if let Some(filter) = test_data_filters {
builder = builder.with_predicate(filter);
}
let file_scan_config =
FileScanConfig::new(object_store_url, Arc::clone(&data_schema_arrow))
.with_file_groups(file_groups)
.with_projection(Some(projection_vector));

let scan = builder.build();
return Ok((vec![], Arc::new(scan)));
let mut table_parquet_options = TableParquetOptions::new();
table_parquet_options.global.pushdown_filters = true;
table_parquet_options.global.reorder_filters = true;

let mut builder = ParquetExecBuilder::new(file_scan_config)
.with_table_parquet_options(table_parquet_options);

if let Some(filter) = test_data_filters {
builder = builder.with_predicate(filter);
}

let scan = builder.build();
return Ok((vec![], Arc::new(scan)));
}
OpStruct::Scan(scan) => {
let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec();

// If it is not test execution context for unit test, we should have at least one
// input source
if self.exec_context_id != TEST_EXEC_CONTEXT_ID && inputs.is_empty() {
Expand Down
9 changes: 9 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ message Operator {
SortMergeJoin sort_merge_join = 108;
HashJoin hash_join = 109;
Window window = 110;
NativeScan native_scan = 111;
}
}

Expand All @@ -52,6 +53,14 @@ message Scan {
// is purely for informational purposes when viewing native query plans in
// debug mode.
string source = 2;
}

message NativeScan {
repeated spark.spark_expression.DataType fields = 1;
// The source of the scan (e.g. file scan, broadcast exchange, shuffle, etc). This
// is purely for informational purposes when viewing native query plans in
// debug mode.
string source = 2;
repeated string path = 3;
string required_schema = 4;
string data_schema = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,22 @@ class CometSparkSessionExtensions
}

// data source V1
case scanExec @ FileSourceScanExec(
HadoopFsRelation(_, partitionSchema, _, _, _: ParquetFileFormat, _),
_: Seq[_],
requiredSchema,
_,
_,
_,
_,
_,
_)
if CometNativeScanExec.isSchemaSupported(requiredSchema)
&& CometNativeScanExec.isSchemaSupported(partitionSchema)
&& COMET_FULL_NATIVE_SCAN_ENABLED.get =>
logInfo("Comet extension enabled for v1 Scan")
CometNativeScanExec(scanExec, session)
// data source V1
case scanExec @ FileSourceScanExec(
HadoopFsRelation(_, partitionSchema, _, _, _: ParquetFileFormat, _),
_: Seq[_],
Expand Down Expand Up @@ -1205,7 +1221,8 @@ object CometSparkSessionExtensions extends Logging {
}

def isCometScan(op: SparkPlan): Boolean = {
op.isInstanceOf[CometBatchScanExec] || op.isInstanceOf[CometScanExec]
op.isInstanceOf[CometBatchScanExec] || op.isInstanceOf[CometScanExec] ||
op.isInstanceOf[CometNativeScanExec]
}

private def shouldApplySparkToColumnar(conf: SQLConf, op: SparkPlan): Boolean = {
Expand Down
Loading

0 comments on commit 38e32f7

Please sign in to comment.