Skip to content

Commit

Permalink
use fail
Browse files Browse the repository at this point in the history
  • Loading branch information
arvid220u committed Jun 30, 2022
1 parent d220c14 commit 2a144b8
Showing 1 changed file with 88 additions and 88 deletions.
176 changes: 88 additions & 88 deletions daemon/db/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1351,12 +1351,12 @@ impl DB {

pub fn receive_ack(&self, uid: i32, ack: i32) -> Result<bool, DbError> {
let mut conn = self.connect()?;
use crate::schema::complete_friend;
use crate::schema::friend;
use crate::schema::outgoing_async_invitation;
use crate::schema::outgoing_chunk;
use crate::schema::sent;
use crate::schema::transmission;
use crate::schema::friend;
use crate::schema::complete_friend;
use crate::schema::outgoing_async_invitation;

self.check_rep(&mut conn);

Expand All @@ -1368,20 +1368,19 @@ impl DB {
.set(transmission::sent_acked_seqnum.eq(ack))
.execute(conn_b)?;
// Special case: this is an ACK to an outgoing request system message.
let chunk_acked : (bool, ffi::SystemMessage) = outgoing_chunk::table
let chunk_acked: (bool, ffi::SystemMessage) = outgoing_chunk::table
.filter(outgoing_chunk::to_friend.eq(uid))
.filter(outgoing_chunk::sequence_number.eq(ack))
.select((outgoing_chunk::system,
outgoing_chunk::system_message))
.select((outgoing_chunk::system, outgoing_chunk::system_message))
.first::<(bool, ffi::SystemMessage)>(conn_b)?;
if (chunk_acked.0, chunk_acked.1) == (true, ffi::SystemMessage::OutgoingInvitation) {
// This is a system control message for an outgoing invitation.
// In this case, we become complete friends with the other party.
// ======================
// To deduplicate, we first check that the uid is in the outgoing async table
let invite = outgoing_async_invitation::table
.filter(outgoing_async_invitation::friend_uid.eq(uid))
.load::<ffi::JustOutgoingAsyncInvitation>(conn_b)?;
.filter(outgoing_async_invitation::friend_uid.eq(uid))
.load::<ffi::JustOutgoingAsyncInvitation>(conn_b)?;
// if invite.len() == 0, then we need to do nothing
// since the invite has already been approved before
if invite.len() == 1 {
Expand All @@ -1405,9 +1404,10 @@ impl DB {
.execute(conn_b)?;
// remove the invite from the table
diesel::delete(
outgoing_async_invitation::table.filter(
outgoing_async_invitation::friend_uid.eq(uid))
).execute(conn_b)?;
outgoing_async_invitation::table
.filter(outgoing_async_invitation::friend_uid.eq(uid)),
)
.execute(conn_b)?;
}
// ======================
}
Expand All @@ -1430,7 +1430,6 @@ impl DB {
.select(sent::uid)
.load::<i32>(conn_b)?;


// we use ii to make sure that times are guaranteed to be unique
for (ii, uid) in (0_i64..).zip(newly_delivered.into_iter()) {
diesel::update(sent::table.find(uid))
Expand Down Expand Up @@ -2261,10 +2260,9 @@ impl DB {
}
use rand::seq::SliceRandom;
let ack_index_opt = possible_ack_indices.choose(&mut rand::thread_rng());
let ack_index = ack_index_opt.ok_or(diesel::result::Error::RollbackTransaction)
.map_err(|e| {
anyhow::Error::new(e).context("failed to choose ack index")
})?;
let ack_index = ack_index_opt
.ok_or(diesel::result::Error::RollbackTransaction)
.map_err(|e| anyhow::Error::new(e).context("failed to choose ack index"))?;
diesel::insert_into(transmission::table)
.values((
transmission::friend_uid.eq(friend_uid),
Expand All @@ -2275,8 +2273,9 @@ impl DB {
transmission::sent_acked_seqnum.eq(0),
transmission::received_seqnum.eq(0),
))
.execute(conn_b).context("Fail to create transmission record")?;
Ok(())
.execute(conn_b)
.context("Fail to create transmission record")?;
Ok(())
})
}

Expand Down Expand Up @@ -2328,7 +2327,8 @@ impl DB {
friend::invitation_progress,
friend::deleted,
))
.get_result::<ffi::Friend>(conn_b).context("Fail to insert friend into friend::table")?;
.get_result::<ffi::Friend>(conn_b)
.context("Fail to insert friend into friend::table")?;

diesel::insert_into(outgoing_sync_invitation::table)
.values((
Expand All @@ -2337,16 +2337,17 @@ impl DB {
outgoing_sync_invitation::kx_public_key.eq(kx_public_key.clone()),
outgoing_sync_invitation::sent_at.eq(util::unix_micros_now()),
))
.execute(conn_b).context("Fail to insert into outgoing_sync_invitation::table")?;
.execute(conn_b)
.context("Fail to insert into outgoing_sync_invitation::table")?;

self.create_transmission_record(
conn_b,
friend.uid,
read_index,
read_key,
write_key,
max_friends,
)?;
self.create_transmission_record(
conn_b,
friend.uid,
read_index,
read_key,
write_key,
max_friends,
)?;

let my_public_id = self.get_public_id(conn_b)?;

Expand All @@ -2358,11 +2359,12 @@ impl DB {
outgoing_chunk::sequence_number.eq(new_seqnum),
outgoing_chunk::chunks_start_sequence_number.eq(new_seqnum),
outgoing_chunk::message_uid.eq::<Option<i32>>(None), //waived
outgoing_chunk::content.eq(my_public_id), // the content is public_id
outgoing_chunk::content.eq(my_public_id), // the content is public_id
outgoing_chunk::system.eq(true),
outgoing_chunk::system_message.eq(ffi::SystemMessage::OutgoingInvitation),
))
.execute(conn_b).context("Fail to insert into outgoing_chunk::table")?;
.execute(conn_b)
.context("Fail to insert into outgoing_chunk::table")?;

Ok(friend)
});
Expand Down Expand Up @@ -2764,71 +2766,69 @@ impl DB {
deleted: false,
};
// we change the progress field to Complete, meaning that the friend is approved
let r = conn
.transaction::<_, anyhow::Error, _>(|conn_b| {
let can_add =
self.can_add_friend(conn_b, unique_name, kx_public_key.clone(), max_friends)?;
if !can_add {
// return an anyhow error
anyhow::anyhow!("no free ack index");
}
// we create a new friend
let friend = diesel::insert_into(friend::table)
.values(&friend_fragment)
.returning((
friend::uid,
friend::unique_name,
friend::display_name,
friend::invitation_progress,
friend::deleted,
))
.get_result::<ffi::Friend>(conn_b)?;
let r = conn.transaction::<_, anyhow::Error, _>(|conn_b| {
let can_add = self.can_add_friend(conn_b, unique_name, kx_public_key.clone(), max_friends)?;
if !can_add {
// return an anyhow error
return Err(anyhow::anyhow!("no free ack index"));
}
// we create a new friend
let friend = diesel::insert_into(friend::table)
.values(&friend_fragment)
.returning((
friend::uid,
friend::unique_name,
friend::display_name,
friend::invitation_progress,
friend::deleted,
))
.get_result::<ffi::Friend>(conn_b)?;

let inc_invitation = incoming_invitation::table
.filter(incoming_invitation::public_id.eq(public_id.to_string()))
.get_result::<ffi::IncomingInvitation>(conn_b)?;
let inc_invitation = incoming_invitation::table
.filter(incoming_invitation::public_id.eq(public_id.to_string()))
.get_result::<ffi::IncomingInvitation>(conn_b)?;

diesel::delete(incoming_invitation::table.find(public_id)).execute(conn_b)?;
diesel::delete(incoming_invitation::table.find(public_id)).execute(conn_b)?;

diesel::insert_into(complete_friend::table)
.values((
complete_friend::friend_uid.eq(friend.uid),
complete_friend::public_id.eq(public_id),
complete_friend::invitation_public_key.eq(invitation_public_key),
complete_friend::kx_public_key.eq(kx_public_key),
complete_friend::completed_at.eq(util::unix_micros_now()),
))
.execute(conn_b)?;
diesel::insert_into(complete_friend::table)
.values((
complete_friend::friend_uid.eq(friend.uid),
complete_friend::public_id.eq(public_id),
complete_friend::invitation_public_key.eq(invitation_public_key),
complete_friend::kx_public_key.eq(kx_public_key),
complete_friend::completed_at.eq(util::unix_micros_now()),
))
.execute(conn_b)?;

// finally, create a message
let message_uid = diesel::insert_into(message::table)
.values((message::content.eq(inc_invitation.message),))
.returning(message::uid)
.get_result::<i32>(conn_b)?;
// finally, create a message
let message_uid = diesel::insert_into(message::table)
.values((message::content.eq(inc_invitation.message),))
.returning(message::uid)
.get_result::<i32>(conn_b)?;

diesel::insert_into(received::table)
.values((
received::uid.eq(message_uid),
received::from_friend.eq(friend.uid),
received::num_chunks.eq(1),
received::received_at.eq(inc_invitation.received_at),
received::delivered.eq(true),
received::delivered_at.eq(util::unix_micros_now()),
received::seen.eq(false),
))
.execute(conn_b)?;
diesel::insert_into(received::table)
.values((
received::uid.eq(message_uid),
received::from_friend.eq(friend.uid),
received::num_chunks.eq(1),
received::received_at.eq(inc_invitation.received_at),
received::delivered.eq(true),
received::delivered_at.eq(util::unix_micros_now()),
received::seen.eq(false),
))
.execute(conn_b)?;

self.create_transmission_record(
conn_b,
friend.uid,
read_index,
read_key,
write_key,
max_friends,
)?;
self.create_transmission_record(
conn_b,
friend.uid,
read_index,
read_key,
write_key,
max_friends,
)?;

Ok(())
});
Ok(())
});

self.check_rep(&mut conn);

Expand Down

0 comments on commit 2a144b8

Please sign in to comment.