Skip to content

Commit

Permalink
Rename InvalidTypeAction to UnsupportedTypeAction
Browse files Browse the repository at this point in the history
  • Loading branch information
phillipleblanc committed Feb 10, 2025
1 parent dd8853f commit b612131
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 65 deletions.
20 changes: 11 additions & 9 deletions src/duckdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
duckdbpool::DuckDbConnectionPool,
DbConnectionPool, DbInstanceKey, Mode,
},
InvalidTypeAction,
UnsupportedTypeAction,
};
use arrow::{array::RecordBatch, datatypes::SchemaRef};
use async_trait::async_trait;
Expand Down Expand Up @@ -129,7 +129,7 @@ type Result<T, E = Error> = std::result::Result<T, E>;
pub struct DuckDBTableProviderFactory {
access_mode: AccessMode,
instances: Arc<Mutex<HashMap<DbInstanceKey, DuckDbConnectionPool>>>,
invalid_type_action: InvalidTypeAction,
unsupported_type_action: UnsupportedTypeAction,
dialect: Arc<dyn Dialect>,
}

Expand All @@ -139,7 +139,7 @@ impl std::fmt::Debug for DuckDBTableProviderFactory {
f.debug_struct("DuckDBTableProviderFactory")
.field("access_mode", &self.access_mode)
.field("instances", &self.instances)
.field("invalid_type_action", &self.invalid_type_action)
.field("unsupported_type_action", &self.unsupported_type_action)
.finish()
}
}
Expand All @@ -154,14 +154,17 @@ impl DuckDBTableProviderFactory {
Self {
access_mode,
instances: Arc::new(Mutex::new(HashMap::new())),
invalid_type_action: InvalidTypeAction::Error,
unsupported_type_action: UnsupportedTypeAction::Error,
dialect: Arc::new(DuckDBDialect::new()),
}
}

