From dbb1a1c73fd990a19657cc113f45b6ac0049b9dc Mon Sep 17 00:00:00 2001 From: Arvid Lunnemark Date: Sun, 27 Feb 2022 20:42:34 +0000 Subject: [PATCH 1/4] fix prio system! --- daemon/config.cc | 8 ++- daemon/config.hpp | 3 +- daemon/constants.hpp | 10 +++ daemon/friend.hpp | 10 ++- daemon/transmitter.cc | 141 ++++++++++++++++++++++++++--------------- daemon/transmitter.hpp | 4 ++ 6 files changed, 119 insertions(+), 57 deletions(-) diff --git a/daemon/config.cc b/daemon/config.cc index 2b0421d3..4fe5520e 100644 --- a/daemon/config.cc +++ b/daemon/config.cc @@ -194,14 +194,15 @@ auto Config::num_enabled_friends() -> int { return num_enabled_friends; } -auto Config::random_enabled_friend() -> asphr::StatusOr { +auto Config::random_enabled_friend(const std::unordered_set& excluded) + -> asphr::StatusOr { const std::lock_guard l(config_mtx); check_rep(); vector enabled_friends; for (auto& friend_pair : friendTable) { - if (friend_pair.second.enabled) { + if (friend_pair.second.enabled && !excluded.contains(friend_pair.first)) { enabled_friends.push_back(friend_pair.second); } } @@ -355,6 +356,7 @@ auto Config::check_rep() const -> void { assert(registrationInfo.allocation.size() > 0); assert(dummyMe.name == "dummyMe"); + assert(dummyMe.dummy); assert(dummyMe.write_key.size() == crypto_aead_xchacha20poly1305_ietf_KEYBYTES); assert(dummyMe.read_key.size() == @@ -400,7 +402,7 @@ auto Config::initialize_dummy_me() -> void { dummy_friend_keypair.first); dummyMe = Friend("dummyMe", 0, dummy_read_write_keys.first, - dummy_read_write_keys.second, 0, false, 0, 0, 0); + dummy_read_write_keys.second, 0, false, 0, 0, 0, true); save(); } \ No newline at end of file diff --git a/daemon/config.hpp b/daemon/config.hpp index 5728d759..6b580372 100644 --- a/daemon/config.hpp +++ b/daemon/config.hpp @@ -45,7 +45,8 @@ class Config { auto has_space_for_friends() -> bool; auto num_enabled_friends() -> int; - auto random_enabled_friend() -> asphr::StatusOr; + auto random_enabled_friend(const std::unordered_set& excluded) + -> asphr::StatusOr; auto dummy_me() -> Friend; auto has_registered() -> bool; diff --git a/daemon/constants.hpp b/daemon/constants.hpp index e15aca2e..9aad6698 100644 --- a/daemon/constants.hpp +++ b/daemon/constants.hpp @@ -23,6 +23,16 @@ constexpr auto DEFAULT_SERVER_ADDRESS = "server1.anysphere.co:443"; // this commit hash will be automatically updated by gui/package.json. constexpr auto RELEASE_COMMIT_HASH = "32622be70c454ef4fc64517444727226d4df983b"; +// this is the number of friends that will be received from in each round +// (ideally, they can all be received in a single PIR request using batch PIR) +// it needs to be at least 2, or else we will get a liveness problem where we +// are sending to one person who is not responding, and then continually trying +// to read from that person while never receiving from anyone else. +// TODO: when batch PIR implemented, increase this to the batch PIR number +constexpr auto RECEIVE_FRIENDS_PER_ROUND = 2; +static_assert(RECEIVE_FRIENDS_PER_ROUND >= 2, + "RECEIVE_FRIENDS_PER_ROUND must be >= 2"); + // this is copied from amazon-roots.pem // ideally we would do some kind of compile-time read file operation here, but // that seems to not exist diff --git a/daemon/friend.hpp b/daemon/friend.hpp index ab0fb4f7..c3f98072 100644 --- a/daemon/friend.hpp +++ b/daemon/friend.hpp @@ -20,7 +20,8 @@ class Friend { enabled(false), latest_ack_id(0), latest_send_id(0), - last_receive_id(0) { + last_receive_id(0), + dummy(false) { auto rng = std::default_random_engine{}; auto all_ack_indexes_not_used = asphr::unordered_set{}; @@ -45,7 +46,7 @@ class Friend { Friend(const string& name, const int read_index, const string& read_key, const string& write_key, const int ack_index, const bool enabled, const uint32_t latest_ack_id, const uint32_t latest_send_id, - const uint32_t last_receive_id) + const uint32_t last_receive_id, bool dummy) : name(name), read_index(read_index), read_key(read_key), @@ -54,7 +55,8 @@ class Friend { enabled(enabled), latest_ack_id(latest_ack_id), latest_send_id(latest_send_id), - last_receive_id(last_receive_id) { + last_receive_id(last_receive_id), + dummy(dummy) { check_rep(); } @@ -83,6 +85,8 @@ class Friend { // have received all IDs up to and including this value. Note that this refers // to ID in the sequence_number space, not the message ID space. uint32_t last_receive_id; + // dummy is true if the friend is a dummy friend! + bool dummy; auto to_json() -> asphr::json; static auto from_json(const asphr::json& j) -> Friend; diff --git a/daemon/transmitter.cc b/daemon/transmitter.cc index 8eaee693..61faeffb 100644 --- a/daemon/transmitter.cc +++ b/daemon/transmitter.cc @@ -29,73 +29,83 @@ auto Transmitter::retrieve_messages() -> void { } auto& client = config->pir_client(); - asphrserver::ReceiveMessageInfo request; - // choose a friend to receive from!! + // choose RECEIVE_FRIENDS_PER_ROUND friends to receive from!! // priority 1: the friend that we just sent a message to - // priority 2: the friend that we successfully received a message from the - // previous round priority 3: random! - Friend friend_info; - bool dummy = false; + vector receive_friends; + std::unordered_set receive_friend_names; if (auto friend_info_status = config->get_friend(just_sent_friend); friend_info_status.ok()) { - friend_info = friend_info_status.value(); - } else if (auto friend_info_status = - config->get_friend(previous_success_receive_friend); - friend_info_status.ok()) { - friend_info = friend_info_status.value(); - } else if (auto friend_info_status = config->random_enabled_friend(); - friend_info_status.ok()) { + receive_friends.push_back(friend_info_status.value()); + receive_friend_names.insert(friend_info_status.value().name); + } + // priority 2: the friend that we successfully received a message from the + // previous round + if (auto friend_info_status = + config->get_friend(previous_success_receive_friend); + friend_info_status.ok() && + previous_success_receive_friend != just_sent_friend) { + receive_friends.push_back(friend_info_status.value()); + receive_friend_names.insert(friend_info_status.value().name); + } + // priority 3: random! + auto receive_friends_old_size = receive_friends.size(); + for (size_t i = 0; i < RECEIVE_FRIENDS_PER_ROUND - receive_friends_old_size; + i++) { // note: we do not need cryptographic randomness here. randomness is only // for liveness - friend_info = friend_info_status.value(); - } else { - friend_info = config->dummy_me(); - dummy = true; + auto friend_info_status = + config->random_enabled_friend(receive_friend_names); + if (friend_info_status.ok()) { + receive_friends.push_back(friend_info_status.value()); + assert(!receive_friend_names.contains(friend_info_status.value().name)); + receive_friend_names.insert(friend_info_status.value().name); + } else { + // dummy if no friends left :') + receive_friends.push_back(config->dummy_me()); + receive_friend_names.insert(config->dummy_me().name); + } } + assert(receive_friends.size() == RECEIVE_FRIENDS_PER_ROUND); - auto query = client.query(friend_info.read_index, config->db_rows()); - - auto serialized_query = query.serialize_to_string(); - - request.set_pir_query(serialized_query); - - asphrserver::ReceiveMessageResponse reply; - grpc::ClientContext context; + auto receive_friend_indices = vector(RECEIVE_FRIENDS_PER_ROUND); + for (auto i = 0; i < RECEIVE_FRIENDS_PER_ROUND; i++) { + receive_friend_indices.at(i) = receive_friends.at(i).read_index; + } + auto pir_replies = batch_retrieve_pir(client, receive_friend_indices); - grpc::Status status = stub->ReceiveMessage(&context, request, &reply); + assert(pir_replies.size() == RECEIVE_FRIENDS_PER_ROUND); previous_success_receive_friend = ""; check_rep(); - // if dummy, we do not actually care about the answer. - // we still do the rpc to not leak information. - if (dummy) { - return; - } - - if (status.ok()) { - cout << "received message!!!" << endl; - - // TODO: inbox.receive_message and msgstore->add_incoming_message need to be - // atomic!!! - // TODO: msgstore should mark a message as delivered here possibly, - // depending on the ACKs. EDIT: this is currently done in outbox which seems - // fine - auto message_opt = - inbox.receive_message(client, *config, reply, friend_info, crypto, - previous_success_receive_friend); - if (message_opt.has_value()) { - auto message = message_opt.value(); - msgstore->add_incoming_message(message.id, message.friend_name, - message.message); + for (auto i = 0; i < RECEIVE_FRIENDS_PER_ROUND; i++) { + auto& friend_info = receive_friends.at(i); + if (friend_info.dummy) { + continue; + } + if (pir_replies.at(i).ok()) { + auto& reply = pir_replies.at(i).value(); + // TODO: inbox.receive_message and msgstore->add_incoming_message need to + // be atomic!!! + auto message_opt = + inbox.receive_message(client, *config, reply, friend_info, crypto, + previous_success_receive_friend); + if (message_opt.has_value()) { + auto message = message_opt.value(); + msgstore->add_incoming_message(message.id, message.friend_name, + message.message); + } else { + cout << "no message received from " << friend_info.name << endl; + } } else { - cout << "no message received" << endl; + cout << "could not retrieve message from " << friend_info.name << endl; + cout << pir_replies.at(i).status().code() << ": " + << pir_replies.at(i).status().message() << endl; } - } else { - cout << status.error_code() << ": " << status.error_message() << endl; } + check_rep(); } @@ -189,6 +199,37 @@ auto Transmitter::send_messages() -> void { check_rep(); } +auto Transmitter::batch_retrieve_pir(FastPIRClient& client, + vector indices) + -> vector> { + // TODO: use batch PIR here!!! we don't want to do this many PIR requests + vector> pir_replies; + + for (auto& index : indices) { + asphrserver::ReceiveMessageInfo request; + + auto query = client.query(index, config->db_rows()); + + auto serialized_query = query.serialize_to_string(); + + request.set_pir_query(serialized_query); + + asphrserver::ReceiveMessageResponse reply; + grpc::ClientContext context; + + grpc::Status status = stub->ReceiveMessage(&context, request, &reply); + + if (status.ok()) { + pir_replies.push_back(reply); + } else { + pir_replies.push_back(absl::UnknownError( + "ReceiveMessage RPC failed with error: " + status.error_message())); + } + } + + return pir_replies; +} + auto Transmitter::check_rep() const noexcept -> void { assert(config->has_registered() || !config->has_registered()); } \ No newline at end of file diff --git a/daemon/transmitter.hpp b/daemon/transmitter.hpp index 5b233981..65aa2207 100644 --- a/daemon/transmitter.hpp +++ b/daemon/transmitter.hpp @@ -44,5 +44,9 @@ class Transmitter { string previous_success_receive_friend; Time last_ui_timestamp; + // for each index, get the PIR response for that index + auto batch_retrieve_pir(FastPIRClient& client, vector indices) + -> vector>; + auto check_rep() const noexcept -> void; }; \ No newline at end of file From b38a5ec7a22ff92642e16b406b6b9dbcf84a65e1 Mon Sep 17 00:00:00 2001 From: Arvid Lunnemark Date: Sun, 27 Feb 2022 20:46:22 +0000 Subject: [PATCH 2/4] fix prioritization for previous success receive friend --- daemon/inbox.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/daemon/inbox.cc b/daemon/inbox.cc index 589f0992..27f561d9 100644 --- a/daemon/inbox.cc +++ b/daemon/inbox.cc @@ -167,8 +167,6 @@ auto Inbox::receive_message(FastPIRClient& client, Config& config, << decrypted.status() << endl; save(); return std::nullopt; - } else { - previous_success_receive_friend = friend_info.name; } auto& message = decrypted.value(); @@ -214,6 +212,8 @@ auto Inbox::receive_message(FastPIRClient& client, Config& config, return std::nullopt; } + previous_success_receive_friend = friend_info.name; + if (message.num_chunks() == 0) { save(); return InboxMessage{message.msg(), friend_info.name, message.id()}; From 94bf389485c51920754653e82d0055663056e78b Mon Sep 17 00:00:00 2001 From: Arvid Lunnemark Date: Sun, 27 Feb 2022 20:49:59 +0000 Subject: [PATCH 3/4] fix prio system --- daemon/inbox.cc | 6 ++++-- daemon/inbox.hpp | 2 +- daemon/transmitter.cc | 4 ++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/daemon/inbox.cc b/daemon/inbox.cc index 27f561d9..b714027d 100644 --- a/daemon/inbox.cc +++ b/daemon/inbox.cc @@ -134,7 +134,7 @@ auto Inbox::update_ack_from_friend(Config& config, pir_value_t& pir_acks, auto Inbox::receive_message(FastPIRClient& client, Config& config, const asphrserver::ReceiveMessageResponse& reply, const Friend& friend_info_in, const Crypto& crypto, - string& previous_success_receive_friend) + string* previous_success_receive_friend) -> std::optional { check_rep(); Friend friend_info = friend_info_in; @@ -212,7 +212,9 @@ auto Inbox::receive_message(FastPIRClient& client, Config& config, return std::nullopt; } - previous_success_receive_friend = friend_info.name; + // we have received a new message (chunk) that we haven't received before! + // this is a success! + *previous_success_receive_friend = friend_info.name; if (message.num_chunks() == 0) { save(); diff --git a/daemon/inbox.hpp b/daemon/inbox.hpp index e536dece..a077ca92 100644 --- a/daemon/inbox.hpp +++ b/daemon/inbox.hpp @@ -42,7 +42,7 @@ class Inbox { auto receive_message(FastPIRClient& client, Config& config, const asphrserver::ReceiveMessageResponse& reply, const Friend& friend_info_in, const Crypto& crypto, - string& previous_success_receive_friend) + string* previous_success_receive_friend) -> std::optional; private: diff --git a/daemon/transmitter.cc b/daemon/transmitter.cc index 61faeffb..fcef0068 100644 --- a/daemon/transmitter.cc +++ b/daemon/transmitter.cc @@ -39,7 +39,7 @@ auto Transmitter::retrieve_messages() -> void { receive_friends.push_back(friend_info_status.value()); receive_friend_names.insert(friend_info_status.value().name); } - // priority 2: the friend that we successfully received a message from the + // priority 2: the friend that we successfully received a new message from the // previous round if (auto friend_info_status = config->get_friend(previous_success_receive_friend); @@ -91,7 +91,7 @@ auto Transmitter::retrieve_messages() -> void { // be atomic!!! auto message_opt = inbox.receive_message(client, *config, reply, friend_info, crypto, - previous_success_receive_friend); + &previous_success_receive_friend); if (message_opt.has_value()) { auto message = message_opt.value(); msgstore->add_incoming_message(message.id, message.friend_name, From f3037b10f59f918beaef976178093afd9c89cff6 Mon Sep 17 00:00:00 2001 From: Arvid Lunnemark Date: Mon, 28 Feb 2022 00:11:52 +0000 Subject: [PATCH 4/4] fix undefined behavior! --- daemon/config.cc | 10 ++++++---- daemon/friend.cc | 20 +++++++++----------- daemon/friend.hpp | 1 - daemon/outbox.cc | 16 ++++++++-------- 4 files changed, 23 insertions(+), 24 deletions(-) diff --git a/daemon/config.cc b/daemon/config.cc index 4fe5520e..4e12feb9 100644 --- a/daemon/config.cc +++ b/daemon/config.cc @@ -63,7 +63,9 @@ Config::Config(const string& config_file_address) Config::Config(const asphr::json& config_json_input, const string& config_file_address) - : saved_file_address(config_file_address), db_rows_(CLIENT_DB_ROWS) { + : saved_file_address(config_file_address), + db_rows_(CLIENT_DB_ROWS), + dummyMe("dummyMe", 0, "", "", 0, false, 0, 0, 0, true) { auto config_json = config_json_input; if (!config_json.contains("has_registered")) { cout << "WARNING (invalid config file): config file does not contain " @@ -112,7 +114,7 @@ Config::Config(const asphr::json& config_json_input, for (auto& friend_json : config_json.at("friends")) { Friend f = Friend::from_json(friend_json); - friendTable[f.name] = f; + friendTable.try_emplace(f.name, f); } data_dir = config_json.at("data_dir").get(); @@ -232,7 +234,7 @@ auto Config::add_friend(const Friend& f) -> void { check_rep(); assert(!friendTable.contains(f.name)); - friendTable[f.name] = f; + friendTable.try_emplace(f.name, f); save(); check_rep(); @@ -288,7 +290,7 @@ auto Config::update_friend(const Friend& f) -> void { check_rep(); assert(friendTable.contains(f.name)); - friendTable[f.name] = f; + friendTable.insert_or_assign(f.name, f); save(); check_rep(); diff --git a/daemon/friend.cc b/daemon/friend.cc index 176ab36f..2f4ffaf7 100644 --- a/daemon/friend.cc +++ b/daemon/friend.cc @@ -21,17 +21,15 @@ auto Friend::to_json() -> asphr::json { } auto Friend::from_json(const asphr::json& j) -> Friend { - Friend f; - f.name = j.at("name").get(); - f.read_index = j.at("read_index").get(); - asphr::Base64Unescape(j.at("write_key").get(), &f.write_key); - asphr::Base64Unescape(j.at("read_key").get(), &f.read_key); - f.ack_index = j.at("ack_index").get(); - f.enabled = j.at("enabled").get(); - f.latest_ack_id = j.at("latest_ack_id").get(); - f.latest_send_id = j.at("latest_send_id").get(); - f.last_receive_id = j.at("last_receive_id").get(); - f.check_rep(); + string read_key; + string write_key; + asphr::Base64Unescape(j.at("read_key").get(), &read_key); + asphr::Base64Unescape(j.at("write_key").get(), &write_key); + Friend f(j.at("name").get(), j.at("read_index").get(), read_key, + write_key, j.at("ack_index").get(), j.at("enabled").get(), + j.at("latest_ack_id").get(), + j.at("latest_send_id").get(), + j.at("last_receive_id").get(), false); return f; } diff --git a/daemon/friend.hpp b/daemon/friend.hpp index c3f98072..b28a3e8c 100644 --- a/daemon/friend.hpp +++ b/daemon/friend.hpp @@ -10,7 +10,6 @@ class Friend { public: - Friend() = default; Friend(const string& name, const vector& friends) : name(name), read_index(0), diff --git a/daemon/outbox.cc b/daemon/outbox.cc index 3a33a0d5..ffa8fd81 100644 --- a/daemon/outbox.cc +++ b/daemon/outbox.cc @@ -8,14 +8,14 @@ #include "client_lib/client_lib.hpp" auto MessageToSend::from_json(const asphr::json& json) -> MessageToSend { - MessageToSend message; - message.to = Friend::from_json(json.at("to")); - message.sequence_number = json.at("sequence_number").get(); - message.msg = json.at("msg").get(); - message.chunked = json.at("chunked").get(); - message.num_chunks = json.at("num_chunks").get(); - message.chunks_start_id = json.at("chunks_start_id").get(); - message.full_message_id = json.at("full_message_id").get(); + auto message = MessageToSend{ + .to = Friend::from_json(json.at("to")), + .sequence_number = json.at("sequence_number").get(), + .msg = json.at("msg").get(), + .chunked = json.at("chunked").get(), + .num_chunks = json.at("num_chunks").get(), + .chunks_start_id = json.at("chunks_start_id").get(), + .full_message_id = json.at("full_message_id").get()}; return message; }