Skip to content

Commit

Permalink
feat: add integer dna and protein encoding (#511)
Browse files Browse the repository at this point in the history
  • Loading branch information
tshauck authored May 23, 2024
1 parent c15621b commit edc1d80
Show file tree
Hide file tree
Showing 19 changed files with 341 additions and 88 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.75 as builder
FROM rust:1.76 as builder

COPY . /usr/src/exon
WORKDIR /usr/src/exon
Expand Down
28 changes: 22 additions & 6 deletions exon/exon-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::str::FromStr;

use datafusion::{
common::extensions_options,
config::{ConfigExtension, ConfigOptions},
execution::context::SessionState,
prelude::SessionConfig,
};
use exon_fasta::SequenceDataType;

use crate::error::{ExonError, Result};

Expand Down Expand Up @@ -69,10 +72,21 @@ extensions_options! {
pub vcf_parse_info: bool, default = false
pub vcf_parse_formats: bool, default = false
pub fasta_sequence_buffer_capacity: usize, default = FASTA_READER_SEQUENCE_CAPACITY
pub fasta_large_utf8: bool, default = false
pub sam_parse_tags: bool, default = false
pub bam_parse_tags: bool, default = false
pub cram_parse_tags: bool, default = false
pub fasta_sequence_data_type: String, default = "utf8".to_string()
}
}

impl ExonConfigExtension {
pub fn fasta_sequence_data_type(&self) -> Result<SequenceDataType> {
SequenceDataType::from_str(&self.fasta_sequence_data_type).map_err(|_| {
ExonError::Configuration(format!(
"Invalid sequence data type: {}",
self.fasta_sequence_data_type
))
})
}
}

