Skip to content

Commit

Permalink
Implement friend acking (#41)
Browse files Browse the repository at this point in the history
* update message.proto with control message

* πŸ”„ "update message.proto with control message
"
Update anysphere/asphr commit SHA
πŸ”— anysphere/asphr@e92e27b

* handle control message properly

* πŸ”„ "handle control message properly
"
Update anysphere/asphr commit SHA
πŸ”— anysphere/asphr@420185b

* check for ack of control message

* πŸ”„ "check for ack of control message
"
Update anysphere/asphr commit SHA
πŸ”— anysphere/asphr@fd692ae

* normal message shall not be control message

* make compile (almost)
  • Loading branch information
arvid220u authored Jun 27, 2022
1 parent 9498b75 commit b607bfc
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 60 deletions.
2 changes: 1 addition & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ load("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository")

git_repository(
name = "asphr",
commit = "b4b457df06b76da857081570d2fd34af104eb40e", # autoupdate anysphere/asphr
commit = "fd692ae8914992f6c7267e038a771bfed337dd70", # autoupdate anysphere/asphr
init_submodules = True,
remote = "https://github.com/anysphere/asphr.git",
)
Expand Down
7 changes: 4 additions & 3 deletions daemon/crypto/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ constexpr size_t CRYPTO_NPUBBYTES =
// the maximum size of a message such that it can be sent in a single message
// if the message is this size or shorter, it is guaranteed to be sent in a
// single round. 1+5 is for the uint32 ID, 1+MESSAGE_SIZE is for the header of
// the string, and 1 + 1 + 5 is for the repeated acks containing at least one
// element. -1 at the end is for the padding which reserves one byte.
// the string, and 1 + 5 is for num_chunks and 1 + 5 is for
// chunks_start_sequence_number element. -1 at the end is for the padding which
// reserves one byte.
constexpr size_t GUARANTEED_SINGLE_MESSAGE_SIZE =
MESSAGE_SIZE - (1 + 5) -
(1 +
CEIL_DIV((sizeof MESSAGE_SIZE) * 8 - std::countl_zero(MESSAGE_SIZE), 8)) -
(1 + 1 + 5) - CRYPTO_ABYTES - 1 - CRYPTO_NPUBBYTES;
(1 + 5) - (1 + 5) - CRYPTO_ABYTES - 1 - CRYPTO_NPUBBYTES;

// we support up to 4 billion messages! that's a lot.
// (we use unsigned integers)
Expand Down
2 changes: 0 additions & 2 deletions daemon/crypto/crypto_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ TEST(CryptoTest, EncryptDecrypt) {

EXPECT_EQ(decrypted->msg(), message.msg());
EXPECT_EQ(decrypted->sequence_number(), message.sequence_number());
EXPECT_EQ(decrypted->acks_size(), message.acks_size());
}

TEST(CryptoTest, EncryptDecryptMaxSize) {
Expand Down Expand Up @@ -61,7 +60,6 @@ TEST(CryptoTest, EncryptDecryptMaxSize) {

EXPECT_EQ(decrypted->msg(), message.msg());
EXPECT_EQ(decrypted->sequence_number(), message.sequence_number());
EXPECT_EQ(decrypted->acks_size(), message.acks_size());
}

TEST(CryptoTest, EncryptDecryptBiggerThanMaxSize) {
Expand Down
106 changes: 89 additions & 17 deletions daemon/db/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ impl fmt::Display for DbError {
}
}

// keep this in sync with message.proto
pub const CONTROL_MESSAGE_OUTGOING_FRIEND_REQUEST: i32 = 0;

// TODO(arvid): manage a connection pool for the DB here?
// i.e. pool: Box<ConnectionPool> or something
pub struct DB {
Expand Down Expand Up @@ -228,6 +231,8 @@ pub mod ffi {
pub content: String,
pub write_key: Vec<u8>,
pub num_chunks: i32,
pub control: bool,
pub control_message: i32,
}

#[derive(Queryable)]
Expand Down Expand Up @@ -373,6 +378,12 @@ pub mod ffi {
chunk: IncomingChunkFragment,
num_chunks: i32,
) -> Result<ReceiveChunkStatus>;
// receive the control message telling us to add the friend
fn receive_friend_request_control_message(
&self,
from_friend: i32,
sequence_number: i32,
) -> Result<()>;

// fails if there is no chunk to send
// prioritizes by the given uid in order from first to last try
Expand Down Expand Up @@ -807,6 +818,7 @@ impl DB {

pub fn receive_ack(&self, uid: i32, ack: i32) -> Result<bool, DbError> {
let mut conn = self.connect()?;
use crate::schema::friend;
use crate::schema::outgoing_chunk;
use crate::schema::sent;
use crate::schema::status;
Expand All @@ -818,6 +830,21 @@ impl DB {
diesel::update(status::table.find(uid))
.set(status::sent_acked_seqnum.eq(ack))
.execute(conn_b)?;
// check if there are any control messages that were ACKed
let control_chunks = outgoing_chunk::table
.filter(outgoing_chunk::to_friend.eq(uid))
.filter(outgoing_chunk::sequence_number.le(ack))
.filter(outgoing_chunk::control.eq(true))
.select(outgoing_chunk::control_message)
.load::<i32>(conn_b)?;
for control_message in control_chunks {
if control_message == CONTROL_MESSAGE_OUTGOING_FRIEND_REQUEST {
// yay! they shall now be considered a Real Friend
diesel::update(friend::table.find(uid))
.set(friend::progress.eq(ACTUAL_FRIEND))
.execute(conn_b)?;
}
}
// delete all outgoing chunks with seqnum <= ack
diesel::delete(
outgoing_chunk::table
Expand Down Expand Up @@ -859,37 +886,52 @@ impl DB {
}
}

pub fn receive_chunk(
pub fn update_sequence_number(
&self,
chunk: ffi::IncomingChunkFragment,
num_chunks: i32,
) -> Result<ffi::ReceiveChunkStatus, DbError> {
let mut conn = self.connect()?;
use crate::schema::incoming_chunk;
use crate::schema::message;
use crate::schema::received;
conn: &mut SqliteConnection,
from_friend: i32,
sequence_number: i32,
) -> Result<ffi::ReceiveChunkStatus, diesel::result::Error> {
use crate::schema::status;

let r = conn.transaction::<_, diesel::result::Error, _>(|conn_b| {
let old_seqnum = status::table
.find(chunk.from_friend)
.select(status::received_seqnum)
.first::<i32>(conn_b)?;
conn.transaction::<_, diesel::result::Error, _>(|conn_b| {
let old_seqnum =
status::table.find(from_friend).select(status::received_seqnum).first::<i32>(conn_b)?;
// if chunk is before old_seqnum, we just ignore it!!
// in other words, once a chunk has been received, it can never be patched.
// this is good. if something bad happens, you can always retransmit in a new
// message.
if chunk.sequence_number <= old_seqnum {
if sequence_number <= old_seqnum {
return Ok(ffi::ReceiveChunkStatus::OldChunk);
}
// we want to update received_seqnum in status!

// so we only update the seqnum if we increase exactly by one (otherwise we might miss messages!)
if chunk.sequence_number == old_seqnum + 1 {
diesel::update(status::table.find(chunk.from_friend))
.set(status::received_seqnum.eq(chunk.sequence_number))
if sequence_number == old_seqnum + 1 {
diesel::update(status::table.find(from_friend))
.set(status::received_seqnum.eq(sequence_number))
.execute(conn_b)?;
}
Ok(ffi::ReceiveChunkStatus::NewChunk)
})
}

pub fn receive_chunk(
&self,
chunk: ffi::IncomingChunkFragment,
num_chunks: i32,
) -> Result<ffi::ReceiveChunkStatus, DbError> {
let mut conn = self.connect()?;
use crate::schema::incoming_chunk;
use crate::schema::message;
use crate::schema::received;

let r = conn.transaction::<_, diesel::result::Error, _>(|conn_b| {
let chunk_status =
self.update_sequence_number(conn_b, chunk.from_friend, chunk.sequence_number)?;
if chunk_status == ffi::ReceiveChunkStatus::OldChunk {
return Ok(chunk_status);
}

// check if there is already a message uid associated with this chunk sequence
let q =
Expand Down Expand Up @@ -981,6 +1023,33 @@ impl DB {
}
}

fn receive_friend_request_control_message(
&self,
from_friend: i32,
sequence_number: i32,
) -> Result<(), DbError> {
let mut conn = self.connect()?;
use crate::schema::friend;

let r = conn.transaction::<_, diesel::result::Error, _>(|conn_b| {
let chunk_status = self.update_sequence_number(conn_b, from_friend, sequence_number)?;
if chunk_status == ffi::ReceiveChunkStatus::OldChunk {
return Ok(chunk_status);
}
// move the friend to become an actual friend
diesel::update(friend::table.find(from_friend))
.set(friend::progress.eq(ACTUAL_FRIEND))
.execute(conn_b)?;

Ok(ffi::ReceiveChunkStatus::NewChunk)
});

match r {
Ok(b) => Ok(()),
Err(e) => Err(DbError::Unknown(format!("receive_chunk: {}", e))),
}
}

pub fn chunk_to_send(
&self,
uid_priority: Vec<i32>,
Expand Down Expand Up @@ -1033,6 +1102,8 @@ impl DB {
outgoing_chunk::content,
address::write_key,
sent::num_chunks,
outgoing_chunk::control,
outgoing_chunk::control_message,
))
.first::<ffi::OutgoingChunkPlusPlus>(conn_b)?;
Ok(chunk_plusplus)
Expand Down Expand Up @@ -1185,6 +1256,7 @@ impl DB {
outgoing_chunk::chunks_start_sequence_number.eq(new_seqnum),
outgoing_chunk::message_uid.eq(message_uid),
outgoing_chunk::content.eq(chunk),
outgoing_chunk::control.eq(false),
))
.execute(conn_b)?;
}
Expand Down
24 changes: 13 additions & 11 deletions daemon/db/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ diesel::table! {
chunks_start_sequence_number -> Integer,
message_uid -> Integer,
content -> Text,
control -> Bool,
control_message -> Integer,
}
}

Expand Down Expand Up @@ -122,15 +124,15 @@ diesel::joinable!(sent -> message (uid));
diesel::joinable!(status -> friend (uid));

diesel::allow_tables_to_appear_in_same_query!(
address,
config,
draft,
friend,
incoming_chunk,
message,
outgoing_chunk,
received,
registration,
sent,
status,
address,
config,
draft,
friend,
incoming_chunk,
message,
outgoing_chunk,
received,
registration,
sent,
status,
);
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ CREATE TABLE outgoing_chunk (
chunks_start_sequence_number integer NOT NULL,
message_uid integer NOT NULL,
content text NOT NULL,
control boolean NOT NULL,
control_message integer NOT NULL, -- corresponds to the enum value in the protobuf
PRIMARY KEY (to_friend, sequence_number),
FOREIGN KEY(message_uid) REFERENCES sent(uid),
FOREIGN KEY(to_friend) REFERENCES friend(uid)
Expand Down
73 changes: 47 additions & 26 deletions daemon/transmitter/transmitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,36 +266,57 @@ auto Transmitter::retrieve() -> void {
sequence_number, chunk.sequence_number(),
chunks_start_sequence_number,
chunk.chunks_start_sequence_number(), num_chunks,
chunk.num_chunks(), chunk_content, chunk.msg());
chunk.num_chunks(), chunk_content, chunk.msg(),
control, chunk.control(), control_message,
chunk.control_message());
}

// we don't set these fields if we only have one chunk
auto num_chunks = chunk.num_chunks() > 1 ? chunk.num_chunks() : 1;
auto chunks_start_sequence_number =
chunk.num_chunks() > 1 ? chunk.chunks_start_sequence_number()
: chunk.sequence_number();

// TODO: we probably don't want to cast to int32 here... let's use
// int64s everywhere
auto receive_chunk_status = G.db->receive_chunk(
(db::IncomingChunkFragment){
.from_friend = f.uid,
.sequence_number = static_cast<int>(chunk.sequence_number()),
.chunks_start_sequence_number =
static_cast<int>(chunks_start_sequence_number),
.content = chunk.msg()},
static_cast<int>(num_chunks));
if (receive_chunk_status ==
db::ReceiveChunkStatus::NewChunkAndNewMessage) {
std::lock_guard<std::mutex> l(G.message_notification_cv_mutex);
G.message_notification_cv.notify_all();
if (chunk.control()) {
switch (chunk.control_message()) {
case asphrclient::Message::ControlMessage::OUTGOING_FRIEND_REQUEST:
ASPHR_LOG_INFO(
"Received outgoing friend request from someone who's already "
"someone we wanted to add.",
friend_uid, f.uid);
G.db->receive_friend_request_control_message(
f.uid, chunk.sequence_number());
break;
default:
ASPHR_LOG_ERR("Received unknown control message from friend.",
friend_uid, f.uid, control_message,
chunk.control_message());
ASPHR_ASSERT(false);
}
} else {
// we don't set these fields if we only have one chunk
auto num_chunks = chunk.num_chunks() > 1 ? chunk.num_chunks() : 1;
auto chunks_start_sequence_number =
chunk.num_chunks() > 1 ? chunk.chunks_start_sequence_number()
: chunk.sequence_number();

// TODO: we probably don't want to cast to int32 here... let's use
// int64s everywhere
auto receive_chunk_status = G.db->receive_chunk(
(db::IncomingChunkFragment){
.from_friend = f.uid,
.sequence_number = static_cast<int>(chunk.sequence_number()),
.chunks_start_sequence_number =
static_cast<int>(chunks_start_sequence_number),
.content = chunk.msg()},
static_cast<int>(num_chunks));
if (receive_chunk_status ==
db::ReceiveChunkStatus::NewChunkAndNewMessage) {
std::lock_guard<std::mutex> l(G.message_notification_cv_mutex);
G.message_notification_cv.notify_all();
}

if (receive_chunk_status == db::ReceiveChunkStatus::NewChunk ||
receive_chunk_status ==
db::ReceiveChunkStatus::NewChunkAndNewMessage) {
previous_success_receive_friend = std::optional<int>(f.uid);
}
}

if (receive_chunk_status == db::ReceiveChunkStatus::NewChunk ||
receive_chunk_status ==
db::ReceiveChunkStatus::NewChunkAndNewMessage) {
previous_success_receive_friend = std::optional<int>(f.uid);
}
} else {
ASPHR_LOG_INFO(
"Failed to decrypt message (message was probably not for us, "
Expand Down

0 comments on commit b607bfc

Please sign in to comment.