Skip to content

Commit

Permalink
Merge pull request #1117 from muzarski/rows-parse-error-refactor
Browse files Browse the repository at this point in the history
errors: replace `RowsParseError` and narrow the return types after deserialization refactor
  • Loading branch information
wprzytula authored Nov 13, 2024
2 parents 8122c65 + 2054163 commit 8253681
Show file tree
Hide file tree
Showing 31 changed files with 420 additions and 305 deletions.
2 changes: 1 addition & 1 deletion docs/source/queries/paged.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ loop {
.execute_single_page(&paged_prepared, &[], paging_state)
.await?;

let rows_res = res.into_rows_result()?.unwrap();
let rows_res = res.into_rows_result()?;

println!(
"Paging state response from the prepared statement execution: {:#?} ({} rows)",
Expand Down
32 changes: 15 additions & 17 deletions docs/source/queries/result.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,18 @@ Additionally, [`QueryResult`](https://docs.rs/scylla/latest/scylla/transport/que
let result = session
.query_unpaged("SELECT a from ks.tab", &[])
.await?
.into_rows_result()?
.unwrap();
.into_rows_result()?;

for row in result.rows::<(i32,)>()? {
let (int_value,): (i32,) = row?;
}

// first_row gets the first row and parses it as the given type
let first_int_val: Option<(i32,)> = session
let first_int_val: (i32,) = session
.query_unpaged("SELECT a from ks.tab", &[])
.await?
.into_rows_result()?
.map(|res| res.first_row::<(i32,)>())
.transpose()?;
.first_row::<(i32,)>()?;

// result_not_rows fails when the response is rows
session.query_unpaged("INSERT INTO ks.tab (a) VALUES (0)", &[]).await?.result_not_rows()?;
Expand All @@ -75,13 +73,13 @@ To properly handle `NULL` values parse column as an `Option<>`:
use scylla::IntoTypedRows;

// Parse row as two columns containing an int and text which might be null
if let Some(rows_result) = session.query_unpaged("SELECT a, b from ks.tab", &[])
let rows_result = session
.query_unpaged("SELECT a, b from ks.tab", &[])
.await?
.into_rows_result()?
{
for row in rows_result.rows::<(i32, Option<&str>)>()? {
let (int_value, str_or_null): (i32, Option<&str>) = row?;
}
.into_rows_result()?;

for row in rows_result.rows::<(i32, Option<&str>)>()? {
let (int_value, str_or_null): (i32, Option<&str>) = row?;
}
# Ok(())
# }
Expand Down Expand Up @@ -111,13 +109,13 @@ struct MyRow {
}

// Parse row as two columns containing an int and text which might be null
if let Some(result_rows) = session.query_unpaged("SELECT a, b from ks.tab", &[])
let result_rows = session
.query_unpaged("SELECT a, b from ks.tab", &[])
.await?
.into_rows_result()?
{
for row in result_rows.rows::<MyRow>()? {
let my_row: MyRow = row?;
}
.into_rows_result()?;

for row in result_rows.rows::<MyRow>()? {
let my_row: MyRow = row?;
}
# Ok(())
# }
Expand Down
4 changes: 2 additions & 2 deletions docs/source/queries/simple.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ use scylla::IntoTypedRows;
// Query rows from the table and print them
let result = session.query_unpaged("SELECT a FROM ks.tab", &[])
.await?
.into_rows_result()?
.unwrap();
.into_rows_result()?;

let mut iter = result.rows::<(i32,)>()?;
while let Some(read_row) = iter.next().transpose()? {
println!("Read a value from row: {}", read_row.0);
Expand Down
1 change: 0 additions & 1 deletion examples/compare-tokens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ async fn main() -> Result<()> {
)
.await?
.into_rows_result()?
.expect("Got not Rows result")
.single_row()?;
assert_eq!(t, qt);
println!("token for {}: {}", pk, t);
Expand Down
43 changes: 22 additions & 21 deletions examples/cqlsh-rs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use rustyline::error::ReadlineError;
use rustyline::{CompletionType, Config, Context, Editor};
use rustyline_derive::{Helper, Highlighter, Hinter, Validator};
use scylla::frame::response::result::Row;
use scylla::transport::query_result::IntoRowsResultError;
use scylla::transport::session::Session;
use scylla::transport::Compression;
use scylla::QueryRowsResult;
use scylla::QueryResult;
use scylla::SessionBuilder;
use std::env;

Expand Down Expand Up @@ -176,24 +177,27 @@ impl Completer for CqlHelper {
}
}

fn print_result(result: Option<&QueryRowsResult>) {
if let Some(rows_result) = result {
for row in rows_result.rows::<Row>().unwrap() {
let row = row.unwrap();
for column in &row.columns {
print!("|");
print!(
" {:16}",
match column {
None => "null".to_owned(),
Some(value) => format!("{:?}", value),
}
);
fn print_result(result: QueryResult) -> Result<(), IntoRowsResultError> {
match result.into_rows_result() {
Ok(rows_result) => {
for row in rows_result.rows::<Row>().unwrap() {
let row = row.unwrap();
for column in &row.columns {
print!("|");
print!(
" {:16}",
match column {
None => "null".to_owned(),
Some(value) => format!("{:?}", value),
}
);
}
println!("|");
}
println!("|")
Ok(())
}
} else {
println!("OK");
Err(IntoRowsResultError::ResultNotRows(_)) => Ok(println!("OK")),
Err(e) => Err(e),
}
}

Expand Down Expand Up @@ -226,10 +230,7 @@ async fn main() -> Result<()> {
let maybe_res = session.query_unpaged(line, &[]).await;
match maybe_res {
Err(err) => println!("Error: {}", err),
Ok(res) => {
let rows_res = res.into_rows_result()?;
print_result(rows_res.as_ref())
}
Ok(res) => print_result(res)?,
}
}
Err(ReadlineError::Interrupted) => continue,
Expand Down
5 changes: 2 additions & 3 deletions examples/custom_deserialization.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{Context, Result};
use anyhow::Result;
use scylla::deserialize::DeserializeValue;
use scylla::frame::response::result::ColumnType;
use scylla::transport::session::Session;
Expand Down Expand Up @@ -55,8 +55,7 @@ async fn main() -> Result<()> {
(),
)
.await?
.into_rows_result()?
.context("Expected Result:Rows response, got a different Result response.")?;
.into_rows_result()?;

let (v,) = rows_result.single_row::<(MyType,)>()?;
assert_eq!(v, MyType("asdf"));
Expand Down
5 changes: 2 additions & 3 deletions examples/get_by_name.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{anyhow, Context as _, Result};
use anyhow::{anyhow, Result};
use scylla::frame::response::result::Row;
use scylla::transport::session::Session;
use scylla::SessionBuilder;
Expand Down Expand Up @@ -39,8 +39,7 @@ async fn main() -> Result<()> {
let rows_result = session
.query_unpaged("SELECT pk, ck, value FROM examples_ks.get_by_name", &[])
.await?
.into_rows_result()?
.context("Response is not of Rows type")?;
.into_rows_result()?;
let col_specs = rows_result.column_specs();
let (ck_idx, _) = col_specs
.get_by_name("ck")
Expand Down
8 changes: 2 additions & 6 deletions examples/select-paging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ async fn main() -> Result<()> {
.query_single_page(paged_query.clone(), &[], paging_state)
.await?;

let res = res
.into_rows_result()?
.expect("Got result different than Rows");
let res = res.into_rows_result()?;

println!(
"Paging state: {:#?} ({} rows)",
Expand Down Expand Up @@ -85,9 +83,7 @@ async fn main() -> Result<()> {
.execute_single_page(&paged_prepared, &[], paging_state)
.await?;

let res = res
.into_rows_result()?
.expect("Got result different than Rows");
let res = res.into_rows_result()?;

println!(
"Paging state from the prepared statement execution: {:#?} ({} rows)",
Expand Down
3 changes: 1 addition & 2 deletions examples/tower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ async fn main() -> anyhow::Result<()> {
let rows_result = session
.call("SELECT keyspace_name, table_name FROM system_schema.tables;".into())
.await?
.into_rows_result()?
.expect("Got result different than Rows");
.into_rows_result()?;

let print_text = |t: &Option<scylla::frame::response::result::CqlValue>| {
t.as_ref()
Expand Down
92 changes: 71 additions & 21 deletions scylla-cql/src/frame/frame_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ pub use super::request::{
use super::response::result::TableSpec;
use super::response::CqlResponseKind;
use super::TryFromPrimitiveError;
use crate::types::deserialize::{DeserializationError, TypeCheckError};
use thiserror::Error;

/// An error returned by `parse_response_body_extensions`.
Expand Down Expand Up @@ -227,7 +226,7 @@ pub enum CqlResultParseError {
#[error("RESULT:Prepared response deserialization failed: {0}")]
PreparedParseError(#[from] PreparedParseError),
#[error("RESULT:Rows response deserialization failed: {0}")]
RowsParseError(#[from] RowsParseError),
RawRowsParseError(#[from] RawRowsAndPagingStateResponseParseError),
}

#[non_exhaustive]
Expand Down Expand Up @@ -301,48 +300,99 @@ pub enum PreparedParseError {
#[error("Invalid result metadata: {0}")]
ResultMetadataParseError(ResultMetadataParseError),
#[error("Invalid prepared metadata: {0}")]
PreparedMetadataParseError(ResultMetadataParseError),
PreparedMetadataParseError(PreparedMetadataParseError),
#[error("Non-zero paging state in result metadata: {0:?}")]
NonZeroPagingState(Arc<[u8]>),
}

/// An error type returned when deserialization
/// of `RESULT::Rows` response fails.
/// An error that occurred during initial deserialization of
/// `RESULT:Rows` response. Since the deserialization of rows is lazy,
/// we initially only need to deserialize:
/// - result metadata flags
/// - column count (result metadata)
/// - paging state response
#[non_exhaustive]
#[derive(Debug, Error, Clone)]
pub enum RowsParseError {
#[error("Invalid result metadata: {0}")]
ResultMetadataParseError(#[from] ResultMetadataParseError),
#[error("Invalid result metadata, server claims {col_count} columns, received {col_specs_count} col specs.")]
ColumnCountMismatch {
col_count: usize,
col_specs_count: usize,
},
#[error("Malformed rows count: {0}")]
RowsCountParseError(LowLevelDeserializationError),
#[error("Data type check prior to deserialization failed: {0}")]
IncomingDataTypeCheckError(#[from] TypeCheckError),
#[error("Data deserialization failed: {0}")]
DataDeserializationError(#[from] DeserializationError),
pub enum RawRowsAndPagingStateResponseParseError {
/// Failed to parse metadata flags.
#[error("Malformed metadata flags: {0}")]
FlagsParseError(LowLevelDeserializationError),

/// Failed to parse column count.
#[error("Malformed column count: {0}")]
ColumnCountParseError(LowLevelDeserializationError),

/// Failed to parse paging state response.
#[error("Malformed paging state: {0}")]
PagingStateParseError(LowLevelDeserializationError),
}

/// An error type returned when deserialization
/// of `[Result/Prepared]Metadata` failed.
/// of statement's prepared metadata failed.
#[non_exhaustive]
#[derive(Error, Debug, Clone)]
pub enum ResultMetadataParseError {
pub enum PreparedMetadataParseError {
/// Failed to parse metadata flags.
#[error("Malformed metadata flags: {0}")]
FlagsParseError(LowLevelDeserializationError),

/// Failed to parse column count.
#[error("Malformed column count: {0}")]
ColumnCountParseError(LowLevelDeserializationError),

/// Failed to parse partition key count.
#[error("Malformed partition key count: {0}")]
PkCountParseError(LowLevelDeserializationError),

/// Failed to parse partition key index.
#[error("Malformed partition key index: {0}")]
PkIndexParseError(LowLevelDeserializationError),

/// Failed to parse global table spec.
#[error("Invalid global table spec: {0}")]
GlobalTableSpecParseError(#[from] TableSpecParseError),

/// Failed to parse column spec.
#[error("Invalid column spec: {0}")]
ColumnSpecParseError(#[from] ColumnSpecParseError),
}

/// An error returned when lazy deserialization of
/// result metadata and rows count fails.
#[non_exhaustive]
#[derive(Error, Debug, Clone)]
pub enum ResultMetadataAndRowsCountParseError {
/// Failed to deserialize result metadata.
#[error("Failed to lazily deserialize result metadata: {0}")]
ResultMetadataParseError(#[from] ResultMetadataParseError),

/// Received malformed rows count from the server.
#[error("Malformed rows count: {0}")]
RowsCountParseError(LowLevelDeserializationError),
}

/// An error type returned when deserialization
/// of result metadata failed.
#[non_exhaustive]
#[derive(Error, Debug, Clone)]
pub enum ResultMetadataParseError {
/// Failed to parse metadata flags.
#[error("Malformed metadata flags: {0}")]
FlagsParseError(LowLevelDeserializationError),

/// Failed to parse column count.
#[error("Malformed column count: {0}")]
ColumnCountParseError(LowLevelDeserializationError),

/// Failed to parse paging state response.
#[error("Malformed paging state: {0}")]
PagingStateParseError(LowLevelDeserializationError),

/// Failed to parse global table spec.
#[error("Invalid global table spec: {0}")]
GlobalTableSpecParseError(#[from] TableSpecParseError),

/// Failed to parse column spec.
#[error("Invalid column spec: {0}")]
ColumnSpecParseError(#[from] ColumnSpecParseError),
}
Expand Down
Loading

0 comments on commit 8253681

Please sign in to comment.