Expand Down Expand Up @@ -102,7 +116,7 @@ mod tests {
exon_config.fasta_sequence_buffer_capacity,
super::FASTA_READER_SEQUENCE_CAPACITY
);
assert!(!exon_config.fasta_large_utf8);
assert_eq!(exon_config.fasta_sequence_data_type, "utf8");
assert!(!exon_config.sam_parse_tags);
assert!(!exon_config.bam_parse_tags);
assert!(!exon_config.cram_parse_tags);
Expand All @@ -118,7 +132,7 @@ mod tests {
options.set("exon.vcf_parse_info", "false")?;
options.set("exon.vcf_parse_formats", "false")?;
options.set("exon.fasta_sequence_buffer_capacity", "1024")?;
options.set("exon.fasta_large_utf8", "true")?;
options.set("exon.fasta_sequence_data_type", "large_utf8")?;
options.set("exon.sam_parse_tags", "true")?;
options.set("exon.bam_parse_tags", "true")?;
options.set("exon.cram_parse_tags", "true")?;
Expand All @@ -132,11 +146,12 @@ mod tests {
assert!(!exon_config.vcf_parse_info);
assert!(!exon_config.vcf_parse_formats);
assert_eq!(exon_config.fasta_sequence_buffer_capacity, 1024);
assert!(exon_config.fasta_large_utf8);
assert!(exon_config.sam_parse_tags);
assert!(exon_config.bam_parse_tags);
assert!(exon_config.cram_parse_tags);

assert_eq!(exon_config.fasta_sequence_data_type, "large_utf8");

Ok(())
}

Expand All @@ -148,10 +163,11 @@ mod tests {
ctx.sql("SET exon.vcf_parse_formats = true").await?;
ctx.sql("SET exon.fasta_sequence_buffer_capacity = 1024")
.await?;
ctx.sql("SET exon.fasta_large_utf8 = true").await?;
ctx.sql("SET exon.sam_parse_tags = true").await?;
ctx.sql("SET exon.bam_parse_tags = true").await?;
ctx.sql("SET exon.cram_parse_tags = true").await?;
ctx.sql("SET exon.fasta_sequence_data_type = 'large_utf8'")
.await?;

let state = ctx.state();
let exon_config = state
Expand All @@ -164,7 +180,7 @@ mod tests {
assert!(exon_config.vcf_parse_info);
assert!(exon_config.vcf_parse_formats);
assert_eq!(exon_config.fasta_sequence_buffer_capacity, 1024);
assert!(exon_config.fasta_large_utf8);
assert_eq!(exon_config.fasta_sequence_data_type, "large_utf8");
assert!(exon_config.sam_parse_tags);
assert!(exon_config.bam_parse_tags);
assert!(exon_config.cram_parse_tags);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ impl ExonListingTableFactory {

let table_options = ListingFASTATableOptions::new(file_compression_type)
.with_table_partition_cols(table_partition_cols)
.with_sequence_data_type(exon_config_extension.fasta_sequence_data_type()?)
.with_some_file_extension(extension);

let schema = table_options.infer_schema(state).await?;
Expand Down
8 changes: 8 additions & 0 deletions exon/exon-core/src/datasources/exon_listing_table_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use datafusion::{
execution::runtime_env::RuntimeEnv,
physical_plan::ExecutionPlan,
};
use exon_fasta::SequenceDataType;
use noodles::core::Region;
use object_store::{path::Path, ObjectStore};

Expand Down Expand Up @@ -121,6 +122,13 @@ pub trait ExonFileIndexedListingOptions: ExonIndexedListingOptions {
}
}

#[async_trait]
/// Options for a listing table with configurable sequence data type
pub trait ExonSequenceDataTypeOptions {
/// The sequence data type for the table
fn sequence_data_type(&self) -> &SequenceDataType;
}

#[derive(Debug, Clone)]
/// Configuration for listing a table
pub struct ExonListingConfig<T> {
Expand Down
8 changes: 7 additions & 1 deletion exon/exon-core/src/datasources/fasta/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion::{
PlanProperties, SendableRecordBatchStream,
},
};
use exon_fasta::FASTAConfig;
use exon_fasta::{FASTAConfig, SequenceDataType};

use crate::datasources::ExonFileScanConfig;

Expand Down Expand Up @@ -57,6 +57,9 @@ pub struct FASTAScan {

/// The statistics for the scan.
statistics: Statistics,

/// The sequence data type.
sequence_data_type: SequenceDataType,
}

impl FASTAScan {
Expand All @@ -65,6 +68,7 @@ impl FASTAScan {
base_config: FileScanConfig,
file_compression_type: FileCompressionType,
fasta_sequence_buffer_capacity: usize,
sequence_data_type: SequenceDataType,
) -> Self {
let (projected_schema, statistics, properties) = base_config.project_with_properties();

Expand All @@ -76,6 +80,7 @@ impl FASTAScan {
fasta_sequence_buffer_capacity,
properties,
statistics,
sequence_data_type,
}
}

Expand Down Expand Up @@ -158,6 +163,7 @@ impl ExecutionPlan for FASTAScan {
let config = FASTAConfig::new(object_store, self.base_config.file_schema.clone())
.with_batch_size(batch_size)
.with_fasta_sequence_buffer_capacity(self.fasta_sequence_buffer_capacity)
.with_sequence_data_type(self.sequence_data_type.clone())
.with_projection(self.base_config.file_projection());

let opener = FASTAOpener::new(Arc::new(config), self.file_compression_type);
Expand Down
40 changes: 34 additions & 6 deletions exon/exon-core/src/datasources/fasta/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
exon_file_type::get_file_extension_with_compression,
exon_listing_table_options::{
ExonFileIndexedListingOptions, ExonIndexedListingOptions, ExonListingConfig,
ExonListingOptions,
ExonListingOptions, ExonSequenceDataTypeOptions,
},
hive_partition::filter_matches_partition_cols,
indexed_file::{fai::compute_fai_range, region::RegionObjectStoreExtension},
Expand All @@ -42,7 +42,7 @@ use datafusion::{
prelude::Expr,
};
use exon_common::TableSchema;
use exon_fasta::FASTASchemaBuilder;
use exon_fasta::{FASTASchemaBuilder, SequenceDataType};
use futures::TryStreamExt;
use noodles::{core::Region, fasta::fai::Reader};
use object_store::{path::Path, ObjectStore};
Expand All @@ -66,6 +66,16 @@ pub struct ListingFASTATableOptions {

/// The region file to read from
region_file: Option<String>,

/// The sequence data type for the table
sequence_data_type: SequenceDataType,
}

#[async_trait]
impl ExonSequenceDataTypeOptions for ListingFASTATableOptions {
fn sequence_data_type(&self) -> &SequenceDataType {
&self.sequence_data_type
}
}

#[async_trait]
Expand All @@ -86,7 +96,12 @@ impl ExonListingOptions for ListingFASTATableOptions {
&self,
conf: datafusion::datasource::physical_plan::FileScanConfig,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
let scan = FASTAScan::new(conf, self.file_compression_type(), 2000);
let scan = FASTAScan::new(
conf,
self.file_compression_type(),
2000,
self.sequence_data_type.clone(),
);

Ok(Arc::new(scan))
}
Expand Down Expand Up @@ -134,6 +149,7 @@ impl Default for ListingFASTATableOptions {
table_partition_cols: Vec::new(),
regions: Vec::new(),
region_file: None,
sequence_data_type: SequenceDataType::Utf8,
}
}
}
Expand All @@ -149,6 +165,15 @@ impl ListingFASTATableOptions {
table_partition_cols: Vec::new(),
regions: Vec::new(),
region_file: None,
sequence_data_type: SequenceDataType::Utf8,
}
}

/// Set the sequence data type for the table
pub fn with_sequence_data_type(self, sequence_data_type: SequenceDataType) -> Self {
Self {
sequence_data_type,
..self
}
}

Expand Down Expand Up @@ -194,7 +219,7 @@ impl ListingFASTATableOptions {
))?;

let mut fasta_schema_builder = FASTASchemaBuilder::default()
.with_large_utf8(exon_settings.fasta_large_utf8)
.with_sequence_data_type(exon_settings.fasta_sequence_data_type()?)
.with_partition_fields(self.table_partition_cols.clone());

Ok(fasta_schema_builder.build())
Expand All @@ -217,7 +242,7 @@ pub struct ListingFASTATable<T> {
table_schema: TableSchema,
}

impl<T: ExonFileIndexedListingOptions> ListingFASTATable<T> {
impl<T: ExonFileIndexedListingOptions + ExonSequenceDataTypeOptions> ListingFASTATable<T> {
/// Create a new VCF listing table
pub fn try_new(config: ExonListingConfig<T>, table_schema: TableSchema) -> Result<Self> {
Ok(Self {
Expand Down Expand Up @@ -280,7 +305,9 @@ impl<T: ExonFileIndexedListingOptions> ListingFASTATable<T> {
}

#[async_trait]
impl<T: ExonFileIndexedListingOptions + 'static> TableProvider for ListingFASTATable<T> {
impl<T: ExonFileIndexedListingOptions + ExonSequenceDataTypeOptions + 'static> TableProvider
for ListingFASTATable<T>
{
fn as_any(&self) -> &dyn Any {
self
}
Expand Down Expand Up @@ -428,6 +455,7 @@ impl<T: ExonFileIndexedListingOptions + 'static> TableProvider for ListingFASTAT
file_scan_config,
self.config.options.file_compression_type(),
fasta_sequence_buffer_capacity,
self.config.options.sequence_data_type().clone(),
);

Ok(Arc::new(scan))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl TableFunctionImpl for FastaIndexedScanFunction {
))?;

let fasta_schema = FASTASchemaBuilder::default()
.with_large_utf8(exon_settings.fasta_large_utf8)
.with_sequence_data_type(exon_settings.fasta_sequence_data_type()?)
.build();

futures::executor::block_on(async {
Expand Down
2 changes: 1 addition & 1 deletion exon/exon-core/src/datasources/fasta/udtfs/fasta_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl TableFunctionImpl for FastaScanFunction {
))?;

let fasta_schema = FASTASchemaBuilder::default()
.with_large_utf8(exon_settings.fasta_large_utf8)
.with_sequence_data_type(exon_settings.fasta_sequence_data_type()?)
.build();

let listing_table_options =
Expand Down
1 change: 1 addition & 0 deletions exon/exon-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
mod session_context;
pub use session_context::ExonSessionExt;

#[allow(clippy::cmp_owned)]
mod config;
pub use config::new_exon_config;

Expand Down
4 changes: 2 additions & 2 deletions exon/exon-core/test-data/datasources/faa/test.faa
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
>a description
MMM
ACDEFGHIKLMNPQRSTVWY
>b description2
MMM
ACDEFGHIKLMNPQRSTVWY
Binary file modified exon/exon-core/test-data/datasources/fasta/test.parquet
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
control substitution on

statement ok
SET exon.fasta_sequence_data_type = 'utf8';

query III
SELECT * FROM fasta_indexed_scan('$CARGO_MANIFEST_DIR/test-data/datasources/fasta-indexed/test.fasta', 'a');
----
Expand Down
34 changes: 33 additions & 1 deletion exon/exon-core/tests/sqllogictests/slt/fasta-scan-tests.slt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ SELECT id, description, sequence FROM fasta_table;
a description ATCG
b description2 ATCG


query T
SELECT * FROM fasta_table WHERE id = 'a';
----
Expand Down Expand Up @@ -115,3 +114,36 @@ query T
SELECT COUNT(*) FROM fa_table;
----
2

statement ok
DROP TABLE fa_table;

statement ok
SET exon.fasta_sequence_data_type = 'one_hot_dna';

statement ok
CREATE EXTERNAL TABLE exon_table STORED AS FASTA LOCATION '$CARGO_MANIFEST_DIR/test-data/datasources/fasta/test.fasta';

query T
SELECT id, description, sequence FROM exon_table;
----
a description [1, 4, 2, 3]
b description2 [1, 4, 2, 3]

statement ok
DROP TABLE exon_table;

statement ok
SET exon.fasta_sequence_data_type = 'one_hot_protein';

statement ok
CREATE EXTERNAL TABLE exon_table STORED AS FASTA OPTIONS (file_extension 'faa') LOCATION '$CARGO_MANIFEST_DIR/test-data/datasources/faa/test.faa';

query T
SELECT id, description, sequence FROM exon_table;
----
a description [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
b description2 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

statement ok
DROP TABLE exon_table;
Loading

0 comments on commit edc1d80

Please sign in to comment.