diff --git a/WORKSPACE b/WORKSPACE index f8dce004..32336381 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -7,7 +7,7 @@ load("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository") git_repository( name = "asphr", - commit = "b4b457df06b76da857081570d2fd34af104eb40e", # autoupdate anysphere/asphr + commit = "fd692ae8914992f6c7267e038a771bfed337dd70", # autoupdate anysphere/asphr init_submodules = True, remote = "https://github.com/anysphere/asphr.git", ) diff --git a/daemon/crypto/constants.hpp b/daemon/crypto/constants.hpp index a92e4efd..7ab9f0e9 100644 --- a/daemon/crypto/constants.hpp +++ b/daemon/crypto/constants.hpp @@ -15,13 +15,14 @@ constexpr size_t CRYPTO_NPUBBYTES = // the maximum size of a message such that it can be sent in a single message // if the message is this size or shorter, it is guaranteed to be sent in a // single round. 1+5 is for the uint32 ID, 1+MESSAGE_SIZE is for the header of -// the string, and 1 + 1 + 5 is for the repeated acks containing at least one -// element. -1 at the end is for the padding which reserves one byte. +// the string, and 1 + 5 is for num_chunks and 1 + 5 is for +// chunks_start_sequence_number element. -1 at the end is for the padding which +// reserves one byte. constexpr size_t GUARANTEED_SINGLE_MESSAGE_SIZE = MESSAGE_SIZE - (1 + 5) - (1 + CEIL_DIV((sizeof MESSAGE_SIZE) * 8 - std::countl_zero(MESSAGE_SIZE), 8)) - - (1 + 1 + 5) - CRYPTO_ABYTES - 1 - CRYPTO_NPUBBYTES; + (1 + 5) - (1 + 5) - CRYPTO_ABYTES - 1 - CRYPTO_NPUBBYTES; // we support up to 4 billion messages! that's a lot. // (we use unsigned integers) diff --git a/daemon/crypto/crypto_test.cc b/daemon/crypto/crypto_test.cc index 92e6009c..1ede395a 100644 --- a/daemon/crypto/crypto_test.cc +++ b/daemon/crypto/crypto_test.cc @@ -31,7 +31,6 @@ TEST(CryptoTest, EncryptDecrypt) { EXPECT_EQ(decrypted->msg(), message.msg()); EXPECT_EQ(decrypted->sequence_number(), message.sequence_number()); - EXPECT_EQ(decrypted->acks_size(), message.acks_size()); } TEST(CryptoTest, EncryptDecryptMaxSize) { @@ -61,7 +60,6 @@ TEST(CryptoTest, EncryptDecryptMaxSize) { EXPECT_EQ(decrypted->msg(), message.msg()); EXPECT_EQ(decrypted->sequence_number(), message.sequence_number()); - EXPECT_EQ(decrypted->acks_size(), message.acks_size()); } TEST(CryptoTest, EncryptDecryptBiggerThanMaxSize) { diff --git a/daemon/db/db.rs b/daemon/db/db.rs index 2a1bdd72..4253393b 100644 --- a/daemon/db/db.rs +++ b/daemon/db/db.rs @@ -50,6 +50,9 @@ impl fmt::Display for DbError { } } +// keep this in sync with message.proto +pub const CONTROL_MESSAGE_OUTGOING_FRIEND_REQUEST: i32 = 0; + // TODO(arvid): manage a connection pool for the DB here? // i.e. pool: Box or something pub struct DB { @@ -228,6 +231,8 @@ pub mod ffi { pub content: String, pub write_key: Vec, pub num_chunks: i32, + pub control: bool, + pub control_message: i32, } #[derive(Queryable)] @@ -373,6 +378,12 @@ pub mod ffi { chunk: IncomingChunkFragment, num_chunks: i32, ) -> Result; + // receive the control message telling us to add the friend + fn receive_friend_request_control_message( + &self, + from_friend: i32, + sequence_number: i32, + ) -> Result<()>; // fails if there is no chunk to send // prioritizes by the given uid in order from first to last try @@ -807,6 +818,7 @@ impl DB { pub fn receive_ack(&self, uid: i32, ack: i32) -> Result { let mut conn = self.connect()?; + use crate::schema::friend; use crate::schema::outgoing_chunk; use crate::schema::sent; use crate::schema::status; @@ -818,6 +830,21 @@ impl DB { diesel::update(status::table.find(uid)) .set(status::sent_acked_seqnum.eq(ack)) .execute(conn_b)?; + // check if there are any control messages that were ACKed + let control_chunks = outgoing_chunk::table + .filter(outgoing_chunk::to_friend.eq(uid)) + .filter(outgoing_chunk::sequence_number.le(ack)) + .filter(outgoing_chunk::control.eq(true)) + .select(outgoing_chunk::control_message) + .load::(conn_b)?; + for control_message in control_chunks { + if control_message == CONTROL_MESSAGE_OUTGOING_FRIEND_REQUEST { + // yay! they shall now be considered a Real Friend + diesel::update(friend::table.find(uid)) + .set(friend::progress.eq(ACTUAL_FRIEND)) + .execute(conn_b)?; + } + } // delete all outgoing chunks with seqnum <= ack diesel::delete( outgoing_chunk::table @@ -859,37 +886,52 @@ impl DB { } } - pub fn receive_chunk( + pub fn update_sequence_number( &self, - chunk: ffi::IncomingChunkFragment, - num_chunks: i32, - ) -> Result { - let mut conn = self.connect()?; - use crate::schema::incoming_chunk; - use crate::schema::message; - use crate::schema::received; + conn: &mut SqliteConnection, + from_friend: i32, + sequence_number: i32, + ) -> Result { use crate::schema::status; - let r = conn.transaction::<_, diesel::result::Error, _>(|conn_b| { - let old_seqnum = status::table - .find(chunk.from_friend) - .select(status::received_seqnum) - .first::(conn_b)?; + conn.transaction::<_, diesel::result::Error, _>(|conn_b| { + let old_seqnum = + status::table.find(from_friend).select(status::received_seqnum).first::(conn_b)?; // if chunk is before old_seqnum, we just ignore it!! // in other words, once a chunk has been received, it can never be patched. // this is good. if something bad happens, you can always retransmit in a new // message. - if chunk.sequence_number <= old_seqnum { + if sequence_number <= old_seqnum { return Ok(ffi::ReceiveChunkStatus::OldChunk); } // we want to update received_seqnum in status! // so we only update the seqnum if we increase exactly by one (otherwise we might miss messages!) - if chunk.sequence_number == old_seqnum + 1 { - diesel::update(status::table.find(chunk.from_friend)) - .set(status::received_seqnum.eq(chunk.sequence_number)) + if sequence_number == old_seqnum + 1 { + diesel::update(status::table.find(from_friend)) + .set(status::received_seqnum.eq(sequence_number)) .execute(conn_b)?; } + Ok(ffi::ReceiveChunkStatus::NewChunk) + }) + } + + pub fn receive_chunk( + &self, + chunk: ffi::IncomingChunkFragment, + num_chunks: i32, + ) -> Result { + let mut conn = self.connect()?; + use crate::schema::incoming_chunk; + use crate::schema::message; + use crate::schema::received; + + let r = conn.transaction::<_, diesel::result::Error, _>(|conn_b| { + let chunk_status = + self.update_sequence_number(conn_b, chunk.from_friend, chunk.sequence_number)?; + if chunk_status == ffi::ReceiveChunkStatus::OldChunk { + return Ok(chunk_status); + } // check if there is already a message uid associated with this chunk sequence let q = @@ -981,6 +1023,33 @@ impl DB { } } + fn receive_friend_request_control_message( + &self, + from_friend: i32, + sequence_number: i32, + ) -> Result<(), DbError> { + let mut conn = self.connect()?; + use crate::schema::friend; + + let r = conn.transaction::<_, diesel::result::Error, _>(|conn_b| { + let chunk_status = self.update_sequence_number(conn_b, from_friend, sequence_number)?; + if chunk_status == ffi::ReceiveChunkStatus::OldChunk { + return Ok(chunk_status); + } + // move the friend to become an actual friend + diesel::update(friend::table.find(from_friend)) + .set(friend::progress.eq(ACTUAL_FRIEND)) + .execute(conn_b)?; + + Ok(ffi::ReceiveChunkStatus::NewChunk) + }); + + match r { + Ok(b) => Ok(()), + Err(e) => Err(DbError::Unknown(format!("receive_chunk: {}", e))), + } + } + pub fn chunk_to_send( &self, uid_priority: Vec, @@ -1033,6 +1102,8 @@ impl DB { outgoing_chunk::content, address::write_key, sent::num_chunks, + outgoing_chunk::control, + outgoing_chunk::control_message, )) .first::(conn_b)?; Ok(chunk_plusplus) @@ -1185,6 +1256,7 @@ impl DB { outgoing_chunk::chunks_start_sequence_number.eq(new_seqnum), outgoing_chunk::message_uid.eq(message_uid), outgoing_chunk::content.eq(chunk), + outgoing_chunk::control.eq(false), )) .execute(conn_b)?; } diff --git a/daemon/db/schema.rs b/daemon/db/schema.rs index 5936806c..a387d61a 100644 --- a/daemon/db/schema.rs +++ b/daemon/db/schema.rs @@ -61,6 +61,8 @@ diesel::table! { chunks_start_sequence_number -> Integer, message_uid -> Integer, content -> Text, + control -> Bool, + control_message -> Integer, } } @@ -122,15 +124,15 @@ diesel::joinable!(sent -> message (uid)); diesel::joinable!(status -> friend (uid)); diesel::allow_tables_to_appear_in_same_query!( - address, - config, - draft, - friend, - incoming_chunk, - message, - outgoing_chunk, - received, - registration, - sent, - status, + address, + config, + draft, + friend, + incoming_chunk, + message, + outgoing_chunk, + received, + registration, + sent, + status, ); diff --git a/daemon/migrations/2022-06-16-052955_create_initial_schema/up.sql b/daemon/migrations/2022-06-16-052955_create_initial_schema/up.sql index 1a679796..59cdcd25 100644 --- a/daemon/migrations/2022-06-16-052955_create_initial_schema/up.sql +++ b/daemon/migrations/2022-06-16-052955_create_initial_schema/up.sql @@ -100,6 +100,8 @@ CREATE TABLE outgoing_chunk ( chunks_start_sequence_number integer NOT NULL, message_uid integer NOT NULL, content text NOT NULL, + control boolean NOT NULL, + control_message integer NOT NULL, -- corresponds to the enum value in the protobuf PRIMARY KEY (to_friend, sequence_number), FOREIGN KEY(message_uid) REFERENCES sent(uid), FOREIGN KEY(to_friend) REFERENCES friend(uid) diff --git a/daemon/transmitter/transmitter.cc b/daemon/transmitter/transmitter.cc index 7a17de1b..bf9b53e1 100644 --- a/daemon/transmitter/transmitter.cc +++ b/daemon/transmitter/transmitter.cc @@ -266,36 +266,57 @@ auto Transmitter::retrieve() -> void { sequence_number, chunk.sequence_number(), chunks_start_sequence_number, chunk.chunks_start_sequence_number(), num_chunks, - chunk.num_chunks(), chunk_content, chunk.msg()); + chunk.num_chunks(), chunk_content, chunk.msg(), + control, chunk.control(), control_message, + chunk.control_message()); } - // we don't set these fields if we only have one chunk - auto num_chunks = chunk.num_chunks() > 1 ? chunk.num_chunks() : 1; - auto chunks_start_sequence_number = - chunk.num_chunks() > 1 ? chunk.chunks_start_sequence_number() - : chunk.sequence_number(); - - // TODO: we probably don't want to cast to int32 here... let's use - // int64s everywhere - auto receive_chunk_status = G.db->receive_chunk( - (db::IncomingChunkFragment){ - .from_friend = f.uid, - .sequence_number = static_cast(chunk.sequence_number()), - .chunks_start_sequence_number = - static_cast(chunks_start_sequence_number), - .content = chunk.msg()}, - static_cast(num_chunks)); - if (receive_chunk_status == - db::ReceiveChunkStatus::NewChunkAndNewMessage) { - std::lock_guard l(G.message_notification_cv_mutex); - G.message_notification_cv.notify_all(); + if (chunk.control()) { + switch (chunk.control_message()) { + case asphrclient::Message::ControlMessage::OUTGOING_FRIEND_REQUEST: + ASPHR_LOG_INFO( + "Received outgoing friend request from someone who's already " + "someone we wanted to add.", + friend_uid, f.uid); + G.db->receive_friend_request_control_message( + f.uid, chunk.sequence_number()); + break; + default: + ASPHR_LOG_ERR("Received unknown control message from friend.", + friend_uid, f.uid, control_message, + chunk.control_message()); + ASPHR_ASSERT(false); + } + } else { + // we don't set these fields if we only have one chunk + auto num_chunks = chunk.num_chunks() > 1 ? chunk.num_chunks() : 1; + auto chunks_start_sequence_number = + chunk.num_chunks() > 1 ? chunk.chunks_start_sequence_number() + : chunk.sequence_number(); + + // TODO: we probably don't want to cast to int32 here... let's use + // int64s everywhere + auto receive_chunk_status = G.db->receive_chunk( + (db::IncomingChunkFragment){ + .from_friend = f.uid, + .sequence_number = static_cast(chunk.sequence_number()), + .chunks_start_sequence_number = + static_cast(chunks_start_sequence_number), + .content = chunk.msg()}, + static_cast(num_chunks)); + if (receive_chunk_status == + db::ReceiveChunkStatus::NewChunkAndNewMessage) { + std::lock_guard l(G.message_notification_cv_mutex); + G.message_notification_cv.notify_all(); + } + + if (receive_chunk_status == db::ReceiveChunkStatus::NewChunk || + receive_chunk_status == + db::ReceiveChunkStatus::NewChunkAndNewMessage) { + previous_success_receive_friend = std::optional(f.uid); + } } - if (receive_chunk_status == db::ReceiveChunkStatus::NewChunk || - receive_chunk_status == - db::ReceiveChunkStatus::NewChunkAndNewMessage) { - previous_success_receive_friend = std::optional(f.uid); - } } else { ASPHR_LOG_INFO( "Failed to decrypt message (message was probably not for us, "