Skip to content

Commit

Permalink
feat: Add delete stream feature back. (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
YoEight authored Jan 9, 2025
1 parent 70a6c7c commit 0d6e86d
Show file tree
Hide file tree
Showing 25 changed files with 639 additions and 443 deletions.
51 changes: 34 additions & 17 deletions geth-client-tests/src/append_read_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use uuid::Uuid;

use geth_client::GrpcClient;
use geth_common::{
AppendError, AppendStreamCompleted, Client, Direction, ExpectedRevision, Propose, Revision,
AppendError, AppendStreamCompleted, Client, ContentType, Direction, ExpectedRevision, Propose,
Revision,
};

use crate::tests::{client_endpoint, random_valid_options, Toto};
Expand All @@ -22,7 +23,8 @@ async fn simple_append() -> eyre::Result<()> {
tokio::spawn(geth_engine::run(options.clone()));

let stream_name: String = Name().fake();
let event_type: String = Name().fake();
let class: String = Name().fake();
let content_type = ContentType::Json;
let event_id = Uuid::new_v4();
let expected: Toto = Faker.fake();

Expand All @@ -32,7 +34,8 @@ async fn simple_append() -> eyre::Result<()> {
ExpectedRevision::Any,
vec![Propose {
id: event_id,
r#type: event_type.clone(),
content_type,
class: class.clone(),
data: serde_json::to_vec(&expected)?.into(),
}],
)
Expand All @@ -50,13 +53,15 @@ async fn simple_append() -> eyre::Result<()> {

let mut stream = client
.read_stream(&stream_name, Direction::Forward, Revision::Start, 1)
.await;
.await?
.success()?;

let event = stream.try_next().await?.unwrap();

assert_eq!(event_id, event.id);
assert_eq!(event_type, event.r#type);
assert_eq!(content_type, event.content_type);
assert_eq!(stream_name, event.stream_name);
assert_eq!(class, event.class);
assert_eq!(0, event.revision);

let actual = serde_json::from_slice::<Toto>(&event.data)?;
Expand All @@ -76,7 +81,8 @@ async fn simple_append_expecting_no_stream_on_non_existing_stream() -> eyre::Res
tokio::spawn(geth_engine::run(options.clone()));

let stream_name: String = Name().fake();
let event_type: String = Name().fake();
let content_type = ContentType::Json;
let class: String = Name().fake();
let event_id = Uuid::new_v4();
let expected: Toto = Faker.fake();

Expand All @@ -86,7 +92,8 @@ async fn simple_append_expecting_no_stream_on_non_existing_stream() -> eyre::Res
ExpectedRevision::NoStream,
vec![Propose {
id: event_id,
r#type: event_type.clone(),
content_type,
class,
data: serde_json::to_vec(&expected)?.into(),
}],
)
Expand Down Expand Up @@ -114,7 +121,8 @@ async fn simple_append_expecting_existence_on_non_existing_stream() -> eyre::Res
tokio::spawn(geth_engine::run(options.clone()));

let stream_name: String = Name().fake();
let event_type: String = Name().fake();
let content_type = ContentType::Json;
let class: String = Name().fake();
let event_id = Uuid::new_v4();
let expected: Toto = Faker.fake();

Expand All @@ -124,7 +132,8 @@ async fn simple_append_expecting_existence_on_non_existing_stream() -> eyre::Res
ExpectedRevision::StreamExists,
vec![Propose {
id: event_id,
r#type: event_type.clone(),
content_type,
class,
data: serde_json::to_vec(&expected)?.into(),
}],
)
Expand All @@ -151,7 +160,8 @@ async fn simple_append_expecting_revision_on_non_existing_stream() -> eyre::Resu
tokio::spawn(geth_engine::run(options.clone()));

let stream_name: String = Name().fake();
let event_type: String = Name().fake();
let content_type = ContentType::Json;
let class: String = Name().fake();
let event_id = Uuid::new_v4();
let expected: Toto = Faker.fake();

Expand All @@ -161,7 +171,8 @@ async fn simple_append_expecting_revision_on_non_existing_stream() -> eyre::Resu
ExpectedRevision::Revision(42),
vec![Propose {
id: event_id,
r#type: event_type.clone(),
content_type,
class,
data: serde_json::to_vec(&expected)?.into(),
}],
)
Expand Down Expand Up @@ -192,7 +203,8 @@ async fn simple_append_expecting_revision_on_existing_stream() -> eyre::Result<(
tokio::spawn(geth_engine::run(options.clone()));

let stream_name: String = Name().fake();
let event_type: String = Name().fake();
let content_type = ContentType::Json;
let class: String = Name().fake();
let event_id = Uuid::new_v4();
let expected: Toto = Faker.fake();

Expand All @@ -202,7 +214,8 @@ async fn simple_append_expecting_revision_on_existing_stream() -> eyre::Result<(
ExpectedRevision::Revision(42),
vec![Propose {
id: event_id,
r#type: event_type.clone(),
content_type,
class,
data: serde_json::to_vec(&expected)?.into(),
}],
)
Expand Down Expand Up @@ -233,15 +246,17 @@ async fn read_whole_stream_forward() -> eyre::Result<()> {
tokio::spawn(geth_engine::run(options.clone()));

let stream_name: String = Name().fake();
let event_type: String = Name().fake();
let class: String = Name().fake();
let content_type = ContentType::Json;
let expected: Toto = Faker.fake();
let data: Bytes = serde_json::to_vec(&expected)?.into();
let mut events = vec![];

for _ in 0..100 {
events.push(Propose {
id: Uuid::new_v4(),
r#type: event_type.clone(),
content_type,
class: class.clone(),
data: data.clone(),
});
}
Expand All @@ -252,7 +267,8 @@ async fn read_whole_stream_forward() -> eyre::Result<()> {

let stream = client
.read_stream(&stream_name, Direction::Forward, Revision::Start, u64::MAX)
.await;
.await?
.success()?;

let actuals = stream.try_collect::<Vec<_>>().await?;

Expand All @@ -263,9 +279,10 @@ async fn read_whole_stream_forward() -> eyre::Result<()> {
let actual = actuals.get(i).unwrap();

assert_eq!(expected.id, actual.id);
assert_eq!(expected.r#type, actual.r#type);
assert_eq!(expected.content_type, actual.content_type);
assert_eq!(expected.data, actual.data);
assert_eq!(stream_name, actual.stream_name);
assert_eq!(class, actual.class);
}

Ok(())
Expand Down
51 changes: 51 additions & 0 deletions geth-client-tests/src/delete_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use std::u64;

use bytes::Bytes;
use fake::{faker::name::en::Name, Fake};
use geth_client::GrpcClient;
use geth_common::{Client, ContentType, Direction, ExpectedRevision, Propose, Revision};
use temp_dir::TempDir;
use uuid::Uuid;

use crate::tests::{client_endpoint, random_valid_options};

#[tokio::test]
async fn simple_delete() -> eyre::Result<()> {
let db_dir = TempDir::new()?;
let options = random_valid_options(&db_dir);

let client = GrpcClient::new(client_endpoint(&options));
tokio::spawn(geth_engine::run(options.clone()));

let stream_name: String = Name().fake();
let class: String = Name().fake();
let content_type = ContentType::Binary;
let event_id = Uuid::new_v4();

client
.append_stream(
&stream_name,
ExpectedRevision::Any,
vec![Propose {
id: event_id,
content_type,
class: class.clone(),
data: Bytes::default(),
}],
)
.await?
.success()?;

client
.delete_stream(&stream_name, ExpectedRevision::Any)
.await?
.success()?;

let stream = client
.read_stream(&stream_name, Direction::Forward, Revision::Start, u64::MAX)
.await?;

assert!(stream.is_stream_deleted());

Ok(())
}
3 changes: 3 additions & 0 deletions geth-client-tests/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#[cfg(test)]
mod append_read_tests;

#[cfg(test)]
mod delete_tests;

#[cfg(test)]
pub mod tests {
use fake::{Dummy, Fake};
Expand Down
79 changes: 53 additions & 26 deletions geth-client/src/next/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use uuid::Uuid;
use geth_common::{
AppendStream, AppendStreamCompleted, Client, DeleteStream, DeleteStreamCompleted, Direction,
EndPoint, ExpectedRevision, GetProgram, KillProgram, ListPrograms, Operation, ProgramKilled,
ProgramObtained, ProgramSummary, Propose, ReadStream, Record, Reply, Revision, StreamRead,
Subscribe, SubscribeToProgram, SubscribeToStream, SubscriptionEvent, UnsubscribeReason,
ProgramObtained, ProgramSummary, Propose, ReadStream, ReadStreamCompleted, Record, Reply,
Revision, StreamRead, Subscribe, SubscribeToProgram, SubscribeToStream, SubscriptionEvent,
UnsubscribeReason,
};

use crate::next::driver::Driver;
Expand Down Expand Up @@ -118,40 +119,66 @@ impl Client for GrpcClient {
direction: Direction,
revision: Revision<u64>,
max_count: u64,
) -> BoxStream<'static, eyre::Result<Record>> {
let outcome = self
) -> eyre::Result<ReadStreamCompleted<BoxStream<'static, eyre::Result<Record>>>> {
let mut task = self
.mailbox
.send_operation(Operation::ReadStream(ReadStream {
stream_name: stream_id.to_string(),
direction,
revision,
max_count,
}))
.await;
.await?;

let stream_id = stream_id.to_string();
Box::pin(async_stream::try_stream! {
let mut task = outcome?;
while let Some(event) = task.recv().await? {
match event {
Reply::StreamRead(read) => match read {
StreamRead::EventAppeared(record) => {
yield record;
}

StreamRead::Error(e) => {
read_error(&stream_id, e)?;
}

StreamRead::EndOfStream => break,
if let Some(event) = task.recv().await? {
match event {
Reply::StreamRead(event) => match event {
StreamRead::EventAppeared(record) => {
let stream_id = stream_id.to_string();
Ok(ReadStreamCompleted::Success(Box::pin(
async_stream::try_stream! {
yield record;
while let Some(event) = task.recv().await? {
match event {
Reply::StreamRead(read) => match read {
StreamRead::EventAppeared(record) => {
yield record;
}

StreamRead::Unexpected(e) => {
read_error(&stream_id, e)?;
}

StreamRead::EndOfStream => break,

_ => {
unexpected_reply_when_reading(&stream_id)?;
}
}

_ => {
unexpected_reply_when_reading(&stream_id)?;
}
}
}
},
)))
}

_ => {
unexpected_reply_when_reading(&stream_id)?;
}
}
StreamRead::StreamDeleted => Ok(ReadStreamCompleted::StreamDeleted),

StreamRead::Unexpected(e) => Err(e),

StreamRead::EndOfStream => Ok(ReadStreamCompleted::Success(Box::pin(
futures_util::stream::empty(),
))),
},

_ => unexpected_reply_when_reading(stream_id),
}
})
} else {
eyre::bail!("multiplex process is no longer reachable");
}
}

async fn subscribe_to_stream(
Expand Down Expand Up @@ -284,7 +311,7 @@ fn produce_subscription_stream<'a>(
})
}

fn unexpected_reply_when_reading(stream_id: &str) -> eyre::Result<()> {
fn unexpected_reply_when_reading<A>(stream_id: &str) -> eyre::Result<A> {
eyre::bail!("unexpected reply when reading: {}", stream_id)
}

Expand Down
7 changes: 4 additions & 3 deletions geth-common/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use uuid::Uuid;

use crate::{
AppendStreamCompleted, DeleteStreamCompleted, Direction, ExpectedRevision, ProgramKilled,
ProgramObtained, ProgramSummary, Propose, Record, Revision, SubscriptionConfirmation,
ProgramObtained, ProgramSummary, Propose, ReadStreamCompleted, Record, Revision,
SubscriptionConfirmation,
};

pub enum SubscriptionEvent {
Expand Down Expand Up @@ -37,7 +38,7 @@ pub trait Client {
direction: Direction,
revision: Revision<u64>,
max_count: u64,
) -> BoxStream<'static, eyre::Result<Record>>;
) -> eyre::Result<ReadStreamCompleted<BoxStream<'static, eyre::Result<Record>>>>;

async fn subscribe_to_stream(
&self,
Expand Down Expand Up @@ -86,7 +87,7 @@ where
direction: Direction,
revision: Revision<u64>,
max_count: u64,
) -> BoxStream<'static, eyre::Result<Record>> {
) -> eyre::Result<ReadStreamCompleted<BoxStream<'static, eyre::Result<Record>>>> {
self.as_ref()
.read_stream(stream_id, direction, revision, max_count)
.await
Expand Down
Loading

0 comments on commit 0d6e86d

Please sign in to comment.