diff --git a/docs/source/queries/paged.md b/docs/source/queries/paged.md index 516a149ab..3944de568 100644 --- a/docs/source/queries/paged.md +++ b/docs/source/queries/paged.md @@ -202,7 +202,7 @@ loop { .execute_single_page(&paged_prepared, &[], paging_state) .await?; - let rows_res = res.into_rows_result()?.unwrap(); + let rows_res = res.into_rows_result()?; println!( "Paging state response from the prepared statement execution: {:#?} ({} rows)", diff --git a/docs/source/queries/result.md b/docs/source/queries/result.md index 5e88e1510..db63637e7 100644 --- a/docs/source/queries/result.md +++ b/docs/source/queries/result.md @@ -41,20 +41,18 @@ Additionally, [`QueryResult`](https://docs.rs/scylla/latest/scylla/transport/que let result = session .query_unpaged("SELECT a from ks.tab", &[]) .await? - .into_rows_result()? - .unwrap(); + .into_rows_result()?; for row in result.rows::<(i32,)>()? { let (int_value,): (i32,) = row?; } // first_row gets the first row and parses it as the given type -let first_int_val: Option<(i32,)> = session +let first_int_val: (i32,) = session .query_unpaged("SELECT a from ks.tab", &[]) .await? .into_rows_result()? - .map(|res| res.first_row::<(i32,)>()) - .transpose()?; + .first_row::<(i32,)>()?; // result_not_rows fails when the response is rows session.query_unpaged("INSERT INTO ks.tab (a) VALUES (0)", &[]).await?.result_not_rows()?; @@ -75,13 +73,13 @@ To properly handle `NULL` values parse column as an `Option<>`: use scylla::IntoTypedRows; // Parse row as two columns containing an int and text which might be null -if let Some(rows_result) = session.query_unpaged("SELECT a, b from ks.tab", &[]) +let rows_result = session + .query_unpaged("SELECT a, b from ks.tab", &[]) .await? - .into_rows_result()? -{ - for row in rows_result.rows::<(i32, Option<&str>)>()? { - let (int_value, str_or_null): (i32, Option<&str>) = row?; - } + .into_rows_result()?; + +for row in rows_result.rows::<(i32, Option<&str>)>()? { + let (int_value, str_or_null): (i32, Option<&str>) = row?; } # Ok(()) # } @@ -111,13 +109,13 @@ struct MyRow { } // Parse row as two columns containing an int and text which might be null -if let Some(result_rows) = session.query_unpaged("SELECT a, b from ks.tab", &[]) +let result_rows = session + .query_unpaged("SELECT a, b from ks.tab", &[]) .await? - .into_rows_result()? -{ - for row in result_rows.rows::()? { - let my_row: MyRow = row?; - } + .into_rows_result()?; + +for row in result_rows.rows::()? { + let my_row: MyRow = row?; } # Ok(()) # } diff --git a/docs/source/queries/simple.md b/docs/source/queries/simple.md index 468c10c93..a91799859 100644 --- a/docs/source/queries/simple.md +++ b/docs/source/queries/simple.md @@ -103,8 +103,8 @@ use scylla::IntoTypedRows; // Query rows from the table and print them let result = session.query_unpaged("SELECT a FROM ks.tab", &[]) .await? - .into_rows_result()? - .unwrap(); + .into_rows_result()?; + let mut iter = result.rows::<(i32,)>()?; while let Some(read_row) = iter.next().transpose()? { println!("Read a value from row: {}", read_row.0); diff --git a/examples/compare-tokens.rs b/examples/compare-tokens.rs index 5350006b9..ab4bbb6b1 100644 --- a/examples/compare-tokens.rs +++ b/examples/compare-tokens.rs @@ -52,7 +52,6 @@ async fn main() -> Result<()> { ) .await? .into_rows_result()? - .expect("Got not Rows result") .single_row()?; assert_eq!(t, qt); println!("token for {}: {}", pk, t); diff --git a/examples/cqlsh-rs.rs b/examples/cqlsh-rs.rs index ba4651963..04e303d25 100644 --- a/examples/cqlsh-rs.rs +++ b/examples/cqlsh-rs.rs @@ -4,9 +4,10 @@ use rustyline::error::ReadlineError; use rustyline::{CompletionType, Config, Context, Editor}; use rustyline_derive::{Helper, Highlighter, Hinter, Validator}; use scylla::frame::response::result::Row; +use scylla::transport::query_result::IntoRowsResultError; use scylla::transport::session::Session; use scylla::transport::Compression; -use scylla::QueryRowsResult; +use scylla::QueryResult; use scylla::SessionBuilder; use std::env; @@ -176,24 +177,27 @@ impl Completer for CqlHelper { } } -fn print_result(result: Option<&QueryRowsResult>) { - if let Some(rows_result) = result { - for row in rows_result.rows::().unwrap() { - let row = row.unwrap(); - for column in &row.columns { - print!("|"); - print!( - " {:16}", - match column { - None => "null".to_owned(), - Some(value) => format!("{:?}", value), - } - ); +fn print_result(result: QueryResult) -> Result<(), IntoRowsResultError> { + match result.into_rows_result() { + Ok(rows_result) => { + for row in rows_result.rows::().unwrap() { + let row = row.unwrap(); + for column in &row.columns { + print!("|"); + print!( + " {:16}", + match column { + None => "null".to_owned(), + Some(value) => format!("{:?}", value), + } + ); + } + println!("|"); } - println!("|") + Ok(()) } - } else { - println!("OK"); + Err(IntoRowsResultError::ResultNotRows(_)) => Ok(println!("OK")), + Err(e) => Err(e), } } @@ -226,10 +230,7 @@ async fn main() -> Result<()> { let maybe_res = session.query_unpaged(line, &[]).await; match maybe_res { Err(err) => println!("Error: {}", err), - Ok(res) => { - let rows_res = res.into_rows_result()?; - print_result(rows_res.as_ref()) - } + Ok(res) => print_result(res)?, } } Err(ReadlineError::Interrupted) => continue, diff --git a/examples/custom_deserialization.rs b/examples/custom_deserialization.rs index 0306ebe87..5a5991edf 100644 --- a/examples/custom_deserialization.rs +++ b/examples/custom_deserialization.rs @@ -1,4 +1,4 @@ -use anyhow::{Context, Result}; +use anyhow::Result; use scylla::deserialize::DeserializeValue; use scylla::frame::response::result::ColumnType; use scylla::transport::session::Session; @@ -55,8 +55,7 @@ async fn main() -> Result<()> { (), ) .await? - .into_rows_result()? - .context("Expected Result:Rows response, got a different Result response.")?; + .into_rows_result()?; let (v,) = rows_result.single_row::<(MyType,)>()?; assert_eq!(v, MyType("asdf")); diff --git a/examples/get_by_name.rs b/examples/get_by_name.rs index 1caca3e3d..4aca66f66 100644 --- a/examples/get_by_name.rs +++ b/examples/get_by_name.rs @@ -1,4 +1,4 @@ -use anyhow::{anyhow, Context as _, Result}; +use anyhow::{anyhow, Result}; use scylla::frame::response::result::Row; use scylla::transport::session::Session; use scylla::SessionBuilder; @@ -39,8 +39,7 @@ async fn main() -> Result<()> { let rows_result = session .query_unpaged("SELECT pk, ck, value FROM examples_ks.get_by_name", &[]) .await? - .into_rows_result()? - .context("Response is not of Rows type")?; + .into_rows_result()?; let col_specs = rows_result.column_specs(); let (ck_idx, _) = col_specs .get_by_name("ck") diff --git a/examples/select-paging.rs b/examples/select-paging.rs index b3c7501fe..00aa961fc 100644 --- a/examples/select-paging.rs +++ b/examples/select-paging.rs @@ -51,9 +51,7 @@ async fn main() -> Result<()> { .query_single_page(paged_query.clone(), &[], paging_state) .await?; - let res = res - .into_rows_result()? - .expect("Got result different than Rows"); + let res = res.into_rows_result()?; println!( "Paging state: {:#?} ({} rows)", @@ -85,9 +83,7 @@ async fn main() -> Result<()> { .execute_single_page(&paged_prepared, &[], paging_state) .await?; - let res = res - .into_rows_result()? - .expect("Got result different than Rows"); + let res = res.into_rows_result()?; println!( "Paging state from the prepared statement execution: {:#?} ({} rows)", diff --git a/examples/tower.rs b/examples/tower.rs index c34c3f398..f521b1b61 100644 --- a/examples/tower.rs +++ b/examples/tower.rs @@ -45,8 +45,7 @@ async fn main() -> anyhow::Result<()> { let rows_result = session .call("SELECT keyspace_name, table_name FROM system_schema.tables;".into()) .await? - .into_rows_result()? - .expect("Got result different than Rows"); + .into_rows_result()?; let print_text = |t: &Option| { t.as_ref() diff --git a/scylla-cql/src/frame/frame_errors.rs b/scylla-cql/src/frame/frame_errors.rs index 94e455bb7..1f12a6008 100644 --- a/scylla-cql/src/frame/frame_errors.rs +++ b/scylla-cql/src/frame/frame_errors.rs @@ -14,7 +14,6 @@ pub use super::request::{ use super::response::result::TableSpec; use super::response::CqlResponseKind; use super::TryFromPrimitiveError; -use crate::types::deserialize::{DeserializationError, TypeCheckError}; use thiserror::Error; /// An error returned by `parse_response_body_extensions`. @@ -227,7 +226,7 @@ pub enum CqlResultParseError { #[error("RESULT:Prepared response deserialization failed: {0}")] PreparedParseError(#[from] PreparedParseError), #[error("RESULT:Rows response deserialization failed: {0}")] - RowsParseError(#[from] RowsParseError), + RawRowsParseError(#[from] RawRowsAndPagingStateResponseParseError), } #[non_exhaustive] @@ -301,48 +300,99 @@ pub enum PreparedParseError { #[error("Invalid result metadata: {0}")] ResultMetadataParseError(ResultMetadataParseError), #[error("Invalid prepared metadata: {0}")] - PreparedMetadataParseError(ResultMetadataParseError), + PreparedMetadataParseError(PreparedMetadataParseError), #[error("Non-zero paging state in result metadata: {0:?}")] NonZeroPagingState(Arc<[u8]>), } -/// An error type returned when deserialization -/// of `RESULT::Rows` response fails. +/// An error that occurred during initial deserialization of +/// `RESULT:Rows` response. Since the deserialization of rows is lazy, +/// we initially only need to deserialize: +/// - result metadata flags +/// - column count (result metadata) +/// - paging state response #[non_exhaustive] #[derive(Debug, Error, Clone)] -pub enum RowsParseError { - #[error("Invalid result metadata: {0}")] - ResultMetadataParseError(#[from] ResultMetadataParseError), - #[error("Invalid result metadata, server claims {col_count} columns, received {col_specs_count} col specs.")] - ColumnCountMismatch { - col_count: usize, - col_specs_count: usize, - }, - #[error("Malformed rows count: {0}")] - RowsCountParseError(LowLevelDeserializationError), - #[error("Data type check prior to deserialization failed: {0}")] - IncomingDataTypeCheckError(#[from] TypeCheckError), - #[error("Data deserialization failed: {0}")] - DataDeserializationError(#[from] DeserializationError), +pub enum RawRowsAndPagingStateResponseParseError { + /// Failed to parse metadata flags. + #[error("Malformed metadata flags: {0}")] + FlagsParseError(LowLevelDeserializationError), + + /// Failed to parse column count. + #[error("Malformed column count: {0}")] + ColumnCountParseError(LowLevelDeserializationError), + + /// Failed to parse paging state response. + #[error("Malformed paging state: {0}")] + PagingStateParseError(LowLevelDeserializationError), } /// An error type returned when deserialization -/// of `[Result/Prepared]Metadata` failed. +/// of statement's prepared metadata failed. #[non_exhaustive] #[derive(Error, Debug, Clone)] -pub enum ResultMetadataParseError { +pub enum PreparedMetadataParseError { + /// Failed to parse metadata flags. #[error("Malformed metadata flags: {0}")] FlagsParseError(LowLevelDeserializationError), + + /// Failed to parse column count. #[error("Malformed column count: {0}")] ColumnCountParseError(LowLevelDeserializationError), + + /// Failed to parse partition key count. #[error("Malformed partition key count: {0}")] PkCountParseError(LowLevelDeserializationError), + + /// Failed to parse partition key index. #[error("Malformed partition key index: {0}")] PkIndexParseError(LowLevelDeserializationError), + + /// Failed to parse global table spec. + #[error("Invalid global table spec: {0}")] + GlobalTableSpecParseError(#[from] TableSpecParseError), + + /// Failed to parse column spec. + #[error("Invalid column spec: {0}")] + ColumnSpecParseError(#[from] ColumnSpecParseError), +} + +/// An error returned when lazy deserialization of +/// result metadata and rows count fails. +#[non_exhaustive] +#[derive(Error, Debug, Clone)] +pub enum ResultMetadataAndRowsCountParseError { + /// Failed to deserialize result metadata. + #[error("Failed to lazily deserialize result metadata: {0}")] + ResultMetadataParseError(#[from] ResultMetadataParseError), + + /// Received malformed rows count from the server. + #[error("Malformed rows count: {0}")] + RowsCountParseError(LowLevelDeserializationError), +} + +/// An error type returned when deserialization +/// of result metadata failed. +#[non_exhaustive] +#[derive(Error, Debug, Clone)] +pub enum ResultMetadataParseError { + /// Failed to parse metadata flags. + #[error("Malformed metadata flags: {0}")] + FlagsParseError(LowLevelDeserializationError), + + /// Failed to parse column count. + #[error("Malformed column count: {0}")] + ColumnCountParseError(LowLevelDeserializationError), + + /// Failed to parse paging state response. #[error("Malformed paging state: {0}")] PagingStateParseError(LowLevelDeserializationError), + + /// Failed to parse global table spec. #[error("Invalid global table spec: {0}")] GlobalTableSpecParseError(#[from] TableSpecParseError), + + /// Failed to parse column spec. #[error("Invalid column spec: {0}")] ColumnSpecParseError(#[from] ColumnSpecParseError), } diff --git a/scylla-cql/src/frame/response/result.rs b/scylla-cql/src/frame/response/result.rs index 3a351b434..54a1567c7 100644 --- a/scylla-cql/src/frame/response/result.rs +++ b/scylla-cql/src/frame/response/result.rs @@ -1,8 +1,10 @@ use crate::cql_to_rust::{FromRow, FromRowError}; use crate::frame::frame_errors::{ ColumnSpecParseError, ColumnSpecParseErrorKind, CqlResultParseError, CqlTypeParseError, - LowLevelDeserializationError, PreparedParseError, ResultMetadataParseError, RowsParseError, - SchemaChangeEventParseError, SetKeyspaceParseError, TableSpecParseError, + LowLevelDeserializationError, PreparedMetadataParseError, PreparedParseError, + RawRowsAndPagingStateResponseParseError, ResultMetadataAndRowsCountParseError, + ResultMetadataParseError, SchemaChangeEventParseError, SetKeyspaceParseError, + TableSpecParseError, }; use crate::frame::request::query::PagingStateResponse; use crate::frame::response::event::SchemaChangeEvent; @@ -614,7 +616,7 @@ impl Row { /// /// Flags and paging state are deserialized, remaining part of metadata /// as well as rows remain serialized. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RawMetadataAndRawRows { // Already deserialized part of metadata: col_count: usize, @@ -1142,20 +1144,20 @@ impl RawMetadataAndRawRows { fn deserialize( frame: &mut FrameSlice, cached_metadata: Option>>, - ) -> StdResult<(Self, PagingStateResponse), RowsParseError> { + ) -> StdResult<(Self, PagingStateResponse), RawRowsAndPagingStateResponseParseError> { let flags = types::read_int(frame.as_slice_mut()) - .map_err(|err| ResultMetadataParseError::FlagsParseError(err.into()))?; + .map_err(|err| RawRowsAndPagingStateResponseParseError::FlagsParseError(err.into()))?; let global_tables_spec = flags & 0x0001 != 0; let has_more_pages = flags & 0x0002 != 0; let no_metadata = flags & 0x0004 != 0; let col_count = types::read_int_length(frame.as_slice_mut()) - .map_err(ResultMetadataParseError::ColumnCountParseError)?; + .map_err(RawRowsAndPagingStateResponseParseError::ColumnCountParseError)?; let raw_paging_state = has_more_pages .then(|| { types::read_bytes(frame.as_slice_mut()) - .map_err(ResultMetadataParseError::PagingStateParseError) + .map_err(RawRowsAndPagingStateResponseParseError::PagingStateParseError) }) .transpose()?; @@ -1186,29 +1188,22 @@ impl RawMetadataAndRawRows { fn metadata_deserializer( col_count: usize, global_tables_spec: bool, - ) -> impl for<'frame> FnOnce(&mut &'frame [u8]) -> StdResult, RowsParseError> - { + ) -> impl for<'frame> FnOnce( + &mut &'frame [u8], + ) -> StdResult, ResultMetadataParseError> { move |buf| { let server_metadata = { let global_table_spec = global_tables_spec .then(|| deser_table_spec(buf)) - .transpose() - .map_err(ResultMetadataParseError::from)?; + .transpose()?; - let col_specs = deser_col_specs_borrowed(buf, global_table_spec, col_count) - .map_err(ResultMetadataParseError::from)?; + let col_specs = deser_col_specs_borrowed(buf, global_table_spec, col_count)?; ResultMetadata { col_count, col_specs, } }; - if server_metadata.col_count() != server_metadata.col_specs().len() { - return Err(RowsParseError::ColumnCountMismatch { - col_count: server_metadata.col_count(), - col_specs_count: server_metadata.col_specs().len(), - }); - } Ok(server_metadata) } } @@ -1217,7 +1212,9 @@ impl RawMetadataAndRawRows { /// /// If metadata is cached (in the PreparedStatement), it is reused (shared) from cache /// instead of deserializing. - pub fn deserialize_metadata(self) -> StdResult { + pub fn deserialize_metadata( + self, + ) -> StdResult { let (metadata_deserialized, row_count_and_raw_rows) = match self.cached_metadata { Some(cached) if self.no_metadata => { // Server sent no metadata, but we have metadata cached. This means that we asked the server @@ -1262,7 +1259,7 @@ impl RawMetadataAndRawRows { let mut frame_slice = FrameSlice::new(&row_count_and_raw_rows); let rows_count: usize = types::read_int_length(frame_slice.as_slice_mut()) - .map_err(RowsParseError::RowsCountParseError)?; + .map_err(ResultMetadataAndRowsCountParseError::RowsCountParseError)?; Ok(DeserializedMetadataAndRawRows { metadata: metadata_deserialized, @@ -1274,22 +1271,22 @@ impl RawMetadataAndRawRows { fn deser_prepared_metadata( buf: &mut &[u8], -) -> StdResult { +) -> StdResult { let flags = types::read_int(buf) - .map_err(|err| ResultMetadataParseError::FlagsParseError(err.into()))?; + .map_err(|err| PreparedMetadataParseError::FlagsParseError(err.into()))?; let global_tables_spec = flags & 0x0001 != 0; let col_count = - types::read_int_length(buf).map_err(ResultMetadataParseError::ColumnCountParseError)?; + types::read_int_length(buf).map_err(PreparedMetadataParseError::ColumnCountParseError)?; let pk_count: usize = - types::read_int_length(buf).map_err(ResultMetadataParseError::PkCountParseError)?; + types::read_int_length(buf).map_err(PreparedMetadataParseError::PkCountParseError)?; let mut pk_indexes = Vec::with_capacity(pk_count); for i in 0..pk_count { pk_indexes.push(PartitionKeyIndex { index: types::read_short(buf) - .map_err(|err| ResultMetadataParseError::PkIndexParseError(err.into()))? + .map_err(|err| PreparedMetadataParseError::PkIndexParseError(err.into()))? as u16, sequence: i as u16, }); @@ -1473,7 +1470,8 @@ pub fn deser_cql_value( fn deser_rows( buf_bytes: Bytes, cached_metadata: Option<&Arc>>, -) -> StdResult<(RawMetadataAndRawRows, PagingStateResponse), RowsParseError> { +) -> StdResult<(RawMetadataAndRawRows, PagingStateResponse), RawRowsAndPagingStateResponseParseError> +{ let mut frame_slice = FrameSlice::new(&buf_bytes); RawMetadataAndRawRows::deserialize(&mut frame_slice, cached_metadata.cloned()) } diff --git a/scylla-cql/src/types/deserialize/row.rs b/scylla-cql/src/types/deserialize/row.rs index ad6514518..8de97489d 100644 --- a/scylla-cql/src/types/deserialize/row.rs +++ b/scylla-cql/src/types/deserialize/row.rs @@ -347,7 +347,7 @@ pub enum BuiltinTypeCheckErrorKind { /// Duplicated column in DB metadata. DuplicatedColumn { - /// Column index of the second occurence of the column with the same name. + /// Column index of the second occurrence of the column with the same name. column_index: usize, /// The name of the duplicated column. @@ -401,7 +401,7 @@ impl Display for BuiltinTypeCheckErrorKind { ), BuiltinTypeCheckErrorKind::DuplicatedColumn { column_name, column_index } => write!( f, - "column {} occurs more than once in DB metadata; second occurence is at column index {}", + "column {} occurs more than once in DB metadata; second occurrence is at column index {}", column_name, column_index, ), diff --git a/scylla-cql/src/types/deserialize/value.rs b/scylla-cql/src/types/deserialize/value.rs index b07c6eb47..492984c3f 100644 --- a/scylla-cql/src/types/deserialize/value.rs +++ b/scylla-cql/src/types/deserialize/value.rs @@ -1572,7 +1572,7 @@ pub enum TupleTypeCheckErrorKind { /// The index of the field whose type check failed. position: usize, - /// The type check error that occured. + /// The type check error that occurred. err: TypeCheckError, }, } @@ -1651,7 +1651,7 @@ pub enum UdtTypeCheckErrorKind { /// The name of the field whose type check failed. field_name: String, - /// Inner type check error that occured. + /// Inner type check error that occurred. err: TypeCheckError, }, } diff --git a/scylla/src/lib.rs b/scylla/src/lib.rs index 8b62c0c2b..39387f621 100644 --- a/scylla/src/lib.rs +++ b/scylla/src/lib.rs @@ -82,12 +82,9 @@ //! .await? //! .into_rows_result()?; //! -//! -//! if let Some(rows) = query_rows { -//! for row in rows.rows()? { -//! // Parse row as int and text \ -//! let (int_val, text_val): (i32, &str) = row?; -//! } +//! for row in query_rows.rows()? { +//! // Parse row as int and text \ +//! let (int_val, text_val): (i32, &str) = row?; //! } //! # Ok(()) //! # } diff --git a/scylla/src/transport/caching_session.rs b/scylla/src/transport/caching_session.rs index 192ad6dd4..108752e4c 100644 --- a/scylla/src/transport/caching_session.rs +++ b/scylla/src/transport/caching_session.rs @@ -428,7 +428,7 @@ mod tests { .execute_unpaged("select * from test_table", &[]) .await .unwrap(); - let result_rows = result.into_rows_result().unwrap().unwrap(); + let result_rows = result.into_rows_result().unwrap(); assert_eq!(1, session.cache.len()); assert_eq!(1, result_rows.rows_num()); @@ -438,7 +438,7 @@ mod tests { .await .unwrap(); - let result_rows = result.into_rows_result().unwrap().unwrap(); + let result_rows = result.into_rows_result().unwrap(); assert_eq!(1, session.cache.len()); assert_eq!(1, result_rows.rows_num()); @@ -485,7 +485,7 @@ mod tests { .unwrap(); assert_eq!(1, session.cache.len()); - assert_eq!(1, result.into_rows_result().unwrap().unwrap().rows_num()); + assert_eq!(1, result.into_rows_result().unwrap().rows_num()); } async fn assert_test_batch_table_rows_contain( @@ -498,7 +498,6 @@ mod tests { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(i32, i32)>() .unwrap() .map(|r| r.unwrap()) @@ -710,7 +709,6 @@ mod tests { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(i32, i64)>() .unwrap() .collect::, _>>() diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 185286d2f..26e41ae72 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -1434,10 +1434,12 @@ impl Connection { let (version_id,) = self .query_unpaged(LOCAL_VERSION) .await? - .into_rows_result()? - .ok_or(QueryError::ProtocolError( - ProtocolError::SchemaVersionFetch(SchemaVersionFetchError::ResultNotRows), - ))? + .into_rows_result() + .map_err(|err| { + QueryError::ProtocolError(ProtocolError::SchemaVersionFetch( + SchemaVersionFetchError::TracesEventsIntoRowsResultError(err), + )) + })? .single_row::<(Uuid,)>() .map_err(|err| { ProtocolError::SchemaVersionFetch(SchemaVersionFetchError::SingleRowError(err)) @@ -2620,7 +2622,6 @@ mod tests { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(i32, Vec)>() .unwrap() .collect::, _>>() diff --git a/scylla/src/transport/cql_collections_test.rs b/scylla/src/transport/cql_collections_test.rs index 475bd47ee..9cdb34ce5 100644 --- a/scylla/src/transport/cql_collections_test.rs +++ b/scylla/src/transport/cql_collections_test.rs @@ -52,7 +52,6 @@ async fn insert_and_select( .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(SelectT,)>() .unwrap() .0; diff --git a/scylla/src/transport/cql_types_test.rs b/scylla/src/transport/cql_types_test.rs index 2863df76c..30f406a1d 100644 --- a/scylla/src/transport/cql_types_test.rs +++ b/scylla/src/transport/cql_types_test.rs @@ -101,7 +101,6 @@ where .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(T,)>() .unwrap() .map(Result::unwrap) @@ -222,7 +221,6 @@ async fn test_cql_varint() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(CqlVarint,)>() .unwrap() .map(Result::unwrap) @@ -300,7 +298,6 @@ async fn test_counter() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(Counter,)>() .unwrap() .map(Result::unwrap) @@ -379,7 +376,6 @@ async fn test_naive_date_04() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(NaiveDate,)>() .unwrap() .next() @@ -405,7 +401,6 @@ async fn test_naive_date_04() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(NaiveDate,)>() .unwrap(); assert_eq!(read_date, *naive_date); @@ -447,7 +442,6 @@ async fn test_cql_date() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(CqlDate,)>() .unwrap(); @@ -533,7 +527,6 @@ async fn test_date_03() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .first_row::<(Date,)>() .ok() .map(|val| val.0); @@ -556,7 +549,6 @@ async fn test_date_03() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .first_row::<(Date,)>() .unwrap(); assert_eq!(read_date, *date); @@ -602,7 +594,6 @@ async fn test_cql_time() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(CqlTime,)>() .unwrap(); @@ -623,7 +614,6 @@ async fn test_cql_time() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(CqlTime,)>() .unwrap(); @@ -704,7 +694,6 @@ async fn test_naive_time_04() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .first_row::<(NaiveTime,)>() .unwrap(); @@ -725,7 +714,6 @@ async fn test_naive_time_04() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .first_row::<(NaiveTime,)>() .unwrap(); assert_eq!(read_time, *time); @@ -790,7 +778,6 @@ async fn test_time_03() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .first_row::<(Time,)>() .unwrap(); @@ -811,7 +798,6 @@ async fn test_time_03() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .first_row::<(Time,)>() .unwrap(); assert_eq!(read_time, *time); @@ -867,7 +853,6 @@ async fn test_cql_timestamp() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(CqlTimestamp,)>() .unwrap(); @@ -888,7 +873,6 @@ async fn test_cql_timestamp() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(CqlTimestamp,)>() .unwrap(); @@ -968,7 +952,6 @@ async fn test_date_time_04() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .first_row::<(DateTime,)>() .unwrap(); @@ -989,7 +972,6 @@ async fn test_date_time_04() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .first_row::<(DateTime,)>() .unwrap(); assert_eq!(read_datetime, *datetime); @@ -1020,7 +1002,6 @@ async fn test_date_time_04() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .first_row::<(DateTime,)>() .unwrap(); assert_eq!(read_datetime, nanosecond_precision_1st_half_rounded); @@ -1049,7 +1030,6 @@ async fn test_date_time_04() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .first_row::<(DateTime,)>() .unwrap(); assert_eq!(read_datetime, nanosecond_precision_2nd_half_rounded); @@ -1141,7 +1121,6 @@ async fn test_offset_date_time_03() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .first_row::<(OffsetDateTime,)>() .unwrap(); @@ -1162,7 +1141,6 @@ async fn test_offset_date_time_03() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .first_row::<(OffsetDateTime,)>() .unwrap(); assert_eq!(read_datetime, *datetime); @@ -1193,7 +1171,6 @@ async fn test_offset_date_time_03() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .first_row::<(OffsetDateTime,)>() .unwrap(); assert_eq!(read_datetime, nanosecond_precision_1st_half_rounded); @@ -1222,7 +1199,6 @@ async fn test_offset_date_time_03() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .first_row::<(OffsetDateTime,)>() .unwrap(); assert_eq!(read_datetime, nanosecond_precision_2nd_half_rounded); @@ -1274,7 +1250,6 @@ async fn test_timeuuid() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(CqlTimeuuid,)>() .unwrap(); @@ -1296,7 +1271,6 @@ async fn test_timeuuid() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(CqlTimeuuid,)>() .unwrap(); @@ -1368,7 +1342,6 @@ async fn test_timeuuid_ordering() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(CqlTimeuuid,)>() .unwrap() .map(|r| r.unwrap().0) @@ -1450,7 +1423,6 @@ async fn test_inet() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(IpAddr,)>() .unwrap(); @@ -1468,7 +1440,6 @@ async fn test_inet() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(IpAddr,)>() .unwrap(); @@ -1522,7 +1493,6 @@ async fn test_blob() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(Vec,)>() .unwrap(); @@ -1540,7 +1510,6 @@ async fn test_blob() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(Vec,)>() .unwrap(); @@ -1631,7 +1600,6 @@ async fn test_udt_after_schema_update() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(UdtV1,)>() .unwrap(); @@ -1651,7 +1619,6 @@ async fn test_udt_after_schema_update() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(UdtV1,)>() .unwrap(); @@ -1675,7 +1642,6 @@ async fn test_udt_after_schema_update() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(UdtV2,)>() .unwrap(); @@ -1708,7 +1674,6 @@ async fn test_empty() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .first_row::<(CqlValue,)>() .unwrap(); @@ -1728,7 +1693,6 @@ async fn test_empty() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .first_row::<(CqlValue,)>() .unwrap(); @@ -1817,7 +1781,6 @@ async fn test_udt_with_missing_field() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(TR,)>() .unwrap() .0; diff --git a/scylla/src/transport/cql_value_test.rs b/scylla/src/transport/cql_value_test.rs index c5c2eedd5..932b72934 100644 --- a/scylla/src/transport/cql_value_test.rs +++ b/scylla/src/transport/cql_value_test.rs @@ -62,7 +62,6 @@ async fn test_cqlvalue_udt() { .await .unwrap() .into_rows_result() - .unwrap() .unwrap(); let (received_udt_cql_value,) = rows_result.single_row::<(CqlValue,)>().unwrap(); @@ -115,7 +114,6 @@ async fn test_cqlvalue_duration() { .await .unwrap() .into_rows_result() - .unwrap() .unwrap(); let mut rows_iter = rows_result.rows::<(CqlValue,)>().unwrap(); diff --git a/scylla/src/transport/errors.rs b/scylla/src/transport/errors.rs index 349d968a4..778f33f29 100644 --- a/scylla/src/transport/errors.rs +++ b/scylla/src/transport/errors.rs @@ -18,7 +18,7 @@ use scylla_cql::{ CqlAuthChallengeParseError, CqlAuthSuccessParseError, CqlAuthenticateParseError, CqlErrorParseError, CqlEventParseError, CqlRequestSerializationError, CqlResponseParseError, CqlResultParseError, CqlSupportedParseError, - FrameBodyExtensionsParseError, FrameHeaderParseError, RowsParseError, + FrameBodyExtensionsParseError, FrameHeaderParseError, }, request::CqlRequestKind, response::CqlResponseKind, @@ -34,7 +34,11 @@ use thiserror::Error; use crate::{authentication::AuthError, frame::response}; -use super::query_result::SingleRowError; +use super::{ + iterator::NextRowError, + legacy_query_result::IntoLegacyQueryResultError, + query_result::{IntoRowsResultError, SingleRowError}, +}; /// Error that occurred during query execution #[derive(Error, Debug, Clone)] @@ -100,6 +104,19 @@ pub enum QueryError { /// Client timeout occurred before any response arrived #[error("Request timeout: {0}")] RequestTimeout(String), + + // TODO: This should not belong here, but it requires changes to error types + // returned in async iterator API. This should be handled in separate PR. + // The reason this needs to be included is that topology.rs makes use of iter API and returns QueryError. + // Once iter API is adjusted, we can then adjust errors returned by topology module (e.g. refactor MetadataError and not include it in QueryError). + /// An error occurred during async iteration over rows of result. + #[error("An error occurred during async iteration over rows of result: {0}")] + NextRowError(#[from] NextRowError), + + /// Failed to convert [`QueryResult`][crate::transport::query_result::QueryResult] + /// into [`LegacyQueryResult`][crate::transport::legacy_query_result::LegacyQueryResult]. + #[error("Failed to convert `QueryResult` into `LegacyQueryResult`: {0}")] + IntoLegacyQueryResultError(#[from] IntoLegacyQueryResultError), } impl From for QueryError { @@ -164,6 +181,10 @@ impl From for NewSessionError { QueryError::BrokenConnection(e) => NewSessionError::BrokenConnection(e), QueryError::UnableToAllocStreamId => NewSessionError::UnableToAllocStreamId, QueryError::RequestTimeout(msg) => NewSessionError::RequestTimeout(msg), + QueryError::IntoLegacyQueryResultError(e) => { + NewSessionError::IntoLegacyQueryResultError(e) + } + QueryError::NextRowError(e) => NewSessionError::NextRowError(e), } } } @@ -180,18 +201,6 @@ impl From for QueryError { } } -impl From for QueryError { - fn from(err: RowsParseError) -> Self { - let err: CqlResultParseError = err.into(); - let err: CqlResponseParseError = err.into(); - let err: RequestError = err.into(); - let err: UserRequestError = err.into(); - let err: QueryError = err.into(); - - err - } -} - /// Error that occurred during session creation #[derive(Error, Debug, Clone)] #[non_exhaustive] @@ -266,6 +275,19 @@ pub enum NewSessionError { /// during `Session` creation. #[error("Client timeout: {0}")] RequestTimeout(String), + + // TODO: This should not belong here, but it requires changes to error types + // returned in async iterator API. This should be handled in separate PR. + // The reason this needs to be included is that topology.rs makes use of iter API and returns QueryError. + // Once iter API is adjusted, we can then adjust errors returned by topology module (e.g. refactor MetadataError and not include it in QueryError). + /// An error occurred during async iteration over rows of result. + #[error("An error occurred during async iteration over rows of result: {0}")] + NextRowError(#[from] NextRowError), + + /// Failed to convert [`QueryResult`][crate::transport::query_result::QueryResult] + /// into [`LegacyQueryResult`][crate::transport::legacy_query_result::LegacyQueryResult]. + #[error("Failed to convert `QueryResult` into `LegacyQueryResult`: {0}")] + IntoLegacyQueryResultError(#[from] IntoLegacyQueryResultError), } /// A protocol error. @@ -351,8 +373,11 @@ pub enum UseKeyspaceProtocolError { #[derive(Error, Debug, Clone)] #[non_exhaustive] pub enum SchemaVersionFetchError { - #[error("Schema version query returned non-rows result")] - ResultNotRows, + /// Failed to convert schema version query result into rows result. + #[error("Failed to convert schema version query result into rows result: {0}")] + TracesEventsIntoRowsResultError(IntoRowsResultError), + + /// Failed to deserialize a single row from schema version query response. #[error(transparent)] SingleRowError(SingleRowError), } @@ -361,9 +386,9 @@ pub enum SchemaVersionFetchError { #[derive(Error, Debug, Clone)] #[non_exhaustive] pub enum TracingProtocolError { - /// Response to system_traces.session is not RESULT:Rows. - #[error("Response to system_traces.session is not RESULT:Rows")] - TracesSessionNotRows, + /// Failed to convert result of system_traces.session query to rows result. + #[error("Failed to convert result of system_traces.session query to rows result")] + TracesSessionIntoRowsResultError(IntoRowsResultError), /// system_traces.session has invalid column type. #[error("system_traces.session has invalid column type: {0}")] @@ -373,9 +398,9 @@ pub enum TracingProtocolError { #[error("Response to system_traces.session failed to deserialize: {0}")] TracesSessionDeserializationFailed(DeserializationError), - /// Response to system_traces.events is not RESULT:Rows. - #[error("Response to system_traces.events is not RESULT:Rows")] - TracesEventsNotRows, + /// Failed to convert result of system_traces.events query to rows result. + #[error("Failed to convert result of system_traces.events query to rows result")] + TracesEventsIntoRowsResultError(IntoRowsResultError), /// system_traces.events has invalid column type. #[error("system_traces.events has invalid column type: {0}")] @@ -431,6 +456,14 @@ pub enum MetadataError { #[derive(Error, Debug, Clone)] #[non_exhaustive] pub enum PeersMetadataError { + /// system.peers has invalid column type. + #[error("system.peers has invalid column type: {0}")] + SystemPeersInvalidColumnType(TypeCheckError), + + /// system.local has invalid column type. + #[error("system.local has invalid column type: {0}")] + SystemLocalInvalidColumnType(TypeCheckError), + /// Empty peers list returned during peers metadata fetch. #[error("Peers list is empty")] EmptyPeers, diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index a46de9d42..bda210b31 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -8,12 +8,12 @@ use std::sync::Arc; use std::task::{Context, Poll}; use futures::Stream; -use scylla_cql::frame::frame_errors::RowsParseError; +use scylla_cql::frame::frame_errors::ResultMetadataAndRowsCountParseError; use scylla_cql::frame::response::result::RawMetadataAndRawRows; use scylla_cql::frame::response::NonErrorResponse; use scylla_cql::types::deserialize::result::RawRowLendingIterator; use scylla_cql::types::deserialize::row::{ColumnIterator, DeserializeRow}; -use scylla_cql::types::deserialize::TypeCheckError; +use scylla_cql::types::deserialize::{DeserializationError, TypeCheckError}; use scylla_cql::types::serialize::row::SerializedValues; use std::result::Result; use thiserror::Error; @@ -587,7 +587,7 @@ impl QueryPager { self.current_page .next() .unwrap() - .map_err(|e| RowsParseError::from(e).into()), + .map_err(|err| NextRowError::RowDeserializationError(err).into()), ) } @@ -622,7 +622,14 @@ impl QueryPager { let mut s = self.as_mut(); let received_page = ready_some_ok!(Pin::new(&mut s.page_receiver).poll_recv(cx)); - let raw_rows_with_deserialized_metadata = received_page.rows.deserialize_metadata()?; + + // TODO: see my other comment next to QueryError::NextRowError + // This is the place where conversion happens. To fix this, we need to refactor error types in iterator API. + // The `page_receiver`'s error type should be narrowed from QueryError to some other error type. + let raw_rows_with_deserialized_metadata = + received_page.rows.deserialize_metadata().map_err(|err| { + NextRowError::NextPageError(NextPageError::ResultMetadataParseError(err)) + })?; s.current_page = RawRowLendingIterator::new(raw_rows_with_deserialized_metadata); if let Some(tracing_id) = received_page.tracing_id { @@ -935,7 +942,10 @@ impl QueryPager { // - That future is polled in a tokio::task which isn't going to be // cancelled let page_received = receiver.recv().await.unwrap()?; - let raw_rows_with_deserialized_metadata = page_received.rows.deserialize_metadata()?; + let raw_rows_with_deserialized_metadata = + page_received.rows.deserialize_metadata().map_err(|err| { + NextRowError::NextPageError(NextPageError::ResultMetadataParseError(err)) + })?; Ok(Self { current_page: RawRowLendingIterator::new(raw_rows_with_deserialized_metadata), @@ -1018,7 +1028,7 @@ where self.raw_row_lending_stream.next().await.map(|res| { res.and_then(|column_iterator| { ::deserialize(column_iterator) - .map_err(|err| RowsParseError::from(err).into()) + .map_err(|err| NextRowError::RowDeserializationError(err).into()) }) }) }; @@ -1029,6 +1039,31 @@ where } } +/// An error returned that occurred during next page fetch. +#[derive(Error, Debug, Clone)] +#[non_exhaustive] +pub enum NextPageError { + /// Failed to deserialize result metadata associated with next page response. + #[error("Failed to deserialize result metadata associated with next page response: {0}")] + ResultMetadataParseError(#[from] ResultMetadataAndRowsCountParseError), + // TODO: This should also include a variant representing an error that occurred during + // query that fetches the next page. However, as of now, it would require that we include QueryError here. + // This would introduce a cyclic dependency: QueryError -> NextRowError -> NextPageError -> QueryError. +} + +/// An error returned by async iterator API. +#[derive(Error, Debug, Clone)] +#[non_exhaustive] +pub enum NextRowError { + /// Failed to fetch next page of result. + #[error("Failed to fetch next page of result: {0}")] + NextPageError(#[from] NextPageError), + + /// An error occurred during row deserialization. + #[error("Row deserialization error: {0}")] + RowDeserializationError(#[from] DeserializationError), +} + mod legacy { use super::*; @@ -1040,7 +1075,7 @@ mod legacy { } impl Stream for LegacyRowIterator { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut s = self.as_mut(); @@ -1050,8 +1085,8 @@ mod legacy { let next_column_iter = ready_some_ok!(next_fut.poll(cx)); - let next_ready_row = - Row::deserialize(next_column_iter).map_err(|e| RowsParseError::from(e).into()); + let next_ready_row = Row::deserialize(next_column_iter) + .map_err(LegacyNextRowError::RowDeserializationError); Poll::Ready(Some(next_ready_row)) } @@ -1104,7 +1139,7 @@ mod legacy { /// Couldn't get next typed row from the iterator #[derive(Error, Debug, Clone)] - pub enum NextRowError { + pub enum LegacyNextRowError { /// Query to fetch next page has failed #[error(transparent)] QueryError(#[from] QueryError), @@ -1112,12 +1147,16 @@ mod legacy { /// Parsing values in row as given types failed #[error(transparent)] FromRowError(#[from] FromRowError), + + /// Row deserialization error + #[error("Row deserialization error: {0}")] + RowDeserializationError(#[from] DeserializationError), } /// Fetching pages is asynchronous so `LegacyTypedRowIterator` does not implement the `Iterator` trait.\ /// Instead it uses the asynchronous `Stream` trait impl Stream for LegacyTypedRowIterator { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut s = self.as_mut(); @@ -1131,4 +1170,4 @@ mod legacy { // LegacyTypedRowIterator can be moved freely for any RowT so it's Unpin impl Unpin for LegacyTypedRowIterator {} } -pub use legacy::{LegacyRowIterator, LegacyTypedRowIterator, NextRowError}; +pub use legacy::{LegacyNextRowError, LegacyRowIterator, LegacyTypedRowIterator}; diff --git a/scylla/src/transport/legacy_query_result.rs b/scylla/src/transport/legacy_query_result.rs index 46818a297..8dd6b7214 100644 --- a/scylla/src/transport/legacy_query_result.rs +++ b/scylla/src/transport/legacy_query_result.rs @@ -1,7 +1,9 @@ use crate::frame::response::cql_to_rust::{FromRow, FromRowError}; use crate::frame::response::result::ColumnSpec; use crate::frame::response::result::Row; +use scylla_cql::frame::frame_errors::ResultMetadataAndRowsCountParseError; use scylla_cql::frame::response::result::{self, ResultMetadataHolder}; +use scylla_cql::types::deserialize::{DeserializationError, TypeCheckError}; use thiserror::Error; use uuid::Uuid; @@ -174,6 +176,24 @@ impl LegacyQueryResult { } } +/// An error that occurred during [`QueryResult`](crate::transport::query_result::QueryResult) +/// to [`LegacyQueryResult`] conversion. +#[non_exhaustive] +#[derive(Error, Clone, Debug)] +pub enum IntoLegacyQueryResultError { + /// Failed to lazily deserialize result metadata. + #[error("Failed to lazily deserialize result metadata: {0}")] + ResultMetadataAndRowsCountParseError(#[from] ResultMetadataAndRowsCountParseError), + + /// Failed to perform the typecheck against [`Row`] type. + #[error("Typecheck error: {0}")] + TypecheckError(#[from] TypeCheckError), + + /// Failed to deserialize rows. + #[error("Failed to deserialize rows: {0}")] + DeserializationError(#[from] DeserializationError), +} + /// [`LegacyQueryResult::rows()`](LegacyQueryResult::rows) or a similar function called on a bad LegacyQueryResult.\ /// Expected `LegacyQueryResult.rows` to be `Some`, but it was `None`.\ /// `LegacyQueryResult.rows` is `Some` for queries that can return rows (e.g `SELECT`).\ diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 51db7f97f..b445dea5f 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -2860,7 +2860,9 @@ mod latency_awareness { | QueryError::MetadataError(_) | QueryError::ProtocolError(_) | QueryError::TimeoutError - | QueryError::RequestTimeout(_) => true, + | QueryError::RequestTimeout(_) + | QueryError::NextRowError(_) + | QueryError::IntoLegacyQueryResultError(_) => true, } } } diff --git a/scylla/src/transport/query_result.rs b/scylla/src/transport/query_result.rs index 52326ba32..9a7745c50 100644 --- a/scylla/src/transport/query_result.rs +++ b/scylla/src/transport/query_result.rs @@ -3,7 +3,7 @@ use std::fmt::Debug; use thiserror::Error; use uuid::Uuid; -use scylla_cql::frame::frame_errors::RowsParseError; +use scylla_cql::frame::frame_errors::ResultMetadataAndRowsCountParseError; use scylla_cql::frame::response::result::{ ColumnSpec, ColumnType, DeserializedMetadataAndRawRows, RawMetadataAndRawRows, Row, TableSpec, }; @@ -11,7 +11,7 @@ use scylla_cql::types::deserialize::result::TypedRowIterator; use scylla_cql::types::deserialize::row::DeserializeRow; use scylla_cql::types::deserialize::{DeserializationError, TypeCheckError}; -use super::legacy_query_result::LegacyQueryResult; +use super::legacy_query_result::{IntoLegacyQueryResultError, LegacyQueryResult}; /// A view over specification of a table in the database. #[derive(Debug, Clone, Copy)] @@ -133,7 +133,7 @@ impl<'res> ColumnSpecs<'res> { /// /// NOTE: this is a result of a single CQL request. If you use paging for your query, /// this will contain exactly one page. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct QueryResult { raw_metadata_and_rows: Option, tracing_id: Option, @@ -203,47 +203,62 @@ impl QueryResult { /// Transforms itself into the Rows result type to enable deserializing rows. /// Deserializes result metadata and allocates it. /// - /// Returns `None` if the response is not of Rows kind. + /// Returns an error if the response is not of Rows kind or metadata deserialization failed. /// /// ```rust /// # use scylla::transport::query_result::{QueryResult, QueryRowsResult}; /// # fn example(query_result: QueryResult) -> Result<(), Box> { - /// let maybe_rows_result = query_result.into_rows_result()?; - /// if let Some(rows_result) = maybe_rows_result { - /// let mut rows_iter = rows_result.rows::<(i32, &str)>()?; - /// while let Some((num, text)) = rows_iter.next().transpose()? { - /// // do something with `num` and `text`` - /// } - /// } else { - /// // Response was not Result:Rows, but some other kind of Result. + /// let rows_result = query_result.into_rows_result()?; + /// + /// let mut rows_iter = rows_result.rows::<(i32, &str)>()?; + /// while let Some((num, text)) = rows_iter.next().transpose()? { + /// // do something with `num` and `text`` /// } /// /// Ok(()) /// # } /// /// ``` - pub fn into_rows_result(self) -> Result, RowsParseError> { - let QueryResult { - raw_metadata_and_rows, - tracing_id, + /// + /// If the response is not of Rows kind, the original [`QueryResult`] (self) is + /// returned back to the user in the error type. See [`IntoRowsResultError`] documentation. + /// + /// ```rust + /// # use scylla::transport::query_result::{QueryResult, QueryRowsResult, IntoRowsResultError}; + /// # fn example(non_rows_query_result: QueryResult) -> Result<(), Box> { + /// let err = non_rows_query_result.into_rows_result().unwrap_err(); + /// + /// match err { + /// IntoRowsResultError::ResultNotRows(query_result) => { + /// // do something with original `query_result` + /// } + /// _ => { + /// // deserialization failed - query result is not recovered + /// } + /// } + /// + /// Ok(()) + /// # } + /// ``` + pub fn into_rows_result(self) -> Result { + let Some(raw_metadata_and_rows) = self.raw_metadata_and_rows else { + return Err(IntoRowsResultError::ResultNotRows(self)); + }; + let tracing_id = self.tracing_id; + let warnings = self.warnings; + + let raw_rows_with_metadata = raw_metadata_and_rows.deserialize_metadata()?; + Ok(QueryRowsResult { + raw_rows_with_metadata, warnings, - } = self; - raw_metadata_and_rows - .map(|raw_rows| { - let raw_rows_with_metadata = raw_rows.deserialize_metadata()?; - Ok(QueryRowsResult { - raw_rows_with_metadata, - warnings, - tracing_id, - }) - }) - .transpose() + tracing_id, + }) } /// Transforms itself into the legacy result type, by eagerly deserializing rows /// into the Row type. This is inefficient, and should only be used during transition /// period to the new API. - pub fn into_legacy_result(self) -> Result { + pub fn into_legacy_result(self) -> Result { if let Some(raw_rows) = self.raw_metadata_and_rows { let raw_rows_with_metadata = raw_rows.deserialize_metadata()?; @@ -289,14 +304,11 @@ impl QueryResult { /// ```rust /// # use scylla::transport::query_result::QueryResult; /// # fn example(query_result: QueryResult) -> Result<(), Box> { -/// let maybe_rows_result = query_result.into_rows_result()?; -/// if let Some(rows_result) = maybe_rows_result { -/// let mut rows_iter = rows_result.rows::<(i32, &str)>()?; -/// while let Some((num, text)) = rows_iter.next().transpose()? { -/// // do something with `num` and `text`` -/// } -/// } else { -/// // Response was not Result:Rows, but some other kind of Result. +/// let rows_result = query_result.into_rows_result()?; +/// +/// let mut rows_iter = rows_result.rows::<(i32, &str)>()?; +/// while let Some((num, text)) = rows_iter.next().transpose()? { +/// // do something with `num` and `text`` /// } /// /// Ok(()) @@ -417,6 +429,22 @@ impl QueryRowsResult { } } +/// An error returned by [`QueryResult::into_rows_result`] +/// +/// The `ResultNotRows` variant contains original [`QueryResult`], +/// which otherwise would be consumed and lost. +#[derive(Debug, Error, Clone)] +pub enum IntoRowsResultError { + /// Result is not of Rows kind + #[error("Result is not of Rows kind")] + ResultNotRows(QueryResult), + + // transparent because the underlying error provides enough context. + /// Failed to lazily deserialize result metadata. + #[error(transparent)] + ResultMetadataLazyDeserializationError(#[from] ResultMetadataAndRowsCountParseError), +} + /// An error returned by [`QueryRowsResult::rows`]. #[derive(Debug, Error)] pub enum RowsError { @@ -564,8 +592,8 @@ mod tests { // Not RESULT::Rows response -> no column specs { let rqr = QueryResult::new(None, None, Vec::new()); - let qr = rqr.into_rows_result().unwrap(); - assert_matches!(qr, None); + let qr = rqr.into_rows_result(); + assert_matches!(qr, Err(IntoRowsResultError::ResultNotRows(_))); } // RESULT::Rows response -> some column specs @@ -575,7 +603,7 @@ mod tests { let rr = RawMetadataAndRawRows::new_for_test(None, Some(metadata), false, 0, &[]) .unwrap(); let rqr = QueryResult::new(Some(rr), None, Vec::new()); - let qr = rqr.into_rows_result().unwrap().unwrap(); + let qr = rqr.into_rows_result().unwrap(); let column_specs = qr.column_specs(); assert_eq!(column_specs.len(), n); @@ -622,8 +650,8 @@ mod tests { // Not RESULT::Rows { let rqr = QueryResult::new(None, None, Vec::new()); - let qr = rqr.into_rows_result().unwrap(); - assert_matches!(qr, None); + let qr = rqr.into_rows_result(); + assert_matches!(qr, Err(IntoRowsResultError::ResultNotRows(_))); } // RESULT::Rows with 0 rows @@ -632,7 +660,7 @@ mod tests { let rqr = QueryResult::new(Some(rr), None, Vec::new()); assert_matches!(rqr.result_not_rows(), Err(ResultNotRowsError)); - let qr = rqr.into_rows_result().unwrap().unwrap(); + let qr = rqr.into_rows_result().unwrap(); // Type check error { @@ -678,8 +706,8 @@ mod tests { assert_matches!(rqr.result_not_rows(), Err(ResultNotRowsError)); } - let qr_good_data = rqr_good_data.into_rows_result().unwrap().unwrap(); - let qr_bad_data = rqr_bad_data.into_rows_result().unwrap().unwrap(); + let qr_good_data = rqr_good_data.into_rows_result().unwrap(); + let qr_bad_data = rqr_bad_data.into_rows_result().unwrap(); for qr in [&qr_good_data, &qr_bad_data] { // Type check error @@ -735,7 +763,7 @@ mod tests { let rqr = QueryResult::new(Some(rr), None, Vec::new()); assert_matches!(rqr.result_not_rows(), Err(ResultNotRowsError)); - let qr = rqr.into_rows_result().unwrap().unwrap(); + let qr = rqr.into_rows_result().unwrap(); // Type check error { @@ -771,4 +799,40 @@ mod tests { } } } + + #[test] + fn test_query_result_returns_self_if_not_rows() { + // Check tracing ID + for tracing_id in [None, Some(Uuid::from_u128(0x_feed_dead))] { + let qr = QueryResult::new(None, tracing_id, vec![]); + let err = qr.into_rows_result().unwrap_err(); + match err { + IntoRowsResultError::ResultNotRows(query_result) => { + assert_eq!(query_result.tracing_id, tracing_id) + } + IntoRowsResultError::ResultMetadataLazyDeserializationError(_) => { + panic!("Expected ResultNotRows error") + } + } + } + + // Check warnings + { + let warnings = &["Ooops", "Meltdown..."]; + let qr = QueryResult::new( + None, + None, + warnings.iter().copied().map(String::from).collect(), + ); + let err = qr.into_rows_result().unwrap_err(); + match err { + IntoRowsResultError::ResultNotRows(query_result) => { + assert_eq!(query_result.warnings().collect_vec(), warnings) + } + IntoRowsResultError::ResultMetadataLazyDeserializationError(_) => { + panic!("Expected ResultNotRows error") + } + } + } + } } diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index 4db0bbde2..2ed5ca901 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -514,11 +514,9 @@ impl GenericSession { /// .await? /// .into_rows_result()?; /// - /// if let Some(rows) = query_rows { - /// for row in rows.rows()? { - /// // Parse row as int and text. - /// let (int_val, text_val): (i32, &str) = row?; - /// } + /// for row in query_rows.rows()? { + /// // Parse row as int and text. + /// let (int_val, text_val): (i32, &str) = row?; /// } /// # Ok(()) /// # } @@ -562,7 +560,6 @@ impl GenericSession { /// // Do something with a single page of results. /// for row in res /// .into_rows_result()? - /// .unwrap() /// .rows::<(i32, &str)>()? /// { /// let (a, b) = row?; @@ -725,7 +722,6 @@ impl GenericSession { /// // Do something with a single page of results. /// for row in res /// .into_rows_result()? - /// .unwrap() /// .rows::<(i32, &str)>()? /// { /// let (a, b) = row?; @@ -1799,10 +1795,10 @@ where // Get tracing info let maybe_tracing_info: Option = traces_session_res - .into_rows_result()? - .ok_or(ProtocolError::Tracing( - TracingProtocolError::TracesSessionNotRows, - ))? + .into_rows_result() + .map_err(|err| { + ProtocolError::Tracing(TracingProtocolError::TracesSessionIntoRowsResultError(err)) + })? .maybe_first_row() .map_err(|err| match err { MaybeFirstRowError::TypeCheckFailed(e) => { @@ -1819,12 +1815,9 @@ where }; // Get tracing events - let tracing_event_rows_result = - traces_events_res - .into_rows_result()? - .ok_or(ProtocolError::Tracing( - TracingProtocolError::TracesEventsNotRows, - ))?; + let tracing_event_rows_result = traces_events_res.into_rows_result().map_err(|err| { + ProtocolError::Tracing(TracingProtocolError::TracesEventsIntoRowsResultError(err)) + })?; let tracing_event_rows = tracing_event_rows_result.rows().map_err(|err| match err { RowsError::TypeCheckFailed(err) => { ProtocolError::Tracing(TracingProtocolError::TracesEventsInvalidColumnType(err)) diff --git a/scylla/src/transport/session_test.rs b/scylla/src/transport/session_test.rs index 6c4beeb4a..8027360a7 100644 --- a/scylla/src/transport/session_test.rs +++ b/scylla/src/transport/session_test.rs @@ -109,7 +109,7 @@ async fn test_unprepared_statement() { .await .unwrap(); - let rows = query_result.into_rows_result().unwrap().unwrap(); + let rows = query_result.into_rows_result().unwrap(); let col_specs = rows.column_specs(); assert_eq!(col_specs.get_by_name("a").unwrap().0, 0); @@ -154,7 +154,6 @@ async fn test_unprepared_statement() { let mut page_results = rs_manual .into_rows_result() .unwrap() - .unwrap() .rows::<(i32, i32, String)>() .unwrap() .collect::, _>>() @@ -244,7 +243,6 @@ async fn test_prepared_statement() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(i64,)>() .unwrap(); let token = Token::new(value); @@ -266,7 +264,6 @@ async fn test_prepared_statement() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(i64,)>() .unwrap(); let token = Token::new(value); @@ -291,7 +288,6 @@ async fn test_prepared_statement() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(i32, i32, String)>() .unwrap() .collect::, _>>() @@ -312,7 +308,6 @@ async fn test_prepared_statement() { let mut page_results = rs_manual .into_rows_result() .unwrap() - .unwrap() .rows::<(i32, i32, String)>() .unwrap() .collect::, _>>() @@ -336,7 +331,6 @@ async fn test_prepared_statement() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(i32, i32, String, i32, Option)>() .unwrap(); assert!(e.is_none()); @@ -385,7 +379,6 @@ async fn test_prepared_statement() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row() .unwrap(); assert_eq!(input, output) @@ -509,7 +502,6 @@ async fn test_batch() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(i32, i32, String)>() .unwrap() .collect::>() @@ -549,7 +541,6 @@ async fn test_batch() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(i32, i32, String)>() .unwrap() .collect::>() @@ -605,7 +596,6 @@ async fn test_token_calculation() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row::<(i64,)>() .unwrap(); let token = Token::new(value); @@ -716,7 +706,6 @@ async fn test_use_keyspace() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(String,)>() .unwrap() .map(|res| res.unwrap().0) @@ -768,7 +757,6 @@ async fn test_use_keyspace() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(String,)>() .unwrap() .map(|res| res.unwrap().0) @@ -831,7 +819,6 @@ async fn test_use_keyspace_case_sensitivity() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(String,)>() .unwrap() .map(|row| row.unwrap().0) @@ -849,7 +836,6 @@ async fn test_use_keyspace_case_sensitivity() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(String,)>() .unwrap() .map(|row| row.unwrap().0) @@ -893,7 +879,6 @@ async fn test_raw_use_keyspace() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(String,)>() .unwrap() .map(|res| res.unwrap().0) @@ -1188,7 +1173,6 @@ async fn assert_in_tracing_table(session: &Session, tracing_uuid: Uuid) { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows_num(); if rows_num > 0 { // Ok there was some row for this tracing_uuid @@ -1302,7 +1286,6 @@ async fn test_timestamp() { .await .unwrap() .into_rows_result() - .unwrap() .unwrap(); let mut results = query_rows_result @@ -1961,7 +1944,6 @@ async fn test_named_bind_markers() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(i32, i32, i32)>() .unwrap() .map(|res| res.unwrap()) @@ -2115,7 +2097,6 @@ async fn test_unprepared_reprepare_in_execute() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(i32, i32, i32)>() .unwrap() .map(|r| r.unwrap()) @@ -2173,7 +2154,6 @@ async fn test_unusual_valuelists() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(i32, i32, String)>() .unwrap() .map(|r| r.unwrap()) @@ -2247,7 +2227,6 @@ async fn test_unprepared_reprepare_in_batch() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(i32, i32, i32)>() .unwrap() .map(|r| r.unwrap()) @@ -2317,7 +2296,6 @@ async fn test_unprepared_reprepare_in_caching_session_execute() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(i32, i32, i32)>() .unwrap() .map(|r| r.unwrap()) @@ -2387,7 +2365,6 @@ async fn assert_test_batch_table_rows_contain(sess: &Session, expected_rows: &[( .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(i32, i32)>() .unwrap() .map(|r| r.unwrap()) @@ -2616,7 +2593,7 @@ async fn test_batch_lwts() { batch.append_statement("UPDATE tab SET r1 = 1 WHERE p1 = 0 AND c1 = 0 IF r2 = 0"); let batch_res: QueryResult = session.batch(&batch, ((), (), ())).await.unwrap(); - let batch_deserializer = batch_res.into_rows_result().unwrap().unwrap(); + let batch_deserializer = batch_res.into_rows_result().unwrap(); // Scylla returns 5 columns, but Cassandra returns only 1 let is_scylla: bool = batch_deserializer.column_specs().len() == 5; @@ -2660,7 +2637,6 @@ async fn test_batch_lwts_for_scylla( prepared_batch_res .into_rows_result() .unwrap() - .unwrap() .rows() .unwrap() .map(|r| r.unwrap()) @@ -2705,7 +2681,6 @@ async fn test_batch_lwts_for_cassandra( prepared_batch_res .into_rows_result() .unwrap() - .unwrap() .rows() .unwrap() .map(|r| r.unwrap()) @@ -2972,7 +2947,6 @@ async fn simple_strategy_test() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(i32, i32, i32)>() .unwrap() .map(|r| r.unwrap()) @@ -3128,7 +3102,6 @@ async fn test_deserialize_empty_collections() { .await .unwrap() .into_rows_result() - .unwrap() .unwrap(); let (collection,) = query_rows_result.first_row::<(Collection,)>().unwrap(); diff --git a/scylla/src/transport/silent_prepare_batch_test.rs b/scylla/src/transport/silent_prepare_batch_test.rs index 48c0dc1f1..bca8ef183 100644 --- a/scylla/src/transport/silent_prepare_batch_test.rs +++ b/scylla/src/transport/silent_prepare_batch_test.rs @@ -98,7 +98,6 @@ async fn assert_test_batch_table_rows_contain(sess: &Session, expected_rows: &[( .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::<(i32, i32)>() .unwrap() .map(|r| r.unwrap()) diff --git a/scylla/src/transport/speculative_execution.rs b/scylla/src/transport/speculative_execution.rs index d66a9bb6f..60344d0a0 100644 --- a/scylla/src/transport/speculative_execution.rs +++ b/scylla/src/transport/speculative_execution.rs @@ -110,7 +110,9 @@ fn can_be_ignored(result: &Result) -> bool { QueryError::EmptyPlan => false, // Errors that should not appear here, thus should not be ignored - QueryError::TimeoutError + QueryError::NextRowError(_) + | QueryError::IntoLegacyQueryResultError(_) + | QueryError::TimeoutError | QueryError::RequestTimeout(_) | QueryError::MetadataError(_) => false, diff --git a/scylla/src/transport/topology.rs b/scylla/src/transport/topology.rs index ab29cd46b..2b83b7ade 100644 --- a/scylla/src/transport/topology.rs +++ b/scylla/src/transport/topology.rs @@ -15,7 +15,6 @@ use futures::stream::{self, StreamExt, TryStreamExt}; use futures::Stream; use rand::seq::SliceRandom; use rand::{thread_rng, Rng}; -use scylla_cql::frame::frame_errors::RowsParseError; use scylla_cql::types::deserialize::TypeCheckError; use scylla_macros::DeserializeRow; use std::borrow::BorrowMut; @@ -806,9 +805,9 @@ async fn query_peers(conn: &Arc, connect_port: u16) -> Result() - .map_err(RowsParseError::from)?; + let rows_stream = pager.rows_stream::().map_err(|err| { + MetadataError::Peers(PeersMetadataError::SystemPeersInvalidColumnType(err)) + })?; Ok::<_, QueryError>(rows_stream) }) .into_stream() @@ -823,9 +822,9 @@ async fn query_peers(conn: &Arc, connect_port: u16) -> Result() - .map_err(RowsParseError::from)?; + let rows_stream = pager.rows_stream::().map_err(|err| { + MetadataError::Peers(PeersMetadataError::SystemLocalInvalidColumnType(err)) + })?; Ok::<_, QueryError>(rows_stream) }) .into_stream() diff --git a/scylla/src/utils/test_utils.rs b/scylla/src/utils/test_utils.rs index 2a7a21f69..d15284d61 100644 --- a/scylla/src/utils/test_utils.rs +++ b/scylla/src/utils/test_utils.rs @@ -50,7 +50,6 @@ pub(crate) async fn supports_feature(session: &Session, feature: &str) -> bool { .unwrap() .into_rows_result() .unwrap() - .unwrap() .single_row() .unwrap(); @@ -108,10 +107,9 @@ pub async fn scylla_supports_tablets(session: &Session) -> bool { ) .await .unwrap() - .into_rows_result() - .unwrap(); + .into_rows_result(); - result.map_or(false, |rows_result| rows_result.single_row::().is_ok()) + result.is_ok_and(|rows_result| rows_result.single_row::().is_ok()) } #[cfg(test)] diff --git a/scylla/tests/integration/skip_metadata_optimization.rs b/scylla/tests/integration/skip_metadata_optimization.rs index 17f595400..dba646e89 100644 --- a/scylla/tests/integration/skip_metadata_optimization.rs +++ b/scylla/tests/integration/skip_metadata_optimization.rs @@ -115,7 +115,6 @@ async fn test_skip_result_metadata() { .unwrap() .into_rows_result() .unwrap() - .unwrap() .rows::() .unwrap() .collect::, _>>() @@ -134,7 +133,6 @@ async fn test_skip_result_metadata() { .unwrap(); results_from_manual_paging.extend( rs_manual.into_rows_result() - .unwrap() .unwrap() .rows::() .unwrap()