#[must_use]
pub fn with_invalid_type_action(mut self, invalid_type_action: InvalidTypeAction) -> Self {
self.invalid_type_action = invalid_type_action;
pub fn with_unsupported_type_action(
mut self,
unsupported_type_action: UnsupportedTypeAction,
) -> Self {
self.unsupported_type_action = unsupported_type_action;
self
}

Expand Down Expand Up @@ -219,7 +222,7 @@ impl DuckDBTableProviderFactory {

let pool = DuckDbConnectionPool::new_memory()
.context(DbConnectionPoolSnafu)?
.with_invalid_type_action(self.invalid_type_action);
.with_unsupported_type_action(self.unsupported_type_action);

instances.insert(key, pool.clone());

Expand All @@ -240,7 +243,7 @@ impl DuckDBTableProviderFactory {

let pool = DuckDbConnectionPool::new_file(&db_path, &self.access_mode)
.context(DbConnectionPoolSnafu)?
.with_invalid_type_action(self.invalid_type_action);
.with_unsupported_type_action(self.unsupported_type_action);

instances.insert(key, pool.clone());

Expand All @@ -259,7 +262,6 @@ impl TableProviderFactory for DuckDBTableProviderFactory {
_state: &dyn Session,
cmd: &CreateExternalTable,
) -> DataFusionResult<Arc<dyn TableProvider>> {

if cmd.name.schema().is_some() {
TableWithSchemaCreationNotSupportedSnafu {
table_name: cmd.name.to_string(),
Expand Down
7 changes: 6 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ pub enum Error {

#[derive(PartialEq, Eq, Clone, Copy, Default, Debug, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum InvalidTypeAction {
pub enum UnsupportedTypeAction {
/// Refuse to create the table if any unsupported types are found
#[default]
Error,
/// Log a warning for any unsupported types
Warn,
/// Ignore any unsupported types (i.e. skip them)
Ignore,
/// Attempt to convert any unsupported types to a string
String,
}
4 changes: 2 additions & 2 deletions src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::sql::sql_provider_datafusion::{
SqlTable,
};
use crate::util::schema::SchemaValidator;
use crate::InvalidTypeAction;
use crate::UnsupportedTypeAction;
use arrow::{
array::RecordBatch,
datatypes::{Schema, SchemaRef},
Expand Down Expand Up @@ -267,7 +267,7 @@ impl TableProviderFactory for PostgresTableProviderFactory {
);

let schema: SchemaRef = Arc::new(schema);
PostgresConnection::handle_unsupported_schema(&schema, InvalidTypeAction::default())
PostgresConnection::handle_unsupported_schema(&schema, UnsupportedTypeAction::default())
.map_err(|e| DataFusionError::External(e.into()))?;

let postgres = Postgres::new(
Expand Down
21 changes: 11 additions & 10 deletions src/sql/db_connection_pool/dbconnection/duckdbconn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use snafu::{prelude::*, ResultExt};
use tokio::sync::mpsc::Sender;

use crate::util::schema::SchemaValidator;
use crate::InvalidTypeAction;
use crate::UnsupportedTypeAction;

use super::DbConnection;
use super::Result;
Expand Down Expand Up @@ -173,7 +173,7 @@ impl DuckDBAttachments {
pub struct DuckDbConnection {
pub conn: r2d2::PooledConnection<DuckdbConnectionManager>,
attachments: Option<Arc<DuckDBAttachments>>,
invalid_type_action: InvalidTypeAction,
unsupported_type_action: UnsupportedTypeAction,
}

impl SchemaValidator for DuckDbConnection {
Expand Down Expand Up @@ -222,8 +222,8 @@ impl DuckDbConnection {
}

#[must_use]
pub fn with_invalid_type_action(mut self, invalid_type_action: InvalidTypeAction) -> Self {
self.invalid_type_action = invalid_type_action;
pub fn with_unsupported_type_action(mut self, unsupported_type_action: UnsupportedTypeAction) -> Self {
self.unsupported_type_action = unsupported_type_action;
self
}

Expand Down Expand Up @@ -285,7 +285,7 @@ impl SyncDbConnection<r2d2::PooledConnection<DuckdbConnectionManager>, DuckDBPar
DuckDbConnection {
conn,
attachments: None,
invalid_type_action: InvalidTypeAction::default(),
unsupported_type_action: UnsupportedTypeAction::default(),
}
}

Expand All @@ -306,7 +306,7 @@ impl SyncDbConnection<r2d2::PooledConnection<DuckdbConnectionManager>, DuckDBPar
.boxed()
.context(super::UnableToGetSchemaSnafu)?;

Self::handle_unsupported_schema(&result.get_schema(), self.invalid_type_action)
Self::handle_unsupported_schema(&result.get_schema(), self.unsupported_type_action)
}

fn query_arrow(
Expand Down Expand Up @@ -522,7 +522,7 @@ mod tests {
let schema = Arc::new(SchemaBuilder::from(Fields::from(fields)).finish());

let rebuilt_schema =
DuckDbConnection::handle_unsupported_schema(&schema, InvalidTypeAction::Error)
DuckDbConnection::handle_unsupported_schema(&schema, UnsupportedTypeAction::Error)
.expect("should rebuild schema successfully");

assert_eq!(schema, rebuilt_schema);
Expand Down Expand Up @@ -574,17 +574,18 @@ mod tests {
Arc::new(SchemaBuilder::from(Fields::from(rebuilt_fields)).finish());

assert!(
DuckDbConnection::handle_unsupported_schema(&schema, InvalidTypeAction::Error).is_err()
DuckDbConnection::handle_unsupported_schema(&schema, UnsupportedTypeAction::Error)
.is_err()
);

let rebuilt_schema =
DuckDbConnection::handle_unsupported_schema(&schema, InvalidTypeAction::Warn)
DuckDbConnection::handle_unsupported_schema(&schema, UnsupportedTypeAction::Warn)
.expect("should rebuild schema successfully");

assert_eq!(rebuilt_schema, expected_rebuilt_schema);

let rebuilt_schema =
DuckDbConnection::handle_unsupported_schema(&schema, InvalidTypeAction::Ignore)
DuckDbConnection::handle_unsupported_schema(&schema, UnsupportedTypeAction::Ignore)
.expect("should rebuild schema successfully");

assert_eq!(rebuilt_schema, expected_rebuilt_schema);
Expand Down
12 changes: 6 additions & 6 deletions src/sql/db_connection_pool/dbconnection/postgresconn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use futures::StreamExt;
use postgres_native_tls::MakeTlsConnector;
use snafu::prelude::*;

use crate::InvalidTypeAction;
use crate::UnsupportedTypeAction;

use super::AsyncDbConnection;
use super::DbConnection;
Expand Down Expand Up @@ -131,7 +131,7 @@ pub enum PostgresError {

pub struct PostgresConnection {
pub conn: bb8::PooledConnection<'static, PostgresConnectionManager<MakeTlsConnector>>,
invalid_type_action: InvalidTypeAction,
unsupported_type_action: UnsupportedTypeAction,
}

impl SchemaValidator for PostgresConnection {
Expand Down Expand Up @@ -187,7 +187,7 @@ impl<'a>
) -> Self {
PostgresConnection {
conn,
invalid_type_action: InvalidTypeAction::default(),
unsupported_type_action: UnsupportedTypeAction::default(),
}
}

Expand Down Expand Up @@ -232,7 +232,7 @@ impl<'a>
let type_details = row.get::<usize, Option<serde_json::Value>>(3);
let Ok(arrow_type) = pg_data_type_to_arrow_type(&pg_type, type_details) else {
handle_invalid_type_error(
self.invalid_type_action,
self.unsupported_type_action,
super::Error::UnsupportedDataType {
data_type: pg_type.to_string(),
field_name: column_name.to_string(),
Expand Down Expand Up @@ -310,8 +310,8 @@ impl<'a>

impl PostgresConnection {
#[must_use]
pub fn with_invalid_type_action(mut self, action: InvalidTypeAction) -> Self {
self.invalid_type_action = action;
pub fn with_unsupported_type_action(mut self, action: UnsupportedTypeAction) -> Self {
self.unsupported_type_action = action;
self
}
}
4 changes: 2 additions & 2 deletions src/sql/db_connection_pool/dbconnection/sqliteconn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::any::Any;

use crate::sql::arrow_sql_gen::sqlite::rows_to_arrow;
use crate::util::schema::SchemaValidator;
use crate::InvalidTypeAction;
use crate::UnsupportedTypeAction;
use arrow::datatypes::SchemaRef;
use arrow_schema::DataType;
use async_trait::async_trait;
Expand Down Expand Up @@ -110,7 +110,7 @@ impl AsyncDbConnection<Connection, &'static (dyn ToSql + Sync)> for SqliteConnec
.boxed()
.context(super::UnableToGetSchemaSnafu)?;

Self::handle_unsupported_schema(&schema, InvalidTypeAction::Error)
Self::handle_unsupported_schema(&schema, UnsupportedTypeAction::Error)
}

async fn query_arrow(
Expand Down
18 changes: 9 additions & 9 deletions src/sql/db_connection_pool/duckdbpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
dbconnection::{duckdbconn::DuckDbConnection, DbConnection, SyncDbConnection},
JoinPushDown,
},
InvalidTypeAction,
UnsupportedTypeAction,
};

#[derive(Debug, Snafu)]
Expand All @@ -38,7 +38,7 @@ pub struct DuckDbConnectionPool {
join_push_down: JoinPushDown,
attached_databases: Vec<Arc<str>>,
mode: Mode,
invalid_type_action: InvalidTypeAction,
unsupported_type_action: UnsupportedTypeAction,
}

impl std::fmt::Debug for DuckDbConnectionPool {
Expand All @@ -48,7 +48,7 @@ impl std::fmt::Debug for DuckDbConnectionPool {
.field("join_push_down", &self.join_push_down)
.field("attached_databases", &self.attached_databases)
.field("mode", &self.mode)
.field("invalid_type_action", &self.invalid_type_action)
.field("unsupported_type_action", &self.unsupported_type_action)
.finish()
}
}
Expand Down Expand Up @@ -86,7 +86,7 @@ impl DuckDbConnectionPool {
join_push_down: JoinPushDown::AllowedFor(":memory:".to_string()),
attached_databases: Vec::new(),
mode: Mode::Memory,
invalid_type_action: InvalidTypeAction::Error,
unsupported_type_action: UnsupportedTypeAction::Error,
})
}

Expand Down Expand Up @@ -124,13 +124,13 @@ impl DuckDbConnectionPool {
join_push_down: JoinPushDown::AllowedFor(path.to_string()),
attached_databases: Vec::new(),
mode: Mode::File,
invalid_type_action: InvalidTypeAction::Error,
unsupported_type_action: UnsupportedTypeAction::Error,
})
}

#[must_use]
pub fn with_invalid_type_action(mut self, action: InvalidTypeAction) -> Self {
self.invalid_type_action = action;
pub fn with_unsupported_type_action(mut self, action: UnsupportedTypeAction) -> Self {
self.unsupported_type_action = action;
self
}

Expand Down Expand Up @@ -168,7 +168,7 @@ impl DuckDbConnectionPool {
Ok(Box::new(
DuckDbConnection::new(conn)
.with_attachments(attachments)
.with_invalid_type_action(self.invalid_type_action),
.with_unsupported_type_action(self.unsupported_type_action),
))
}

Expand Down Expand Up @@ -211,7 +211,7 @@ impl DbConnectionPool<r2d2::PooledConnection<DuckdbConnectionManager>, DuckDBPar
Ok(Box::new(
DuckDbConnection::new(conn)
.with_attachments(attachments)
.with_invalid_type_action(self.invalid_type_action),
.with_unsupported_type_action(self.unsupported_type_action),
))
}

Expand Down
12 changes: 6 additions & 6 deletions src/sql/db_connection_pool/postgrespool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{collections::HashMap, path::PathBuf, str::FromStr, sync::Arc};

use crate::{
util::{self, ns_lookup::verify_ns_lookup_and_tcp_connect},
InvalidTypeAction,
UnsupportedTypeAction,
};
use async_trait::async_trait;
use bb8::ErrorSink;
Expand Down Expand Up @@ -82,7 +82,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
pub struct PostgresConnectionPool {
pool: Arc<bb8::Pool<PostgresConnectionManager<MakeTlsConnector>>>,
join_push_down: JoinPushDown,
invalid_type_action: InvalidTypeAction,
unsupported_type_action: UnsupportedTypeAction,
}

impl PostgresConnectionPool {
Expand Down Expand Up @@ -208,14 +208,14 @@ impl PostgresConnectionPool {
Ok(PostgresConnectionPool {
pool: Arc::new(pool.clone()),
join_push_down,
invalid_type_action: InvalidTypeAction::default(),
unsupported_type_action: UnsupportedTypeAction::default(),
})
}

/// Specify the action to take when an invalid type is encountered.
#[must_use]
pub fn with_invalid_type_action(mut self, action: InvalidTypeAction) -> Self {
self.invalid_type_action = action;
pub fn with_unsupported_type_action(mut self, action: UnsupportedTypeAction) -> Self {
self.unsupported_type_action = action;
self
}

Expand Down Expand Up @@ -385,7 +385,7 @@ impl
let pool = Arc::clone(&self.pool);
let conn = pool.get_owned().await.context(ConnectionPoolRunSnafu)?;
Ok(Box::new(
PostgresConnection::new(conn).with_invalid_type_action(self.invalid_type_action),
PostgresConnection::new(conn).with_unsupported_type_action(self.unsupported_type_action),
))
}

Expand Down
4 changes: 2 additions & 2 deletions src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::sql::db_connection_pool::{
};
use crate::sql::sql_provider_datafusion;
use crate::util::schema::SchemaValidator;
use crate::InvalidTypeAction;
use crate::UnsupportedTypeAction;
use arrow::array::{Int64Array, StringArray};
use arrow::{array::RecordBatch, datatypes::SchemaRef};
use async_trait::async_trait;
Expand Down Expand Up @@ -299,7 +299,7 @@ impl TableProviderFactory for SqliteTableProviderFactory {

let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into());
let schema: SchemaRef =
SqliteConnection::handle_unsupported_schema(&schema, InvalidTypeAction::Error)
SqliteConnection::handle_unsupported_schema(&schema, UnsupportedTypeAction::Error)
.map_err(|e| DataFusionError::External(e.into()))?;

let sqlite = Arc::new(Sqlite::new(
Expand Down
Loading

0 comments on commit b612131

Please sign in to comment.